Python · 64557 bytes Raw Blame History
1 """Tool-batch execution and recovery bookkeeping for the typed runtime."""
2
3 from __future__ import annotations
4
5 from collections.abc import Awaitable, Callable
6 from dataclasses import dataclass, field
7 from pathlib import Path
8 from typing import Any
9
10 from ..llm.base import ToolCall
11 from .compaction import infer_preferred_next_step, summarize_confirmed_facts
12 from .context import RuntimeContext
13 from .dod import (
14 DefinitionOfDone,
15 DefinitionOfDoneStore,
16 all_planned_artifacts_exist,
17 begin_new_verification_attempt,
18 collect_planned_artifact_targets,
19 derive_verification_commands,
20 ensure_active_verification_attempt,
21 infer_next_output_file,
22 is_state_mutating_tool_call,
23 planned_artifact_target_satisfied,
24 record_successful_tool_call,
25 synthesize_todo_items,
26 )
27 from .events import AgentEvent, TurnSummary
28 from .evidence_provenance import EvidenceProvenance, EvidenceProvenanceStatus
29 from .executor import ToolExecutionState, ToolExecutor
30 from .logging import get_runtime_logger
31 from .policy_timeline import append_verification_timeline_entry
32 from .repair_focus import extract_active_repair_context
33 from .safeguard_services import extract_shell_text_rewrite_target
34 from .tool_batch_checks import ToolBatchConfidenceGate, ToolBatchVerificationGate
35 from .tool_batch_recovery import ToolBatchRecoveryController
36 from .verification_observations import (
37 VerificationObservation,
38 VerificationObservationStatus,
39 )
40 from .workflow import (
41 advance_todos_from_tool_call,
42 effective_pending_todo_items,
43 infer_pending_todo_output_target,
44 preferred_pending_todo_item,
45 reconcile_aggregate_completion_steps,
46 sync_todos_to_definition_of_done,
47 )
48
49 EventSink = Callable[[AgentEvent], Awaitable[None]]
50 ConfirmationHandler = (
51 Callable[[str, str, str, dict[str, Any] | None], Awaitable[bool]] | None
52 )
53 UserQuestionHandler = Callable[[str, list[str] | None], Awaitable[str]] | None
54
55 _VERIFY_ITEM = "Collect verification evidence"
56 _TODO_NUDGE_EXCLUDED_ITEMS = {
57 "Complete the requested work",
58 _VERIFY_ITEM,
59 }
60 _MUTATION_TODO_HINTS = (
61 "create",
62 "creating",
63 "update",
64 "updating",
65 "edit",
66 "editing",
67 "write",
68 "writing",
69 "fix",
70 "fixing",
71 "modify",
72 "modifying",
73 "change",
74 "changing",
75 "patch",
76 "patching",
77 "replace",
78 "replacing",
79 "correct",
80 "correcting",
81 "rewrite",
82 "rewriting",
83 )
84 _CONSISTENCY_REVIEW_HINTS = (
85 "consistent",
86 "consistently",
87 "formatted",
88 "link",
89 "linked",
90 "navigation",
91 "work properly",
92 "all files",
93 "every file",
94 "ensure",
95 )
96 _BOOKKEEPING_NOTE_TOOL_NAMES = {
97 "notepad_write_working",
98 "notepad_append",
99 "notepad_write_priority",
100 "notepad_write_manual",
101 }
102
103
104 @dataclass
105 class ToolBatchResult:
106 """Outcome of running one assistant-proposed tool batch."""
107
108 actions_taken: list[str] = field(default_factory=list)
109 consecutive_errors: int = 0
110 halted: bool = False
111 final_response: str = ""
112
113
114 class ToolBatchRunner:
115 """Owns tool-batch execution, recovery, and post-tool bookkeeping."""
116
117 def __init__(
118 self,
119 context: RuntimeContext,
120 dod_store: DefinitionOfDoneStore,
121 *,
122 confidence_gate: ToolBatchConfidenceGate | None = None,
123 recovery_controller: ToolBatchRecoveryController | None = None,
124 verification_gate: ToolBatchVerificationGate | None = None,
125 ) -> None:
126 self.context = context
127 self.dod_store = dod_store
128 self.confidence_gate = confidence_gate or ToolBatchConfidenceGate(context)
129 self.recovery_controller = recovery_controller or ToolBatchRecoveryController(context)
130 self.verification_gate = verification_gate or ToolBatchVerificationGate(context)
131
132 async def execute_batch(
133 self,
134 *,
135 tool_calls: list[ToolCall],
136 tool_source: str,
137 pending_tool_calls_seen: set[str],
138 emit: EventSink,
139 summary: TurnSummary,
140 dod: DefinitionOfDone,
141 executor: ToolExecutor,
142 on_confirmation: ConfirmationHandler,
143 on_user_question: UserQuestionHandler,
144 emit_confirmation,
145 consecutive_errors: int,
146 ) -> ToolBatchResult:
147 """Run one assistant tool batch through the shared executor seam."""
148
149 result = ToolBatchResult(consecutive_errors=consecutive_errors)
150
151 # Pre-populate planned items for the entire batch so the todo
152 # widget shows what's coming, not just what's done.
153 planned_labels = _batch_planned_labels(tool_calls)
154 completed_labels: list[str] = []
155
156 async def _emit_batch_todos() -> None:
157 """Emit a todo update combining DoD state with batch progress."""
158 items = synthesize_todo_items(dod)
159 for label in planned_labels:
160 if label in completed_labels:
161 continue
162 # Don't duplicate items already in DoD
163 if any(item["content"] == label for item in items):
164 continue
165 items.append({"content": label, "status": "in_progress", "active_form": label})
166 if items:
167 await emit(AgentEvent(type="todo_update", todo_items=items))
168
169 await _emit_batch_todos()
170
171 for tool_call in tool_calls:
172 cfg = self.context.config.reasoning
173
174 if cfg.confidence_scoring:
175 should_skip = await self.confidence_gate.should_skip(
176 tool_call=tool_call,
177 emit=emit,
178 )
179 if should_skip:
180 continue
181
182 if tool_call.id not in pending_tool_calls_seen:
183 await emit(
184 AgentEvent(
185 type="tool_call",
186 tool_name=tool_call.name,
187 tool_call_id=tool_call.id,
188 tool_args=tool_call.arguments,
189 phase="assistant",
190 )
191 )
192
193 result.actions_taken.append(
194 f"{tool_call.name}: {str(tool_call.arguments)[:100]}"
195 )
196
197 outcome = await executor.execute_tool_call(
198 tool_call,
199 on_confirmation=on_confirmation,
200 on_user_question=on_user_question,
201 emit_confirmation=emit_confirmation,
202 source=tool_source,
203 )
204 executed_tool_call = outcome.tool_call
205 if (
206 outcome.rollback_action is not None
207 and self.context.config.reasoning.show_rollback_plan
208 ):
209 await emit(
210 AgentEvent(
211 type="rollback",
212 content=(
213 f"Rollback tracked: {outcome.rollback_action.description}"
214 ),
215 rollback_action=outcome.rollback_action,
216 )
217 )
218
219 if (
220 outcome.state == ToolExecutionState.EXECUTED
221 and outcome.is_error
222 and self.context.config.auto_recover
223 ):
224 recovery_result = await self.recovery_controller.build_follow_up(
225 tool_call=executed_tool_call,
226 outcome=outcome,
227 emit=emit,
228 )
229 if recovery_result is not None:
230 summary.tool_result_messages.append(recovery_result)
231 self.context.session.append(recovery_result)
232 continue
233
234 if outcome.state == ToolExecutionState.EXECUTED and not outcome.is_error:
235 loop_response = await self._record_successful_execution(
236 tool_call=executed_tool_call,
237 outcome=outcome,
238 dod=dod,
239 emit=emit,
240 summary=summary,
241 )
242 # Mark this tool's label as completed and emit live progress
243 label = _tool_call_label(executed_tool_call)
244 if label:
245 completed_labels.append(label)
246 await _emit_batch_todos()
247 if loop_response is not None:
248 result.halted = True
249 result.final_response = loop_response
250 return result
251
252 if outcome.is_error:
253 result.consecutive_errors += 1
254 else:
255 result.consecutive_errors = 0
256
257 await emit(
258 AgentEvent(
259 type="tool_result",
260 content=outcome.event_content,
261 tool_name=executed_tool_call.name,
262 tool_call_id=outcome.tool_call.id,
263 tool_metadata=(
264 outcome.registry_result.metadata
265 if outcome.registry_result is not None
266 else None
267 ),
268 is_error=outcome.is_error,
269 phase="assistant",
270 )
271 )
272
273 # Always append tool results to the session so the model sees
274 # its own output. The verification gate may inject a correction
275 # prompt, but the original result must still be in context —
276 # otherwise the model operates blind and loops.
277 self.context.session.append(outcome.message)
278 summary.tool_result_messages.append(outcome.message)
279 if outcome.state == ToolExecutionState.DUPLICATE:
280 self._queue_duplicate_observation_nudge(tool_call, dod=dod)
281 elif outcome.state == ToolExecutionState.BLOCKED:
282 self._queue_blocked_active_repair_nudge(outcome.event_content)
283 self._queue_blocked_active_repair_mutation_nudge(outcome.event_content)
284 self._queue_blocked_completed_artifact_scope_nudge(
285 outcome.event_content,
286 dod=dod,
287 )
288 self._queue_blocked_late_reference_drift_nudge(
289 outcome.event_content,
290 dod=dod,
291 )
292 self._queue_blocked_shell_rewrite_nudge(tool_call)
293 self._queue_blocked_html_edit_nudge(tool_call, outcome.event_content)
294
295 should_continue = await self.verification_gate.should_continue(
296 tool_call=tool_call,
297 outcome=outcome,
298 emit=emit,
299 )
300
301 rlog = get_runtime_logger()
302 rlog.tool_exec(
303 name=tool_call.name,
304 state=outcome.state.value,
305 is_error=outcome.is_error,
306 result_preview=outcome.event_content,
307 appended_to_session=True,
308 )
309 if should_continue:
310 rlog.verification_gate(tool_call.name, should_continue=True)
311 continue
312
313 if result.consecutive_errors >= 3:
314 final_response = (
315 "I ran into some issues. "
316 "Let me know if you'd like me to try a different approach."
317 )
318 summary.final_response = final_response
319 summary.failures.append("three consecutive tool errors")
320 await emit(AgentEvent(type="response", content=final_response))
321 result.halted = True
322 result.final_response = final_response
323
324 return result
325
326 def _queue_duplicate_observation_nudge(
327 self,
328 tool_call: ToolCall,
329 *,
330 dod: DefinitionOfDone,
331 ) -> None:
332 """Queue a concrete next-step nudge after duplicate observational actions."""
333
334 if tool_call.name not in {"read", "glob", "grep", "bash"}:
335 return
336
337 current_task = getattr(self.context.session, "current_task", None)
338 missing_artifact = _next_missing_planned_artifact(
339 dod,
340 project_root=self.context.project_root,
341 messages=list(getattr(self.context.session, "messages", []) or []),
342 )
343 next_pending = preferred_pending_todo_item(
344 dod,
345 project_root=self.context.project_root,
346 missing_artifact=missing_artifact,
347 )
348 confirmed_facts = summarize_confirmed_facts(
349 self.context.session.messages,
350 max_items=2,
351 )
352 if _should_prioritize_missing_artifact(
353 dod=dod,
354 next_pending=next_pending,
355 missing_artifact=missing_artifact,
356 project_root=self.context.project_root,
357 ):
358 prefix = "Reuse the earlier observation instead of repeating it. "
359 if confirmed_facts:
360 prefix += f"Confirmed facts: {confirmed_facts}. "
361 self.context.queue_steering_message(
362 prefix
363 + "A declared output artifact is still missing."
364 + _missing_artifact_resume_suffix(
365 missing_artifact,
366 project_root=self.context.project_root,
367 messages=list(getattr(self.context.session, "messages", []) or []),
368 )
369 + " Do not switch into review or consistency-check mode until the missing artifact exists."
370 )
371 return
372 if next_pending:
373 mutation_suffix = ""
374 if _todo_is_mutation_step(next_pending):
375 mutation_suffix = _missing_artifact_resume_suffix(
376 missing_artifact,
377 project_root=self.context.project_root,
378 messages=list(getattr(self.context.session, "messages", []) or []),
379 )
380 if not mutation_suffix:
381 mutation_suffix = (
382 " You already have enough evidence for that step, so stop gathering "
383 "more reference material and perform the change now."
384 )
385 if confirmed_facts:
386 self.context.queue_steering_message(
387 "Reuse the earlier observation instead of repeating it. "
388 f"Confirmed facts: {confirmed_facts}. "
389 f"Continue with the next pending item: `{next_pending}`. "
390 "Only gather more evidence if a specific fact required for that step is still unknown."
391 + mutation_suffix
392 )
393 else:
394 self.context.queue_steering_message(
395 "Reuse the earlier observation instead of repeating it. "
396 f"Continue with the next pending item: `{next_pending}`. "
397 "Only gather more evidence if a specific fact required for that step is still unknown."
398 + mutation_suffix
399 )
400 return
401
402 if missing_artifact is not None:
403 self.context.queue_steering_message(
404 "Reuse the earlier observation instead of repeating it. "
405 + _missing_artifact_resume_suffix(
406 missing_artifact,
407 project_root=self.context.project_root,
408 messages=list(getattr(self.context.session, "messages", []) or []),
409 ).strip()
410 )
411 return
412
413 if all_planned_artifacts_exist(dod, project_root=self.context.project_root):
414 verification_commands = dod.verification_commands or derive_verification_commands(
415 dod,
416 project_root=self.context.project_root,
417 task_statement=current_task,
418 supplement_existing=True,
419 )
420 verification_suffix = (
421 "Move to verification or final confirmation using the files already on disk."
422 if verification_commands
423 else "Finish the current review using the files already on disk."
424 )
425 self.context.queue_steering_message(
426 "Reuse the earlier observation instead of repeating it. "
427 "All explicitly planned artifacts already exist. "
428 "Use the current task artifacts as the source of truth and do not reopen "
429 "reference materials unless one specific gap is still unknown. "
430 + verification_suffix
431 )
432 return
433
434 preferred_next_step = infer_preferred_next_step(
435 self.context.session.messages,
436 current_task=current_task,
437 )
438 if preferred_next_step and confirmed_facts:
439 self.context.queue_steering_message(
440 "Reuse the earlier observation instead of repeating it. "
441 f"Confirmed facts: {confirmed_facts}. "
442 f"{preferred_next_step} "
443 "Only gather more evidence if a specific filename, href, or title is still unknown."
444 )
445 return
446
447 if preferred_next_step:
448 self.context.queue_steering_message(
449 "Reuse the earlier observation instead of repeating it. "
450 f"{preferred_next_step} "
451 "Only gather more evidence if a specific filename, href, or title is still unknown."
452 )
453 return
454
455 target_path = str(
456 tool_call.arguments.get("file_path")
457 or tool_call.arguments.get("path")
458 or ""
459 ).strip()
460 if target_path:
461 self.context.queue_steering_message(
462 "Reuse the earlier observation instead of repeating it. "
463 f"Use the current contents of `{target_path}` and take a different next step. "
464 "Only gather more evidence if a specific filename, href, or title is still unknown."
465 )
466 return
467
468 self.context.queue_steering_message(
469 "Reuse the earlier observation instead of repeating it. "
470 "Choose a different next step that makes progress."
471 )
472
473 def _queue_blocked_shell_rewrite_nudge(self, tool_call: ToolCall) -> None:
474 """Steer the model back to file tools after a blocked shell text rewrite."""
475
476 if tool_call.name != "bash":
477 return
478
479 target = extract_shell_text_rewrite_target(
480 str(tool_call.arguments.get("command", ""))
481 )
482 if target is None:
483 return
484
485 current_task = getattr(self.context.session, "current_task", None)
486 confirmed_facts = summarize_confirmed_facts(
487 self.context.session.messages,
488 max_items=2,
489 )
490 preferred_next_step = infer_preferred_next_step(
491 self.context.session.messages,
492 current_task=current_task,
493 )
494
495 if preferred_next_step and confirmed_facts:
496 self.context.queue_steering_message(
497 "Use Loader's file tools for this text edit instead of a shell rewrite. "
498 f"Confirmed facts: {confirmed_facts}. "
499 f"{preferred_next_step} "
500 f"Target `{target}` with edit/patch/write rather than `bash`."
501 )
502 return
503
504 self.context.queue_steering_message(
505 "Use Loader's file tools for this text edit instead of a shell rewrite. "
506 f"Apply the change to `{target}` with edit/patch/write."
507 )
508
509 def _queue_blocked_active_repair_nudge(self, event_content: str) -> None:
510 """Reinforce active repair focus after an out-of-scope blocked observation."""
511
512 if "[Blocked - active repair scope:" not in event_content:
513 return
514
515 repair = extract_active_repair_context(self.context.session.messages)
516 if repair is None:
517 return
518
519 if repair.allowed_paths:
520 allowed_preview = ", ".join(f"`{path}`" for path in repair.allowed_paths[:3])
521 if len(repair.allowed_paths) > 3:
522 allowed_preview += ", ..."
523 self.context.queue_steering_message(
524 "Verification already identified the active repair target. "
525 f"Stay on the concrete repair files {allowed_preview} "
526 f"and repair `{repair.artifact_path}` directly. "
527 "Do not reopen unrelated reference materials while this repair target is unresolved."
528 )
529 return
530
531 roots_preview = ", ".join(f"`{root}`" for root in repair.allowed_roots[:2])
532 if len(repair.allowed_roots) > 2:
533 roots_preview += ", ..."
534 self.context.queue_steering_message(
535 "Verification already identified the active repair target. "
536 f"Stay within the current artifact set under {roots_preview} "
537 f"and repair `{repair.artifact_path}` directly. "
538 "Do not reopen unrelated reference materials while this repair target is unresolved."
539 )
540
541 def _queue_blocked_active_repair_mutation_nudge(self, event_content: str) -> None:
542 """Keep repair-phase mutations pinned to the named repair files."""
543
544 if "[Blocked - active repair mutation scope:" not in event_content:
545 return
546
547 repair = extract_active_repair_context(self.context.session.messages)
548 if repair is None or not repair.allowed_paths:
549 return
550
551 allowed_preview = ", ".join(f"`{path}`" for path in repair.allowed_paths[:3])
552 if len(repair.allowed_paths) > 3:
553 allowed_preview += ", ..."
554 self.context.queue_steering_message(
555 "Verification already identified the concrete repair files. "
556 f"Keep mutations pinned to {allowed_preview} "
557 f"and repair `{repair.artifact_path}` before widening the change set."
558 )
559
560 def _queue_blocked_late_reference_drift_nudge(
561 self,
562 event_content: str,
563 *,
564 dod: DefinitionOfDone,
565 ) -> None:
566 """Reinforce missing-artifact progress after late-stage reference drift is blocked."""
567
568 if "[Blocked - late reference drift:" not in event_content:
569 return
570
571 missing_artifact = _next_missing_planned_artifact(
572 dod,
573 project_root=self.context.project_root,
574 messages=list(getattr(self.context.session, "messages", []) or []),
575 )
576 if missing_artifact is None:
577 return
578
579 planned_roots: list[str] = []
580 seen_roots: set[str] = set()
581 for target, expect_directory in collect_planned_artifact_targets(
582 dod,
583 project_root=self.context.project_root,
584 ):
585 root = str(target if expect_directory else target.parent)
586 if root in seen_roots:
587 continue
588 seen_roots.add(root)
589 planned_roots.append(root)
590
591 roots_preview = ", ".join(f"`{root}`" for root in planned_roots[:2])
592 if len(planned_roots) > 2:
593 roots_preview += ", ..."
594 self.context.queue_steering_message(
595 "Late-stage reference rereads are no longer helping. "
596 "One explicitly planned artifact is still missing."
597 + _missing_artifact_resume_suffix(
598 missing_artifact,
599 project_root=self.context.project_root,
600 messages=list(getattr(self.context.session, "messages", []) or []),
601 )
602 + f" Stay within the current output roots under {roots_preview}"
603 + " and finish that artifact before reopening older reference materials."
604 )
605
606 def _queue_blocked_completed_artifact_scope_nudge(
607 self,
608 event_content: str,
609 *,
610 dod: DefinitionOfDone,
611 ) -> None:
612 """Keep post-build review anchored to the generated artifact set."""
613
614 if "[Blocked - completed artifact set scope:" not in event_content:
615 return
616
617 planned_roots: list[str] = []
618 seen_roots: set[str] = set()
619 for target, expect_directory in collect_planned_artifact_targets(
620 dod,
621 project_root=self.context.project_root,
622 ):
623 root = str(target if expect_directory else target.parent)
624 if root in seen_roots:
625 continue
626 seen_roots.add(root)
627 planned_roots.append(root)
628
629 next_pending = preferred_pending_todo_item(
630 dod,
631 project_root=self.context.project_root,
632 )
633 roots_preview = ", ".join(f"`{root}`" for root in planned_roots[:2])
634 if len(planned_roots) > 2:
635 roots_preview += ", ..."
636 if next_pending and _todo_is_consistency_review_step(next_pending):
637 self.context.queue_steering_message(
638 "All explicitly planned artifacts already exist. "
639 f"Stay within the current output roots under {roots_preview} and continue "
640 f"with `{next_pending}` using the generated files as the source of truth. "
641 "Do not reopen earlier reference materials."
642 )
643 return
644
645 self.context.queue_steering_message(
646 "All explicitly planned artifacts already exist. "
647 f"Stay within the current output roots under {roots_preview} "
648 "and move to verification or final confirmation using the generated files. "
649 "Do not reopen earlier reference materials."
650 )
651
652 def _queue_blocked_html_edit_nudge(self, tool_call: ToolCall, event_content: str) -> None:
653 """Keep blocked edit feedback generic; avoid task-class-specific steering."""
654
655 if tool_call.name != "edit":
656 return
657 if "old_string and new_string are identical - no change would occur" not in event_content:
658 return
659
660 repair = extract_active_repair_context(self.context.session.messages)
661 if repair is None:
662 return
663
664 target = (
665 str(tool_call.arguments.get("file_path") or "").strip() or repair.artifact_path
666 )
667 if not target:
668 return
669
670 self.context.queue_steering_message(
671 "That edit would make no on-disk change. "
672 f"Stay on `{target}` and use the current file contents as the source of truth. "
673 "Read the exact current text you need to change, then submit one `edit`, `patch`, "
674 "or `write` call that actually changes the file. "
675 "If a narrow single-line edit keeps bouncing, replace the surrounding block in one "
676 "mutation instead of retrying the same no-op edit. "
677 "Do not reopen unrelated reference materials while this concrete repair target is unresolved."
678 )
679
680 async def _record_successful_execution(
681 self,
682 *,
683 tool_call: ToolCall,
684 outcome,
685 dod: DefinitionOfDone,
686 emit: EventSink,
687 summary: TurnSummary,
688 ) -> str | None:
689 """Update DoD bookkeeping after a successful tool execution."""
690
691 is_mutating = is_state_mutating_tool_call(tool_call)
692 previously_verified = dod.last_verification_result == "passed"
693 record_successful_tool_call(dod, tool_call)
694 if previously_verified and is_mutating:
695 _mark_verification_stale(
696 context=self.context,
697 summary=summary,
698 dod=dod,
699 tool_call=tool_call,
700 )
701 elif is_mutating:
702 _mark_verification_planned(
703 context=self.context,
704 summary=summary,
705 dod=dod,
706 tool_call=tool_call,
707 )
708 if tool_call.name == "TodoWrite" and outcome.registry_result is not None:
709 new_todos = outcome.registry_result.metadata.get("new_todos", [])
710 if isinstance(new_todos, list):
711 sync_todos_to_definition_of_done(
712 dod,
713 new_todos,
714 project_root=self.context.project_root,
715 )
716 self._queue_todowrite_resume_nudge(dod=dod)
717 else:
718 pending_before = list(dod.pending_items)
719 if advance_todos_from_tool_call(dod, tool_call):
720 reconcile_aggregate_completion_steps(
721 dod,
722 project_root=self.context.project_root,
723 )
724 self._queue_next_pending_todo_nudge(
725 tool_call=tool_call,
726 pending_before=pending_before,
727 dod=dod,
728 )
729 self._queue_bookkeeping_resume_nudge(
730 tool_call=tool_call,
731 dod=dod,
732 )
733 self._queue_missing_artifact_progress_nudge(
734 tool_call=tool_call,
735 dod=dod,
736 )
737 self._queue_planned_artifact_handoff_nudge(
738 tool_call=tool_call,
739 dod=dod,
740 )
741 self.dod_store.save(dod)
742 recovery_context = self.context.recovery_context
743 if recovery_context is not None:
744 recovery_context.note_success(tool_call.name, tool_call.arguments)
745 if recovery_context.should_clear_after_success(
746 tool_call.name,
747 tool_call.arguments,
748 ):
749 self.context.recovery_context = None
750 return None
751
752 def _queue_next_pending_todo_nudge(
753 self,
754 *,
755 tool_call: ToolCall,
756 pending_before: list[str],
757 dod: DefinitionOfDone,
758 ) -> None:
759 if is_state_mutating_tool_call(tool_call):
760 return
761 if tool_call.name not in {"read", "glob", "grep", "bash"}:
762 return
763 if tool_call.name == "bash":
764 command = str(tool_call.arguments.get("command", "")).lower()
765 if not any(
766 token in command
767 for token in (
768 "ls ",
769 " ls",
770 "find ",
771 "grep ",
772 "rg ",
773 "cat ",
774 "sed ",
775 "head ",
776 "tail ",
777 )
778 ):
779 return
780
781 completed_label = next(
782 (
783 item
784 for item in pending_before
785 if item not in dod.pending_items
786 and item not in _TODO_NUDGE_EXCLUDED_ITEMS
787 ),
788 None,
789 )
790 missing_artifact = _next_missing_planned_artifact(
791 dod,
792 project_root=self.context.project_root,
793 messages=list(getattr(self.context.session, "messages", []) or []),
794 )
795 next_pending = preferred_pending_todo_item(
796 dod,
797 project_root=self.context.project_root,
798 missing_artifact=missing_artifact,
799 )
800 has_artifact_progress = _has_confirmed_artifact_progress(
801 dod,
802 project_root=self.context.project_root,
803 )
804 if not completed_label or not next_pending or next_pending == completed_label:
805 return
806 if _should_prioritize_missing_artifact(
807 dod=dod,
808 next_pending=next_pending,
809 missing_artifact=missing_artifact,
810 project_root=self.context.project_root,
811 ):
812 if not has_artifact_progress:
813 compact_handoff = _compact_missing_artifact_handoff(
814 missing_artifact,
815 project_root=self.context.project_root,
816 messages=list(getattr(self.context.session, "messages", []) or []),
817 )
818 if compact_handoff:
819 self.context.queue_steering_message(
820 f"Confirmed progress: `{completed_label}` is now satisfied by the successful "
821 f"`{tool_call.name}` result. {compact_handoff}"
822 " Do not reread reference material or spend the next turn on bookkeeping."
823 )
824 return
825 self.context.queue_steering_message(
826 f"Confirmed progress: `{completed_label}` is now satisfied by the successful "
827 f"`{tool_call.name}` result. One declared output artifact is still missing."
828 + _missing_artifact_resume_suffix(
829 missing_artifact,
830 project_root=self.context.project_root,
831 messages=list(getattr(self.context.session, "messages", []) or []),
832 )
833 + " Do not switch into review or consistency-check mode until the missing artifact exists."
834 )
835 return
836
837 mutation_suffix = ""
838 if _todo_is_mutation_step(next_pending):
839 mutation_suffix = _missing_artifact_resume_suffix(
840 missing_artifact,
841 project_root=self.context.project_root,
842 messages=list(getattr(self.context.session, "messages", []) or []),
843 )
844 if not mutation_suffix:
845 mutation_suffix = (
846 " You already have enough evidence for that step, so stop gathering "
847 "more reference material and perform the change now."
848 )
849
850 self.context.queue_steering_message(
851 f"Confirmed progress: `{completed_label}` is now satisfied by the successful "
852 f"`{tool_call.name}` result. Continue with the next pending item: "
853 f"`{next_pending}` instead of rereading the same evidence.{mutation_suffix}"
854 )
855
856 def _queue_planned_artifact_handoff_nudge(
857 self,
858 *,
859 tool_call: ToolCall,
860 dod: DefinitionOfDone,
861 ) -> None:
862 if not is_state_mutating_tool_call(tool_call):
863 return
864 if not all_planned_artifacts_exist(dod, project_root=self.context.project_root):
865 return
866
867 next_pending = preferred_pending_todo_item(
868 dod,
869 project_root=self.context.project_root,
870 )
871 verification_commands = dod.verification_commands or derive_verification_commands(
872 dod,
873 project_root=self.context.project_root,
874 task_statement=getattr(self.context.session, "current_task", "") or "",
875 supplement_existing=True,
876 )
877
878 if next_pending and _todo_is_consistency_review_step(next_pending):
879 verification_suffix = (
880 " Move to verification once no specific mismatch remains."
881 if verification_commands
882 else " Avoid another full reread unless one specific inconsistency is still unknown."
883 )
884 self.context.queue_steering_message(
885 "All explicitly planned artifacts now exist. "
886 f"Continue with the next pending item: `{next_pending}`. "
887 "Use the files already on disk as the source of truth instead of restarting "
888 "discovery or inventing alternate filenames."
889 + verification_suffix
890 )
891 return
892
893 if verification_commands:
894 self.context.queue_steering_message(
895 "All explicitly planned artifacts now exist. "
896 "Do not expand the artifact set or restart discovery unless a specific gap is "
897 "still known. Move to verification or final confirmation using the files that "
898 "already exist."
899 )
900
901 def _queue_missing_artifact_progress_nudge(
902 self,
903 *,
904 tool_call: ToolCall,
905 dod: DefinitionOfDone,
906 ) -> None:
907 if not is_state_mutating_tool_call(tool_call):
908 return
909 missing_artifact = _next_missing_planned_artifact(
910 dod,
911 project_root=self.context.project_root,
912 )
913 if missing_artifact is None:
914 return
915 next_pending = preferred_pending_todo_item(
916 dod,
917 project_root=self.context.project_root,
918 missing_artifact=missing_artifact,
919 )
920 missing_artifact = _prefer_missing_artifact_for_pending_item(
921 dod,
922 missing_artifact=missing_artifact,
923 next_pending=next_pending,
924 project_root=self.context.project_root,
925 )
926
927 current_label = _current_mutation_label(tool_call)
928 todo_refresh = _todo_refresh_guidance(
929 dod,
930 project_root=self.context.project_root,
931 )
932 if _late_stage_missing_artifact_build(
933 dod,
934 project_root=self.context.project_root,
935 ):
936 self.context.queue_steering_message(
937 f"Confirmed progress: {current_label} is now recorded."
938 + _missing_artifact_resume_suffix(
939 missing_artifact,
940 project_root=self.context.project_root,
941 messages=list(getattr(self.context.session, "messages", []) or []),
942 )
943 + " No TodoWrite, no verification, no rereads until that artifact exists."
944 )
945 return
946 self.context.queue_steering_message(
947 f"Confirmed progress: {current_label} is now recorded."
948 " One declared output artifact is still missing."
949 + _missing_artifact_resume_suffix(
950 missing_artifact,
951 project_root=self.context.project_root,
952 messages=list(getattr(self.context.session, "messages", []) or []),
953 )
954 + todo_refresh
955 + " Do not move to verification, final confirmation, or TodoWrite-only "
956 "bookkeeping until that artifact exists."
957 + " Do not spend another turn on working notes or rediscovery alone."
958 )
959
960 def _queue_todowrite_resume_nudge(
961 self,
962 *,
963 dod: DefinitionOfDone,
964 ) -> None:
965 missing_artifact = _next_missing_planned_artifact(
966 dod,
967 project_root=self.context.project_root,
968 messages=list(getattr(self.context.session, "messages", []) or []),
969 )
970 next_pending = preferred_pending_todo_item(
971 dod,
972 project_root=self.context.project_root,
973 missing_artifact=missing_artifact,
974 )
975 if missing_artifact is None:
976 if next_pending and _todo_is_mutation_step(next_pending):
977 pending_target = infer_pending_todo_output_target(
978 dod,
979 next_pending,
980 project_root=self.context.project_root,
981 )
982 if pending_target is not None:
983 concrete_message = (
984 "Todo tracking is updated. Continue with the next pending item: "
985 f"`{next_pending}`. Resume by creating `{pending_target.name}` now. "
986 f"Prefer one `write` call for `{pending_target}` instead of more rereads. "
987 )
988 if not pending_target.parent.exists():
989 concrete_message += (
990 "The `write` tool can create that file's parent directories "
991 "automatically, so do the write in one step instead of stopping "
992 "for a separate mkdir. "
993 )
994 concrete_message += (
995 "Use the current output files as the source of truth, and do not "
996 "reopen reference materials unless one specific fact required for "
997 "that step is still unknown. Make your next response the concrete "
998 "mutation tool call itself, not another bookkeeping-only turn. "
999 "Perform the mutation now instead of spending another turn on "
1000 "planning, rereads, or verification."
1001 )
1002 self.context.queue_steering_message(concrete_message)
1003 return
1004 self.context.queue_steering_message(
1005 "Todo tracking is updated. Continue with the next pending item: "
1006 f"`{next_pending}`. Use the current output files as the source of "
1007 "truth, and do not reopen reference materials unless one specific "
1008 "fact required for that step is still unknown. Perform the mutation "
1009 "now instead of spending another turn on planning, rereads, or "
1010 "verification."
1011 )
1012 return
1013
1014 if (
1015 next_pending
1016 and _todo_is_consistency_review_step(next_pending)
1017 and not all_planned_artifacts_exist(
1018 dod,
1019 project_root=self.context.project_root,
1020 )
1021 ):
1022 self.context.queue_steering_message(
1023 "Todo tracking is updated. Continue with the next pending item: "
1024 f"`{next_pending}`. Use the current output files as the source of "
1025 "truth, and do not reopen reference materials unless one specific "
1026 "mismatch is still unknown."
1027 )
1028 return
1029
1030 if not all_planned_artifacts_exist(dod, project_root=self.context.project_root):
1031 return
1032
1033 verification_commands = dod.verification_commands or derive_verification_commands(
1034 dod,
1035 project_root=self.context.project_root,
1036 task_statement=getattr(self.context.session, "current_task", "") or "",
1037 supplement_existing=True,
1038 )
1039 if next_pending and _todo_is_consistency_review_step(next_pending):
1040 verification_suffix = (
1041 " Move to verification once no specific mismatch remains."
1042 if verification_commands
1043 else " Finish the targeted consistency pass without reopening reference materials."
1044 )
1045 self.context.queue_steering_message(
1046 "Todo tracking is updated. All explicitly planned artifacts now exist. "
1047 f"Continue with the next pending item: `{next_pending}`. "
1048 "Use the current output files as the source of truth, and do not restart "
1049 "early discovery or reopen reference materials."
1050 + verification_suffix
1051 )
1052 return
1053
1054 verification_suffix = (
1055 " Move to verification or final confirmation using the files already on disk."
1056 if verification_commands
1057 else " Finish the task using the files already on disk."
1058 )
1059 self.context.queue_steering_message(
1060 "Todo tracking is updated. All explicitly planned artifacts now exist. "
1061 "Do not restart discovery, reopen reference materials, or spend another turn "
1062 "on TodoWrite alone."
1063 + verification_suffix
1064 )
1065 return
1066
1067 todo_refresh = _todo_refresh_guidance(
1068 dod,
1069 project_root=self.context.project_root,
1070 )
1071 next_pending_suffix = (
1072 f" Continue with the next pending item: `{next_pending}`."
1073 if next_pending
1074 else ""
1075 )
1076 self.context.queue_steering_message(
1077 "Todo tracking is updated. A declared output artifact is still missing."
1078 + next_pending_suffix
1079 + _missing_artifact_resume_suffix(
1080 missing_artifact,
1081 project_root=self.context.project_root,
1082 messages=list(getattr(self.context.session, "messages", []) or []),
1083 )
1084 + todo_refresh
1085 + " Do not spend the next turn on TodoWrite alone, bookkeeping notes, "
1086 "verification, or final confirmation until that artifact exists."
1087 )
1088
1089 def _queue_bookkeeping_resume_nudge(
1090 self,
1091 *,
1092 tool_call: ToolCall,
1093 dod: DefinitionOfDone,
1094 ) -> None:
1095 if tool_call.name not in _BOOKKEEPING_NOTE_TOOL_NAMES:
1096 return
1097
1098 missing_artifact = _next_missing_planned_artifact(
1099 dod,
1100 project_root=self.context.project_root,
1101 messages=list(getattr(self.context.session, "messages", []) or []),
1102 )
1103 if missing_artifact is None:
1104 return
1105
1106 next_pending = preferred_pending_todo_item(
1107 dod,
1108 project_root=self.context.project_root,
1109 missing_artifact=missing_artifact,
1110 )
1111 todo_refresh = _todo_refresh_guidance(
1112 dod,
1113 project_root=self.context.project_root,
1114 )
1115 if (
1116 next_pending
1117 and not _todo_is_mutation_step(next_pending)
1118 and not _todo_is_consistency_review_step(next_pending)
1119 and not _should_prioritize_missing_artifact(
1120 dod=dod,
1121 next_pending=next_pending,
1122 missing_artifact=(
1123 missing_artifact
1124 if _has_confirmed_artifact_progress(
1125 dod,
1126 project_root=self.context.project_root,
1127 )
1128 else None
1129 ),
1130 project_root=self.context.project_root,
1131 )
1132 ):
1133 self.context.queue_steering_message(
1134 "Bookkeeping note is recorded. Continue with the next pending item: "
1135 f"`{next_pending}`. Make your next response one concrete evidence-gathering "
1136 "tool call that advances that step, not another bookkeeping-only turn."
1137 + todo_refresh
1138 + " Do not jump ahead to later artifact creation, verification, or final "
1139 "confirmation until that step is satisfied."
1140 )
1141 return
1142
1143 self.context.queue_steering_message(
1144 "Bookkeeping note is recorded. A declared output artifact is still missing."
1145 + _missing_artifact_resume_suffix(
1146 missing_artifact,
1147 project_root=self.context.project_root,
1148 messages=list(getattr(self.context.session, "messages", []) or []),
1149 )
1150 + todo_refresh
1151 + " Do not spend the next turn on additional notes, rediscovery, "
1152 "verification, or final confirmation until that artifact exists."
1153 )
1154
1155
1156 def _todo_is_consistency_review_step(item: str) -> bool:
1157 text = item.lower()
1158 return any(hint in text for hint in _CONSISTENCY_REVIEW_HINTS)
1159
1160
1161 def _should_prioritize_missing_artifact(
1162 *,
1163 dod: DefinitionOfDone,
1164 next_pending: str | None,
1165 missing_artifact: tuple[Path, bool] | None,
1166 project_root: Path,
1167 ) -> bool:
1168 if missing_artifact is None:
1169 return False
1170 if not next_pending:
1171 return True
1172 if _pending_todo_conflicts_with_missing_artifact(
1173 dod,
1174 item=next_pending,
1175 missing_artifact=missing_artifact,
1176 project_root=project_root,
1177 ):
1178 return True
1179 if _todo_is_consistency_review_step(next_pending):
1180 return True
1181 return not _todo_is_mutation_step(next_pending)
1182
1183
1184 def _pending_todo_conflicts_with_missing_artifact(
1185 dod: DefinitionOfDone,
1186 *,
1187 item: str,
1188 missing_artifact: tuple[Path, bool],
1189 project_root: Path,
1190 ) -> bool:
1191 text = item.strip().lower()
1192 if not text or item in _TODO_NUDGE_EXCLUDED_ITEMS:
1193 return False
1194
1195 target, expect_directory = missing_artifact
1196 inferred_target = infer_pending_todo_output_target(
1197 dod,
1198 item,
1199 project_root=project_root,
1200 )
1201 if inferred_target is None:
1202 return not expect_directory and _todo_is_mutation_step(item)
1203
1204 inferred_target = inferred_target.resolve(strict=False)
1205 target = target.resolve(strict=False)
1206 if expect_directory:
1207 return target != inferred_target and target not in inferred_target.parents
1208 return inferred_target != target
1209
1210
1211 def _next_missing_planned_artifact(
1212 dod: DefinitionOfDone,
1213 *,
1214 project_root: Path,
1215 messages: list[Any] | None = None,
1216 ) -> tuple[Path, bool] | None:
1217 for target, expect_directory in collect_planned_artifact_targets(
1218 dod,
1219 project_root=project_root,
1220 max_paths=12,
1221 ):
1222 if not planned_artifact_target_satisfied(
1223 dod,
1224 target=target,
1225 expect_directory=expect_directory,
1226 project_root=project_root,
1227 ):
1228 return target, expect_directory
1229 for target, expect_directory in collect_planned_artifact_targets(
1230 dod,
1231 project_root=project_root,
1232 max_paths=12,
1233 ):
1234 if not expect_directory or not target.is_dir():
1235 continue
1236 next_output_file, _ = infer_next_output_file(
1237 target=target,
1238 project_root=project_root,
1239 messages=list(messages or []),
1240 )
1241 if next_output_file is not None and not next_output_file.exists():
1242 return next_output_file, False
1243 return None
1244
1245
1246 def _prefer_missing_artifact_for_pending_item(
1247 dod: DefinitionOfDone,
1248 *,
1249 missing_artifact: tuple[Path, bool] | None,
1250 next_pending: str | None,
1251 project_root: Path,
1252 ) -> tuple[Path, bool] | None:
1253 if missing_artifact is None or not next_pending:
1254 return missing_artifact
1255
1256 inferred_target = infer_pending_todo_output_target(
1257 dod,
1258 next_pending,
1259 project_root=project_root,
1260 )
1261 if inferred_target is None or inferred_target.exists():
1262 return missing_artifact
1263
1264 normalized_target = inferred_target.expanduser().resolve(strict=False)
1265 for planned_target, expect_directory in collect_planned_artifact_targets(
1266 dod,
1267 project_root=project_root,
1268 max_paths=12,
1269 ):
1270 normalized_planned = planned_target.expanduser().resolve(strict=False)
1271 if expect_directory:
1272 try:
1273 normalized_target.relative_to(normalized_planned)
1274 except ValueError:
1275 continue
1276 return normalized_target, False
1277 if normalized_planned == normalized_target:
1278 return normalized_target, False
1279 return missing_artifact
1280
1281
1282 def _late_stage_missing_artifact_build(
1283 dod: DefinitionOfDone,
1284 *,
1285 project_root: Path,
1286 ) -> bool:
1287 completed = 0
1288 missing = 0
1289 for target, expect_directory in collect_planned_artifact_targets(
1290 dod,
1291 project_root=project_root,
1292 max_paths=12,
1293 ):
1294 if planned_artifact_target_satisfied(
1295 dod,
1296 target=target,
1297 expect_directory=expect_directory,
1298 project_root=project_root,
1299 ):
1300 completed += 1
1301 else:
1302 missing += 1
1303 return completed >= 7 and missing > 0
1304
1305
1306 def _has_confirmed_artifact_progress(
1307 dod: DefinitionOfDone,
1308 *,
1309 project_root: Path,
1310 ) -> bool:
1311 for target, expect_directory in collect_planned_artifact_targets(
1312 dod,
1313 project_root=project_root,
1314 max_paths=12,
1315 ):
1316 if planned_artifact_target_satisfied(
1317 dod,
1318 target=target,
1319 expect_directory=expect_directory,
1320 project_root=project_root,
1321 ):
1322 return True
1323 return bool(dod.touched_files)
1324
1325
1326 def _missing_artifact_resume_suffix(
1327 missing_artifact: tuple[Path, bool] | None,
1328 *,
1329 project_root: Path,
1330 messages: list[Any] | None = None,
1331 ) -> str:
1332 if missing_artifact is None:
1333 return ""
1334
1335 target, expect_directory = missing_artifact
1336 label = target.name or str(target)
1337 if expect_directory and not label.endswith("/"):
1338 label += "/"
1339 if expect_directory:
1340 next_output_file, next_output_source = infer_next_output_file(
1341 target=target,
1342 project_root=project_root,
1343 messages=list(messages or []),
1344 )
1345 if next_output_file is not None:
1346 guidance_origin = (
1347 f"It is the next missing declared output under `{label}`."
1348 if next_output_source == "declared"
1349 else (
1350 "It mirrors the observed filename pattern from another "
1351 f"`{label}` directory you already inspected."
1352 )
1353 )
1354 guidance = (
1355 f" Resume by creating `{next_output_file.name}` now. {guidance_origin} "
1356 f"Prefer one `write` call for "
1357 f"`{next_output_file}` instead of more rereads."
1358 )
1359 if not next_output_file.parent.exists():
1360 guidance += (
1361 " The `write` tool can create that file's parent directories automatically,"
1362 " so do the write in one step instead of stopping for a separate mkdir."
1363 )
1364 guidance += (
1365 " Make your next response the concrete mutation tool call itself, not another"
1366 " bookkeeping-only turn."
1367 )
1368 return guidance
1369 if target.is_dir():
1370 return (
1371 f" Resume by creating the next output file under `{label}` now. Prefer one "
1372 f"concrete `write` call for a file inside `{target}` instead of more rereads."
1373 " Make your next response the concrete mutation tool call itself, not another"
1374 " bookkeeping-only turn."
1375 )
1376 return (
1377 f" Resume by creating `{label}` now. Prefer one concrete directory-creation "
1378 f"step for `{target}` instead of more rereads."
1379 )
1380 guidance = (
1381 f" Resume by creating `{label}` now. Prefer one `write` call for `{target}` "
1382 "instead of more rereads."
1383 )
1384 if not target.parent.exists():
1385 guidance += (
1386 " The `write` tool can create that file's parent directories automatically,"
1387 " so do the write in one step instead of stopping for a separate mkdir."
1388 )
1389 guidance += (
1390 " Make your next response the concrete mutation tool call itself, not another"
1391 " bookkeeping-only turn."
1392 )
1393 return guidance
1394
1395
1396 def _compact_missing_artifact_handoff(
1397 missing_artifact: tuple[Path, bool] | None,
1398 *,
1399 project_root: Path,
1400 messages: list[Any] | None = None,
1401 ) -> str:
1402 """Build a shorter first-mutation handoff once the next output target is known."""
1403
1404 if missing_artifact is None:
1405 return ""
1406
1407 target, expect_directory = missing_artifact
1408 label = target.name or str(target)
1409 if expect_directory and not label.endswith("/"):
1410 label += "/"
1411 if expect_directory:
1412 next_output_file, _ = infer_next_output_file(
1413 target=target,
1414 project_root=project_root,
1415 messages=list(messages or []),
1416 )
1417 if next_output_file is None:
1418 if target.is_dir():
1419 return (
1420 f"Next step: create the next output file under `{label}`. Prefer one "
1421 f"concrete `write` call inside `{target}` now."
1422 )
1423 return (
1424 f"Next step: create `{label}`. Prefer one concrete directory-creation step "
1425 f"for `{target}` now."
1426 )
1427 guidance = (
1428 f"Next step: create `{next_output_file.name}`. Prefer one `write` call for "
1429 f"`{next_output_file}` now."
1430 )
1431 if not next_output_file.parent.exists():
1432 guidance += (
1433 " The `write` tool can create that file's parent directories automatically."
1434 )
1435 guidance += " Make your next response the concrete mutation tool call itself."
1436 return guidance
1437
1438 guidance = (
1439 f"Next step: create `{label}`. Prefer one `write` call for `{target}` now."
1440 )
1441 if not target.parent.exists():
1442 guidance += (
1443 " The `write` tool can create that file's parent directories automatically."
1444 )
1445 guidance += " Make your next response the concrete mutation tool call itself."
1446 return guidance
1447
1448
1449 def _todo_refresh_guidance(
1450 dod: DefinitionOfDone,
1451 *,
1452 project_root: Path | None = None,
1453 ) -> str:
1454 non_special_pending = [
1455 item
1456 for item in effective_pending_todo_items(dod, project_root=project_root)
1457 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
1458 ]
1459 non_special_completed = [
1460 item for item in dod.completed_items if item not in _TODO_NUDGE_EXCLUDED_ITEMS
1461 ]
1462 if len(dod.touched_files) < 2 and (len(non_special_pending) + len(non_special_completed)) < 3:
1463 return ""
1464 return (
1465 " If the tracked steps no longer match the confirmed progress, refresh `TodoWrite` "
1466 "in the same response as the next concrete step instead of spending a full turn on "
1467 "bookkeeping alone."
1468 )
1469
1470
1471 def _mark_verification_stale(
1472 *,
1473 context: RuntimeContext,
1474 summary: TurnSummary,
1475 dod: DefinitionOfDone,
1476 tool_call: ToolCall,
1477 ) -> None:
1478 detail = _stale_verification_detail(tool_call)
1479 stale_attempt = ensure_active_verification_attempt(dod)
1480 next_attempt = begin_new_verification_attempt(
1481 dod,
1482 supersedes_attempt_id=stale_attempt.attempt_id,
1483 )
1484 append_verification_timeline_entry(
1485 context,
1486 summary,
1487 reason_code="verification_stale",
1488 reason_summary="previous verification became stale after new mutating work",
1489 evidence_summary=[f"fresh verification required after {detail}"],
1490 evidence_provenance=_stale_verification_provenance(dod, detail=detail),
1491 verification_observations=_stale_verification_observations(
1492 dod,
1493 detail=detail,
1494 stale_attempt_id=stale_attempt.attempt_id,
1495 stale_attempt_number=stale_attempt.attempt_number,
1496 superseded_by_attempt_id=next_attempt.attempt_id,
1497 ),
1498 )
1499 dod.last_verification_result = VerificationObservationStatus.STALE.value
1500 dod.evidence = []
1501 while _VERIFY_ITEM in dod.completed_items:
1502 dod.completed_items.remove(_VERIFY_ITEM)
1503 if _VERIFY_ITEM not in dod.pending_items:
1504 dod.pending_items.append(_VERIFY_ITEM)
1505
1506
1507 def _todo_is_mutation_step(label: str) -> bool:
1508 lowered = label.lower()
1509 return any(token in lowered for token in _MUTATION_TODO_HINTS)
1510
1511
1512 def _mark_verification_planned(
1513 *,
1514 context: RuntimeContext,
1515 summary: TurnSummary,
1516 dod: DefinitionOfDone,
1517 tool_call: ToolCall,
1518 ) -> None:
1519 if dod.last_verification_result in {
1520 VerificationObservationStatus.PLANNED.value,
1521 VerificationObservationStatus.PENDING.value,
1522 VerificationObservationStatus.STALE.value,
1523 }:
1524 return
1525 if not dod.verification_commands:
1526 dod.verification_commands = derive_verification_commands(
1527 dod,
1528 project_root=context.project_root,
1529 task_statement=dod.task_statement,
1530 )
1531 commands = [command for command in dod.verification_commands if command]
1532 if not commands:
1533 return
1534
1535 attempt = begin_new_verification_attempt(dod)
1536 detail = _stale_verification_detail(tool_call)
1537 append_verification_timeline_entry(
1538 context,
1539 summary,
1540 reason_code="verification_planned",
1541 reason_summary="verification is planned after new mutating work",
1542 evidence_summary=[f"verification planned for `{command}`" for command in commands[:2]],
1543 evidence_provenance=[
1544 EvidenceProvenance(
1545 category="verification",
1546 source="dod.verification_commands",
1547 summary=f"verification planned for `{command}`",
1548 status=EvidenceProvenanceStatus.MISSING.value,
1549 subject=command,
1550 detail=detail,
1551 )
1552 for command in commands
1553 ],
1554 verification_observations=[
1555 VerificationObservation(
1556 status=VerificationObservationStatus.PLANNED.value,
1557 summary=f"verification planned for `{command}`",
1558 command=command,
1559 kind="runtime",
1560 detail=detail,
1561 attempt_id=attempt.attempt_id,
1562 attempt_number=attempt.attempt_number,
1563 )
1564 for command in commands
1565 ],
1566 )
1567 dod.last_verification_result = VerificationObservationStatus.PLANNED.value
1568 while _VERIFY_ITEM in dod.completed_items:
1569 dod.completed_items.remove(_VERIFY_ITEM)
1570 if _VERIFY_ITEM not in dod.pending_items:
1571 dod.pending_items.append(_VERIFY_ITEM)
1572
1573
1574 def _stale_verification_observations(
1575 dod: DefinitionOfDone,
1576 *,
1577 detail: str,
1578 stale_attempt_id: str,
1579 stale_attempt_number: int,
1580 superseded_by_attempt_id: str,
1581 ) -> list[VerificationObservation]:
1582 return [
1583 VerificationObservation(
1584 status=VerificationObservationStatus.STALE.value,
1585 summary=f"verification became stale for `{command}` after new mutating work",
1586 command=command,
1587 kind="runtime",
1588 detail=detail,
1589 attempt_id=stale_attempt_id,
1590 attempt_number=stale_attempt_number,
1591 supersedes_attempt_id=superseded_by_attempt_id,
1592 )
1593 for command in _stale_verification_commands(dod)
1594 ]
1595
1596
1597 def _stale_verification_provenance(
1598 dod: DefinitionOfDone,
1599 *,
1600 detail: str,
1601 ) -> list[EvidenceProvenance]:
1602 return [
1603 EvidenceProvenance(
1604 category="verification",
1605 source="tool_execution",
1606 summary=f"fresh verification required for `{command}` after new mutating work",
1607 status=EvidenceProvenanceStatus.MISSING.value,
1608 subject=command,
1609 detail=detail,
1610 )
1611 for command in _stale_verification_commands(dod)
1612 ]
1613
1614
1615 def _stale_verification_commands(dod: DefinitionOfDone) -> list[str]:
1616 commands = [command for command in dod.verification_commands if command]
1617 if commands:
1618 return commands
1619 observed = [evidence.command for evidence in dod.evidence if evidence.command]
1620 if observed:
1621 return observed
1622 return ["verification"]
1623
1624
1625 def _stale_verification_detail(tool_call: ToolCall) -> str:
1626 if tool_call.name in {"write", "edit", "patch"}:
1627 file_path = str(tool_call.arguments.get("file_path", "")).strip()
1628 if file_path:
1629 return f"{tool_call.name} changed {file_path}"
1630 if tool_call.name == "bash":
1631 command = str(tool_call.arguments.get("command", "")).strip()
1632 if command:
1633 return f"bash ran `{command}`"
1634 return f"{tool_call.name} changed the workspace"
1635
1636
1637 def _current_mutation_label(tool_call: ToolCall) -> str:
1638 if tool_call.name in {"write", "edit", "patch"}:
1639 file_path = str(tool_call.arguments.get("file_path", "")).strip()
1640 if file_path:
1641 return f"`{Path(file_path).name or file_path}`"
1642 if tool_call.name == "bash":
1643 command = str(tool_call.arguments.get("command", "")).strip()
1644 if command:
1645 return f"`{command}`"
1646 return f"the successful `{tool_call.name}` result"
1647
1648
1649 def _tool_call_label(tool_call: ToolCall) -> str:
1650 """Human-readable label for one tool call."""
1651 name = tool_call.name
1652 if name in ("write", "edit", "patch"):
1653 path = str(tool_call.arguments.get("file_path", "")).strip()
1654 if path:
1655 short = Path(path).name
1656 verb = "Write" if name == "write" else "Edit"
1657 return f"{verb} {short}"
1658 if name == "bash":
1659 cmd = str(tool_call.arguments.get("command", "")).strip()
1660 if cmd:
1661 return f"Run {cmd[:40]}"
1662 if name == "read":
1663 path = str(tool_call.arguments.get("file_path", "")).strip()
1664 if path:
1665 return f"Read {Path(path).name}"
1666 if name == "glob":
1667 pattern = str(tool_call.arguments.get("pattern", "")).strip()
1668 if pattern:
1669 return f"Search {pattern[:30]}"
1670 return ""
1671
1672
1673 def _batch_planned_labels(tool_calls: list[ToolCall]) -> list[str]:
1674 """Build labels for all tool calls in a batch (for upfront planning display)."""
1675 labels = []
1676 for tc in tool_calls:
1677 label = _tool_call_label(tc)
1678 if label and label not in labels:
1679 labels.append(label)
1680 return labels