Python · 57750 bytes Raw Blame History
1 """Tool lifecycle hooks for Loader runtime execution."""
2
3 from __future__ import annotations
4
5 import shlex
6 from collections.abc import Iterable
7 from dataclasses import dataclass, field
8 from enum import StrEnum
9 from pathlib import Path
10 from typing import Any, Protocol
11
12 from ..llm.base import ToolCall
13 from ..tools.base import Tool, ToolRegistry
14 from ..tools.base import ToolResult as RegistryToolResult
15 from .dod import (
16 DefinitionOfDoneStore,
17 collect_missing_declared_html_output_files,
18 collect_planned_artifact_targets,
19 planned_artifact_target_satisfied,
20 )
21 from .memory import MemoryStore
22 from .path_display import display_runtime_path
23 from .permissions import PermissionOverride, PermissionPolicy
24 from .repair_focus import (
25 extract_active_repair_context,
26 normalize_repair_path,
27 path_matches_allowed_paths,
28 path_within_allowed_roots,
29 )
30 from .rollback import RollbackPlan, create_rollback_plan_for_action, is_destructive_tool
31 from .safeguard_services import (
32 ActionTracker,
33 PreActionValidator,
34 extract_shell_text_rewrite_target,
35 )
36 from .workflow import infer_output_outline_label
37
38
39 class HookEvent(StrEnum):
40 """Lifecycle hook events for one tool call."""
41
42 PRE_TOOL_USE = "pre_tool_use"
43 POST_TOOL_USE = "post_tool_use"
44 POST_TOOL_USE_FAILURE = "post_tool_use_failure"
45
46
47 class HookDecision(StrEnum):
48 """Terminal and non-terminal hook decisions."""
49
50 CONTINUE = "continue"
51 DENY = "deny"
52 CANCEL = "cancel"
53 FAIL = "fail"
54
55
56 @dataclass(slots=True)
57 class HookContext:
58 """Context passed to hook implementations."""
59
60 tool_call: ToolCall
61 tool: Tool | None
62 registry: ToolRegistry
63 permission_policy: PermissionPolicy
64 source: str
65 skip_duplicate_check: bool = False
66 record_action: bool = True
67 result: RegistryToolResult | None = None
68 output: str | None = None
69 is_error: bool = False
70
71
72 @dataclass(slots=True)
73 class HookResult:
74 """Result from one hook invocation."""
75
76 decision: HookDecision = HookDecision.CONTINUE
77 message: str | None = None
78 injected_messages: list[str] = field(default_factory=list)
79 permission_override: PermissionOverride | None = None
80 permission_reason: str | None = None
81 updated_arguments: dict[str, Any] | None = None
82 output_override: str | None = None
83 metadata: dict[str, Any] = field(default_factory=dict)
84 terminal_state: str | None = None
85
86
87 @dataclass(slots=True)
88 class HookRunSummary:
89 """Aggregated result across all hooks for one lifecycle stage."""
90
91 tool_call: ToolCall
92 decision: HookDecision = HookDecision.CONTINUE
93 message: str | None = None
94 injected_messages: list[str] = field(default_factory=list)
95 permission_override: PermissionOverride | None = None
96 permission_reason: str | None = None
97 output_override: str | None = None
98 metadata: dict[str, Any] = field(default_factory=dict)
99 terminal_state: str | None = None
100
101
102 class ToolHook(Protocol):
103 """Protocol for async tool lifecycle hooks."""
104
105 async def pre_tool_use(self, context: HookContext) -> HookResult: ...
106
107 async def post_tool_use(self, context: HookContext) -> HookResult: ...
108
109 async def post_tool_use_failure(self, context: HookContext) -> HookResult: ...
110
111
112 class BaseToolHook:
113 """Default no-op implementation for tool hooks."""
114
115 async def pre_tool_use(self, context: HookContext) -> HookResult:
116 return HookResult()
117
118 async def post_tool_use(self, context: HookContext) -> HookResult:
119 return HookResult()
120
121 async def post_tool_use_failure(self, context: HookContext) -> HookResult:
122 return HookResult()
123
124
125 class FilePathAliasHook(BaseToolHook):
126 """Normalize common file-path aliases before validation and execution."""
127
128 _FILE_TOOLS = frozenset({"read", "write", "edit", "patch"})
129 _ALIASES = ("filepath", "filePath", "file", "filename", "path")
130
131 async def pre_tool_use(self, context: HookContext) -> HookResult:
132 if context.tool_call.name not in self._FILE_TOOLS:
133 return HookResult()
134
135 arguments = context.tool_call.arguments
136 file_path = str(arguments.get("file_path", "")).strip()
137 if file_path:
138 return HookResult()
139
140 for alias in self._ALIASES:
141 candidate = arguments.get(alias)
142 if not str(candidate or "").strip():
143 continue
144
145 updated_arguments = dict(arguments)
146 updated_arguments["file_path"] = candidate
147 for cleanup_key in self._ALIASES:
148 updated_arguments.pop(cleanup_key, None)
149 return HookResult(updated_arguments=updated_arguments)
150
151 return HookResult()
152
153
154 class SearchPathAliasHook(BaseToolHook):
155 """Normalize common search-path aliases before validation and execution."""
156
157 _SEARCH_TOOLS = frozenset({"glob", "grep"})
158 _ALIASES = ("directory", "dir", "folder")
159
160 async def pre_tool_use(self, context: HookContext) -> HookResult:
161 if context.tool_call.name not in self._SEARCH_TOOLS:
162 return HookResult()
163
164 arguments = context.tool_call.arguments
165 path = str(arguments.get("path", "")).strip()
166 if path:
167 return HookResult()
168
169 for alias in self._ALIASES:
170 candidate = arguments.get(alias)
171 if not str(candidate or "").strip():
172 continue
173
174 updated_arguments = dict(arguments)
175 updated_arguments["path"] = candidate
176 for cleanup_key in self._ALIASES:
177 updated_arguments.pop(cleanup_key, None)
178 return HookResult(updated_arguments=updated_arguments)
179
180 if context.tool_call.name == "glob":
181 normalized_arguments = self._normalize_glob_pattern_path(arguments)
182 if normalized_arguments is not None:
183 return HookResult(updated_arguments=normalized_arguments)
184
185 return HookResult()
186
187 def _normalize_glob_pattern_path(
188 self,
189 arguments: dict[str, Any],
190 ) -> dict[str, Any] | None:
191 pattern = str(arguments.get("pattern", "")).strip()
192 if not pattern:
193 return None
194
195 parent = ""
196 basename = ""
197 if pattern.startswith(("/", "~", "./", "../")):
198 pattern_path = Path(pattern)
199 parent = str(pattern_path.parent).strip()
200 basename = pattern_path.name.strip()
201 else:
202 implicit = self._split_implicit_glob_parent(pattern)
203 if implicit is None:
204 return None
205 parent, basename = implicit
206 if not parent or not basename:
207 return None
208 if any(token in parent for token in ("*", "?", "[")):
209 return None
210
211 updated_arguments = dict(arguments)
212 updated_arguments["path"] = parent
213 updated_arguments["pattern"] = basename
214 return updated_arguments
215
216 def _split_implicit_glob_parent(self, pattern: str) -> tuple[str, str] | None:
217 if "/" not in pattern:
218 return None
219
220 parts = [segment for segment in pattern.split("/") if segment]
221 while parts and self._is_wildcard_segment(parts[0]):
222 parts.pop(0)
223 if len(parts) < 2:
224 return None
225
226 parent_parts = parts[:-1]
227 basename = parts[-1].strip()
228 if not basename or not parent_parts:
229 return None
230 if any(self._segment_contains_glob(segment) for segment in parent_parts):
231 return None
232 return "/".join(parent_parts), basename
233
234 def _is_wildcard_segment(self, segment: str) -> bool:
235 return bool(segment) and all(char in "*?[]" for char in segment)
236
237 def _segment_contains_glob(self, segment: str) -> bool:
238 return any(token in segment for token in ("*", "?", "["))
239
240
241 class RelativePathContextHook(BaseToolHook):
242 """Recover relative file/search paths against recently-used external directories."""
243
244 _FILE_TOOLS = frozenset({"read", "write", "edit", "patch"})
245 _SEARCH_TOOLS = frozenset({"glob", "grep"})
246
247 def __init__(self, action_tracker: ActionTracker, workspace_root: Path) -> None:
248 self.action_tracker = action_tracker
249 self.workspace_root = workspace_root.expanduser().resolve()
250
251 async def pre_tool_use(self, context: HookContext) -> HookResult:
252 argument_key = self._argument_key(context.tool_call.name)
253 if argument_key is None:
254 return HookResult()
255
256 arguments = context.tool_call.arguments
257 raw_path = str(arguments.get(argument_key, "")).strip()
258 if not raw_path:
259 return HookResult()
260
261 require_existing = context.tool_call.name in {"read", "glob", "grep", "edit", "patch"}
262 resolved: str | None = None
263 injected_messages: list[str] = []
264 if raw_path.startswith("/"):
265 resolved = self._resolve_workspace_mirror_path(
266 raw_path,
267 require_existing=require_existing,
268 )
269 if resolved is not None:
270 injected_messages.append(
271 self._workspace_mirror_correction_message(raw_path, resolved)
272 )
273 elif not raw_path.startswith("~"):
274 resolved = self._resolve_recent_context_path(
275 raw_path,
276 require_existing=require_existing,
277 prefer_external_ancestor=context.tool_call.name in self._SEARCH_TOOLS,
278 )
279 if resolved is None:
280 return HookResult()
281
282 updated_arguments = dict(arguments)
283 updated_arguments[argument_key] = resolved
284 return HookResult(
285 updated_arguments=updated_arguments,
286 injected_messages=injected_messages,
287 )
288
289 def _argument_key(self, tool_name: str) -> str | None:
290 if tool_name in self._FILE_TOOLS:
291 return "file_path"
292 if tool_name in self._SEARCH_TOOLS:
293 return "path"
294 return None
295
296 def _resolve_recent_context_path(
297 self,
298 raw_path: str,
299 *,
300 require_existing: bool,
301 prefer_external_ancestor: bool,
302 ) -> str | None:
303 workspace_candidate = (self.workspace_root / raw_path).expanduser()
304 if workspace_candidate.exists():
305 if prefer_external_ancestor:
306 anchored = self._resolve_recent_context_ancestor(
307 raw_path,
308 require_existing=require_existing,
309 )
310 if anchored is not None:
311 return anchored
312 return None
313
314 for base_dir in self.action_tracker.recent_path_contexts():
315 candidate = (Path(base_dir) / raw_path).expanduser()
316 if require_existing:
317 if candidate.exists():
318 return str(candidate)
319 continue
320 if candidate.exists() or candidate.parent.exists():
321 return str(candidate)
322 if prefer_external_ancestor:
323 return self._resolve_recent_context_ancestor(
324 raw_path,
325 require_existing=require_existing,
326 )
327 return None
328
329 def _resolve_recent_context_ancestor(
330 self,
331 raw_path: str,
332 *,
333 require_existing: bool,
334 ) -> str | None:
335 raw_parts = tuple(part for part in Path(raw_path).parts if part not in {"."})
336 if not raw_parts:
337 return None
338
339 for base_dir in self.action_tracker.recent_path_contexts():
340 base_path = Path(base_dir).expanduser()
341 try:
342 resolved_base = base_path.resolve(strict=False)
343 except Exception:
344 resolved_base = base_path
345 if resolved_base == self.workspace_root:
346 continue
347 try:
348 resolved_base.relative_to(self.workspace_root)
349 continue
350 except ValueError:
351 pass
352
353 matched = self._match_recent_context_ancestor(
354 resolved_base,
355 raw_parts,
356 )
357 if matched is None:
358 continue
359 if require_existing and not matched.exists():
360 continue
361 return str(matched)
362 return None
363
364 def _match_recent_context_ancestor(
365 self,
366 base_path: Path,
367 raw_parts: tuple[str, ...],
368 ) -> Path | None:
369 candidates = [base_path, *base_path.parents]
370 for candidate in candidates:
371 if len(candidate.parts) < len(raw_parts):
372 continue
373 if candidate.parts[-len(raw_parts) :] == raw_parts:
374 return candidate
375 return None
376
377 def _resolve_workspace_mirror_path(
378 self,
379 raw_path: str,
380 *,
381 require_existing: bool,
382 ) -> str | None:
383 candidate = Path(raw_path).expanduser()
384 try:
385 resolved = candidate.resolve(strict=False)
386 except Exception:
387 resolved = candidate
388
389 try:
390 relative = resolved.relative_to(self.workspace_root)
391 except ValueError:
392 return None
393 if not relative.parts:
394 return None
395
396 anchor = relative.parts[0]
397 for base_dir in self.action_tracker.recent_path_contexts():
398 base_path = Path(base_dir).expanduser()
399 try:
400 resolved_base = base_path.resolve(strict=False)
401 except Exception:
402 resolved_base = base_path
403 if resolved_base == self.workspace_root:
404 continue
405 try:
406 resolved_base.relative_to(self.workspace_root)
407 continue
408 except ValueError:
409 pass
410
411 try:
412 anchor_index = resolved_base.parts.index(anchor)
413 except ValueError:
414 continue
415 if anchor_index <= 0:
416 continue
417
418 anchor_root = Path(*resolved_base.parts[: anchor_index + 1])
419 remapped = Path(*resolved_base.parts[:anchor_index]).joinpath(*relative.parts)
420 if remapped == resolved:
421 continue
422 if require_existing:
423 if remapped.exists():
424 return str(remapped)
425 continue
426 if remapped.exists() or remapped.parent.exists() or anchor_root.exists():
427 return str(remapped)
428 return None
429
430 def _workspace_mirror_correction_message(self, raw_path: str, resolved_path: str) -> str:
431 raw_name = Path(str(raw_path)).name or str(raw_path)
432 resolved_root = self._describe_anchor_root(resolved_path)
433 return (
434 "[Path anchor correction] A repo-local mirror path was remapped to the established "
435 f"output root under `{resolved_root}`. Keep future file/search tool calls on that "
436 f"external root and use `{raw_name}` there instead of re-anchoring work to the "
437 "workspace checkout."
438 )
439
440 def _describe_anchor_root(self, path_value: str) -> str:
441 resolved = Path(path_value).expanduser()
442 try:
443 candidate = resolved.resolve(strict=False)
444 except Exception:
445 candidate = resolved
446
447 parts = candidate.parts
448 if "Loader" in parts:
449 loader_index = parts.index("Loader")
450 return str(Path(*parts[: loader_index + 1]))
451 return str(candidate.parent)
452
453
454 _OBSERVATION_TOOLS = frozenset({"read", "glob", "grep", "bash"})
455 _MUTATION_TOOLS = frozenset({"write", "edit", "patch", "bash"})
456 _READ_ONLY_BASH_PREFIXES = frozenset(
457 {"ls", "pwd", "find", "stat", "cat", "head", "tail", "rg", "grep"}
458 )
459 _MUTATING_BASH_FRAGMENTS = (
460 " >",
461 ">>",
462 "| tee",
463 "touch ",
464 "mkdir ",
465 "rm ",
466 "mv ",
467 "cp ",
468 "sed -i",
469 "perl -pi",
470 "git add",
471 "git commit",
472 "git apply",
473 )
474
475
476 def _extract_observation_paths(tool_call: ToolCall) -> list[str]:
477 arguments = tool_call.arguments
478 if tool_call.name == "read":
479 file_path = str(arguments.get("file_path", "")).strip()
480 return [file_path] if file_path else []
481
482 if tool_call.name in {"glob", "grep"}:
483 candidates: list[str] = []
484 search_path = str(arguments.get("path", "")).strip()
485 if search_path:
486 anchored_path = _derive_search_anchor(search_path, str(arguments.get("pattern", "")).strip())
487 candidates.append(anchored_path or search_path)
488 pattern = str(arguments.get("pattern", "")).strip()
489 if not search_path and pattern.startswith(("/", "~")):
490 candidates.append(str(Path(pattern).expanduser().parent))
491 return candidates
492
493 command = str(arguments.get("command", "")).strip()
494 if not _is_read_only_bash(command):
495 return []
496 return _extract_bash_paths(command)
497
498
499 def _is_read_only_bash(command: str) -> bool:
500 normalized = " ".join(command.split())
501 if not normalized:
502 return False
503 if extract_shell_text_rewrite_target(normalized) is not None:
504 return False
505 if any(fragment in normalized for fragment in _MUTATING_BASH_FRAGMENTS):
506 return False
507 argv, _ = _extract_bash_command_context(normalized)
508 if not argv:
509 return False
510 return argv[0] in _READ_ONLY_BASH_PREFIXES
511
512
513 def _extract_bash_paths(command: str) -> list[str]:
514 argv, base_dir = _extract_bash_command_context(command)
515 if not argv:
516 return []
517 observed: list[str] = []
518 for token in argv[1:]:
519 candidate = token.strip()
520 if not candidate or candidate.startswith("-"):
521 continue
522 if candidate.startswith(("/", "~")):
523 observed.append(candidate)
524 continue
525 if base_dir and (
526 "/" in candidate
527 or candidate.startswith(("./", "../"))
528 or candidate in {".", ".."}
529 ):
530 observed.append(str(base_dir / candidate))
531 if not observed and base_dir is not None:
532 observed.append(str(base_dir))
533 return observed
534
535
536 def _extract_bash_command_context(command: str) -> tuple[list[str], Path | None]:
537 try:
538 argv = shlex.split(command)
539 except ValueError:
540 return [], None
541 if not argv:
542 return [], None
543 base_dir = _extract_bash_base_dir(argv)
544 if base_dir is None:
545 return argv, None
546 if len(argv) >= 4 and argv[2] in {"&&", ";"}:
547 return argv[3:], base_dir
548 return argv, base_dir
549
550
551 def _extract_bash_base_dir(argv: list[str]) -> Path | None:
552 if len(argv) < 2 or argv[0] != "cd":
553 return None
554 candidate = argv[1].strip()
555 if not candidate:
556 return None
557 if not candidate.startswith(("/", "~")):
558 return None
559 try:
560 return Path(candidate).expanduser()
561 except (OSError, RuntimeError, ValueError):
562 return None
563
564
565 def _derive_search_anchor(search_path: str, pattern: str) -> str:
566 normalized_search_path = str(search_path or "").strip()
567 normalized_pattern = str(pattern or "").strip()
568 if not normalized_search_path or not normalized_pattern:
569 return normalized_search_path
570
571 literal_segments: list[str] = []
572 for segment in normalized_pattern.split("/"):
573 cleaned = segment.strip()
574 if not cleaned or cleaned == ".":
575 continue
576 if any(token in cleaned for token in ("*", "?", "[")):
577 continue
578 literal_segments.append(cleaned)
579
580 if not literal_segments:
581 return normalized_search_path
582
583 if "." in literal_segments[-1]:
584 literal_segments = literal_segments[:-1]
585 if not literal_segments:
586 return normalized_search_path
587
588 try:
589 anchored = Path(normalized_search_path).expanduser().joinpath(*literal_segments)
590 except (OSError, RuntimeError, ValueError):
591 return normalized_search_path
592 return str(anchored)
593
594
595 def _extract_mutation_paths(tool_call: ToolCall) -> list[str]:
596 arguments = tool_call.arguments
597 if tool_call.name in {"write", "edit", "patch"}:
598 file_path = str(arguments.get("file_path", "")).strip()
599 return [file_path] if file_path else []
600
601 if tool_call.name != "bash":
602 return []
603
604 command = str(arguments.get("command", "")).strip()
605 if not command or not _is_mutating_bash(command):
606 return []
607 target = extract_shell_text_rewrite_target(command)
608 return [target] if target else []
609
610
611 def _is_mutating_bash(command: str) -> bool:
612 normalized = " ".join(command.split())
613 if not normalized:
614 return False
615 if extract_shell_text_rewrite_target(normalized) is not None:
616 return True
617 if any(fragment in normalized for fragment in _MUTATING_BASH_FRAGMENTS):
618 return True
619 try:
620 argv = shlex.split(normalized)
621 except ValueError:
622 return False
623 if not argv:
624 return False
625 return argv[0] in {"touch", "mkdir", "rm", "mv", "cp", "chmod", "chown"}
626
627
628 def _tool_call_is_effective_mutation(tool_call: ToolCall) -> bool:
629 if tool_call.name != "bash":
630 return tool_call.name in _MUTATION_TOOLS
631 command = str(tool_call.arguments.get("command", "")).strip()
632 return _is_mutating_bash(command)
633
634
635 def _repair_declared_output_paths(repair: Any, *, project_root: Path) -> set[str]:
636 declared_outputs: set[str] = set()
637 for root in getattr(repair, "allowed_roots", ()) or ():
638 normalized_root = normalize_repair_path(root)
639 if not normalized_root:
640 continue
641 for path in collect_missing_declared_html_output_files(
642 target=Path(normalized_root),
643 project_root=project_root,
644 ):
645 declared_outputs.add(normalize_repair_path(str(path)))
646 return declared_outputs
647
648
649 def _repair_uses_artifact_set_as_source_of_truth(repair: Any) -> bool:
650 return any(
651 "source of truth" in str(line).lower()
652 for line in getattr(repair, "repair_lines", ())
653 )
654
655
656 def _next_missing_repair_target(repair: Any) -> str:
657 for raw_path in getattr(repair, "allowed_paths", ()) or ():
658 path_text = str(raw_path or "").strip()
659 if not path_text:
660 continue
661 try:
662 if not Path(path_text).exists():
663 return path_text
664 except (OSError, RuntimeError, ValueError):
665 continue
666 return ""
667
668
669 def _planned_artifact_targets_satisfied(dod: Any, *, project_root: Path) -> bool:
670 targets = collect_planned_artifact_targets(
671 dod,
672 project_root=project_root,
673 )
674 if not targets:
675 return False
676 return all(
677 planned_artifact_target_satisfied(
678 dod,
679 target=target,
680 expect_directory=expect_directory,
681 project_root=project_root,
682 )
683 for target, expect_directory in targets
684 )
685
686
687 def _planned_artifact_targets_declare_missing_html_outputs(
688 dod: Any,
689 *,
690 project_root: Path,
691 ) -> bool:
692 for target, _expect_directory in collect_planned_artifact_targets(
693 dod,
694 project_root=project_root,
695 ):
696 if collect_missing_declared_html_output_files(
697 target=target,
698 project_root=project_root,
699 ):
700 return True
701 return False
702
703
704 class ActiveRepairScopeHook(BaseToolHook):
705 """Keep fix-mode observations anchored to the active artifact set."""
706
707 _MAX_SOURCE_OF_TRUTH_OBSERVATIONS = 4
708
709 def __init__(
710 self,
711 *,
712 dod_store: DefinitionOfDoneStore,
713 project_root: Path,
714 session: Any,
715 ) -> None:
716 self.dod_store = dod_store
717 self.project_root = project_root
718 self.session = session
719 self._source_of_truth_scope_key: tuple[str, ...] | None = None
720 self._source_of_truth_observation_count = 0
721
722 async def pre_tool_use(self, context: HookContext) -> HookResult:
723 if context.tool_call.name not in _OBSERVATION_TOOLS:
724 return HookResult()
725 if context.source == "verification":
726 return HookResult()
727
728 repair = self._active_repair_context()
729 if repair is None:
730 return HookResult()
731
732 observed_paths = _extract_observation_paths(context.tool_call)
733 if not observed_paths:
734 return HookResult()
735 declared_output_paths = _repair_declared_output_paths(
736 repair,
737 project_root=self.project_root,
738 )
739 in_allowed_roots = bool(repair.allowed_roots) and all(
740 path_within_allowed_roots(path, repair.allowed_roots) for path in observed_paths
741 )
742 source_of_truth_scope = (
743 _repair_uses_artifact_set_as_source_of_truth(repair) and in_allowed_roots
744 )
745 if source_of_truth_scope:
746 self._sync_source_of_truth_scope(repair.allowed_roots)
747 if (
748 self._source_of_truth_observation_count
749 >= self._MAX_SOURCE_OF_TRUTH_OBSERVATIONS
750 ):
751 next_missing_target = _next_missing_repair_target(repair)
752 missing_target_suffix = (
753 f" or create `{next_missing_target}`"
754 if next_missing_target
755 else " or create the next missing repair target"
756 )
757 return HookResult(
758 decision=HookDecision.DENY,
759 message=(
760 "[Blocked - repair audit loop: the active repair artifact set has "
761 "already been inspected several times without a concrete mutation.] "
762 f"Suggestion: make one concrete edit, patch, or write to "
763 f"`{repair.artifact_path}`{missing_target_suffix} "
764 "instead of more rereads."
765 ),
766 terminal_state="blocked",
767 )
768 if repair.allowed_paths:
769 if all(path_matches_allowed_paths(path, repair.allowed_paths) for path in observed_paths):
770 return HookResult()
771 if declared_output_paths and all(
772 normalize_repair_path(path) in declared_output_paths
773 for path in observed_paths
774 ):
775 return HookResult()
776 if source_of_truth_scope:
777 return HookResult()
778
779 allowed_preview = ", ".join(f"`{path}`" for path in repair.allowed_paths[:3])
780 if len(repair.allowed_paths) > 3:
781 allowed_preview += ", ..."
782 declared_preview = ", ".join(
783 f"`{Path(path).name or path}`"
784 for path in sorted(declared_output_paths)[:3]
785 )
786 if len(declared_output_paths) > 3:
787 declared_preview += ", ..."
788 suggestion_suffix = (
789 f" Declared sibling outputs currently allowed inside this repair set include: {declared_preview}."
790 if declared_preview
791 else ""
792 )
793 return HookResult(
794 decision=HookDecision.DENY,
795 message=(
796 "[Blocked - active repair scope: verification already identified "
797 f"`{repair.artifact_path}` as the current repair target. "
798 "Stay on the concrete repair files until that repair passes.] "
799 "Suggestion: inspect or edit only "
800 f"{allowed_preview} and do not reopen unrelated reference materials."
801 f"{suggestion_suffix}"
802 ),
803 terminal_state="blocked",
804 )
805
806 if not repair.allowed_roots:
807 return HookResult()
808 if all(path_within_allowed_roots(path, repair.allowed_roots) for path in observed_paths):
809 return HookResult()
810
811 roots_preview = ", ".join(f"`{root}`" for root in repair.allowed_roots[:2])
812 if len(repair.allowed_roots) > 2:
813 roots_preview += ", ..."
814 return HookResult(
815 decision=HookDecision.DENY,
816 message=(
817 "[Blocked - active repair scope: verification already identified "
818 f"`{repair.artifact_path}` as the current repair target. "
819 "Stay inside the current artifact set until that repair passes.] "
820 "Suggestion: inspect or edit files under "
821 f"{roots_preview} and do not reopen unrelated reference materials."
822 ),
823 terminal_state="blocked",
824 )
825
826 async def post_tool_use(self, context: HookContext) -> HookResult:
827 if context.source == "verification":
828 return HookResult()
829 if _tool_call_is_effective_mutation(context.tool_call):
830 self._reset_source_of_truth_scope()
831 return HookResult()
832 if context.tool_call.name not in _OBSERVATION_TOOLS:
833 return HookResult()
834
835 repair = self._active_repair_context()
836 if repair is None or not _repair_uses_artifact_set_as_source_of_truth(repair):
837 self._reset_source_of_truth_scope()
838 return HookResult()
839
840 observed_paths = _extract_observation_paths(context.tool_call)
841 if not observed_paths:
842 return HookResult()
843 if not repair.allowed_roots or not all(
844 path_within_allowed_roots(path, repair.allowed_roots) for path in observed_paths
845 ):
846 return HookResult()
847
848 self._sync_source_of_truth_scope(repair.allowed_roots)
849 self._source_of_truth_observation_count += 1
850 return HookResult()
851
852 def _active_repair_context(self):
853 dod_path = getattr(self.session, "active_dod_path", None)
854 if not dod_path:
855 return None
856 path = Path(str(dod_path))
857 if not path.exists():
858 return None
859 dod = self.dod_store.load(path)
860 if dod.status == "done":
861 return None
862 return extract_active_repair_context(getattr(self.session, "messages", []))
863
864 def _sync_source_of_truth_scope(self, allowed_roots: tuple[str, ...]) -> None:
865 normalized = tuple(sorted(normalize_repair_path(root) for root in allowed_roots))
866 if self._source_of_truth_scope_key == normalized:
867 return
868 self._source_of_truth_scope_key = normalized
869 self._source_of_truth_observation_count = 0
870
871 def _reset_source_of_truth_scope(self) -> None:
872 self._source_of_truth_scope_key = None
873 self._source_of_truth_observation_count = 0
874
875
876 class ActiveRepairMutationScopeHook(BaseToolHook):
877 """Keep repair-phase mutations pinned to the concrete repair targets."""
878
879 def __init__(
880 self,
881 *,
882 dod_store: DefinitionOfDoneStore,
883 project_root: Path,
884 session: Any,
885 ) -> None:
886 self.dod_store = dod_store
887 self.project_root = project_root
888 self.session = session
889
890 async def pre_tool_use(self, context: HookContext) -> HookResult:
891 if context.tool_call.name not in _MUTATION_TOOLS:
892 return HookResult()
893 if context.source == "verification":
894 return HookResult()
895
896 repair = self._active_repair_context()
897 if repair is None or not repair.allowed_paths:
898 return HookResult()
899 allowed_paths = {normalize_repair_path(path) for path in repair.allowed_paths}
900
901 mutation_paths = _extract_mutation_paths(context.tool_call)
902 if not mutation_paths:
903 if context.tool_call.name == "bash" and _is_mutating_bash(
904 str(context.tool_call.arguments.get("command", "")).strip()
905 ):
906 return HookResult(
907 decision=HookDecision.DENY,
908 message=(
909 "[Blocked - active repair mutation scope: the current repair already "
910 f"identifies `{repair.artifact_path}` as the concrete target.] "
911 "Suggestion: use write/edit/patch directly on one of the active repair "
912 "files instead of a broad shell mutation."
913 ),
914 terminal_state="blocked",
915 )
916 return HookResult()
917 normalized_mutation_paths = [
918 normalize_repair_path(path) for path in mutation_paths if str(path).strip()
919 ]
920 allowed_declared_outputs = _repair_declared_output_paths(
921 repair,
922 project_root=self.project_root,
923 )
924
925 if normalized_mutation_paths and all(
926 path in allowed_paths for path in normalized_mutation_paths
927 ):
928 return HookResult()
929 if normalized_mutation_paths and all(
930 path in allowed_paths or path in allowed_declared_outputs
931 for path in normalized_mutation_paths
932 ):
933 return HookResult()
934
935 allowed_preview = ", ".join(f"`{path}`" for path in repair.allowed_paths[:3])
936 if len(repair.allowed_paths) > 3:
937 allowed_preview += ", ..."
938 declared_preview = ", ".join(
939 f"`{Path(path).name or path}`"
940 for path in sorted(allowed_declared_outputs)[:3]
941 )
942 if len(allowed_declared_outputs) > 3:
943 declared_preview += ", ..."
944 suggestion_suffix = (
945 f" Declared sibling outputs currently allowed inside this repair set include: {declared_preview}."
946 if declared_preview
947 else ""
948 )
949 return HookResult(
950 decision=HookDecision.DENY,
951 message=(
952 "[Blocked - active repair mutation scope: verification already identified "
953 f"`{repair.artifact_path}` as the current repair target.] Suggestion: keep "
954 f"mutations on the active repair files only: {allowed_preview}."
955 f"{suggestion_suffix}"
956 ),
957 terminal_state="blocked",
958 )
959
960 def _active_repair_context(self):
961 dod_path = getattr(self.session, "active_dod_path", None)
962 if not dod_path:
963 return None
964 path = Path(str(dod_path))
965 if not path.exists():
966 return None
967 dod = self.dod_store.load(path)
968 if dod.status == "done":
969 return None
970 return extract_active_repair_context(getattr(self.session, "messages", []))
971
972 class LateReferenceDriftHook(BaseToolHook):
973 """Block reopening old reference paths once planned artifacts are well underway."""
974
975 _MIN_COMPLETED_FILES = 3
976 _MAX_COMPLETED_SCOPE_OBSERVATIONS = 4
977 _REFERENCE_STUDY_HINTS = (
978 "examine",
979 "inspect",
980 "study",
981 "cadence",
982 "format",
983 "structure",
984 "reference",
985 )
986
987 def __init__(self, *, dod_store: DefinitionOfDoneStore, project_root: Path, session: Any) -> None:
988 self.dod_store = dod_store
989 self.project_root = project_root
990 self.session = session
991 self._completed_scope_key: tuple[str, ...] | None = None
992 self._completed_scope_observation_count = 0
993
994 async def pre_tool_use(self, context: HookContext) -> HookResult:
995 if context.tool_call.name not in _OBSERVATION_TOOLS:
996 return HookResult()
997 if context.source == "verification":
998 return HookResult()
999
1000 completed_scope = self._completed_artifact_scope()
1001 if completed_scope is not None:
1002 observed_paths = _extract_observation_paths(context.tool_call)
1003 if not observed_paths:
1004 return HookResult()
1005 if all(path_within_allowed_roots(path, completed_scope) for path in observed_paths):
1006 self._sync_completed_scope_state(completed_scope)
1007 if (
1008 self._completed_scope_observation_count
1009 >= self._MAX_COMPLETED_SCOPE_OBSERVATIONS
1010 ):
1011 roots_preview = ", ".join(f"`{root}`" for root in completed_scope[:2])
1012 if len(completed_scope) > 2:
1013 roots_preview += ", ..."
1014 repair = extract_active_repair_context(
1015 getattr(self.session, "messages", [])
1016 )
1017 if repair is not None and repair.allowed_paths:
1018 repair_preview = ", ".join(
1019 f"`{path}`" for path in repair.allowed_paths[:3]
1020 )
1021 if len(repair.allowed_paths) > 3:
1022 repair_preview += ", ..."
1023 suggestion = (
1024 "make one concrete edit, patch, or write to the active "
1025 f"repair file(s) {repair_preview}. Do not finish with a "
1026 "final response while these verification repair targets remain."
1027 )
1028 else:
1029 suggestion = (
1030 "finish with a final response so Loader can verify automatically, "
1031 "or make one concrete edit for a specific mismatch inside "
1032 f"{roots_preview} instead of more rereads."
1033 )
1034 return HookResult(
1035 decision=HookDecision.DENY,
1036 message=(
1037 "[Blocked - post-build audit loop: all explicitly planned artifacts "
1038 "already exist and the current output set has already been inspected "
1039 f"several times.] Suggestion: {suggestion}"
1040 ),
1041 terminal_state="blocked",
1042 )
1043 return HookResult()
1044
1045 roots_preview = ", ".join(f"`{root}`" for root in completed_scope[:2])
1046 if len(completed_scope) > 2:
1047 roots_preview += ", ..."
1048 return HookResult(
1049 decision=HookDecision.DENY,
1050 message=(
1051 "[Blocked - completed artifact set scope: all explicitly planned artifacts "
1052 "already exist.] Suggestion: stay within the current output roots under "
1053 f"{roots_preview} and use those files as the source of truth instead of "
1054 "reopening earlier reference materials."
1055 ),
1056 terminal_state="blocked",
1057 )
1058
1059 late_stage = self._late_stage_missing_artifact()
1060 if late_stage is None:
1061 return HookResult()
1062 missing_artifact, planned_roots = late_stage
1063 observed_paths = _extract_observation_paths(context.tool_call)
1064 if not observed_paths:
1065 return HookResult()
1066 if all(path_within_allowed_roots(path, planned_roots) for path in observed_paths):
1067 return HookResult()
1068
1069 roots_preview = ", ".join(f"`{root}`" for root in planned_roots[:2])
1070 if len(planned_roots) > 2:
1071 roots_preview += ", ..."
1072 return HookResult(
1073 decision=HookDecision.DENY,
1074 message=(
1075 "[Blocked - late reference drift: several planned artifacts already exist and "
1076 f"`{missing_artifact}` is still missing.] Suggestion: finish the next missing "
1077 f"artifact inside {roots_preview} before reopening earlier reference materials."
1078 ),
1079 terminal_state="blocked",
1080 )
1081
1082 def _late_stage_missing_artifact(self) -> tuple[str, tuple[str, ...]] | None:
1083 dod_path = getattr(self.session, "active_dod_path", None)
1084 if not dod_path:
1085 return None
1086 path = Path(str(dod_path))
1087 if not path.exists():
1088 return None
1089 dod = self.dod_store.load(path)
1090 if dod.status == "done":
1091 return None
1092
1093 planned_targets = collect_planned_artifact_targets(
1094 dod,
1095 project_root=self.project_root,
1096 )
1097 if not planned_targets:
1098 return None
1099
1100 missing_label = ""
1101 declared_missing_label = ""
1102 completed_files = 0
1103 planned_roots: list[str] = []
1104 seen_roots: set[str] = set()
1105 for target, expect_directory in planned_targets:
1106 satisfied = planned_artifact_target_satisfied(
1107 dod,
1108 target=target,
1109 expect_directory=expect_directory,
1110 project_root=self.project_root,
1111 )
1112 if not expect_directory:
1113 if satisfied:
1114 completed_files += 1
1115 elif not missing_label:
1116 missing_label = str(target)
1117 root = str(target.parent)
1118 else:
1119 if not satisfied and not missing_label:
1120 missing_label = str(target)
1121 root = str(target)
1122 if root not in seen_roots:
1123 planned_roots.append(root)
1124 seen_roots.add(root)
1125
1126 if not missing_label:
1127 for target, _expect_directory in planned_targets:
1128 declared_missing = collect_missing_declared_html_output_files(
1129 target=target,
1130 project_root=self.project_root,
1131 )
1132 if not declared_missing:
1133 continue
1134 declared_missing_label = str(declared_missing[0])
1135 missing_label = declared_missing_label
1136 break
1137
1138 if not missing_label:
1139 return None
1140 minimum_completed_files = self._MIN_COMPLETED_FILES
1141 if declared_missing_label and completed_files >= 1:
1142 minimum_completed_files = 1
1143 if completed_files >= 1 and self._reference_study_completed(dod):
1144 minimum_completed_files = 1
1145 if completed_files < minimum_completed_files:
1146 return None
1147 return missing_label, tuple(planned_roots)
1148
1149 def _reference_study_completed(self, dod) -> bool:
1150 for item in dod.completed_items:
1151 text = str(item).strip().lower()
1152 if not text:
1153 continue
1154 if any(hint in text for hint in self._REFERENCE_STUDY_HINTS):
1155 return True
1156 return False
1157
1158 async def post_tool_use(self, context: HookContext) -> HookResult:
1159 if context.source == "verification":
1160 return HookResult()
1161 if _tool_call_is_effective_mutation(context.tool_call):
1162 self._reset_completed_scope_state()
1163 return HookResult()
1164 if context.tool_call.name not in _OBSERVATION_TOOLS:
1165 return HookResult()
1166
1167 completed_scope = self._completed_artifact_scope()
1168 if completed_scope is None:
1169 self._reset_completed_scope_state()
1170 return HookResult()
1171
1172 observed_paths = _extract_observation_paths(context.tool_call)
1173 if not observed_paths:
1174 return HookResult()
1175 if not all(path_within_allowed_roots(path, completed_scope) for path in observed_paths):
1176 return HookResult()
1177
1178 self._sync_completed_scope_state(completed_scope)
1179 self._completed_scope_observation_count += 1
1180 return HookResult()
1181
1182 def _completed_artifact_scope(self) -> tuple[str, ...] | None:
1183 dod_path = getattr(self.session, "active_dod_path", None)
1184 if not dod_path:
1185 return None
1186 path = Path(str(dod_path))
1187 if not path.exists():
1188 return None
1189 dod = self.dod_store.load(path)
1190 if dod.status in {"done", "fixing"}:
1191 return None
1192
1193 planned_targets = collect_planned_artifact_targets(
1194 dod,
1195 project_root=self.project_root,
1196 )
1197 if not planned_targets:
1198 return None
1199 if not _planned_artifact_targets_satisfied(
1200 dod,
1201 project_root=self.project_root,
1202 ):
1203 return None
1204 if _planned_artifact_targets_declare_missing_html_outputs(
1205 dod,
1206 project_root=self.project_root,
1207 ):
1208 return None
1209
1210 planned_roots: list[str] = []
1211 seen_roots: set[str] = set()
1212 for target, expect_directory in planned_targets:
1213 root = str(target if expect_directory else target.parent)
1214 if root in seen_roots:
1215 continue
1216 seen_roots.add(root)
1217 planned_roots.append(root)
1218 return tuple(planned_roots)
1219
1220 def _sync_completed_scope_state(self, completed_scope: tuple[str, ...]) -> None:
1221 normalized = tuple(sorted(completed_scope))
1222 if self._completed_scope_key == normalized:
1223 return
1224 self._completed_scope_key = normalized
1225 self._completed_scope_observation_count = 0
1226
1227 def _reset_completed_scope_state(self) -> None:
1228 self._completed_scope_key = None
1229 self._completed_scope_observation_count = 0
1230
1231
1232 class MissingPlannedOutputReadHook(BaseToolHook):
1233 """Block rereads of planned outputs that have not been created yet."""
1234
1235 def __init__(
1236 self,
1237 *,
1238 dod_store: DefinitionOfDoneStore,
1239 project_root: Path,
1240 session: Any,
1241 ) -> None:
1242 self.dod_store = dod_store
1243 self.project_root = project_root
1244 self.session = session
1245
1246 async def pre_tool_use(self, context: HookContext) -> HookResult:
1247 if context.tool_call.name != "read":
1248 return HookResult()
1249 if context.source == "verification":
1250 return HookResult()
1251
1252 missing_output = self._missing_planned_output_path(context.tool_call)
1253 if missing_output is None:
1254 return HookResult()
1255
1256 target_path, dod = missing_output
1257 message_lines = [
1258 (
1259 "[Blocked - missing planned output artifact: "
1260 f"`{target_path}` has not been created yet.]"
1261 ),
1262 (
1263 "Suggestion: create it now with one "
1264 f"`write(file_path=\"{display_runtime_path(target_path)}\", content=\"...\")` "
1265 "call instead of reading it first."
1266 ),
1267 ]
1268
1269 outline_label = infer_output_outline_label(
1270 dod,
1271 target_path,
1272 project_root=self.project_root,
1273 )
1274 if outline_label:
1275 message_lines.append(
1276 f"Use the existing outline label `{outline_label}` so the new file matches the current artifact graph."
1277 )
1278
1279 sibling_hint = self._existing_sibling_html_hint(target_path)
1280 if sibling_hint:
1281 message_lines.append(sibling_hint)
1282
1283 return HookResult(
1284 decision=HookDecision.DENY,
1285 message=" ".join(message_lines),
1286 terminal_state="blocked",
1287 )
1288
1289 def _missing_planned_output_path(
1290 self,
1291 tool_call: ToolCall,
1292 ) -> tuple[Path, Any] | None:
1293 dod_path = getattr(self.session, "active_dod_path", None)
1294 if not dod_path:
1295 return None
1296 path = Path(str(dod_path))
1297 if not path.exists():
1298 return None
1299 dod = self.dod_store.load(path)
1300 if dod.status in {"done", "fixing"}:
1301 return None
1302
1303 raw_path = str(tool_call.arguments.get("file_path") or "").strip()
1304 if not raw_path:
1305 return None
1306 target_path = Path(raw_path).expanduser().resolve(strict=False)
1307 if target_path.exists():
1308 return None
1309
1310 planned_targets = collect_planned_artifact_targets(
1311 dod,
1312 project_root=self.project_root,
1313 )
1314 if not planned_targets:
1315 return None
1316
1317 for planned_target, expect_directory in planned_targets:
1318 if expect_directory:
1319 continue
1320 if planned_target != target_path:
1321 continue
1322 if planned_artifact_target_satisfied(
1323 dod,
1324 target=planned_target,
1325 expect_directory=False,
1326 project_root=self.project_root,
1327 ):
1328 return None
1329 return target_path, dod
1330
1331 for planned_target, _ in planned_targets:
1332 if target_path not in collect_missing_declared_html_output_files(
1333 target=planned_target,
1334 project_root=self.project_root,
1335 ):
1336 continue
1337 return target_path, dod
1338 return None
1339
1340 def _existing_sibling_html_hint(self, target_path: Path) -> str | None:
1341 if target_path.suffix.lower() not in {".html", ".htm"}:
1342 return None
1343 parent = target_path.parent
1344 if not parent.is_dir():
1345 return None
1346 siblings = sorted(
1347 child for child in parent.iterdir() if child.is_file() and child != target_path
1348 )
1349 html_siblings = [
1350 child for child in siblings if child.suffix.lower() in {".html", ".htm"}
1351 ]
1352 if not html_siblings:
1353 return None
1354 reference = html_siblings[-1]
1355 return (
1356 "Reuse the overall structure and navigation pattern from "
1357 f"`{reference.name}` as the starting pattern for this file."
1358 )
1359
1360
1361 class HookManager:
1362 """Runs tool hooks across Loader's three lifecycle events."""
1363
1364 def __init__(self, hooks: Iterable[ToolHook] | None = None) -> None:
1365 self.hooks = list(hooks or [])
1366
1367 async def run_pre_tool_use(self, context: HookContext) -> HookRunSummary:
1368 return await self._run_event(HookEvent.PRE_TOOL_USE, context)
1369
1370 async def run_post_tool_use(self, context: HookContext) -> HookRunSummary:
1371 return await self._run_event(HookEvent.POST_TOOL_USE, context)
1372
1373 async def run_post_tool_use_failure(self, context: HookContext) -> HookRunSummary:
1374 return await self._run_event(HookEvent.POST_TOOL_USE_FAILURE, context)
1375
1376 async def _run_event(
1377 self,
1378 event: HookEvent,
1379 context: HookContext,
1380 ) -> HookRunSummary:
1381 summary = HookRunSummary(tool_call=context.tool_call)
1382 for hook in self.hooks:
1383 if event == HookEvent.PRE_TOOL_USE:
1384 result = await hook.pre_tool_use(context)
1385 elif event == HookEvent.POST_TOOL_USE:
1386 result = await hook.post_tool_use(context)
1387 else:
1388 result = await hook.post_tool_use_failure(context)
1389
1390 summary.injected_messages.extend(result.injected_messages)
1391 summary.metadata.update(result.metadata)
1392 if result.permission_override is not None:
1393 summary.permission_override = result.permission_override
1394 summary.permission_reason = result.permission_reason
1395 if result.output_override is not None:
1396 summary.output_override = result.output_override
1397 context.output = result.output_override
1398 if result.updated_arguments is not None:
1399 updated_call = ToolCall(
1400 id=context.tool_call.id,
1401 name=context.tool_call.name,
1402 arguments=dict(result.updated_arguments),
1403 )
1404 context.tool_call = updated_call
1405 summary.tool_call = updated_call
1406 if result.message is not None:
1407 summary.message = result.message
1408 if result.terminal_state is not None:
1409 summary.terminal_state = result.terminal_state
1410 if result.decision != HookDecision.CONTINUE:
1411 summary.decision = result.decision
1412 return summary
1413
1414 return summary
1415
1416
1417 class DuplicateActionHook(BaseToolHook):
1418 """Pre-hook that cancels already-completed duplicate actions."""
1419
1420 def __init__(self, action_tracker: ActionTracker) -> None:
1421 self.action_tracker = action_tracker
1422
1423 async def pre_tool_use(self, context: HookContext) -> HookResult:
1424 if context.skip_duplicate_check:
1425 return HookResult()
1426 is_duplicate, reason = self.action_tracker.check_tool_call(
1427 context.tool_call.name,
1428 context.tool_call.arguments,
1429 )
1430 if not is_duplicate:
1431 return HookResult()
1432 return HookResult(
1433 decision=HookDecision.CANCEL,
1434 message=f"[Skipped - duplicate action: {reason}]",
1435 terminal_state="duplicate",
1436 )
1437
1438
1439 class ActionValidationHook(BaseToolHook):
1440 """Pre-hook that blocks invalid or dangerous tool arguments."""
1441
1442 def __init__(self, validator: PreActionValidator) -> None:
1443 self.validator = validator
1444
1445 async def pre_tool_use(self, context: HookContext) -> HookResult:
1446 validation = self.validator.validate(
1447 context.tool_call.name,
1448 context.tool_call.arguments,
1449 )
1450 if validation.valid:
1451 messages: list[str] = []
1452 if validation.reason and validation.severity == "warning":
1453 messages.append(f"[Validation warning] {validation.reason}")
1454 return HookResult(injected_messages=messages)
1455
1456 message = f"[Blocked - {validation.reason}]"
1457 if validation.suggestion:
1458 message += f" Suggestion: {validation.suggestion}"
1459 return HookResult(
1460 decision=HookDecision.DENY,
1461 message=message,
1462 terminal_state="blocked",
1463 )
1464
1465
1466 class RollbackTrackingHook(BaseToolHook):
1467 """Pre-hook that tracks rollback actions for destructive tools."""
1468
1469 def __init__(
1470 self,
1471 registry: ToolRegistry,
1472 rollback_plan: RollbackPlan | None,
1473 ) -> None:
1474 self.registry = registry
1475 self.rollback_plan = rollback_plan
1476
1477 async def pre_tool_use(self, context: HookContext) -> HookResult:
1478 if self.rollback_plan is None:
1479 return HookResult()
1480 if not is_destructive_tool(
1481 context.tool_call.name,
1482 context.tool_call.arguments,
1483 ):
1484 return HookResult()
1485
1486 async def read_file_for_backup(path: str) -> str:
1487 read_result = await self.registry.execute("read", file_path=path)
1488 return read_result.output if not read_result.is_error else ""
1489
1490 rollback_action = await create_rollback_plan_for_action(
1491 context.tool_call.name,
1492 context.tool_call.arguments,
1493 read_file_for_backup,
1494 )
1495 if rollback_action is None:
1496 return HookResult()
1497
1498 self.rollback_plan.actions.append(rollback_action)
1499 return HookResult(metadata={"rollback_action": rollback_action})
1500
1501
1502 class ActionHistoryHook(BaseToolHook):
1503 """Post-hook that records successful actions for deduplication and loop checks."""
1504
1505 def __init__(self, action_tracker: ActionTracker) -> None:
1506 self.action_tracker = action_tracker
1507
1508 async def post_tool_use(self, context: HookContext) -> HookResult:
1509 if not context.record_action:
1510 return HookResult()
1511 self.action_tracker.record_tool_call(
1512 context.tool_call.name,
1513 context.tool_call.arguments,
1514 )
1515 return HookResult()
1516
1517
1518 class MemoryLifecycleHook(BaseToolHook):
1519 """Mirror durable memory updates into the session notepad."""
1520
1521 async def post_tool_use(self, context: HookContext) -> HookResult:
1522 if context.result is None or context.result.is_error:
1523 return HookResult()
1524
1525 store = MemoryStore(context.permission_policy.workspace_root)
1526 if context.tool_call.name == "project_memory_add_note":
1527 category = str(context.tool_call.arguments.get("category", "")).strip()
1528 content = str(context.tool_call.arguments.get("content", "")).strip()
1529 if category and content:
1530 store.append_notepad_working(
1531 f"Remembered note [{category}]: {content}"
1532 )
1533 elif context.tool_call.name == "project_memory_add_directive":
1534 directive = str(context.tool_call.arguments.get("directive", "")).strip()
1535 priority = str(
1536 context.tool_call.arguments.get("priority", "normal")
1537 ).strip()
1538 if directive:
1539 store.append_notepad_working(
1540 f"Remembered directive [{priority}]: {directive}"
1541 )
1542 return HookResult()
1543
1544
1545 def build_default_tool_hooks(
1546 *,
1547 action_tracker: ActionTracker,
1548 validator: PreActionValidator,
1549 registry: ToolRegistry,
1550 rollback_plan: RollbackPlan | None,
1551 workspace_root: Path,
1552 session: Any,
1553 ) -> HookManager:
1554 """Build Loader's default tool hook stack for one runtime turn."""
1555
1556 return HookManager(
1557 [
1558 FilePathAliasHook(),
1559 SearchPathAliasHook(),
1560 RelativePathContextHook(action_tracker, workspace_root),
1561 ActiveRepairScopeHook(
1562 dod_store=DefinitionOfDoneStore(workspace_root),
1563 project_root=workspace_root,
1564 session=session,
1565 ),
1566 ActiveRepairMutationScopeHook(
1567 dod_store=DefinitionOfDoneStore(workspace_root),
1568 project_root=workspace_root,
1569 session=session,
1570 ),
1571 LateReferenceDriftHook(
1572 dod_store=DefinitionOfDoneStore(workspace_root),
1573 project_root=workspace_root,
1574 session=session,
1575 ),
1576 MissingPlannedOutputReadHook(
1577 dod_store=DefinitionOfDoneStore(workspace_root),
1578 project_root=workspace_root,
1579 session=session,
1580 ),
1581 DuplicateActionHook(action_tracker),
1582 ActionValidationHook(validator),
1583 RollbackTrackingHook(registry, rollback_plan),
1584 ActionHistoryHook(action_tracker),
1585 MemoryLifecycleHook(),
1586 ]
1587 )