Python · 52884 bytes Raw Blame History
1 """Tool-batch execution and recovery bookkeeping for the typed runtime."""
2
3 from __future__ import annotations
4
5 from collections.abc import Awaitable, Callable
6 from dataclasses import dataclass, field
7 from pathlib import Path
8 from typing import Any
9
10 from ..llm.base import ToolCall
11 from .compaction import infer_preferred_next_step, summarize_confirmed_facts
12 from .context import RuntimeContext
13 from .dod import (
14 DefinitionOfDone,
15 DefinitionOfDoneStore,
16 all_planned_artifacts_exist,
17 begin_new_verification_attempt,
18 collect_planned_artifact_targets,
19 derive_verification_commands,
20 ensure_active_verification_attempt,
21 infer_next_declared_html_output_file,
22 is_state_mutating_tool_call,
23 planned_artifact_target_satisfied,
24 record_successful_tool_call,
25 synthesize_todo_items,
26 )
27 from .events import AgentEvent, TurnSummary
28 from .evidence_provenance import EvidenceProvenance, EvidenceProvenanceStatus
29 from .executor import ToolExecutionState, ToolExecutor
30 from .logging import get_runtime_logger
31 from .policy_timeline import append_verification_timeline_entry
32 from .repair_focus import extract_active_repair_context
33 from .safeguard_services import extract_shell_text_rewrite_target
34 from .tool_batch_checks import ToolBatchConfidenceGate, ToolBatchVerificationGate
35 from .tool_batch_recovery import ToolBatchRecoveryController
36 from .verification_observations import (
37 VerificationObservation,
38 VerificationObservationStatus,
39 )
40 from .workflow import (
41 advance_todos_from_tool_call,
42 effective_pending_todo_items,
43 reconcile_aggregate_completion_steps,
44 sync_todos_to_definition_of_done,
45 )
46
47 EventSink = Callable[[AgentEvent], Awaitable[None]]
48 ConfirmationHandler = (
49 Callable[[str, str, str, dict[str, Any] | None], Awaitable[bool]] | None
50 )
51 UserQuestionHandler = Callable[[str, list[str] | None], Awaitable[str]] | None
52
53 _VERIFY_ITEM = "Collect verification evidence"
54 _TODO_NUDGE_EXCLUDED_ITEMS = {
55 "Complete the requested work",
56 _VERIFY_ITEM,
57 }
58 _MUTATION_TODO_HINTS = (
59 "create",
60 "creating",
61 "update",
62 "updating",
63 "edit",
64 "editing",
65 "write",
66 "writing",
67 "fix",
68 "fixing",
69 "modify",
70 "modifying",
71 "change",
72 "changing",
73 "patch",
74 "patching",
75 "replace",
76 "replacing",
77 "correct",
78 "correcting",
79 "rewrite",
80 "rewriting",
81 )
82 _CONSISTENCY_REVIEW_HINTS = (
83 "consistent",
84 "consistently",
85 "formatted",
86 "link",
87 "linked",
88 "navigation",
89 "work properly",
90 "all files",
91 "every file",
92 "ensure",
93 )
94 _BOOKKEEPING_NOTE_TOOL_NAMES = {
95 "notepad_write_working",
96 "notepad_append",
97 "notepad_write_priority",
98 "notepad_write_manual",
99 }
100
101
102 @dataclass
103 class ToolBatchResult:
104 """Outcome of running one assistant-proposed tool batch."""
105
106 actions_taken: list[str] = field(default_factory=list)
107 consecutive_errors: int = 0
108 halted: bool = False
109 final_response: str = ""
110
111
112 class ToolBatchRunner:
113 """Owns tool-batch execution, recovery, and post-tool bookkeeping."""
114
115 def __init__(
116 self,
117 context: RuntimeContext,
118 dod_store: DefinitionOfDoneStore,
119 *,
120 confidence_gate: ToolBatchConfidenceGate | None = None,
121 recovery_controller: ToolBatchRecoveryController | None = None,
122 verification_gate: ToolBatchVerificationGate | None = None,
123 ) -> None:
124 self.context = context
125 self.dod_store = dod_store
126 self.confidence_gate = confidence_gate or ToolBatchConfidenceGate(context)
127 self.recovery_controller = recovery_controller or ToolBatchRecoveryController(context)
128 self.verification_gate = verification_gate or ToolBatchVerificationGate(context)
129
130 async def execute_batch(
131 self,
132 *,
133 tool_calls: list[ToolCall],
134 tool_source: str,
135 pending_tool_calls_seen: set[str],
136 emit: EventSink,
137 summary: TurnSummary,
138 dod: DefinitionOfDone,
139 executor: ToolExecutor,
140 on_confirmation: ConfirmationHandler,
141 on_user_question: UserQuestionHandler,
142 emit_confirmation,
143 consecutive_errors: int,
144 ) -> ToolBatchResult:
145 """Run one assistant tool batch through the shared executor seam."""
146
147 result = ToolBatchResult(consecutive_errors=consecutive_errors)
148
149 # Pre-populate planned items for the entire batch so the todo
150 # widget shows what's coming, not just what's done.
151 planned_labels = _batch_planned_labels(tool_calls)
152 completed_labels: list[str] = []
153
154 async def _emit_batch_todos() -> None:
155 """Emit a todo update combining DoD state with batch progress."""
156 items = synthesize_todo_items(dod)
157 for label in planned_labels:
158 if label in completed_labels:
159 continue
160 # Don't duplicate items already in DoD
161 if any(item["content"] == label for item in items):
162 continue
163 items.append({"content": label, "status": "in_progress", "active_form": label})
164 if items:
165 await emit(AgentEvent(type="todo_update", todo_items=items))
166
167 await _emit_batch_todos()
168
169 for tool_call in tool_calls:
170 cfg = self.context.config.reasoning
171
172 if cfg.confidence_scoring:
173 should_skip = await self.confidence_gate.should_skip(
174 tool_call=tool_call,
175 emit=emit,
176 )
177 if should_skip:
178 continue
179
180 if tool_call.id not in pending_tool_calls_seen:
181 await emit(
182 AgentEvent(
183 type="tool_call",
184 tool_name=tool_call.name,
185 tool_call_id=tool_call.id,
186 tool_args=tool_call.arguments,
187 phase="assistant",
188 )
189 )
190
191 result.actions_taken.append(
192 f"{tool_call.name}: {str(tool_call.arguments)[:100]}"
193 )
194
195 outcome = await executor.execute_tool_call(
196 tool_call,
197 on_confirmation=on_confirmation,
198 on_user_question=on_user_question,
199 emit_confirmation=emit_confirmation,
200 source=tool_source,
201 )
202 executed_tool_call = outcome.tool_call
203 if (
204 outcome.rollback_action is not None
205 and self.context.config.reasoning.show_rollback_plan
206 ):
207 await emit(
208 AgentEvent(
209 type="rollback",
210 content=(
211 f"Rollback tracked: {outcome.rollback_action.description}"
212 ),
213 rollback_action=outcome.rollback_action,
214 )
215 )
216
217 if (
218 outcome.state == ToolExecutionState.EXECUTED
219 and outcome.is_error
220 and self.context.config.auto_recover
221 ):
222 recovery_result = await self.recovery_controller.build_follow_up(
223 tool_call=executed_tool_call,
224 outcome=outcome,
225 emit=emit,
226 )
227 if recovery_result is not None:
228 summary.tool_result_messages.append(recovery_result)
229 self.context.session.append(recovery_result)
230 continue
231
232 if outcome.state == ToolExecutionState.EXECUTED and not outcome.is_error:
233 loop_response = await self._record_successful_execution(
234 tool_call=executed_tool_call,
235 outcome=outcome,
236 dod=dod,
237 emit=emit,
238 summary=summary,
239 )
240 # Mark this tool's label as completed and emit live progress
241 label = _tool_call_label(executed_tool_call)
242 if label:
243 completed_labels.append(label)
244 await _emit_batch_todos()
245 if loop_response is not None:
246 result.halted = True
247 result.final_response = loop_response
248 return result
249
250 if outcome.is_error:
251 result.consecutive_errors += 1
252 else:
253 result.consecutive_errors = 0
254
255 await emit(
256 AgentEvent(
257 type="tool_result",
258 content=outcome.event_content,
259 tool_name=executed_tool_call.name,
260 tool_call_id=outcome.tool_call.id,
261 tool_metadata=(
262 outcome.registry_result.metadata
263 if outcome.registry_result is not None
264 else None
265 ),
266 is_error=outcome.is_error,
267 phase="assistant",
268 )
269 )
270
271 # Always append tool results to the session so the model sees
272 # its own output. The verification gate may inject a correction
273 # prompt, but the original result must still be in context —
274 # otherwise the model operates blind and loops.
275 self.context.session.append(outcome.message)
276 summary.tool_result_messages.append(outcome.message)
277 if outcome.state == ToolExecutionState.DUPLICATE:
278 self._queue_duplicate_observation_nudge(tool_call, dod=dod)
279 elif outcome.state == ToolExecutionState.BLOCKED:
280 self._queue_blocked_active_repair_nudge(outcome.event_content)
281 self._queue_blocked_active_repair_mutation_nudge(outcome.event_content)
282 self._queue_blocked_completed_artifact_scope_nudge(
283 outcome.event_content,
284 dod=dod,
285 )
286 self._queue_blocked_late_reference_drift_nudge(
287 outcome.event_content,
288 dod=dod,
289 )
290 self._queue_blocked_shell_rewrite_nudge(tool_call)
291 self._queue_blocked_html_edit_nudge(tool_call, outcome.event_content)
292
293 should_continue = await self.verification_gate.should_continue(
294 tool_call=tool_call,
295 outcome=outcome,
296 emit=emit,
297 )
298
299 rlog = get_runtime_logger()
300 rlog.tool_exec(
301 name=tool_call.name,
302 state=outcome.state.value,
303 is_error=outcome.is_error,
304 result_preview=outcome.event_content,
305 appended_to_session=True,
306 )
307 if should_continue:
308 rlog.verification_gate(tool_call.name, should_continue=True)
309 continue
310
311 if result.consecutive_errors >= 3:
312 final_response = (
313 "I ran into some issues. "
314 "Let me know if you'd like me to try a different approach."
315 )
316 summary.final_response = final_response
317 summary.failures.append("three consecutive tool errors")
318 await emit(AgentEvent(type="response", content=final_response))
319 result.halted = True
320 result.final_response = final_response
321
322 return result
323
324 def _queue_duplicate_observation_nudge(
325 self,
326 tool_call: ToolCall,
327 *,
328 dod: DefinitionOfDone,
329 ) -> None:
330 """Queue a concrete next-step nudge after duplicate observational actions."""
331
332 if tool_call.name not in {"read", "glob", "grep", "bash"}:
333 return
334
335 current_task = getattr(self.context.session, "current_task", None)
336 missing_artifact = _next_missing_planned_artifact(
337 dod,
338 project_root=self.context.project_root,
339 )
340 next_pending = next(
341 (
342 item
343 for item in effective_pending_todo_items(
344 dod,
345 project_root=self.context.project_root,
346 )
347 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
348 ),
349 None,
350 )
351 confirmed_facts = summarize_confirmed_facts(
352 self.context.session.messages,
353 max_items=2,
354 )
355 if _should_prioritize_missing_artifact(
356 next_pending=next_pending,
357 missing_artifact=missing_artifact,
358 ):
359 prefix = "Reuse the earlier observation instead of repeating it. "
360 if confirmed_facts:
361 prefix += f"Confirmed facts: {confirmed_facts}. "
362 self.context.queue_steering_message(
363 prefix
364 + "An explicitly planned artifact is still missing."
365 + _missing_artifact_resume_suffix(
366 missing_artifact,
367 project_root=self.context.project_root,
368 )
369 + " Do not switch into review or consistency-check mode until the missing artifact exists."
370 )
371 return
372 if next_pending:
373 mutation_suffix = ""
374 if _todo_is_mutation_step(next_pending):
375 mutation_suffix = _missing_artifact_resume_suffix(
376 missing_artifact,
377 project_root=self.context.project_root,
378 )
379 if not mutation_suffix:
380 mutation_suffix = (
381 " You already have enough evidence for that step, so stop gathering "
382 "more reference material and perform the change now."
383 )
384 if confirmed_facts:
385 self.context.queue_steering_message(
386 "Reuse the earlier observation instead of repeating it. "
387 f"Confirmed facts: {confirmed_facts}. "
388 f"Continue with the next pending item: `{next_pending}`. "
389 "Only gather more evidence if a specific fact required for that step is still unknown."
390 + mutation_suffix
391 )
392 else:
393 self.context.queue_steering_message(
394 "Reuse the earlier observation instead of repeating it. "
395 f"Continue with the next pending item: `{next_pending}`. "
396 "Only gather more evidence if a specific fact required for that step is still unknown."
397 + mutation_suffix
398 )
399 return
400
401 if missing_artifact is not None:
402 self.context.queue_steering_message(
403 "Reuse the earlier observation instead of repeating it. "
404 + _missing_artifact_resume_suffix(
405 missing_artifact,
406 project_root=self.context.project_root,
407 ).strip()
408 )
409 return
410
411 if all_planned_artifacts_exist(dod, project_root=self.context.project_root):
412 verification_commands = dod.verification_commands or derive_verification_commands(
413 dod,
414 project_root=self.context.project_root,
415 task_statement=current_task,
416 supplement_existing=True,
417 )
418 verification_suffix = (
419 "Move to verification or final confirmation using the files already on disk."
420 if verification_commands
421 else "Finish the current review using the files already on disk."
422 )
423 self.context.queue_steering_message(
424 "Reuse the earlier observation instead of repeating it. "
425 "All explicitly planned artifacts already exist. "
426 "Use the current task artifacts as the source of truth and do not reopen "
427 "reference materials unless one specific gap is still unknown. "
428 + verification_suffix
429 )
430 return
431
432 preferred_next_step = infer_preferred_next_step(
433 self.context.session.messages,
434 current_task=current_task,
435 )
436 if preferred_next_step and confirmed_facts:
437 self.context.queue_steering_message(
438 "Reuse the earlier observation instead of repeating it. "
439 f"Confirmed facts: {confirmed_facts}. "
440 f"{preferred_next_step} "
441 "Only gather more evidence if a specific filename, href, or title is still unknown."
442 )
443 return
444
445 if preferred_next_step:
446 self.context.queue_steering_message(
447 "Reuse the earlier observation instead of repeating it. "
448 f"{preferred_next_step} "
449 "Only gather more evidence if a specific filename, href, or title is still unknown."
450 )
451 return
452
453 target_path = str(
454 tool_call.arguments.get("file_path")
455 or tool_call.arguments.get("path")
456 or ""
457 ).strip()
458 if target_path:
459 self.context.queue_steering_message(
460 "Reuse the earlier observation instead of repeating it. "
461 f"Use the current contents of `{target_path}` and take a different next step. "
462 "Only gather more evidence if a specific filename, href, or title is still unknown."
463 )
464 return
465
466 self.context.queue_steering_message(
467 "Reuse the earlier observation instead of repeating it. "
468 "Choose a different next step that makes progress."
469 )
470
471 def _queue_blocked_shell_rewrite_nudge(self, tool_call: ToolCall) -> None:
472 """Steer the model back to file tools after a blocked shell text rewrite."""
473
474 if tool_call.name != "bash":
475 return
476
477 target = extract_shell_text_rewrite_target(
478 str(tool_call.arguments.get("command", ""))
479 )
480 if target is None:
481 return
482
483 current_task = getattr(self.context.session, "current_task", None)
484 confirmed_facts = summarize_confirmed_facts(
485 self.context.session.messages,
486 max_items=2,
487 )
488 preferred_next_step = infer_preferred_next_step(
489 self.context.session.messages,
490 current_task=current_task,
491 )
492
493 if preferred_next_step and confirmed_facts:
494 self.context.queue_steering_message(
495 "Use Loader's file tools for this text edit instead of a shell rewrite. "
496 f"Confirmed facts: {confirmed_facts}. "
497 f"{preferred_next_step} "
498 f"Target `{target}` with edit/patch/write rather than `bash`."
499 )
500 return
501
502 self.context.queue_steering_message(
503 "Use Loader's file tools for this text edit instead of a shell rewrite. "
504 f"Apply the change to `{target}` with edit/patch/write."
505 )
506
507 def _queue_blocked_active_repair_nudge(self, event_content: str) -> None:
508 """Reinforce active repair focus after an out-of-scope blocked observation."""
509
510 if "[Blocked - active repair scope:" not in event_content:
511 return
512
513 repair = extract_active_repair_context(self.context.session.messages)
514 if repair is None:
515 return
516
517 if repair.allowed_paths:
518 allowed_preview = ", ".join(f"`{path}`" for path in repair.allowed_paths[:3])
519 if len(repair.allowed_paths) > 3:
520 allowed_preview += ", ..."
521 self.context.queue_steering_message(
522 "Verification already identified the active repair target. "
523 f"Stay on the concrete repair files {allowed_preview} "
524 f"and repair `{repair.artifact_path}` directly. "
525 "Do not reopen unrelated reference materials while this repair target is unresolved."
526 )
527 return
528
529 roots_preview = ", ".join(f"`{root}`" for root in repair.allowed_roots[:2])
530 if len(repair.allowed_roots) > 2:
531 roots_preview += ", ..."
532 self.context.queue_steering_message(
533 "Verification already identified the active repair target. "
534 f"Stay within the current artifact set under {roots_preview} "
535 f"and repair `{repair.artifact_path}` directly. "
536 "Do not reopen unrelated reference materials while this repair target is unresolved."
537 )
538
539 def _queue_blocked_active_repair_mutation_nudge(self, event_content: str) -> None:
540 """Keep repair-phase mutations pinned to the named repair files."""
541
542 if "[Blocked - active repair mutation scope:" not in event_content:
543 return
544
545 repair = extract_active_repair_context(self.context.session.messages)
546 if repair is None or not repair.allowed_paths:
547 return
548
549 allowed_preview = ", ".join(f"`{path}`" for path in repair.allowed_paths[:3])
550 if len(repair.allowed_paths) > 3:
551 allowed_preview += ", ..."
552 self.context.queue_steering_message(
553 "Verification already identified the concrete repair files. "
554 f"Keep mutations pinned to {allowed_preview} "
555 f"and repair `{repair.artifact_path}` before widening the change set."
556 )
557
558 def _queue_blocked_late_reference_drift_nudge(
559 self,
560 event_content: str,
561 *,
562 dod: DefinitionOfDone,
563 ) -> None:
564 """Reinforce missing-artifact progress after late-stage reference drift is blocked."""
565
566 if "[Blocked - late reference drift:" not in event_content:
567 return
568
569 missing_artifact = _next_missing_planned_artifact(
570 dod,
571 project_root=self.context.project_root,
572 )
573 if missing_artifact is None:
574 return
575
576 planned_roots: list[str] = []
577 seen_roots: set[str] = set()
578 for target, expect_directory in collect_planned_artifact_targets(
579 dod,
580 project_root=self.context.project_root,
581 ):
582 root = str(target if expect_directory else target.parent)
583 if root in seen_roots:
584 continue
585 seen_roots.add(root)
586 planned_roots.append(root)
587
588 roots_preview = ", ".join(f"`{root}`" for root in planned_roots[:2])
589 if len(planned_roots) > 2:
590 roots_preview += ", ..."
591 self.context.queue_steering_message(
592 "Late-stage reference rereads are no longer helping. "
593 "One explicitly planned artifact is still missing."
594 + _missing_artifact_resume_suffix(
595 missing_artifact,
596 project_root=self.context.project_root,
597 )
598 + f" Stay within the current output roots under {roots_preview}"
599 + " and finish that artifact before reopening older reference materials."
600 )
601
602 def _queue_blocked_completed_artifact_scope_nudge(
603 self,
604 event_content: str,
605 *,
606 dod: DefinitionOfDone,
607 ) -> None:
608 """Keep post-build review anchored to the generated artifact set."""
609
610 if "[Blocked - completed artifact set scope:" not in event_content:
611 return
612
613 planned_roots: list[str] = []
614 seen_roots: set[str] = set()
615 for target, expect_directory in collect_planned_artifact_targets(
616 dod,
617 project_root=self.context.project_root,
618 ):
619 root = str(target if expect_directory else target.parent)
620 if root in seen_roots:
621 continue
622 seen_roots.add(root)
623 planned_roots.append(root)
624
625 next_pending = next(
626 (
627 item
628 for item in effective_pending_todo_items(
629 dod,
630 project_root=self.context.project_root,
631 )
632 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
633 ),
634 None,
635 )
636 roots_preview = ", ".join(f"`{root}`" for root in planned_roots[:2])
637 if len(planned_roots) > 2:
638 roots_preview += ", ..."
639 if next_pending and _todo_is_consistency_review_step(next_pending):
640 self.context.queue_steering_message(
641 "All explicitly planned artifacts already exist. "
642 f"Stay within the current output roots under {roots_preview} and continue "
643 f"with `{next_pending}` using the generated files as the source of truth. "
644 "Do not reopen earlier reference materials."
645 )
646 return
647
648 self.context.queue_steering_message(
649 "All explicitly planned artifacts already exist. "
650 f"Stay within the current output roots under {roots_preview} "
651 "and move to verification or final confirmation using the generated files. "
652 "Do not reopen earlier reference materials."
653 )
654
655 def _queue_blocked_html_edit_nudge(self, tool_call: ToolCall, event_content: str) -> None:
656 """Keep blocked edit feedback generic; avoid task-class-specific steering."""
657
658 _ = tool_call, event_content
659 return
660
661 async def _record_successful_execution(
662 self,
663 *,
664 tool_call: ToolCall,
665 outcome,
666 dod: DefinitionOfDone,
667 emit: EventSink,
668 summary: TurnSummary,
669 ) -> str | None:
670 """Update DoD bookkeeping after a successful tool execution."""
671
672 is_mutating = is_state_mutating_tool_call(tool_call)
673 previously_verified = dod.last_verification_result == "passed"
674 record_successful_tool_call(dod, tool_call)
675 if previously_verified and is_mutating:
676 _mark_verification_stale(
677 context=self.context,
678 summary=summary,
679 dod=dod,
680 tool_call=tool_call,
681 )
682 elif is_mutating:
683 _mark_verification_planned(
684 context=self.context,
685 summary=summary,
686 dod=dod,
687 tool_call=tool_call,
688 )
689 if tool_call.name == "TodoWrite" and outcome.registry_result is not None:
690 new_todos = outcome.registry_result.metadata.get("new_todos", [])
691 if isinstance(new_todos, list):
692 sync_todos_to_definition_of_done(
693 dod,
694 new_todos,
695 project_root=self.context.project_root,
696 )
697 self._queue_todowrite_resume_nudge(dod=dod)
698 else:
699 pending_before = list(dod.pending_items)
700 if advance_todos_from_tool_call(dod, tool_call):
701 reconcile_aggregate_completion_steps(
702 dod,
703 project_root=self.context.project_root,
704 )
705 self._queue_next_pending_todo_nudge(
706 tool_call=tool_call,
707 pending_before=pending_before,
708 dod=dod,
709 )
710 self._queue_bookkeeping_resume_nudge(
711 tool_call=tool_call,
712 dod=dod,
713 )
714 self._queue_missing_artifact_progress_nudge(
715 tool_call=tool_call,
716 dod=dod,
717 )
718 self._queue_planned_artifact_handoff_nudge(
719 tool_call=tool_call,
720 dod=dod,
721 )
722 self.dod_store.save(dod)
723 recovery_context = self.context.recovery_context
724 if recovery_context is not None:
725 recovery_context.note_success(tool_call.name, tool_call.arguments)
726 if recovery_context.should_clear_after_success(
727 tool_call.name,
728 tool_call.arguments,
729 ):
730 self.context.recovery_context = None
731 return None
732
733 def _queue_next_pending_todo_nudge(
734 self,
735 *,
736 tool_call: ToolCall,
737 pending_before: list[str],
738 dod: DefinitionOfDone,
739 ) -> None:
740 if is_state_mutating_tool_call(tool_call):
741 return
742 if tool_call.name not in {"read", "glob", "grep", "bash"}:
743 return
744 if tool_call.name == "bash":
745 command = str(tool_call.arguments.get("command", "")).lower()
746 if not any(
747 token in command
748 for token in (
749 "ls ",
750 " ls",
751 "find ",
752 "grep ",
753 "rg ",
754 "cat ",
755 "sed ",
756 "head ",
757 "tail ",
758 )
759 ):
760 return
761
762 completed_label = next(
763 (
764 item
765 for item in pending_before
766 if item not in dod.pending_items
767 and item not in _TODO_NUDGE_EXCLUDED_ITEMS
768 ),
769 None,
770 )
771 next_pending = next(
772 (
773 item
774 for item in effective_pending_todo_items(
775 dod,
776 project_root=self.context.project_root,
777 )
778 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
779 ),
780 None,
781 )
782 if not completed_label or not next_pending or next_pending == completed_label:
783 return
784
785 missing_artifact = _next_missing_planned_artifact(
786 dod,
787 project_root=self.context.project_root,
788 )
789 if _should_prioritize_missing_artifact(
790 next_pending=next_pending,
791 missing_artifact=missing_artifact,
792 ):
793 self.context.queue_steering_message(
794 f"Confirmed progress: `{completed_label}` is now satisfied by the successful "
795 f"`{tool_call.name}` result. One explicitly planned artifact is still missing."
796 + _missing_artifact_resume_suffix(
797 missing_artifact,
798 project_root=self.context.project_root,
799 )
800 + " Do not switch into review or consistency-check mode until the missing artifact exists."
801 )
802 return
803
804 mutation_suffix = ""
805 if _todo_is_mutation_step(next_pending):
806 mutation_suffix = _missing_artifact_resume_suffix(
807 missing_artifact,
808 project_root=self.context.project_root,
809 )
810 if not mutation_suffix:
811 mutation_suffix = (
812 " You already have enough evidence for that step, so stop gathering "
813 "more reference material and perform the change now."
814 )
815
816 self.context.queue_steering_message(
817 f"Confirmed progress: `{completed_label}` is now satisfied by the successful "
818 f"`{tool_call.name}` result. Continue with the next pending item: "
819 f"`{next_pending}` instead of rereading the same evidence.{mutation_suffix}"
820 )
821
822 def _queue_planned_artifact_handoff_nudge(
823 self,
824 *,
825 tool_call: ToolCall,
826 dod: DefinitionOfDone,
827 ) -> None:
828 if not is_state_mutating_tool_call(tool_call):
829 return
830 if not all_planned_artifacts_exist(dod, project_root=self.context.project_root):
831 return
832
833 next_pending = next(
834 (
835 item
836 for item in effective_pending_todo_items(
837 dod,
838 project_root=self.context.project_root,
839 )
840 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
841 ),
842 None,
843 )
844 verification_commands = dod.verification_commands or derive_verification_commands(
845 dod,
846 project_root=self.context.project_root,
847 task_statement=getattr(self.context.session, "current_task", "") or "",
848 supplement_existing=True,
849 )
850
851 if next_pending and _todo_is_consistency_review_step(next_pending):
852 verification_suffix = (
853 " Move to verification once no specific mismatch remains."
854 if verification_commands
855 else " Avoid another full reread unless one specific inconsistency is still unknown."
856 )
857 self.context.queue_steering_message(
858 "All explicitly planned artifacts now exist. "
859 f"Continue with the next pending item: `{next_pending}`. "
860 "Use the files already on disk as the source of truth instead of restarting "
861 "discovery or inventing alternate filenames."
862 + verification_suffix
863 )
864 return
865
866 if verification_commands:
867 self.context.queue_steering_message(
868 "All explicitly planned artifacts now exist. "
869 "Do not expand the artifact set or restart discovery unless a specific gap is "
870 "still known. Move to verification or final confirmation using the files that "
871 "already exist."
872 )
873
874 def _queue_missing_artifact_progress_nudge(
875 self,
876 *,
877 tool_call: ToolCall,
878 dod: DefinitionOfDone,
879 ) -> None:
880 if not is_state_mutating_tool_call(tool_call):
881 return
882 missing_artifact = _next_missing_planned_artifact(
883 dod,
884 project_root=self.context.project_root,
885 )
886 if missing_artifact is None:
887 return
888
889 current_label = _current_mutation_label(tool_call)
890 todo_refresh = _todo_refresh_guidance(
891 dod,
892 project_root=self.context.project_root,
893 )
894 self.context.queue_steering_message(
895 f"Confirmed progress: {current_label} is now recorded."
896 " One explicitly planned artifact is still missing."
897 + _missing_artifact_resume_suffix(
898 missing_artifact,
899 project_root=self.context.project_root,
900 )
901 + todo_refresh
902 + " Do not move to verification, final confirmation, or TodoWrite-only "
903 "bookkeeping until that artifact exists."
904 + " Do not spend another turn on working notes or rediscovery alone."
905 )
906
907 def _queue_todowrite_resume_nudge(
908 self,
909 *,
910 dod: DefinitionOfDone,
911 ) -> None:
912 missing_artifact = _next_missing_planned_artifact(
913 dod,
914 project_root=self.context.project_root,
915 )
916 next_pending = next(
917 (
918 item
919 for item in effective_pending_todo_items(
920 dod,
921 project_root=self.context.project_root,
922 )
923 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
924 ),
925 None,
926 )
927 if missing_artifact is None:
928 if next_pending and _todo_is_mutation_step(next_pending):
929 self.context.queue_steering_message(
930 "Todo tracking is updated. Continue with the next pending item: "
931 f"`{next_pending}`. Use the current output files as the source of "
932 "truth, and do not reopen reference materials unless one specific "
933 "fact required for that step is still unknown. Perform the mutation "
934 "now instead of spending another turn on planning, rereads, or "
935 "verification."
936 )
937 return
938
939 if (
940 next_pending
941 and _todo_is_consistency_review_step(next_pending)
942 and not all_planned_artifacts_exist(
943 dod,
944 project_root=self.context.project_root,
945 )
946 ):
947 self.context.queue_steering_message(
948 "Todo tracking is updated. Continue with the next pending item: "
949 f"`{next_pending}`. Use the current output files as the source of "
950 "truth, and do not reopen reference materials unless one specific "
951 "mismatch is still unknown."
952 )
953 return
954
955 if not all_planned_artifacts_exist(dod, project_root=self.context.project_root):
956 return
957
958 verification_commands = dod.verification_commands or derive_verification_commands(
959 dod,
960 project_root=self.context.project_root,
961 task_statement=getattr(self.context.session, "current_task", "") or "",
962 supplement_existing=True,
963 )
964 if next_pending and _todo_is_consistency_review_step(next_pending):
965 verification_suffix = (
966 " Move to verification once no specific mismatch remains."
967 if verification_commands
968 else " Finish the targeted consistency pass without reopening reference materials."
969 )
970 self.context.queue_steering_message(
971 "Todo tracking is updated. All explicitly planned artifacts now exist. "
972 f"Continue with the next pending item: `{next_pending}`. "
973 "Use the current output files as the source of truth, and do not restart "
974 "early discovery or reopen reference materials."
975 + verification_suffix
976 )
977 return
978
979 verification_suffix = (
980 " Move to verification or final confirmation using the files already on disk."
981 if verification_commands
982 else " Finish the task using the files already on disk."
983 )
984 self.context.queue_steering_message(
985 "Todo tracking is updated. All explicitly planned artifacts now exist. "
986 "Do not restart discovery, reopen reference materials, or spend another turn "
987 "on TodoWrite alone."
988 + verification_suffix
989 )
990 return
991
992 todo_refresh = _todo_refresh_guidance(
993 dod,
994 project_root=self.context.project_root,
995 )
996 next_pending_suffix = (
997 f" Continue with the next pending item: `{next_pending}`."
998 if next_pending
999 else ""
1000 )
1001 self.context.queue_steering_message(
1002 "Todo tracking is updated. An explicitly planned artifact is still missing."
1003 + next_pending_suffix
1004 + _missing_artifact_resume_suffix(
1005 missing_artifact,
1006 project_root=self.context.project_root,
1007 )
1008 + todo_refresh
1009 + " Do not spend the next turn on TodoWrite alone, bookkeeping notes, "
1010 "verification, or final confirmation until that artifact exists."
1011 )
1012
1013 def _queue_bookkeeping_resume_nudge(
1014 self,
1015 *,
1016 tool_call: ToolCall,
1017 dod: DefinitionOfDone,
1018 ) -> None:
1019 if tool_call.name not in _BOOKKEEPING_NOTE_TOOL_NAMES:
1020 return
1021
1022 missing_artifact = _next_missing_planned_artifact(
1023 dod,
1024 project_root=self.context.project_root,
1025 )
1026 if missing_artifact is None:
1027 return
1028
1029 next_pending = next(
1030 (
1031 item
1032 for item in effective_pending_todo_items(
1033 dod,
1034 project_root=self.context.project_root,
1035 )
1036 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
1037 ),
1038 None,
1039 )
1040 todo_refresh = _todo_refresh_guidance(
1041 dod,
1042 project_root=self.context.project_root,
1043 )
1044 if (
1045 next_pending
1046 and not _todo_is_mutation_step(next_pending)
1047 and not _todo_is_consistency_review_step(next_pending)
1048 ):
1049 self.context.queue_steering_message(
1050 "Bookkeeping note is recorded. Continue with the next pending item: "
1051 f"`{next_pending}`. Make your next response one concrete evidence-gathering "
1052 "tool call that advances that step, not another bookkeeping-only turn."
1053 + todo_refresh
1054 + " Do not jump ahead to later artifact creation, verification, or final "
1055 "confirmation until that step is satisfied."
1056 )
1057 return
1058
1059 self.context.queue_steering_message(
1060 "Bookkeeping note is recorded. An explicitly planned artifact is still missing."
1061 + _missing_artifact_resume_suffix(
1062 missing_artifact,
1063 project_root=self.context.project_root,
1064 )
1065 + todo_refresh
1066 + " Do not spend the next turn on additional notes, rediscovery, "
1067 "verification, or final confirmation until that artifact exists."
1068 )
1069
1070
1071 def _todo_is_consistency_review_step(item: str) -> bool:
1072 text = item.lower()
1073 return any(hint in text for hint in _CONSISTENCY_REVIEW_HINTS)
1074
1075
1076 def _should_prioritize_missing_artifact(
1077 *,
1078 next_pending: str | None,
1079 missing_artifact: tuple[Path, bool] | None,
1080 ) -> bool:
1081 if missing_artifact is None:
1082 return False
1083 if not next_pending:
1084 return True
1085 if _todo_is_consistency_review_step(next_pending):
1086 return True
1087 return not _todo_is_mutation_step(next_pending)
1088
1089
1090 def _next_missing_planned_artifact(
1091 dod: DefinitionOfDone,
1092 *,
1093 project_root: Path,
1094 ) -> tuple[Path, bool] | None:
1095 for target, expect_directory in collect_planned_artifact_targets(
1096 dod,
1097 project_root=project_root,
1098 max_paths=12,
1099 ):
1100 if not planned_artifact_target_satisfied(
1101 dod,
1102 target=target,
1103 expect_directory=expect_directory,
1104 project_root=project_root,
1105 ):
1106 return target, expect_directory
1107 return None
1108
1109
1110 def _missing_artifact_resume_suffix(
1111 missing_artifact: tuple[Path, bool] | None,
1112 *,
1113 project_root: Path,
1114 ) -> str:
1115 if missing_artifact is None:
1116 return ""
1117
1118 target, expect_directory = missing_artifact
1119 label = target.name or str(target)
1120 if expect_directory and not label.endswith("/"):
1121 label += "/"
1122 if expect_directory:
1123 next_output_file = infer_next_declared_html_output_file(
1124 target=target,
1125 project_root=project_root,
1126 )
1127 if next_output_file is not None:
1128 guidance = (
1129 f" Resume by creating `{next_output_file.name}` now. It is the next missing "
1130 f"declared output under `{label}`. Prefer one `write` call for "
1131 f"`{next_output_file}` instead of more rereads."
1132 )
1133 if not next_output_file.parent.exists():
1134 guidance += (
1135 " The `write` tool can create that file's parent directories automatically,"
1136 " so do the write in one step instead of stopping for a separate mkdir."
1137 )
1138 guidance += (
1139 " Make your next response the concrete mutation tool call itself, not another"
1140 " bookkeeping-only turn."
1141 )
1142 return guidance
1143 if target.is_dir():
1144 return (
1145 f" Resume by creating the next output file under `{label}` now. Prefer one "
1146 f"concrete `write` call for a file inside `{target}` instead of more rereads."
1147 " Make your next response the concrete mutation tool call itself, not another"
1148 " bookkeeping-only turn."
1149 )
1150 return (
1151 f" Resume by creating `{label}` now. Prefer one concrete directory-creation "
1152 f"step for `{target}` instead of more rereads."
1153 )
1154 guidance = (
1155 f" Resume by creating `{label}` now. Prefer one `write` call for `{target}` "
1156 "instead of more rereads."
1157 )
1158 if not target.parent.exists():
1159 guidance += (
1160 " The `write` tool can create that file's parent directories automatically,"
1161 " so do the write in one step instead of stopping for a separate mkdir."
1162 )
1163 guidance += (
1164 " Make your next response the concrete mutation tool call itself, not another"
1165 " bookkeeping-only turn."
1166 )
1167 return guidance
1168
1169
1170 def _todo_refresh_guidance(
1171 dod: DefinitionOfDone,
1172 *,
1173 project_root: Path | None = None,
1174 ) -> str:
1175 non_special_pending = [
1176 item
1177 for item in effective_pending_todo_items(dod, project_root=project_root)
1178 if item not in _TODO_NUDGE_EXCLUDED_ITEMS
1179 ]
1180 non_special_completed = [
1181 item for item in dod.completed_items if item not in _TODO_NUDGE_EXCLUDED_ITEMS
1182 ]
1183 if len(dod.touched_files) < 2 and (len(non_special_pending) + len(non_special_completed)) < 3:
1184 return ""
1185 return (
1186 " If the tracked steps no longer match the confirmed progress, refresh `TodoWrite` "
1187 "in the same response as the next concrete step instead of spending a full turn on "
1188 "bookkeeping alone."
1189 )
1190
1191
1192 def _mark_verification_stale(
1193 *,
1194 context: RuntimeContext,
1195 summary: TurnSummary,
1196 dod: DefinitionOfDone,
1197 tool_call: ToolCall,
1198 ) -> None:
1199 detail = _stale_verification_detail(tool_call)
1200 stale_attempt = ensure_active_verification_attempt(dod)
1201 next_attempt = begin_new_verification_attempt(
1202 dod,
1203 supersedes_attempt_id=stale_attempt.attempt_id,
1204 )
1205 append_verification_timeline_entry(
1206 context,
1207 summary,
1208 reason_code="verification_stale",
1209 reason_summary="previous verification became stale after new mutating work",
1210 evidence_summary=[f"fresh verification required after {detail}"],
1211 evidence_provenance=_stale_verification_provenance(dod, detail=detail),
1212 verification_observations=_stale_verification_observations(
1213 dod,
1214 detail=detail,
1215 stale_attempt_id=stale_attempt.attempt_id,
1216 stale_attempt_number=stale_attempt.attempt_number,
1217 superseded_by_attempt_id=next_attempt.attempt_id,
1218 ),
1219 )
1220 dod.last_verification_result = VerificationObservationStatus.STALE.value
1221 dod.evidence = []
1222 while _VERIFY_ITEM in dod.completed_items:
1223 dod.completed_items.remove(_VERIFY_ITEM)
1224 if _VERIFY_ITEM not in dod.pending_items:
1225 dod.pending_items.append(_VERIFY_ITEM)
1226
1227
1228 def _todo_is_mutation_step(label: str) -> bool:
1229 lowered = label.lower()
1230 return any(token in lowered for token in _MUTATION_TODO_HINTS)
1231
1232
1233 def _mark_verification_planned(
1234 *,
1235 context: RuntimeContext,
1236 summary: TurnSummary,
1237 dod: DefinitionOfDone,
1238 tool_call: ToolCall,
1239 ) -> None:
1240 if dod.last_verification_result in {
1241 VerificationObservationStatus.PLANNED.value,
1242 VerificationObservationStatus.PENDING.value,
1243 VerificationObservationStatus.STALE.value,
1244 }:
1245 return
1246 if not dod.verification_commands:
1247 dod.verification_commands = derive_verification_commands(
1248 dod,
1249 project_root=context.project_root,
1250 task_statement=dod.task_statement,
1251 )
1252 commands = [command for command in dod.verification_commands if command]
1253 if not commands:
1254 return
1255
1256 attempt = begin_new_verification_attempt(dod)
1257 detail = _stale_verification_detail(tool_call)
1258 append_verification_timeline_entry(
1259 context,
1260 summary,
1261 reason_code="verification_planned",
1262 reason_summary="verification is planned after new mutating work",
1263 evidence_summary=[f"verification planned for `{command}`" for command in commands[:2]],
1264 evidence_provenance=[
1265 EvidenceProvenance(
1266 category="verification",
1267 source="dod.verification_commands",
1268 summary=f"verification planned for `{command}`",
1269 status=EvidenceProvenanceStatus.MISSING.value,
1270 subject=command,
1271 detail=detail,
1272 )
1273 for command in commands
1274 ],
1275 verification_observations=[
1276 VerificationObservation(
1277 status=VerificationObservationStatus.PLANNED.value,
1278 summary=f"verification planned for `{command}`",
1279 command=command,
1280 kind="runtime",
1281 detail=detail,
1282 attempt_id=attempt.attempt_id,
1283 attempt_number=attempt.attempt_number,
1284 )
1285 for command in commands
1286 ],
1287 )
1288 dod.last_verification_result = VerificationObservationStatus.PLANNED.value
1289 while _VERIFY_ITEM in dod.completed_items:
1290 dod.completed_items.remove(_VERIFY_ITEM)
1291 if _VERIFY_ITEM not in dod.pending_items:
1292 dod.pending_items.append(_VERIFY_ITEM)
1293
1294
1295 def _stale_verification_observations(
1296 dod: DefinitionOfDone,
1297 *,
1298 detail: str,
1299 stale_attempt_id: str,
1300 stale_attempt_number: int,
1301 superseded_by_attempt_id: str,
1302 ) -> list[VerificationObservation]:
1303 return [
1304 VerificationObservation(
1305 status=VerificationObservationStatus.STALE.value,
1306 summary=f"verification became stale for `{command}` after new mutating work",
1307 command=command,
1308 kind="runtime",
1309 detail=detail,
1310 attempt_id=stale_attempt_id,
1311 attempt_number=stale_attempt_number,
1312 supersedes_attempt_id=superseded_by_attempt_id,
1313 )
1314 for command in _stale_verification_commands(dod)
1315 ]
1316
1317
1318 def _stale_verification_provenance(
1319 dod: DefinitionOfDone,
1320 *,
1321 detail: str,
1322 ) -> list[EvidenceProvenance]:
1323 return [
1324 EvidenceProvenance(
1325 category="verification",
1326 source="tool_execution",
1327 summary=f"fresh verification required for `{command}` after new mutating work",
1328 status=EvidenceProvenanceStatus.MISSING.value,
1329 subject=command,
1330 detail=detail,
1331 )
1332 for command in _stale_verification_commands(dod)
1333 ]
1334
1335
1336 def _stale_verification_commands(dod: DefinitionOfDone) -> list[str]:
1337 commands = [command for command in dod.verification_commands if command]
1338 if commands:
1339 return commands
1340 observed = [evidence.command for evidence in dod.evidence if evidence.command]
1341 if observed:
1342 return observed
1343 return ["verification"]
1344
1345
1346 def _stale_verification_detail(tool_call: ToolCall) -> str:
1347 if tool_call.name in {"write", "edit", "patch"}:
1348 file_path = str(tool_call.arguments.get("file_path", "")).strip()
1349 if file_path:
1350 return f"{tool_call.name} changed {file_path}"
1351 if tool_call.name == "bash":
1352 command = str(tool_call.arguments.get("command", "")).strip()
1353 if command:
1354 return f"bash ran `{command}`"
1355 return f"{tool_call.name} changed the workspace"
1356
1357
1358 def _current_mutation_label(tool_call: ToolCall) -> str:
1359 if tool_call.name in {"write", "edit", "patch"}:
1360 file_path = str(tool_call.arguments.get("file_path", "")).strip()
1361 if file_path:
1362 return f"`{Path(file_path).name or file_path}`"
1363 if tool_call.name == "bash":
1364 command = str(tool_call.arguments.get("command", "")).strip()
1365 if command:
1366 return f"`{command}`"
1367 return f"the successful `{tool_call.name}` result"
1368
1369
1370 def _tool_call_label(tool_call: ToolCall) -> str:
1371 """Human-readable label for one tool call."""
1372 name = tool_call.name
1373 if name in ("write", "edit", "patch"):
1374 path = str(tool_call.arguments.get("file_path", "")).strip()
1375 if path:
1376 short = Path(path).name
1377 verb = "Write" if name == "write" else "Edit"
1378 return f"{verb} {short}"
1379 if name == "bash":
1380 cmd = str(tool_call.arguments.get("command", "")).strip()
1381 if cmd:
1382 return f"Run {cmd[:40]}"
1383 if name == "read":
1384 path = str(tool_call.arguments.get("file_path", "")).strip()
1385 if path:
1386 return f"Read {Path(path).name}"
1387 if name == "glob":
1388 pattern = str(tool_call.arguments.get("pattern", "")).strip()
1389 if pattern:
1390 return f"Search {pattern[:30]}"
1391 return ""
1392
1393
1394 def _batch_planned_labels(tool_calls: list[ToolCall]) -> list[str]:
1395 """Build labels for all tool calls in a batch (for upfront planning display)."""
1396 labels = []
1397 for tc in tool_calls:
1398 label = _tool_call_label(tc)
1399 if label and label not in labels:
1400 labels.append(label)
1401 return labels