Python · 70385 bytes Raw Blame History
1 """Assistant-response repair and fallback helpers for the typed runtime."""
2
3 from __future__ import annotations
4
5 import json
6 import re
7 import shlex
8 from dataclasses import dataclass, field
9 from pathlib import Path
10
11 from ..llm.base import ToolCall
12 from .context import RuntimeContext
13 from .dod import (
14 DefinitionOfDone,
15 collect_planned_artifact_targets,
16 infer_next_output_file,
17 planned_artifact_target_satisfied,
18 )
19 from .parsing import parse_tool_calls
20 from .path_display import display_runtime_path
21 from .recovery import detect_missing_mutation_payload
22 from .workflow import (
23 infer_output_outline_label,
24 infer_pending_todo_output_target,
25 preferred_pending_todo_item,
26 reconcile_aggregate_completion_steps,
27 todo_describes_aggregate_mutation,
28 todo_describes_broad_setup_step,
29 todo_file_candidates,
30 )
31
32 _SPECIAL_DOD_ITEMS = {
33 "Complete the requested work",
34 "Collect verification evidence",
35 }
36 _FIRST_FILE_EMPTY_RETRY_EXTRA = 2
37 _LATE_STAGE_EMPTY_RETRY_EXTRA = 2
38 _MULTI_FILE_OUTPUT_EMPTY_RETRY_EXTRA = 2
39 _SUMMARY_ARTIFACT_NAMES = {
40 "index.html",
41 "index.htm",
42 "readme",
43 "readme.md",
44 "readme.rst",
45 "readme.txt",
46 }
47 _WORKING_NOTE_TOOL_NAMES = (
48 "notepad_write_working",
49 "notepad_append",
50 "notepad_write_priority",
51 "notepad_write_manual",
52 )
53 _MUTATION_TODO_HINTS = (
54 "create",
55 "creating",
56 "develop",
57 "developing",
58 "build",
59 "building",
60 "update",
61 "updating",
62 "edit",
63 "editing",
64 "write",
65 "writing",
66 "fix",
67 "fixing",
68 "modify",
69 "modifying",
70 "change",
71 "changing",
72 "patch",
73 "patching",
74 "replace",
75 "replacing",
76 "correct",
77 "correcting",
78 "rewrite",
79 "rewriting",
80 )
81 _CONSISTENCY_REVIEW_HINTS = (
82 "consistent",
83 "consistently",
84 "formatted",
85 "link",
86 "linked",
87 "navigation",
88 "work properly",
89 "all files",
90 "every file",
91 )
92
93
94 @dataclass(slots=True)
95 class EmptyResponseDecision:
96 """Decision for an empty assistant response."""
97
98 should_continue: bool
99 reason_code: str | None = None
100 reason_summary: str | None = None
101 retry_message: str | None = None
102 final_response: str | None = None
103 failure: str | None = None
104
105
106 @dataclass(slots=True)
107 class ToolCallAnalysis:
108 """Normalized assistant-output analysis for tool execution."""
109
110 content: str
111 response_content: str
112 tool_calls: list[ToolCall] = field(default_factory=list)
113 tool_source: str = "native"
114 clear_stream: bool = False
115 is_final_answer: bool = False
116 should_stop: bool = False
117 reason_code: str | None = None
118 reason_summary: str | None = None
119 final_response: str | None = None
120 failure: str | None = None
121 extracted_iterations: int = 0
122
123
124 class ResponseRepairer:
125 """Owns response-repair heuristics that used to live inline in the loop."""
126
127 def __init__(self, context: RuntimeContext) -> None:
128 self.context = context
129
130 def handle_empty_response(
131 self,
132 *,
133 task: str,
134 original_task: str | None,
135 empty_retry_count: int,
136 max_empty_retries: int,
137 dod: DefinitionOfDone | None = None,
138 ) -> EmptyResponseDecision:
139 """Return the next action when the assistant responds with empty content."""
140
141 _ = task, original_task
142 effective_max_empty_retries = self._effective_max_empty_retries(
143 dod,
144 base_max_empty_retries=max_empty_retries,
145 )
146 if empty_retry_count <= effective_max_empty_retries:
147 return EmptyResponseDecision(
148 should_continue=True,
149 reason_code="empty_response_retry",
150 reason_summary=(
151 "retried after the assistant returned an empty response"
152 ),
153 retry_message=self._build_empty_response_retry_message(
154 dod,
155 retry_number=empty_retry_count,
156 max_empty_retries=effective_max_empty_retries,
157 ),
158 )
159
160 return EmptyResponseDecision(
161 should_continue=False,
162 reason_code="empty_response_retry_exhausted",
163 reason_summary="stopped after the assistant returned empty responses repeatedly",
164 final_response=(
165 "I didn't get a usable response from the model after "
166 f"retrying {effective_max_empty_retries} times. Please try again or "
167 "switch to a different backend/model."
168 ),
169 failure="assistant returned empty output repeatedly",
170 )
171
172 def analyze_response(
173 self,
174 *,
175 content: str,
176 response_content: str,
177 tool_calls: list[ToolCall],
178 extracted_iterations: int,
179 max_extracted_iterations: int,
180 ) -> ToolCallAnalysis:
181 """Normalize assistant output into final-answer, tool, or repair outcomes."""
182
183 normalized_content = content
184 normalized_tool_calls = list(tool_calls)
185 tool_source = "native"
186
187 if self.context.use_react:
188 parsed = parse_tool_calls(content)
189 normalized_tool_calls = parsed.tool_calls
190 normalized_content = parsed.content
191
192 if parsed.is_final_answer and not normalized_tool_calls:
193 return ToolCallAnalysis(
194 content=normalized_content,
195 response_content=response_content,
196 is_final_answer=True,
197 final_response=normalized_content,
198 )
199
200 clear_stream = False
201 next_extracted_iterations = extracted_iterations
202 if not normalized_tool_calls:
203 raw_tool_calls = self._extract_raw_tool_calls(response_content)
204 if raw_tool_calls:
205 normalized_tool_calls = raw_tool_calls
206 tool_source = "raw_text"
207 clear_stream = True
208
209 if normalized_tool_calls and tool_source == "raw_text":
210 next_extracted_iterations += 1
211 if next_extracted_iterations > max_extracted_iterations:
212 return ToolCallAnalysis(
213 content=normalized_content,
214 response_content=response_content,
215 tool_calls=normalized_tool_calls,
216 tool_source=tool_source,
217 clear_stream=clear_stream,
218 extracted_iterations=next_extracted_iterations,
219 should_stop=True,
220 reason_code="raw_text_tool_recovery_exhausted",
221 reason_summary=(
222 "stopped after raw-text tool recovery budget was exhausted"
223 ),
224 final_response=(
225 "I couldn't safely continue because the model kept emitting "
226 "raw-text tool calls instead of proper tool invocations. "
227 "Please try again or switch to a different backend/model."
228 ),
229 failure="raw-text tool recovery budget exhausted",
230 )
231
232 return ToolCallAnalysis(
233 content=normalized_content,
234 response_content=response_content,
235 tool_calls=normalized_tool_calls,
236 tool_source=tool_source,
237 clear_stream=clear_stream,
238 reason_code=(
239 "raw_text_tool_recovered" if tool_source == "raw_text" else None
240 ),
241 reason_summary=(
242 "recovered raw-text tool calls into executable tool invocations"
243 if tool_source == "raw_text"
244 else None
245 ),
246 extracted_iterations=next_extracted_iterations,
247 )
248
249 def _extract_raw_tool_calls(self, response_content: str) -> list[ToolCall]:
250 """Recover raw-text tool calls from the runtime parser and registry."""
251
252 allowed_tool_names = [
253 tool.name for tool in self.context.registry.list_tools()
254 ]
255 parsed = parse_tool_calls(
256 response_content,
257 allowed_tool_names=allowed_tool_names,
258 )
259 return parsed.tool_calls
260
261 def _build_empty_response_retry_message(
262 self,
263 dod: DefinitionOfDone | None,
264 *,
265 retry_number: int,
266 max_empty_retries: int,
267 ) -> str:
268 if dod is not None:
269 minimal_retry_message = self._build_early_concrete_write_retry_message(
270 dod,
271 retry_number=retry_number,
272 max_empty_retries=max_empty_retries,
273 )
274 if minimal_retry_message is not None:
275 return minimal_retry_message
276 if dod is not None and self._should_compact_empty_retry_message(dod):
277 compact_lines: list[str] = []
278 compact_lines.extend(self._compact_planned_artifact_lines(dod))
279 compact_lines.extend(self._payload_retry_lines(dod))
280 compact_lines.extend(
281 self._next_step_resume_lines(
282 dod,
283 retry_number=retry_number,
284 )
285 )
286 return "\n".join(
287 [
288 "[EMPTY ASSISTANT RESPONSE]",
289 (
290 "Your last response was empty "
291 f"(retry {retry_number}/{max_empty_retries}). Continue from the "
292 "exact next step below."
293 ),
294 *[f"- {line}" for line in compact_lines],
295 "",
296 "Respond with that concrete mutation tool call now. Do not return an empty response.",
297 ]
298 )
299
300 progress_lines: list[str] = []
301 if dod is not None:
302 reconcile_aggregate_completion_steps(
303 dod,
304 project_root=self.context.project_root,
305 )
306 latest_working_note = self._latest_working_note()
307 if latest_working_note:
308 progress_lines.append(
309 "Latest working note: " + latest_working_note
310 )
311
312 planned_lines = self._planned_artifact_progress_lines(dod)
313 progress_lines.extend(planned_lines)
314 progress_lines.extend(self._payload_retry_lines(dod))
315 progress_lines.extend(
316 self._next_step_resume_lines(
317 dod,
318 retry_number=retry_number,
319 )
320 )
321
322 touched = [
323 f"`{Path(path).name or path}`"
324 for path in dod.touched_files[-3:]
325 if str(path).strip()
326 ]
327 if touched:
328 progress_lines.append(
329 "Confirmed touched files: " + ", ".join(touched)
330 )
331
332 completed = [
333 item
334 for item in dod.completed_items
335 if item not in _SPECIAL_DOD_ITEMS
336 ]
337 if completed:
338 progress_lines.append(
339 "Confirmed completed work: " + "; ".join(completed[-2:])
340 )
341
342 preferred_missing_artifact = self._preferred_resume_missing_artifact(dod)
343 next_pending = self._preferred_resume_pending_item(
344 dod,
345 missing_artifact=preferred_missing_artifact,
346 )
347 resume_already_names_pending = bool(
348 next_pending
349 and any(f"`{next_pending}`" in line for line in progress_lines)
350 )
351 if next_pending and not resume_already_names_pending:
352 progress_lines.append(f"Next pending item: {next_pending}")
353 todo_refresh = self._todo_refresh_retry_line(dod)
354 if todo_refresh:
355 progress_lines.append(todo_refresh)
356
357 if not progress_lines:
358 return (
359 "[EMPTY ASSISTANT RESPONSE]\n"
360 f"Your last response was empty (retry {retry_number}/{max_empty_retries}). "
361 "Respond directly to the task "
362 "or call tools if needed. Do not return an empty response."
363 )
364
365 return "\n".join(
366 [
367 "[EMPTY ASSISTANT RESPONSE]",
368 (
369 "Your last response was empty "
370 f"(retry {retry_number}/{max_empty_retries}). Continue from the "
371 "confirmed progress below instead of restarting."
372 ),
373 *[f"- {line}" for line in progress_lines],
374 "",
375 "Respond directly to the task or call tools if needed. Do not return an empty response.",
376 ]
377 )
378
379 def _build_early_concrete_write_retry_message(
380 self,
381 dod: DefinitionOfDone,
382 *,
383 retry_number: int,
384 max_empty_retries: int,
385 ) -> str | None:
386 if retry_number < 3:
387 return None
388 if not self._has_confirmed_output_file_progress(dod):
389 return None
390 if self._has_confirmed_substantive_output_file_progress(dod):
391 return None
392
393 next_missing_artifact = self._preferred_resume_missing_artifact(dod)
394 next_pending = self._preferred_resume_pending_item(
395 dod,
396 missing_artifact=next_missing_artifact,
397 )
398 inferred_pending_target = (
399 self._infer_pending_item_output_target(dod, next_pending)
400 if next_pending
401 else None
402 )
403 concrete_target: Path | None = None
404 if inferred_pending_target is not None and not inferred_pending_target.exists():
405 concrete_target = inferred_pending_target.expanduser().resolve(strict=False)
406 elif next_missing_artifact is not None and not next_missing_artifact[1]:
407 concrete_target = next_missing_artifact[0].expanduser().resolve(strict=False)
408 if concrete_target is None or not concrete_target.suffix:
409 return None
410
411 outline_label = infer_output_outline_label(
412 dod,
413 concrete_target,
414 project_root=self.context.project_root,
415 todo_label=next_pending or "",
416 )
417 if next_pending and _todo_is_mutation_step(next_pending):
418 first_line = (
419 f"Continue `{next_pending}` by creating `{concrete_target.name}`."
420 )
421 else:
422 first_line = f"Create `{concrete_target.name}` now."
423 compact_retry = retry_number >= 4
424
425 lines = [
426 first_line,
427 self._mutation_tool_scaffold(concrete_target, tool_name="write"),
428 ]
429 if outline_label:
430 lines.append(
431 f"Use the existing outline label `{outline_label}` for that file so it matches the current guide structure."
432 )
433 html_scaffold_line = self._known_existing_html_scaffold_line(
434 concrete_target,
435 require_first_substantive_output=True,
436 )
437 if html_scaffold_line:
438 lines.append(html_scaffold_line)
439 reference_line = self._known_reference_structure_line(
440 concrete_target,
441 require_first_substantive_output=True,
442 )
443 if reference_line and not compact_retry:
444 lines.append(reference_line)
445 reference_cues_line = self._known_reference_cues_line(
446 concrete_target,
447 require_first_substantive_output=True,
448 retry_number=retry_number,
449 )
450 if reference_cues_line and not compact_retry:
451 lines.append(reference_cues_line)
452 html_starter_line = self._known_html_starter_shape_line(
453 concrete_target,
454 require_first_substantive_output=True,
455 retry_number=retry_number,
456 outline_label=outline_label,
457 )
458 if html_starter_line:
459 lines.append(html_starter_line)
460 html_template_line = self._known_html_starter_template_line(
461 concrete_target,
462 require_first_substantive_output=True,
463 retry_number=retry_number,
464 outline_label=outline_label,
465 )
466 if html_template_line:
467 lines.append(html_template_line)
468 if (
469 not compact_retry
470 and _should_encourage_initial_version(
471 target=concrete_target,
472 has_confirmed_output_file_progress=True,
473 has_confirmed_substantive_output_file_progress=False,
474 )
475 ):
476 lines.append(
477 "Write a compact but real initial version of this file now, then refine or expand it in later edits."
478 )
479 lines.append(
480 "No narration, no TodoWrite, no rereads, and no empty response; emit the mutation tool call now."
481 )
482 return "\n".join(
483 [
484 "[EMPTY ASSISTANT RESPONSE]",
485 (
486 "Your last response was empty "
487 f"(retry {retry_number}/{max_empty_retries}). Emit the exact next mutation now."
488 ),
489 *[f"- {line}" for line in lines],
490 ]
491 )
492
493 def _payload_retry_lines(self, dod: DefinitionOfDone | None) -> list[str]:
494 recovery_context = self.context.recovery_context
495 if recovery_context is None or not recovery_context.attempts:
496 return []
497 attempt = recovery_context.attempts[-1]
498 fix = detect_missing_mutation_payload(
499 attempt.tool_name,
500 attempt.arguments,
501 attempt.error,
502 )
503 if fix is None:
504 return []
505
506 target = fix["file_path"] or self._preferred_retry_target(dod)
507 invalid = ", ".join(f"`{field}`" for field in fix["invalid_fields"])
508 display_target = display_runtime_path(target) if target else None
509 if fix.get("kind") == "missing_target":
510 if attempt.tool_name == "write":
511 target_line = (
512 f"Last tool failure: resend `write` for `{display_target}` with a valid `file_path` and real `content`."
513 if display_target
514 else "Last tool failure: resend `write` with a valid `file_path` and real `content`."
515 )
516 return [
517 target_line,
518 "Do not leave `file_path` empty; point it at the concrete next output file.",
519 self._mutation_tool_scaffold(
520 Path(target),
521 tool_name="write",
522 )
523 if target
524 else "Emit the `write(file_path=..., content=\"...\")` call with the real target path now.",
525 ]
526 if attempt.tool_name == "edit":
527 target_line = (
528 f"Last tool failure: resend `edit` for `{display_target}` with a valid `file_path` plus real `old_string`/`new_string`."
529 if display_target
530 else "Last tool failure: resend `edit` with a valid `file_path` plus real `old_string`/`new_string`."
531 )
532 return [
533 target_line,
534 "Do not leave `file_path` empty; point it at the concrete file you already know needs the edit.",
535 self._mutation_tool_scaffold(
536 Path(target),
537 tool_name="edit",
538 )
539 if target
540 else "Emit the `edit(file_path=..., old_string=\"...\", new_string=\"...\")` call with the real target path now.",
541 ]
542 if attempt.tool_name == "patch":
543 target_line = (
544 f"Last tool failure: resend `patch` for `{display_target}` with a valid `file_path` and real patch text or `hunks`."
545 if display_target
546 else "Last tool failure: resend `patch` with a valid `file_path` and real patch text or `hunks`."
547 )
548 return [
549 target_line,
550 "Do not leave `file_path` empty; point it at the concrete file you already know needs the patch.",
551 self._mutation_tool_scaffold(
552 Path(target),
553 tool_name="patch",
554 )
555 if target
556 else "Emit the `patch(file_path=..., patch=\"...\")` call with the real target path now.",
557 ]
558 if attempt.tool_name == "write":
559 lines = [
560 (
561 f"Last tool failure: resend `write` for `{display_target}` with real `content`, not just summary fields."
562 if display_target
563 else "Last tool failure: resend `write` with real `content`, not just summary fields."
564 ),
565 ]
566 lines.append(f"Do not use {invalid} in place of the actual file body.")
567 if target:
568 lines.append(
569 self._mutation_tool_scaffold(
570 Path(target),
571 tool_name="write",
572 )
573 )
574 return lines
575 if attempt.tool_name == "edit":
576 lines = [
577 (
578 f"Last tool failure: resend `edit` for `{display_target}` with the real text payload."
579 if display_target
580 else "Last tool failure: resend `edit` with the real text payload."
581 ),
582 f"Do not use {invalid} in place of `old_string`/`new_string`.",
583 ]
584 if target:
585 lines.append(
586 self._mutation_tool_scaffold(
587 Path(target),
588 tool_name="edit",
589 )
590 )
591 return lines
592 if attempt.tool_name == "patch":
593 lines = [
594 (
595 f"Last tool failure: resend `patch` for `{display_target}` with real patch text or structured hunks."
596 if display_target
597 else "Last tool failure: resend `patch` with real patch text or structured hunks."
598 ),
599 f"Do not use {invalid} in place of the real patch payload.",
600 ]
601 if target:
602 lines.append(
603 self._mutation_tool_scaffold(
604 Path(target),
605 tool_name="patch",
606 )
607 )
608 return lines
609 return []
610
611 def _todo_refresh_retry_line(self, dod: DefinitionOfDone) -> str | None:
612 non_special_pending = [
613 item for item in dod.pending_items if item not in _SPECIAL_DOD_ITEMS
614 ]
615 non_special_completed = [
616 item for item in dod.completed_items if item not in _SPECIAL_DOD_ITEMS
617 ]
618 if len(dod.touched_files) < 2 and (len(non_special_pending) + len(non_special_completed)) < 3:
619 return None
620 return (
621 "If the tracked steps are stale, refresh `TodoWrite` alongside the next "
622 "concrete mutation instead of spending a full turn on bookkeeping alone."
623 )
624
625 def _effective_max_empty_retries(
626 self,
627 dod: DefinitionOfDone | None,
628 *,
629 base_max_empty_retries: int,
630 ) -> int:
631 if dod is None:
632 return base_max_empty_retries
633 completed_artifacts, missing_artifacts = self._planned_artifact_counts(dod)
634 if completed_artifacts >= 3 and missing_artifacts > 0:
635 return base_max_empty_retries + _LATE_STAGE_EMPTY_RETRY_EXTRA
636 if self._has_concrete_next_output_step(dod):
637 extra_retries = _LATE_STAGE_EMPTY_RETRY_EXTRA
638 if self._has_confirmed_substantive_output_file_progress(dod):
639 extra_retries += _MULTI_FILE_OUTPUT_EMPTY_RETRY_EXTRA
640 elif completed_artifacts > 0:
641 extra_retries += _FIRST_FILE_EMPTY_RETRY_EXTRA
642 return base_max_empty_retries + extra_retries
643 return base_max_empty_retries
644
645 def _should_compact_empty_retry_message(self, dod: DefinitionOfDone) -> bool:
646 completed_artifacts, missing_artifacts = self._planned_artifact_counts(dod)
647 if completed_artifacts >= 3:
648 return missing_artifacts > 0
649 return self._has_concrete_next_output_step(dod)
650
651 def _planned_artifact_counts(self, dod: DefinitionOfDone) -> tuple[int, int]:
652 completed = 0
653 missing = 0
654 for target, expect_directory in collect_planned_artifact_targets(
655 dod,
656 project_root=self.context.project_root,
657 max_paths=12,
658 ):
659 if planned_artifact_target_satisfied(
660 dod,
661 target=target,
662 expect_directory=expect_directory,
663 project_root=self.context.project_root,
664 ):
665 completed += 1
666 else:
667 missing += 1
668 return completed, missing
669
670 def _has_concrete_next_output_step(self, dod: DefinitionOfDone) -> bool:
671 next_missing_artifact = next(
672 (
673 artifact
674 for artifact in collect_planned_artifact_targets(
675 dod,
676 project_root=self.context.project_root,
677 max_paths=12,
678 )
679 if not planned_artifact_target_satisfied(
680 dod,
681 target=artifact[0],
682 expect_directory=artifact[1],
683 project_root=self.context.project_root,
684 )
685 ),
686 None,
687 )
688 next_pending = self._preferred_resume_pending_item(
689 dod,
690 missing_artifact=next_missing_artifact,
691 )
692 if next_pending and self._infer_pending_item_output_target(dod, next_pending):
693 return True
694 if next_missing_artifact is None:
695 return False
696 target, expect_directory = next_missing_artifact
697 if not expect_directory:
698 return True
699 next_output_file, _ = infer_next_output_file(
700 target=target,
701 project_root=self.context.project_root,
702 messages=list(getattr(self.context.session, "messages", []) or []),
703 )
704 return next_output_file is not None
705
706 def _has_confirmed_output_file_progress(self, dod: DefinitionOfDone) -> bool:
707 return any(
708 not expect_directory
709 and planned_artifact_target_satisfied(
710 dod,
711 target=target,
712 expect_directory=False,
713 project_root=self.context.project_root,
714 )
715 for target, expect_directory in collect_planned_artifact_targets(
716 dod,
717 project_root=self.context.project_root,
718 max_paths=12,
719 )
720 )
721
722 def _has_confirmed_substantive_output_file_progress(
723 self,
724 dod: DefinitionOfDone,
725 ) -> bool:
726 for raw_path in dod.touched_files:
727 if not str(raw_path).strip():
728 continue
729 path = Path(raw_path).expanduser().resolve(strict=False)
730 if not path.suffix or _is_summary_artifact_path(path) or not path.is_file():
731 continue
732 return True
733 return any(
734 not expect_directory
735 and not _is_summary_artifact_path(target)
736 and planned_artifact_target_satisfied(
737 dod,
738 target=target,
739 expect_directory=False,
740 project_root=self.context.project_root,
741 )
742 for target, expect_directory in collect_planned_artifact_targets(
743 dod,
744 project_root=self.context.project_root,
745 max_paths=12,
746 )
747 )
748
749 def _planned_artifact_progress_lines(self, dod: DefinitionOfDone) -> list[str]:
750 targets = collect_planned_artifact_targets(
751 dod,
752 project_root=self.context.project_root,
753 max_paths=12,
754 )
755 if not targets:
756 return []
757
758 preferred_missing_artifact = self._preferred_resume_missing_artifact(dod)
759 missing_labels = [
760 self._format_artifact_label(target, expect_directory=expect_directory)
761 for target, expect_directory in targets
762 if not planned_artifact_target_satisfied(
763 dod,
764 target=target,
765 expect_directory=expect_directory,
766 project_root=self.context.project_root,
767 )
768 ]
769 if not missing_labels:
770 return []
771
772 if preferred_missing_artifact is not None:
773 preferred_label = self._format_artifact_label(
774 preferred_missing_artifact[0],
775 expect_directory=preferred_missing_artifact[1],
776 )
777 ordered_labels = [preferred_label, *missing_labels]
778 missing_labels = list(dict.fromkeys(ordered_labels))
779
780 lines = [f"Next missing planned artifact: {missing_labels[0]}"]
781 first_missing_target, first_missing_is_directory = next(
782 (
783 (target, expect_directory)
784 for target, expect_directory in targets
785 if not planned_artifact_target_satisfied(
786 dod,
787 target=target,
788 expect_directory=expect_directory,
789 project_root=self.context.project_root,
790 )
791 ),
792 (None, False),
793 )
794 detail_target = (
795 preferred_missing_artifact
796 if preferred_missing_artifact is not None
797 else (
798 (first_missing_target, first_missing_is_directory)
799 if first_missing_target is not None
800 else None
801 )
802 )
803 if detail_target is not None and detail_target[1]:
804 detail_path = detail_target[0]
805 next_output_file, next_output_source = infer_next_output_file(
806 target=detail_path,
807 project_root=self.context.project_root,
808 messages=list(getattr(self.context.session, "messages", []) or []),
809 )
810 if next_output_file is not None:
811 next_output_detail = (
812 "Next declared output under "
813 if next_output_source == "declared"
814 else "Next observed output pattern under "
815 )
816 lines.append(
817 next_output_detail
818 + f"{self._format_artifact_label(detail_path, expect_directory=True)}: "
819 f"{self._format_artifact_label(next_output_file, expect_directory=False)}"
820 )
821 if len(missing_labels) > 1:
822 preview = ", ".join(missing_labels[:3])
823 if len(missing_labels) > 3:
824 preview += ", ..."
825 lines.append("Remaining planned artifacts: " + preview)
826 return lines
827
828 def _compact_planned_artifact_lines(self, dod: DefinitionOfDone) -> list[str]:
829 lines = self._planned_artifact_progress_lines(dod)
830 if self._confirmed_output_file_count(dod) < 2:
831 return lines[:1]
832 return lines[:2]
833
834 def _confirmed_output_file_count(self, dod: DefinitionOfDone) -> int:
835 return sum(
836 1
837 for target, expect_directory in collect_planned_artifact_targets(
838 dod,
839 project_root=self.context.project_root,
840 max_paths=12,
841 )
842 if not expect_directory
843 and planned_artifact_target_satisfied(
844 dod,
845 target=target,
846 expect_directory=False,
847 project_root=self.context.project_root,
848 )
849 )
850
851 def _next_step_resume_lines(
852 self,
853 dod: DefinitionOfDone,
854 *,
855 retry_number: int,
856 ) -> list[str]:
857 completed_artifacts, _ = self._planned_artifact_counts(dod)
858 has_confirmed_output_file_progress = self._has_confirmed_output_file_progress(dod)
859 has_confirmed_substantive_output_file_progress = (
860 self._has_confirmed_substantive_output_file_progress(dod)
861 )
862 next_missing_artifact = self._preferred_resume_missing_artifact(dod)
863 next_pending = self._preferred_resume_pending_item(
864 dod,
865 missing_artifact=next_missing_artifact,
866 )
867 if (
868 completed_artifacts == 0
869 and next_pending
870 and not _todo_is_mutation_step(next_pending)
871 and not _todo_is_consistency_review_step(next_pending)
872 ):
873 lines = [f"Resume with this exact next step: advance `{next_pending}`."]
874 lines.append(
875 "Make the next response one concrete evidence-gathering tool call that "
876 "directly advances that step."
877 )
878 lines.append(
879 "Do not jump ahead to later artifact creation, verification, or a "
880 "completion summary until that discovery step is satisfied."
881 )
882 if retry_number >= 2:
883 lines.append(
884 "Do not restart from scratch or return another working note; emit the "
885 "next evidence-gathering tool call now."
886 )
887 else:
888 lines.append(
889 "Do not restart from scratch unless one specific missing fact blocks "
890 "that discovery step."
891 )
892 return lines
893
894 inferred_pending_target = (
895 self._infer_pending_item_output_target(dod, next_pending)
896 if next_pending
897 else None
898 )
899 if (
900 next_pending
901 and inferred_pending_target is None
902 and next_missing_artifact is not None
903 and not next_missing_artifact[1]
904 and todo_describes_aggregate_mutation(next_pending)
905 and not todo_describes_broad_setup_step(next_pending)
906 ):
907 concrete_target = next_missing_artifact[0]
908 outline_label = infer_output_outline_label(
909 dod,
910 concrete_target,
911 project_root=self.context.project_root,
912 todo_label=next_pending,
913 )
914 lines = [
915 f"Resume with this exact next step: create `{concrete_target.name}`.",
916 f"It is the next concrete output needed to continue `{next_pending}`.",
917 "Prefer one `write(content=...)` call for "
918 f"`{display_runtime_path(concrete_target)}` before more research.",
919 self._mutation_tool_scaffold(
920 concrete_target,
921 tool_name="write",
922 ),
923 ]
924 if not concrete_target.parent.exists():
925 lines.append(
926 "The `write` tool can create that file's parent directories "
927 "automatically, so do the write in one step instead of stopping "
928 "for a separate mkdir."
929 )
930 if outline_label:
931 lines.append(
932 f"Use the existing outline label `{outline_label}` for that file so it matches the current guide structure."
933 )
934 reference_line = self._known_reference_structure_line(
935 concrete_target,
936 require_first_substantive_output=(
937 has_confirmed_output_file_progress
938 and not has_confirmed_substantive_output_file_progress
939 ),
940 )
941 if reference_line:
942 lines.append(reference_line)
943 reference_cues_line = self._known_reference_cues_line(
944 concrete_target,
945 require_first_substantive_output=(
946 has_confirmed_output_file_progress
947 and not has_confirmed_substantive_output_file_progress
948 ),
949 retry_number=retry_number,
950 )
951 if reference_cues_line:
952 lines.append(reference_cues_line)
953 html_scaffold_line = self._known_existing_html_scaffold_line(
954 concrete_target,
955 require_first_substantive_output=(
956 has_confirmed_output_file_progress
957 and not has_confirmed_substantive_output_file_progress
958 ),
959 )
960 if html_scaffold_line:
961 lines.append(html_scaffold_line)
962 html_starter_line = self._known_html_starter_shape_line(
963 concrete_target,
964 require_first_substantive_output=(
965 has_confirmed_output_file_progress
966 and not has_confirmed_substantive_output_file_progress
967 ),
968 retry_number=retry_number,
969 outline_label=outline_label,
970 )
971 if html_starter_line:
972 lines.append(html_starter_line)
973 if _should_encourage_initial_version(
974 target=concrete_target,
975 has_confirmed_output_file_progress=has_confirmed_output_file_progress,
976 has_confirmed_substantive_output_file_progress=has_confirmed_substantive_output_file_progress,
977 ):
978 lines.append(
979 "Do not wait to perfect the entire multi-file output before this write. "
980 "Write a compact but real initial version of this file now, then refine "
981 "or expand it in later edits."
982 )
983 if has_confirmed_substantive_output_file_progress:
984 lines.append(
985 "Follow the same full-payload one-file-at-a-time write pattern that "
986 "already created the confirmed output files."
987 )
988 if retry_number >= 2:
989 lines.append(
990 "Do not return another working note or empty response; emit the "
991 "concrete mutation tool call now."
992 )
993 else:
994 lines.append(
995 "Do not restart discovery unless one specific missing fact blocks "
996 "that file write."
997 )
998 return lines
999 if next_pending and inferred_pending_target is not None:
1000 inferred_is_directory = not bool(inferred_pending_target.suffix)
1001 inferred_label = self._format_artifact_label(
1002 inferred_pending_target,
1003 expect_directory=inferred_is_directory,
1004 )
1005 outline_label = infer_output_outline_label(
1006 dod,
1007 inferred_pending_target,
1008 project_root=self.context.project_root,
1009 todo_label=next_pending,
1010 )
1011 lines = [
1012 "Resume with this exact next step: continue "
1013 f"`{next_pending}` by creating {inferred_label}."
1014 ]
1015 if inferred_is_directory:
1016 lines.append(
1017 "Prefer one concrete directory-creation step for "
1018 f"`{display_runtime_path(inferred_pending_target)}` before more research."
1019 )
1020 lines.append(
1021 self._directory_creation_scaffold(inferred_pending_target)
1022 )
1023 else:
1024 lines.append(
1025 "Prefer one `write(content=...)` call for "
1026 f"`{display_runtime_path(inferred_pending_target)}` before more research."
1027 )
1028 lines.append(
1029 self._mutation_tool_scaffold(
1030 inferred_pending_target,
1031 tool_name="write",
1032 )
1033 )
1034 if outline_label:
1035 lines.append(
1036 f"Use the existing outline label `{outline_label}` for that file so it matches the current guide structure."
1037 )
1038 self._append_concrete_html_write_cues(
1039 lines,
1040 target=inferred_pending_target,
1041 outline_label=outline_label,
1042 retry_number=retry_number,
1043 has_confirmed_output_file_progress=has_confirmed_output_file_progress,
1044 has_confirmed_substantive_output_file_progress=has_confirmed_substantive_output_file_progress,
1045 )
1046 if todo_describes_aggregate_mutation(next_pending):
1047 lines.insert(
1048 1,
1049 f"It is the next concrete output needed to continue `{next_pending}`.",
1050 )
1051 if (
1052 not inferred_is_directory
1053 and _should_encourage_initial_version(
1054 target=inferred_pending_target,
1055 has_confirmed_output_file_progress=has_confirmed_output_file_progress,
1056 has_confirmed_substantive_output_file_progress=has_confirmed_substantive_output_file_progress,
1057 )
1058 ):
1059 lines.append(
1060 "Do not wait to perfect the entire multi-file output before this write. "
1061 "Write a compact but real initial version of this file now, then refine "
1062 "or expand it in later edits."
1063 )
1064 if has_confirmed_substantive_output_file_progress:
1065 lines.append(
1066 "Follow the same full-payload one-file-at-a-time write pattern that "
1067 "already created the confirmed output files."
1068 )
1069 if retry_number >= 2:
1070 lines.append(
1071 "Do not return another working note or empty response; emit the "
1072 "concrete mutation tool call now."
1073 )
1074 else:
1075 lines.append(
1076 "Do not restart discovery unless one specific missing fact blocks "
1077 "that file write."
1078 )
1079 return lines
1080
1081 for target, expect_directory in collect_planned_artifact_targets(
1082 dod,
1083 project_root=self.context.project_root,
1084 max_paths=12,
1085 ):
1086 if planned_artifact_target_satisfied(
1087 dod,
1088 target=target,
1089 expect_directory=expect_directory,
1090 project_root=self.context.project_root,
1091 ):
1092 continue
1093 label = self._format_artifact_label(
1094 target,
1095 expect_directory=expect_directory,
1096 )
1097 if expect_directory:
1098 next_output_file, next_output_source = infer_next_output_file(
1099 target=target,
1100 project_root=self.context.project_root,
1101 messages=list(getattr(self.context.session, "messages", []) or []),
1102 )
1103 if next_output_file is not None:
1104 next_output_label = self._format_artifact_label(
1105 next_output_file,
1106 expect_directory=False,
1107 )
1108 outline_label = infer_output_outline_label(
1109 dod,
1110 next_output_file,
1111 project_root=self.context.project_root,
1112 todo_label=next_pending or "",
1113 )
1114 if next_pending and _todo_is_mutation_step(next_pending):
1115 lines = [
1116 "Resume with this exact next step: continue "
1117 f"`{next_pending}` by creating {next_output_label}."
1118 ]
1119 else:
1120 lines = [
1121 "Resume with this exact next step: create "
1122 f"{next_output_label}."
1123 ]
1124 lines.append(
1125 f"It is the next missing declared output under {label}."
1126 if next_output_source == "declared"
1127 else (
1128 "It mirrors the observed filename pattern from another "
1129 f"{label} directory you already inspected."
1130 )
1131 )
1132 lines.append(
1133 "Prefer one `write` call for "
1134 f"`{display_runtime_path(next_output_file)}` before more research."
1135 )
1136 lines.append(
1137 self._mutation_tool_scaffold(
1138 next_output_file,
1139 tool_name="write",
1140 )
1141 )
1142 if outline_label:
1143 lines.append(
1144 f"Use the existing outline label `{outline_label}` for that file so it matches the current guide structure."
1145 )
1146 self._append_concrete_html_write_cues(
1147 lines,
1148 target=next_output_file,
1149 outline_label=outline_label,
1150 retry_number=retry_number,
1151 has_confirmed_output_file_progress=has_confirmed_output_file_progress,
1152 has_confirmed_substantive_output_file_progress=has_confirmed_substantive_output_file_progress,
1153 )
1154 if _should_encourage_initial_version(
1155 target=next_output_file,
1156 has_confirmed_output_file_progress=has_confirmed_output_file_progress,
1157 has_confirmed_substantive_output_file_progress=has_confirmed_substantive_output_file_progress,
1158 ):
1159 lines.append(
1160 "Do not wait to perfect the entire multi-file output before this write. "
1161 "Write a compact but real initial version of this file now, then refine "
1162 "or expand it in later edits."
1163 )
1164 if not next_output_file.parent.exists():
1165 lines.append(
1166 "The `write` tool can create that file's parent directories "
1167 "automatically, so do the write in one step instead of stopping "
1168 "for a separate mkdir."
1169 )
1170 if retry_number >= 2:
1171 lines.append(
1172 "Do not restart discovery; emit the next mutation tool call now."
1173 )
1174 else:
1175 lines.append(
1176 "Do not restart discovery unless one specific missing fact blocks this step."
1177 )
1178 return lines
1179 if expect_directory and target.is_dir():
1180 if next_pending and _todo_is_mutation_step(next_pending):
1181 lines = [
1182 "Resume with this exact next step: continue "
1183 f"`{next_pending}` by creating the next output file under {label}."
1184 ]
1185 else:
1186 lines = [
1187 "Resume with this exact next step: create the next output file "
1188 f"under {label}."
1189 ]
1190 lines.append(
1191 "Prefer one concrete `write` call for a file inside "
1192 f"`{display_runtime_path(target)}` before more research."
1193 )
1194 else:
1195 lines = [f"Resume with this exact next step: create {label}."]
1196 if expect_directory and not target.is_dir():
1197 lines.append(
1198 "Prefer one concrete directory-creation step for "
1199 f"`{display_runtime_path(target)}` before more research."
1200 )
1201 elif not expect_directory:
1202 lines.append(
1203 "Prefer one `write` call for "
1204 f"`{display_runtime_path(target)}` before any more reference reads."
1205 )
1206 if not target.parent.exists():
1207 lines.append(
1208 "The `write` tool can create that file's parent directories "
1209 "automatically, so do the write in one step instead of stopping "
1210 "for a separate mkdir."
1211 )
1212 if _should_encourage_initial_version(
1213 target=target,
1214 has_confirmed_output_file_progress=has_confirmed_output_file_progress,
1215 has_confirmed_substantive_output_file_progress=has_confirmed_substantive_output_file_progress,
1216 ):
1217 lines.append(
1218 "Do not wait to perfect the entire multi-file output before this write. "
1219 "Write a compact but real initial version of this file now, then refine "
1220 "or expand it in later edits."
1221 )
1222 lines.append(
1223 self._mutation_tool_scaffold(
1224 target,
1225 tool_name="write",
1226 )
1227 )
1228 if completed_artifacts >= 3:
1229 lines.append(
1230 "Follow the same one-file-at-a-time mutation pattern that already "
1231 "created the confirmed planned artifacts."
1232 )
1233 lines.append(
1234 "Your next response should be the concrete mutation tool call itself, "
1235 "not TodoWrite alone, verification, or a completion summary."
1236 )
1237 if retry_number >= 2:
1238 lines.append(
1239 "Do not restart discovery; emit the next mutation tool call now."
1240 )
1241 else:
1242 lines.append(
1243 "Do not restart discovery unless one specific missing fact blocks this step."
1244 )
1245 return lines
1246 return []
1247
1248 def _append_concrete_html_write_cues(
1249 self,
1250 lines: list[str],
1251 *,
1252 target: Path,
1253 outline_label: str | None,
1254 retry_number: int,
1255 has_confirmed_output_file_progress: bool,
1256 has_confirmed_substantive_output_file_progress: bool,
1257 ) -> None:
1258 first_substantive_output = (
1259 has_confirmed_output_file_progress
1260 and not has_confirmed_substantive_output_file_progress
1261 )
1262 reference_line = self._known_reference_structure_line(
1263 target,
1264 require_first_substantive_output=first_substantive_output,
1265 )
1266 if reference_line:
1267 lines.append(reference_line)
1268 reference_cues_line = self._known_reference_cues_line(
1269 target,
1270 require_first_substantive_output=first_substantive_output,
1271 retry_number=retry_number,
1272 )
1273 if reference_cues_line:
1274 lines.append(reference_cues_line)
1275 html_scaffold_line = self._known_existing_html_scaffold_line(
1276 target,
1277 require_first_substantive_output=first_substantive_output,
1278 )
1279 if html_scaffold_line:
1280 lines.append(html_scaffold_line)
1281 sibling_scaffold_line = self._known_existing_html_sibling_scaffold_line(
1282 target,
1283 outline_label=outline_label,
1284 require_existing_substantive_output=has_confirmed_substantive_output_file_progress,
1285 )
1286 if sibling_scaffold_line:
1287 lines.append(sibling_scaffold_line)
1288 html_starter_line = self._known_html_starter_shape_line(
1289 target,
1290 require_first_substantive_output=(
1291 first_substantive_output
1292 or (
1293 has_confirmed_substantive_output_file_progress
1294 and retry_number >= 4
1295 )
1296 ),
1297 retry_number=retry_number,
1298 outline_label=outline_label,
1299 )
1300 if html_starter_line:
1301 lines.append(html_starter_line)
1302
1303 def _infer_pending_item_output_target(
1304 self,
1305 dod: DefinitionOfDone,
1306 item: str,
1307 ) -> Path | None:
1308 return infer_pending_todo_output_target(
1309 dod,
1310 item,
1311 project_root=self.context.project_root,
1312 )
1313
1314 def _preferred_resume_pending_item(
1315 self,
1316 dod: DefinitionOfDone,
1317 *,
1318 missing_artifact: tuple[Path, bool] | None,
1319 ) -> str | None:
1320 preferred = preferred_pending_todo_item(
1321 dod,
1322 project_root=self.context.project_root,
1323 missing_artifact=missing_artifact,
1324 )
1325 if preferred:
1326 return preferred
1327
1328 explicit_file_items = [
1329 item
1330 for item in dod.pending_items
1331 if item not in _SPECIAL_DOD_ITEMS
1332 and _todo_is_mutation_step(item)
1333 and todo_file_candidates(item)
1334 ]
1335 if explicit_file_items:
1336 return explicit_file_items[0]
1337
1338 return next(
1339 (item for item in dod.pending_items if item not in _SPECIAL_DOD_ITEMS),
1340 None,
1341 )
1342
1343 def _preferred_resume_missing_artifact(
1344 self,
1345 dod: DefinitionOfDone,
1346 ) -> tuple[Path, bool] | None:
1347 planned_targets = collect_planned_artifact_targets(
1348 dod,
1349 project_root=self.context.project_root,
1350 max_paths=12,
1351 )
1352 first_missing = next(
1353 (
1354 artifact
1355 for artifact in planned_targets
1356 if not planned_artifact_target_satisfied(
1357 dod,
1358 target=artifact[0],
1359 expect_directory=artifact[1],
1360 project_root=self.context.project_root,
1361 )
1362 ),
1363 None,
1364 )
1365 if first_missing is None:
1366 return None
1367
1368 next_pending = self._preferred_resume_pending_item(
1369 dod,
1370 missing_artifact=first_missing,
1371 )
1372 if next_pending is None:
1373 return self._concretize_directory_missing_artifact(
1374 dod,
1375 first_missing,
1376 planned_targets=planned_targets,
1377 )
1378
1379 inferred_target = self._infer_pending_item_output_target(dod, next_pending)
1380 if inferred_target is None or inferred_target.exists():
1381 return self._concretize_directory_missing_artifact(
1382 dod,
1383 first_missing,
1384 planned_targets=planned_targets,
1385 )
1386
1387 normalized_target = inferred_target.expanduser().resolve(strict=False)
1388 for planned_target, expect_directory in planned_targets:
1389 normalized_planned = planned_target.expanduser().resolve(strict=False)
1390 if expect_directory:
1391 try:
1392 normalized_target.relative_to(normalized_planned)
1393 except ValueError:
1394 continue
1395 return normalized_target, False
1396 if normalized_planned == normalized_target:
1397 return normalized_target, False
1398 return first_missing
1399
1400 def _preferred_retry_target(self, dod: DefinitionOfDone | None) -> str:
1401 if dod is None:
1402 return ""
1403
1404 missing_artifact = self._preferred_resume_missing_artifact(dod)
1405 next_pending = self._preferred_resume_pending_item(
1406 dod,
1407 missing_artifact=missing_artifact,
1408 )
1409 if next_pending:
1410 pending_target = self._infer_pending_item_output_target(dod, next_pending)
1411 if pending_target is not None and not pending_target.exists():
1412 return str(pending_target)
1413
1414 if missing_artifact is None:
1415 return ""
1416
1417 target, expect_directory = missing_artifact
1418 if not expect_directory:
1419 return str(target)
1420
1421 next_output_file, _ = infer_next_output_file(
1422 target=target,
1423 project_root=self.context.project_root,
1424 messages=list(getattr(self.context.session, "messages", []) or []),
1425 )
1426 if next_output_file is not None:
1427 return str(next_output_file)
1428 return str(target)
1429
1430 def _concretize_directory_missing_artifact(
1431 self,
1432 dod: DefinitionOfDone,
1433 missing_artifact: tuple[Path, bool],
1434 *,
1435 planned_targets: list[tuple[Path, bool]],
1436 ) -> tuple[Path, bool]:
1437 target, expect_directory = missing_artifact
1438 if not expect_directory:
1439 return missing_artifact
1440 if any(
1441 not expect_dir
1442 and not planned_artifact_target_satisfied(
1443 dod,
1444 target=planned_target,
1445 expect_directory=expect_dir,
1446 project_root=self.context.project_root,
1447 )
1448 for planned_target, expect_dir in planned_targets
1449 ):
1450 return missing_artifact
1451 next_output_file, _ = infer_next_output_file(
1452 target=target,
1453 project_root=self.context.project_root,
1454 messages=list(getattr(self.context.session, "messages", []) or []),
1455 )
1456 if next_output_file is None or next_output_file.exists():
1457 return missing_artifact
1458 return next_output_file, False
1459
1460 @staticmethod
1461 def _format_artifact_label(path: Path, *, expect_directory: bool) -> str:
1462 label = path.name or str(path)
1463 if expect_directory and not label.endswith("/"):
1464 label += "/"
1465 return f"`{label}`"
1466
1467 def _latest_working_note(self) -> str | None:
1468 messages = list(getattr(self.context.session, "messages", []) or [])
1469 for message in reversed(messages):
1470 content = str(getattr(message, "content", "") or "").strip()
1471 if not content:
1472 continue
1473 for tool_name in _WORKING_NOTE_TOOL_NAMES:
1474 prefix = f"Observation [{tool_name}]: Result:"
1475 if prefix not in content:
1476 continue
1477 note = content.split(prefix, 1)[1].strip()
1478 if not note:
1479 continue
1480 first_line = next(
1481 (line.strip() for line in note.splitlines() if line.strip()),
1482 "",
1483 )
1484 if not first_line:
1485 continue
1486 first_line = re.sub(r"^-\s*\[[^\]]+\]\s*", "", first_line).strip()
1487 return first_line or None
1488 return None
1489
1490 def _known_reference_structure_line(
1491 self,
1492 target: Path,
1493 *,
1494 require_first_substantive_output: bool,
1495 ) -> str | None:
1496 if not require_first_substantive_output:
1497 return None
1498 reference = self._best_known_reference_path(target)
1499 if reference is None:
1500 return None
1501 return (
1502 f"You already read `{display_runtime_path(reference)}`; reuse its overall "
1503 "structure as the starting pattern for this new file, then adapt the content "
1504 "to the current target."
1505 )
1506
1507 def _known_reference_cues_line(
1508 self,
1509 target: Path,
1510 *,
1511 require_first_substantive_output: bool,
1512 retry_number: int,
1513 ) -> str | None:
1514 if not require_first_substantive_output or retry_number < 2:
1515 return None
1516 reference = self._best_known_reference_path(target)
1517 if reference is None:
1518 return None
1519 cues = self._reference_content_cues(reference)
1520 if not cues:
1521 return None
1522 return f"Reference cues from `{display_runtime_path(reference)}`: {cues}"
1523
1524 def _known_existing_html_scaffold_line(
1525 self,
1526 target: Path,
1527 *,
1528 require_first_substantive_output: bool,
1529 ) -> str | None:
1530 if not require_first_substantive_output:
1531 return None
1532 if target.suffix.lower() not in {".html", ".htm"}:
1533 return None
1534 scaffold = self._best_known_root_html_scaffold(target)
1535 if scaffold is None:
1536 return None
1537 return (
1538 f"Reuse the existing `{display_runtime_path(scaffold)}` head/style/container "
1539 "pattern for this chapter so the guide stays visually consistent; only adapt "
1540 "the title, heading, and chapter body content."
1541 )
1542
1543 def _known_existing_html_sibling_scaffold_line(
1544 self,
1545 target: Path,
1546 *,
1547 outline_label: str | None,
1548 require_existing_substantive_output: bool,
1549 ) -> str | None:
1550 if not require_existing_substantive_output:
1551 return None
1552 if target.suffix.lower() not in {".html", ".htm"}:
1553 return None
1554 sibling = self._best_known_existing_html_sibling(target)
1555 if sibling is None:
1556 return None
1557 label = outline_label.strip() if outline_label and outline_label.strip() else target.stem
1558 return (
1559 f"Reuse the overall structure and navigation pattern from "
1560 f"`{display_runtime_path(sibling)}` as the starting pattern for `{label}`; "
1561 "adapt the title, heading, and body content to the new chapter."
1562 )
1563
1564 def _known_html_starter_shape_line(
1565 self,
1566 target: Path,
1567 *,
1568 require_first_substantive_output: bool,
1569 retry_number: int,
1570 outline_label: str | None,
1571 ) -> str | None:
1572 if not require_first_substantive_output or retry_number < 1:
1573 return None
1574 if target.suffix.lower() not in {".html", ".htm"}:
1575 return None
1576 label = outline_label.strip() if outline_label and outline_label.strip() else "this chapter"
1577 return (
1578 f"If you get stuck, start with `<title>{label}</title>`, "
1579 f"`<h1>{label}</h1>`, one introductory paragraph, a couple of `<h2>` "
1580 "sections with short body text, and a back link to `../index.html`."
1581 )
1582
1583 def _known_html_starter_template_line(
1584 self,
1585 target: Path,
1586 *,
1587 require_first_substantive_output: bool,
1588 retry_number: int,
1589 outline_label: str | None,
1590 ) -> str | None:
1591 if not require_first_substantive_output or retry_number < 4:
1592 return None
1593 if target.suffix.lower() not in {".html", ".htm"}:
1594 return None
1595 label = outline_label.strip() if outline_label and outline_label.strip() else "this chapter"
1596 snippet = (
1597 "<!DOCTYPE html> <html lang=\"en\"> <head> <meta charset=\"UTF-8\"> "
1598 f"<title>{label}</title> </head> <body> <div class=\"container\"> "
1599 f"<h1>{label}</h1> <p>...</p> <h2>Overview</h2> <p>...</p> "
1600 "<p><a href=\"../index.html\">← Back to Main Guide Index</a></p> "
1601 "</div> </body> </html>"
1602 )
1603 return (
1604 "If blanking continues, use this minimal HTML starter as the `content` value "
1605 f"and adapt it: `{snippet}`."
1606 )
1607
1608 def _best_known_root_html_scaffold(self, target: Path) -> Path | None:
1609 normalized_target = target.expanduser().resolve(strict=False)
1610 if normalized_target.suffix.lower() not in {".html", ".htm"}:
1611 return None
1612 candidate = normalized_target.parent.parent / "index.html"
1613 if candidate == normalized_target or not candidate.exists():
1614 return None
1615 return candidate
1616
1617 def _best_known_existing_html_sibling(self, target: Path) -> Path | None:
1618 normalized_target = target.expanduser().resolve(strict=False)
1619 if normalized_target.suffix.lower() not in {".html", ".htm"}:
1620 return None
1621 try:
1622 siblings = [
1623 candidate
1624 for candidate in normalized_target.parent.iterdir()
1625 if candidate.is_file()
1626 and candidate != normalized_target
1627 and candidate.suffix.lower() == normalized_target.suffix.lower()
1628 and not _is_summary_artifact_path(candidate)
1629 ]
1630 except OSError:
1631 return None
1632 if not siblings:
1633 return None
1634 siblings.sort(
1635 key=lambda candidate: (
1636 candidate.stat().st_mtime if candidate.exists() else 0.0,
1637 str(candidate),
1638 ),
1639 reverse=True,
1640 )
1641 return siblings[0]
1642
1643 def _best_known_reference_path(self, target: Path) -> Path | None:
1644 normalized_target = target.expanduser().resolve(strict=False)
1645 target_tokens = {
1646 token
1647 for token in re.split(r"[^a-z0-9]+", normalized_target.stem.lower())
1648 if token
1649 }
1650 target_number = _leading_numeric_prefix(normalized_target.stem)
1651 messages = list(getattr(self.context.session, "messages", []) or [])
1652 candidates: list[tuple[int, str, Path]] = []
1653
1654 for message in messages:
1655 for tool_call in getattr(message, "tool_calls", []) or []:
1656 if getattr(tool_call, "name", "") != "read":
1657 continue
1658 raw_path = str(tool_call.arguments.get("file_path") or "").strip()
1659 if not raw_path:
1660 continue
1661 candidate = Path(raw_path).expanduser().resolve(strict=False)
1662 if candidate == normalized_target or not candidate.suffix:
1663 continue
1664 if candidate.suffix.lower() != normalized_target.suffix.lower():
1665 continue
1666 score = 0
1667 if candidate.name.lower() == normalized_target.name.lower():
1668 score += 8
1669 if candidate.parent.name.lower() == normalized_target.parent.name.lower():
1670 score += 2
1671 if target_number and _leading_numeric_prefix(candidate.stem) == target_number:
1672 score += 3
1673 candidate_tokens = {
1674 token
1675 for token in re.split(r"[^a-z0-9]+", candidate.stem.lower())
1676 if token
1677 }
1678 score += min(3, len(target_tokens & candidate_tokens))
1679 if score <= 0:
1680 continue
1681 candidates.append((score, str(candidate), candidate))
1682
1683 if not candidates:
1684 return None
1685 candidates.sort(key=lambda item: (item[0], item[1]), reverse=True)
1686 return candidates[0][2]
1687
1688 def _reference_content_cues(self, reference: Path) -> str | None:
1689 try:
1690 content = reference.read_text()
1691 except OSError:
1692 return None
1693
1694 suffix = reference.suffix.lower()
1695 cues: list[str] = []
1696 if suffix in {".html", ".htm"}:
1697 for raw_line in content.splitlines():
1698 stripped = " ".join(raw_line.strip().split())
1699 if not stripped:
1700 continue
1701 lowered = stripped.lower()
1702 if not any(
1703 token in lowered
1704 for token in ("<title", "<h1", "<h2", "<p", "<li", "<a ")
1705 ):
1706 continue
1707 cues.append(_truncate_reference_cue(stripped))
1708 if len(cues) >= 3:
1709 break
1710 if not cues:
1711 for raw_line in content.splitlines():
1712 stripped = " ".join(raw_line.strip().split())
1713 if not stripped:
1714 continue
1715 if sum(ch.isalpha() for ch in stripped) < 6:
1716 continue
1717 cues.append(_truncate_reference_cue(stripped))
1718 if len(cues) >= 3:
1719 break
1720 if not cues:
1721 return None
1722 return " | ".join(cues)
1723
1724 @staticmethod
1725 def _mutation_tool_scaffold(path: Path, *, tool_name: str) -> str:
1726 normalized_path = json.dumps(display_runtime_path(path))
1727 if tool_name == "edit":
1728 signature = (
1729 f"edit(file_path={normalized_path}, old_string=\"...\", "
1730 'new_string="...")'
1731 )
1732 elif tool_name == "patch":
1733 signature = f"patch(file_path={normalized_path}, patch=\"...\")"
1734 else:
1735 signature = f"write(file_path={normalized_path}, content=\"...\")"
1736 return f"Emit this tool shape now: `{signature}`."
1737
1738 @staticmethod
1739 def _directory_creation_scaffold(path: Path) -> str:
1740 command = f"mkdir -p {shlex.quote(display_runtime_path(path))}"
1741 return f"Emit this tool shape now: `bash(command={json.dumps(command)})`."
1742
1743
1744 def _todo_is_mutation_step(label: str) -> bool:
1745 lowered = label.lower()
1746 return any(token in lowered for token in _MUTATION_TODO_HINTS)
1747
1748
1749 def _todo_is_consistency_review_step(label: str) -> bool:
1750 lowered = label.lower()
1751 return any(token in lowered for token in _CONSISTENCY_REVIEW_HINTS)
1752
1753
1754 def _is_summary_artifact_path(path: Path) -> bool:
1755 return path.name.lower() in _SUMMARY_ARTIFACT_NAMES
1756
1757
1758 def _should_encourage_initial_version(
1759 *,
1760 target: Path,
1761 has_confirmed_output_file_progress: bool,
1762 has_confirmed_substantive_output_file_progress: bool,
1763 ) -> bool:
1764 if not has_confirmed_output_file_progress:
1765 return True
1766 if _is_summary_artifact_path(target):
1767 return False
1768 return not has_confirmed_substantive_output_file_progress
1769
1770
1771 def _leading_numeric_prefix(stem: str) -> str:
1772 match = re.match(r"^(\d+)", stem)
1773 return match.group(1) if match else ""
1774
1775
1776 def _truncate_reference_cue(value: str, *, max_chars: int = 96) -> str:
1777 if len(value) <= max_chars:
1778 return value
1779 return value[: max_chars - 3].rstrip() + "..."