Python · 70819 bytes Raw Blame History
1 """Runtime-owned safeguard services shared by hooks and agent adapters."""
2
3 from __future__ import annotations
4
5 import os
6 import re
7 import shlex
8 from dataclasses import dataclass
9 from difflib import get_close_matches
10 from pathlib import Path
11
12 from ..tools.fs_safety import coerce_structured_patch_payload
13
14 TEXT_REWRITE_SUFFIXES = frozenset(
15 {
16 ".c",
17 ".cc",
18 ".cpp",
19 ".css",
20 ".csv",
21 ".go",
22 ".h",
23 ".hpp",
24 ".html",
25 ".htm",
26 ".java",
27 ".js",
28 ".json",
29 ".jsx",
30 ".md",
31 ".py",
32 ".rb",
33 ".rs",
34 ".sh",
35 ".sql",
36 ".svg",
37 ".toml",
38 ".ts",
39 ".tsx",
40 ".txt",
41 ".xml",
42 ".yaml",
43 ".yml",
44 }
45 )
46
47
48 def _html_target_tokens(target: str) -> set[str]:
49 stem = Path(target).stem.lower()
50 return {token for token in re.split(r"[^a-z0-9]+", stem) if token}
51
52
53 def _ordered_html_target_number(target: str) -> int | None:
54 match = re.match(r"(\d+)[-_]", Path(target).name)
55 if match is None:
56 return None
57 try:
58 return int(match.group(1))
59 except ValueError:
60 return None
61
62
63 TEXT_REWRITE_FILENAMES = frozenset(
64 {
65 "dockerfile",
66 "index.html",
67 "makefile",
68 "package.json",
69 "pyproject.toml",
70 "readme",
71 "readme.md",
72 }
73 )
74
75
76 def _strip_shell_token(token: str) -> str:
77 return token.strip().strip("\"'").rstrip(";|&")
78
79
80 def _looks_like_text_rewrite_target(token: str) -> bool:
81 candidate = _strip_shell_token(token)
82 if not candidate or candidate in {"-", "/dev/null"}:
83 return False
84 if candidate.startswith("-"):
85 return False
86 lowered = Path(candidate).name.lower()
87 if lowered in TEXT_REWRITE_FILENAMES:
88 return True
89 return Path(candidate).suffix.lower() in TEXT_REWRITE_SUFFIXES
90
91
92 def _extract_redirect_target(argv: list[str]) -> str | None:
93 for index, token in enumerate(argv):
94 if token in {">", ">>"} and index + 1 < len(argv):
95 candidate = argv[index + 1]
96 if _looks_like_text_rewrite_target(candidate):
97 return _strip_shell_token(candidate)
98 if token == "tee":
99 for candidate in argv[index + 1 :]:
100 if candidate.startswith("-"):
101 continue
102 if _looks_like_text_rewrite_target(candidate):
103 return _strip_shell_token(candidate)
104 break
105 return None
106
107
108 def extract_shell_text_rewrite_target(command: str) -> str | None:
109 """Return the target file when bash is used as a brittle text editor."""
110
111 normalized = " ".join(str(command or "").split())
112 if not normalized:
113 return None
114
115 try:
116 argv = shlex.split(normalized)
117 except ValueError:
118 argv = []
119
120 if argv:
121 for index, token in enumerate(argv):
122 if token == "sed" and any(part.startswith("-i") for part in argv[index + 1 :]):
123 for candidate in reversed(argv[index + 1 :]):
124 if _looks_like_text_rewrite_target(candidate):
125 return _strip_shell_token(candidate)
126 if token == "perl" and any(
127 part.startswith("-p") or part.startswith("-0p") for part in argv[index + 1 :]
128 ):
129 for candidate in reversed(argv[index + 1 :]):
130 if _looks_like_text_rewrite_target(candidate):
131 return _strip_shell_token(candidate)
132
133 redirect_target = _extract_redirect_target(argv)
134 if redirect_target is not None:
135 return redirect_target
136
137 regex_match = re.search(
138 r"(?:sed\s+-i(?:\s+''|\s+\"\"|\s+'[^']*'|\s+\"[^\"]*\")?.*?|perl\s+-[0-9]*p[i0-9-]*.*?)\s+([^\s\"';|&]+(?:\.[A-Za-z0-9]+)?)",
139 normalized,
140 )
141 if regex_match:
142 candidate = _strip_shell_token(regex_match.group(1))
143 if _looks_like_text_rewrite_target(candidate):
144 return candidate
145
146 redirect_match = re.search(r"(?:>>?|tee(?:\s+-a)?)\s+([^\s\"';|&]+)", normalized)
147 if redirect_match:
148 candidate = _strip_shell_token(redirect_match.group(1))
149 if _looks_like_text_rewrite_target(candidate):
150 return candidate
151
152 return None
153
154
155 class ActionTracker:
156 """Tracks completed actions to prevent duplicates and detect loops."""
157
158 MAX_SEQUENCE_LENGTH = 20
159 LOOP_PATTERN_MIN = 2
160 LOOP_REPEAT_THRESHOLD = 2
161 MAX_RESPONSE_HISTORY = 5
162 OBSERVATION_REPEAT_WINDOW = 8
163 READ_REPEAT_THRESHOLD = 3
164 SEARCH_REPEAT_THRESHOLD = 2
165 BASH_OBSERVATION_REPEAT_THRESHOLD = 2
166 RECENT_PATH_CONTEXT_LIMIT = 12
167
168 def __init__(self) -> None:
169 self._file_writes: dict[str, list[str]] = {}
170 self._files_edited: dict[str, list[str]] = {}
171 self._commands_run: set[str] = set()
172 self._dirs_created: set[str] = set()
173 self._action_sequence: list[str] = []
174 self._response_history: list[str] = []
175 self._action_index = 0
176 self._mutation_epoch = 0
177 self._recent_reads: dict[str, tuple[int, int, int]] = {}
178 self._recent_searches: dict[str, tuple[int, int, int]] = {}
179 self._recent_bash_observations: dict[str, tuple[int, int, int]] = {}
180 self._recent_path_contexts: list[str] = []
181
182 def reset(self) -> None:
183 self._file_writes.clear()
184 self._files_edited.clear()
185 self._commands_run.clear()
186 self._dirs_created.clear()
187 self._action_sequence.clear()
188 self._response_history.clear()
189 self._action_index = 0
190 self._mutation_epoch = 0
191 self._recent_reads.clear()
192 self._recent_searches.clear()
193 self._recent_bash_observations.clear()
194 self._recent_path_contexts.clear()
195
196 def _normalize_path(self, path: str) -> str:
197 expanded = Path(path).expanduser()
198 try:
199 return str(expanded.resolve())
200 except Exception:
201 return str(expanded)
202
203 @staticmethod
204 def _make_edit_signature(old_string: str, new_string: str) -> str:
205 return f"{hash(old_string)}:{hash(new_string)}"
206
207 @staticmethod
208 def _make_write_signature(content: str) -> str:
209 return str(hash(content))
210
211 def would_duplicate_file_create(self, file_path: str, content: str) -> bool:
212 norm_path = self._normalize_path(file_path)
213 sig = self._make_write_signature(content)
214 return sig in self._file_writes.get(norm_path, [])
215
216 def would_duplicate_edit(self, file_path: str, old_string: str, new_string: str) -> bool:
217 norm_path = self._normalize_path(file_path)
218 sig = self._make_edit_signature(old_string, new_string)
219 return sig in self._files_edited.get(norm_path, [])
220
221 def would_duplicate_patch(self, file_path: str, hunks: list[dict]) -> bool:
222 norm_path = self._normalize_path(file_path)
223 sig = str(hash(str(hunks)))
224 return sig in self._files_edited.get(norm_path, [])
225
226 def would_duplicate_raw_patch(self, file_path: str, patch_text: str) -> bool:
227 norm_path = self._normalize_path(file_path)
228 sig = str(hash(patch_text))
229 return sig in self._files_edited.get(norm_path, [])
230
231 def would_duplicate_command(self, command: str) -> bool:
232 norm_cmd = self._normalize_command(command)
233 return norm_cmd in self._commands_run
234
235 def would_duplicate_mkdir(self, dir_path: str) -> bool:
236 norm_path = self._normalize_path(dir_path)
237 return norm_path in self._dirs_created
238
239 def record_file_create(self, file_path: str, content: str) -> None:
240 norm_path = self._normalize_path(file_path)
241 sig = self._make_write_signature(content)
242 self._file_writes.setdefault(norm_path, []).append(sig)
243
244 def record_edit(self, file_path: str, old_string: str, new_string: str) -> None:
245 norm_path = self._normalize_path(file_path)
246 sig = self._make_edit_signature(old_string, new_string)
247 self._files_edited.setdefault(norm_path, []).append(sig)
248
249 def record_command(self, command: str) -> None:
250 norm_cmd = self._normalize_command(command)
251 self._commands_run.add(norm_cmd)
252
253 mkdir_match = re.match(r'mkdir\s+(-p\s+)?(.+)', norm_cmd)
254 if mkdir_match:
255 dir_path = mkdir_match.group(2).strip().strip('"\'')
256 self._dirs_created.add(self._normalize_path(dir_path))
257
258 def record_mkdir(self, dir_path: str) -> None:
259 self._dirs_created.add(self._normalize_path(dir_path))
260
261 def recent_path_contexts(self) -> list[str]:
262 return list(self._recent_path_contexts)
263
264 def check_tool_call(self, tool_name: str, arguments: dict) -> tuple[bool, str]:
265 if tool_name == "write":
266 file_path = arguments.get("file_path", "")
267 content = arguments.get("content", "")
268 if self.would_duplicate_file_create(file_path, content):
269 return True, f"Same file content already written: {file_path}"
270
271 elif tool_name == "edit":
272 file_path = arguments.get("file_path", "")
273 old_string = arguments.get("old_string", "")
274 new_string = arguments.get("new_string", "")
275 if self.would_duplicate_edit(file_path, old_string, new_string):
276 return True, f"Same edit already applied to: {file_path}"
277
278 elif tool_name == "patch":
279 file_path = arguments.get("file_path", "")
280 hunks = arguments.get("hunks", [])
281 raw_patch = arguments.get("patch") or arguments.get("diff") or arguments.get("patch_text")
282 if isinstance(hunks, list) and hunks and self.would_duplicate_patch(file_path, hunks):
283 return True, f"Same patch already applied to: {file_path}"
284 if isinstance(raw_patch, str) and raw_patch.strip():
285 if self.would_duplicate_raw_patch(file_path, raw_patch):
286 return True, f"Same patch already applied to: {file_path}"
287
288 elif tool_name == "read":
289 read_key = self._make_read_key(arguments)
290 if read_key:
291 duplicate, reason = self._check_recent_observation(
292 self._recent_reads,
293 read_key,
294 (
295 "Already read "
296 f"{str(arguments.get('file_path', '')).strip()} "
297 "recently without any intervening changes; "
298 "reuse the earlier read result instead of rereading"
299 ),
300 repeat_threshold=self.READ_REPEAT_THRESHOLD,
301 )
302 if duplicate:
303 return True, reason
304
305 elif tool_name in {"glob", "grep"}:
306 observation_key = self._make_search_key(tool_name, arguments)
307 if observation_key:
308 duplicate, reason = self._check_recent_observation(
309 self._recent_searches,
310 observation_key,
311 (
312 "Already ran the same search recently without any intervening "
313 "changes; reuse the earlier search result instead of rerunning it"
314 ),
315 repeat_threshold=self.SEARCH_REPEAT_THRESHOLD,
316 )
317 if duplicate:
318 return True, reason
319
320 elif tool_name == "bash":
321 command = str(arguments.get("command", "")).strip()
322 if self._is_observational_bash(command):
323 duplicate, reason = self._check_recent_observation(
324 self._recent_bash_observations,
325 self._normalize_command(command),
326 (
327 "Already ran the same read-only shell probe recently without any "
328 "intervening changes; reuse the earlier shell output instead of rerunning it"
329 ),
330 repeat_threshold=self.BASH_OBSERVATION_REPEAT_THRESHOLD,
331 )
332 if duplicate:
333 return True, reason
334
335 # Bash commands intentionally skip exact-command dedupe here.
336 # Re-running the same shell probe after a filesystem change is often valid,
337 # and higher-level loop detection is a safer backstop than blocking `ls`.
338 return False, ""
339
340 def record_tool_call(self, tool_name: str, arguments: dict) -> None:
341 self._action_index += 1
342 self._action_sequence.append(tool_name)
343 if len(self._action_sequence) > self.MAX_SEQUENCE_LENGTH:
344 self._action_sequence.pop(0)
345
346 if tool_name == "write":
347 file_path = arguments.get("file_path", "")
348 content = arguments.get("content", "")
349 if file_path:
350 self.record_file_create(file_path, content)
351 self._record_path_context(file_path)
352 self._note_mutation()
353
354 elif tool_name == "edit":
355 file_path = arguments.get("file_path", "")
356 old_string = arguments.get("old_string", "")
357 new_string = arguments.get("new_string", "")
358 if file_path:
359 self.record_edit(file_path, old_string, new_string)
360 self._record_path_context(file_path)
361 self._note_mutation()
362
363 elif tool_name == "patch":
364 file_path = arguments.get("file_path", "")
365 hunks = arguments.get("hunks", [])
366 if file_path:
367 raw_patch = arguments.get("patch") or arguments.get("diff") or arguments.get("patch_text")
368 if isinstance(hunks, list) and hunks:
369 self.record_edit(file_path, str(hunks), "structured_patch")
370 elif isinstance(raw_patch, str) and raw_patch.strip():
371 self.record_edit(file_path, raw_patch, "raw_patch")
372 self._record_path_context(file_path)
373 self._note_mutation()
374
375 elif tool_name == "read":
376 read_key = self._make_read_key(arguments)
377 if read_key:
378 self._record_observation(
379 self._recent_reads,
380 read_key,
381 )
382 file_path = str(arguments.get("file_path", "")).strip()
383 if file_path:
384 self._record_path_context(file_path)
385
386 elif tool_name in {"glob", "grep"}:
387 observation_key = self._make_search_key(tool_name, arguments)
388 if observation_key:
389 self._record_observation(
390 self._recent_searches,
391 observation_key,
392 )
393 search_path = str(arguments.get("path", "")).strip()
394 if search_path:
395 self._record_path_context(search_path, is_directory_hint=True)
396
397 elif tool_name == "bash":
398 command = arguments.get("command", "")
399 if command:
400 self.record_command(command)
401 if self._is_mutating_bash(command):
402 self._note_mutation()
403 elif self._is_observational_bash(command):
404 self._record_observation(
405 self._recent_bash_observations,
406 self._normalize_command(command),
407 )
408
409 def detect_loop(self) -> tuple[bool, str]:
410 seq = self._action_sequence
411 if len(seq) < self.LOOP_PATTERN_MIN * self.LOOP_REPEAT_THRESHOLD:
412 return False, ""
413
414 for pattern_len in range(self.LOOP_PATTERN_MIN, min(6, len(seq) // 2 + 1)):
415 pattern = seq[-pattern_len:]
416 repeats = 1
417 for i in range(len(seq) - pattern_len * 2, -1, -pattern_len):
418 if seq[i:i + pattern_len] == pattern:
419 repeats += 1
420 else:
421 break
422
423 if repeats >= self.LOOP_REPEAT_THRESHOLD:
424 pattern_str = " → ".join(pattern)
425 return True, f"Repeating pattern detected ({repeats}x): {pattern_str}"
426
427 return False, ""
428
429 @staticmethod
430 def _normalize_response(response: str) -> str:
431 normalized = response.strip().lower()[:200]
432 normalized = re.sub(r'/[\w/.-]+', '<PATH>', normalized)
433 normalized = re.sub(r'\d+', '<NUM>', normalized)
434 return normalized
435
436 def record_response(self, response: str) -> None:
437 normalized = self._normalize_response(response)
438 self._response_history.append(normalized)
439 if len(self._response_history) > self.MAX_RESPONSE_HISTORY:
440 self._response_history.pop(0)
441
442 def detect_text_loop(self, response: str) -> tuple[bool, str]:
443 if len(self._response_history) < 2:
444 return False, ""
445
446 normalized = self._normalize_response(response)
447 exact_matches = sum(1 for r in self._response_history if r == normalized)
448 if exact_matches >= 2:
449 return True, f"Agent repeated the same response {exact_matches + 1} times"
450
451 repetitive_phrases = [
452 "apologies for any confusion",
453 "let me proceed",
454 "i will now use the",
455 ]
456 response_lower = response.lower()
457 for phrase in repetitive_phrases:
458 if phrase in response_lower:
459 phrase_count = sum(1 for r in self._response_history if phrase in r)
460 if phrase_count >= 2:
461 return True, f"Agent is stuck repeating '{phrase}'"
462
463 current_words = set(normalized.split())
464 similarity_matches = 0
465 for prev in self._response_history[-3:]:
466 prev_words = set(prev.split())
467 if len(current_words) > 10 and len(prev_words) > 10:
468 overlap = len(current_words & prev_words)
469 similarity = overlap / max(len(current_words), len(prev_words))
470 if similarity > 0.85:
471 similarity_matches += 1
472
473 if similarity_matches >= 2:
474 return True, "Agent responses are highly repetitive"
475
476 return False, ""
477
478 def reset_response_history(self) -> None:
479 """Clear response history between turns to prevent cross-turn false positives."""
480 self._response_history.clear()
481
482 @staticmethod
483 def _normalize_command(command: str) -> str:
484 return " ".join(command.split())
485
486 def _note_mutation(self) -> None:
487 self._mutation_epoch += 1
488
489 def _check_recent_observation(
490 self,
491 cache: dict[str, tuple[int, int, int]],
492 key: str,
493 reason: str,
494 *,
495 repeat_threshold: int,
496 ) -> tuple[bool, str]:
497 last_seen = cache.get(key)
498 if last_seen is None:
499 return False, ""
500
501 last_epoch, last_index, repeat_count = last_seen
502 if last_epoch != self._mutation_epoch:
503 return False, ""
504 gap = self._action_index - last_index
505 if gap > self.OBSERVATION_REPEAT_WINDOW:
506 return False, ""
507 if gap <= 0:
508 return True, reason
509 if repeat_count >= repeat_threshold:
510 return True, reason
511 return False, ""
512
513 def _record_observation(
514 self,
515 cache: dict[str, tuple[int, int, int]],
516 key: str,
517 ) -> None:
518 last_seen = cache.get(key)
519 if last_seen is None:
520 cache[key] = (self._mutation_epoch, self._action_index, 1)
521 return
522
523 last_epoch, last_index, repeat_count = last_seen
524 gap = self._action_index - last_index
525 if last_epoch != self._mutation_epoch or gap > self.OBSERVATION_REPEAT_WINDOW:
526 cache[key] = (self._mutation_epoch, self._action_index, 1)
527 return
528
529 cache[key] = (
530 self._mutation_epoch,
531 self._action_index,
532 repeat_count + 1,
533 )
534
535 def _make_search_key(self, tool_name: str, arguments: dict) -> str | None:
536 pattern = str(arguments.get("pattern", "")).strip()
537 if not pattern:
538 return None
539 path = str(arguments.get("path", "")).strip()
540 normalized_path = self._normalize_path(path) if path else ""
541 return f"{tool_name}:{normalized_path}:{pattern}"
542
543 def _make_read_key(self, arguments: dict) -> str | None:
544 file_path = str(arguments.get("file_path", "")).strip()
545 if not file_path:
546 return None
547 offset = str(arguments.get("offset", "")).strip()
548 limit = str(arguments.get("limit", "")).strip()
549 return (
550 f"{self._normalize_path(file_path)}"
551 f":offset={offset or 'full'}"
552 f":limit={limit or 'all'}"
553 )
554
555 def _is_observational_bash(self, command: str) -> bool:
556 norm_cmd = self._normalize_command(command)
557 if not norm_cmd:
558 return False
559 if any(token in norm_cmd for token in ("&&", "||", ";", ">", ">>", "|", "<", "$(", "`")):
560 return False
561 try:
562 argv = shlex.split(norm_cmd)
563 except ValueError:
564 return False
565 if not argv:
566 return False
567 return argv[0] in {"ls", "pwd", "find", "stat", "cat", "head", "tail", "rg"}
568
569 def _is_mutating_bash(self, command: str) -> bool:
570 norm_cmd = self._normalize_command(command)
571 if not norm_cmd:
572 return False
573 if extract_shell_text_rewrite_target(norm_cmd) is not None:
574 return True
575 mutating_fragments = (
576 " >",
577 ">>",
578 "| tee",
579 "touch ",
580 "mkdir ",
581 "rm ",
582 "mv ",
583 "cp ",
584 "sed -i",
585 "perl -pi",
586 "git add",
587 "git commit",
588 "git apply",
589 )
590 if any(fragment in norm_cmd for fragment in mutating_fragments):
591 return True
592 try:
593 argv = shlex.split(norm_cmd)
594 except ValueError:
595 return False
596 if not argv:
597 return False
598 return argv[0] in {"touch", "mkdir", "rm", "mv", "cp", "chmod", "chown"}
599
600 def _record_path_context(self, path_value: str, *, is_directory_hint: bool = False) -> None:
601 normalized = self._normalize_path(path_value)
602 path = Path(normalized)
603 primary_dir = path if is_directory_hint or path.is_dir() else path.parent
604 candidate_dirs = [primary_dir]
605 if primary_dir.parent != primary_dir:
606 candidate_dirs.append(primary_dir.parent)
607
608 for candidate_dir in candidate_dirs:
609 normalized_dir = self._normalize_path(str(candidate_dir))
610 if normalized_dir in self._recent_path_contexts:
611 self._recent_path_contexts.remove(normalized_dir)
612 self._recent_path_contexts.insert(0, normalized_dir)
613
614 if len(self._recent_path_contexts) > self.RECENT_PATH_CONTEXT_LIMIT:
615 del self._recent_path_contexts[self.RECENT_PATH_CONTEXT_LIMIT :]
616
617 @dataclass
618 class ValidationResult:
619 """Result of pre-action validation."""
620
621 valid: bool
622 reason: str = ""
623 suggestion: str = ""
624 severity: str = "warning"
625
626
627 class PreActionValidator:
628 """Validates tool calls before execution to catch problematic actions."""
629
630 HTML_PLACEHOLDER_PATTERNS = [
631 (
632 re.compile(r"\bstarter\s+(?:content|overview)\b", re.IGNORECASE),
633 "starter content",
634 ),
635 (
636 re.compile(r"\bkey\s+concepts\s+go\s+here\b", re.IGNORECASE),
637 "key concepts go here",
638 ),
639 (
640 re.compile(r"\bpractical\s+steps\s+go\s+here\b", re.IGNORECASE),
641 "practical steps go here",
642 ),
643 (
644 re.compile(r"\blorem\s+ipsum\b", re.IGNORECASE),
645 "lorem ipsum",
646 ),
647 (
648 re.compile(r"\bcoming\s+soon\b", re.IGNORECASE),
649 "coming soon",
650 ),
651 (
652 re.compile(r"\bto\s+be\s+(?:added|written|completed|filled\s+in)\b", re.IGNORECASE),
653 "to be added/written",
654 ),
655 ]
656
657 DANGEROUS_PATTERNS = [
658 (r'rm\s+(-[rf]+\s+)?/', "Dangerous: removing from root directory"),
659 (r'rm\s+-rf\s+~', "Dangerous: removing home directory"),
660 (r'>\s*/dev/sd[a-z]', "Dangerous: writing directly to disk device"),
661 (r'mkfs\.', "Dangerous: formatting filesystem"),
662 (r'dd\s+.*of=/dev/', "Dangerous: dd to device"),
663 (r'chmod\s+-R\s+777\s+/', "Dangerous: making everything world-writable"),
664 (r':\(\)\s*\{\s*:\|:\s*&\s*\}\s*;', "Dangerous: fork bomb"),
665 ]
666
667 SUSPICIOUS_PATTERNS = [
668 (r'rm\s+-rf\s+', "Warning: recursive force delete"),
669 (r'>\s*/etc/', "Warning: overwriting system config"),
670 (r'curl\s+.*\|\s*sh', "Warning: piping curl to shell"),
671 (r'wget\s+.*\|\s*sh', "Warning: piping wget to shell"),
672 (r'eval\s+', "Warning: using eval"),
673 (r'sudo\s+', "Warning: using sudo"),
674 ]
675
676 def validate(self, tool_name: str, arguments: dict) -> ValidationResult:
677 if tool_name == "bash":
678 return self._validate_bash(arguments)
679 if tool_name == "write":
680 return self._validate_write(arguments)
681 if tool_name == "edit":
682 return self._validate_edit(arguments)
683 if tool_name == "patch":
684 return self._validate_patch(arguments)
685 if tool_name == "read":
686 return self._validate_read(arguments)
687 if tool_name in ("glob", "grep"):
688 return self._validate_search(tool_name, arguments)
689 return ValidationResult(valid=True)
690
691 def _validate_bash(self, arguments: dict) -> ValidationResult:
692 command = arguments.get("command", "")
693
694 if not command or not command.strip():
695 return ValidationResult(
696 valid=False,
697 reason="Empty command",
698 suggestion="Provide a valid command to execute",
699 severity="error",
700 )
701
702 for pattern, reason in self.DANGEROUS_PATTERNS:
703 if re.search(pattern, command):
704 return ValidationResult(
705 valid=False,
706 reason=reason,
707 suggestion="This command is too dangerous to execute",
708 severity="block",
709 )
710
711 rewrite_target = extract_shell_text_rewrite_target(str(command))
712 if rewrite_target is not None:
713 return ValidationResult(
714 valid=False,
715 reason="Shell-based text rewrites are brittle and bypass Loader's safer file tools",
716 suggestion=(
717 f"Use edit/patch/write for `{rewrite_target}` instead of rewriting it with bash"
718 ),
719 severity="error",
720 )
721
722 for pattern, reason in self.SUSPICIOUS_PATTERNS:
723 if re.search(pattern, command):
724 return ValidationResult(valid=True, reason=reason, severity="warning")
725
726 interactive_patterns = [
727 (r'\bnano\b', "nano requires interactive terminal"),
728 (r'\bvim?\b', "vim requires interactive terminal"),
729 (r'\bemacs\b', "emacs requires interactive terminal"),
730 (r'\bless\b', "less requires interactive terminal"),
731 (r'\bmore\b', "more requires interactive terminal"),
732 (r'\btop\b', "top requires interactive terminal"),
733 (r'\bhtop\b', "htop requires interactive terminal"),
734 ]
735 for pattern, reason in interactive_patterns:
736 if re.search(pattern, command):
737 return ValidationResult(
738 valid=False,
739 reason=reason,
740 suggestion=(
741 "Use non-interactive alternatives (cat, head, tail for viewing; "
742 "sed for editing)"
743 ),
744 severity="error",
745 )
746
747 return ValidationResult(valid=True)
748
749 def _validate_write(self, arguments: dict) -> ValidationResult:
750 file_path = arguments.get("file_path", "")
751 content = arguments.get("content", "")
752
753 if not file_path or not file_path.strip():
754 return ValidationResult(
755 valid=False,
756 reason="Empty file path",
757 suggestion="Provide a valid file path",
758 severity="error",
759 )
760
761 path_result = self._validate_path(file_path)
762 if not path_result.valid:
763 return path_result
764
765 sibling_result = self._validate_numbered_sibling_conflict(str(file_path))
766 if not sibling_result.valid:
767 return sibling_result
768
769 html_declared_file_result = self._validate_html_declared_file_creation(
770 str(file_path),
771 )
772 if not html_declared_file_result.valid:
773 return html_declared_file_result
774
775 if content is None or (isinstance(content, str) and not content.strip()):
776 return ValidationResult(
777 valid=True,
778 reason="Writing empty content to file",
779 severity="warning",
780 )
781
782 html_placeholder_result = self._validate_html_placeholder_content(
783 str(file_path),
784 str(content),
785 )
786 if not html_placeholder_result.valid:
787 return html_placeholder_result
788
789 sensitive_paths = ['/etc/', '/usr/', '/bin/', '/sbin/', '/boot/', '/sys/', '/proc/']
790 for sensitive in sensitive_paths:
791 if file_path.startswith(sensitive):
792 return ValidationResult(
793 valid=False,
794 reason=f"Cannot write to system directory: {sensitive}",
795 suggestion="Write to a user directory instead",
796 severity="block",
797 )
798
799 html_link_scope_result = self._validate_html_write_local_link_scope(
800 str(file_path),
801 str(content),
802 )
803 if not html_link_scope_result.valid:
804 return html_link_scope_result
805
806 if Path(file_path).expanduser().exists():
807 html_index_result = self._validate_html_index_links(
808 str(file_path),
809 str(content),
810 )
811 if not html_index_result.valid:
812 return html_index_result
813
814 html_duplicate_root_links_result = (
815 self._validate_html_root_duplicate_local_links(
816 str(file_path),
817 str(content),
818 )
819 )
820 if not html_duplicate_root_links_result.valid:
821 return html_duplicate_root_links_result
822
823 html_declared_target_result = self._validate_html_declared_target_set(
824 str(file_path),
825 str(content),
826 )
827 if not html_declared_target_result.valid:
828 return html_declared_target_result
829
830 html_asset_result = self._validate_html_local_asset_links(
831 str(file_path),
832 str(content),
833 )
834 if not html_asset_result.valid:
835 return html_asset_result
836
837 html_root_coverage_result = self._validate_html_root_link_coverage(
838 str(file_path),
839 str(content),
840 )
841 if not html_root_coverage_result.valid:
842 return html_root_coverage_result
843
844 return ValidationResult(valid=True)
845
846 def _validate_edit(self, arguments: dict) -> ValidationResult:
847 file_path = arguments.get("file_path", "")
848 old_string = arguments.get("old_string", "")
849 new_string = arguments.get("new_string", "")
850
851 if not file_path or not file_path.strip():
852 return ValidationResult(
853 valid=False,
854 reason="Empty file path",
855 suggestion="Provide a valid file path",
856 severity="error",
857 )
858
859 path_result = self._validate_path(file_path)
860 if not path_result.valid:
861 return path_result
862
863 if old_string is None:
864 return ValidationResult(
865 valid=False,
866 reason="old_string is None",
867 suggestion="Provide the text to replace (can be empty string for prepend)",
868 severity="error",
869 )
870
871 if new_string is None:
872 return ValidationResult(
873 valid=False,
874 reason="new_string is None",
875 suggestion="Provide the replacement text (can be empty string for deletion)",
876 severity="error",
877 )
878
879 if old_string == new_string:
880 return ValidationResult(
881 valid=False,
882 reason="old_string and new_string are identical - no change would occur",
883 suggestion="Provide different old and new strings",
884 severity="error",
885 )
886
887 prospective_content = self._prospective_edit_content(
888 str(file_path),
889 str(old_string),
890 str(new_string),
891 )
892
893 html_placeholder_result = self._validate_html_placeholder_content(
894 str(file_path),
895 prospective_content,
896 )
897 if not html_placeholder_result.valid:
898 return html_placeholder_result
899
900 html_index_result = self._validate_html_index_links(
901 str(file_path),
902 prospective_content,
903 )
904 if not html_index_result.valid:
905 return html_index_result
906
907 html_duplicate_root_links_result = (
908 self._validate_html_root_duplicate_local_links(
909 str(file_path),
910 prospective_content,
911 )
912 )
913 if not html_duplicate_root_links_result.valid:
914 return html_duplicate_root_links_result
915
916 html_declared_target_result = self._validate_html_declared_target_set(
917 str(file_path),
918 prospective_content,
919 )
920 if not html_declared_target_result.valid:
921 return html_declared_target_result
922
923 html_asset_result = self._validate_html_local_asset_links(
924 str(file_path),
925 prospective_content,
926 )
927 if not html_asset_result.valid:
928 return html_asset_result
929
930 return ValidationResult(valid=True)
931
932 def _validate_patch(self, arguments: dict) -> ValidationResult:
933 file_path = arguments.get("file_path", "")
934 hunks = arguments.get("hunks", [])
935 raw_patch = arguments.get("patch") or arguments.get("diff") or arguments.get("patch_text")
936
937 if not file_path or not str(file_path).strip():
938 return ValidationResult(
939 valid=False,
940 reason="Empty file path",
941 suggestion="Provide a valid file path",
942 severity="error",
943 )
944
945 path_result = self._validate_path(str(file_path))
946 if not path_result.valid:
947 return path_result
948
949 sibling_result = self._validate_numbered_sibling_conflict(str(file_path))
950 if not sibling_result.valid:
951 return sibling_result
952
953 html_declared_file_result = self._validate_html_declared_file_creation(
954 str(file_path),
955 )
956 if not html_declared_file_result.valid:
957 return html_declared_file_result
958
959 structured_hunks = coerce_structured_patch_payload(hunks)
960 has_hunks = bool(structured_hunks)
961 has_raw_patch = isinstance(raw_patch, str) and bool(raw_patch.strip())
962 if not has_hunks and not has_raw_patch:
963 return ValidationResult(
964 valid=False,
965 reason="Patch hunks are missing",
966 suggestion="Provide structured patch hunks or a unified diff patch string",
967 severity="error",
968 )
969
970 html_placeholder_result = self._validate_html_placeholder_patch(
971 str(file_path),
972 structured_hunks,
973 raw_patch,
974 )
975 if not html_placeholder_result.valid:
976 return html_placeholder_result
977
978 return ValidationResult(valid=True)
979
980 def _validate_html_placeholder_content(
981 self,
982 file_path: str,
983 content: str,
984 ) -> ValidationResult:
985 normalized = Path(file_path).expanduser()
986 if normalized.suffix.lower() not in {".html", ".htm"}:
987 return ValidationResult(valid=True)
988
989 matched_labels = [
990 label
991 for pattern, label in self.HTML_PLACEHOLDER_PATTERNS
992 if pattern.search(content)
993 ]
994 if not matched_labels:
995 return ValidationResult(valid=True)
996
997 preview = ", ".join(matched_labels[:3])
998 if len(matched_labels) > 3:
999 preview += ", ..."
1000 return ValidationResult(
1001 valid=False,
1002 reason="HTML content contains placeholder or stub text",
1003 suggestion=(
1004 "Replace placeholder phrases with concrete user-facing content before "
1005 f"writing the HTML artifact. Placeholder phrase(s): {preview}. Include "
1006 "specific explanations, examples, commands, or structured prose instead."
1007 ),
1008 severity="error",
1009 )
1010
1011 def _validate_html_placeholder_patch(
1012 self,
1013 file_path: str,
1014 hunks: object,
1015 raw_patch: object,
1016 ) -> ValidationResult:
1017 normalized = Path(file_path).expanduser()
1018 if normalized.suffix.lower() not in {".html", ".htm"}:
1019 return ValidationResult(valid=True)
1020
1021 added_fragments: list[str] = []
1022 if isinstance(raw_patch, str):
1023 for line in raw_patch.splitlines():
1024 if line.startswith("+") and not line.startswith("+++"):
1025 added_fragments.append(line[1:])
1026
1027 if isinstance(hunks, list):
1028 for hunk in hunks:
1029 if not isinstance(hunk, dict):
1030 continue
1031 new_lines = hunk.get("new_lines")
1032 if isinstance(new_lines, list):
1033 added_fragments.extend(str(line) for line in new_lines)
1034 lines = hunk.get("lines")
1035 if isinstance(lines, list):
1036 for line in lines:
1037 text = str(line)
1038 if text.startswith("+") and not text.startswith("+++"):
1039 added_fragments.append(text[1:])
1040
1041 if not added_fragments:
1042 return ValidationResult(valid=True)
1043 return self._validate_html_placeholder_content(
1044 str(file_path),
1045 "\n".join(added_fragments),
1046 )
1047
1048 def _validate_html_write_local_link_scope(
1049 self,
1050 file_path: str,
1051 content: str,
1052 ) -> ValidationResult:
1053 normalized = Path(file_path).expanduser()
1054 if normalized.suffix.lower() not in {".html", ".htm"}:
1055 return ValidationResult(valid=True)
1056
1057 root = (
1058 normalized.parent
1059 if normalized.name.lower() in {"index.html", "index.htm"}
1060 else self._resolve_html_artifact_root(normalized)
1061 )
1062 outside_missing: list[str] = []
1063 for href, resolved in self._collect_local_html_targets(normalized, content):
1064 if resolved.exists():
1065 continue
1066 if self._relative_html_target(root, resolved) is not None:
1067 continue
1068 if href not in outside_missing:
1069 outside_missing.append(href)
1070
1071 if not outside_missing:
1072 return ValidationResult(valid=True)
1073
1074 preview = ", ".join(outside_missing[:3])
1075 if len(outside_missing) > 3:
1076 preview += ", ..."
1077 return ValidationResult(
1078 valid=False,
1079 reason="HTML page links outside the current artifact root",
1080 suggestion=(
1081 "Keep local HTML href values inside the generated artifact root. "
1082 f"Missing out-of-scope href(s): {preview}. Remove the parent/outside "
1083 "link or replace it with an existing in-scope local target."
1084 ),
1085 severity="error",
1086 )
1087
1088 def _validate_html_local_asset_links(
1089 self,
1090 file_path: str,
1091 content: str,
1092 ) -> ValidationResult:
1093 normalized = Path(file_path).expanduser()
1094 if normalized.suffix.lower() not in {".html", ".htm"}:
1095 return ValidationResult(valid=True)
1096
1097 missing: list[str] = []
1098 for href, resolved in self._collect_local_href_targets(normalized, content):
1099 target = self._strip_local_href_target(href)
1100 if target is None:
1101 continue
1102 if self._is_local_html_link_target(target):
1103 continue
1104 if not Path(target).suffix:
1105 continue
1106 if resolved.exists():
1107 continue
1108 if href not in missing:
1109 missing.append(href)
1110
1111 if not missing:
1112 return ValidationResult(valid=True)
1113
1114 preview = ", ".join(missing[:3])
1115 if len(missing) > 3:
1116 preview += ", ..."
1117 return ValidationResult(
1118 valid=False,
1119 reason="HTML local asset references do not exist",
1120 suggestion=(
1121 "Use only existing local assets for non-HTML href values. "
1122 f"Missing local asset href(s): {preview}. Remove the asset link, "
1123 "create the referenced asset first, inline the styling/content, or point "
1124 "the href at an existing local file."
1125 ),
1126 severity="error",
1127 )
1128
1129 def _validate_numbered_sibling_conflict(self, file_path: str) -> ValidationResult:
1130 path = Path(file_path).expanduser()
1131 if path.exists() or not path.suffix or not path.parent.exists():
1132 return ValidationResult(valid=True)
1133
1134 prefix_match = re.match(r"^(\d+)[-_]", path.name)
1135 if prefix_match is None:
1136 return ValidationResult(valid=True)
1137
1138 prefix = prefix_match.group(1)
1139 siblings = sorted(
1140 candidate
1141 for candidate in path.parent.iterdir()
1142 if (
1143 candidate.is_file()
1144 and candidate.suffix == path.suffix
1145 and candidate.name != path.name
1146 and re.match(rf"^{re.escape(prefix)}[-_]", candidate.name)
1147 )
1148 )
1149 if not siblings:
1150 return ValidationResult(valid=True)
1151
1152 preview = ", ".join(candidate.name for candidate in siblings[:3])
1153 if len(siblings) > 3:
1154 preview += ", ..."
1155 return ValidationResult(
1156 valid=False,
1157 reason="New file conflicts with an existing numbered sibling",
1158 suggestion=(
1159 f"Reuse the confirmed numbered file in `{path.parent}` instead of "
1160 f"creating an alternate filename for step {prefix}, for example: {preview}"
1161 ),
1162 severity="error",
1163 )
1164
1165 def _validate_html_declared_file_creation(
1166 self,
1167 file_path: str,
1168 ) -> ValidationResult:
1169 normalized = Path(file_path).expanduser()
1170 if normalized.exists():
1171 return ValidationResult(valid=True)
1172 if normalized.suffix.lower() not in {".html", ".htm"}:
1173 return ValidationResult(valid=True)
1174 if normalized.name.lower() == "index.html":
1175 return ValidationResult(valid=True)
1176
1177 root = self._resolve_html_artifact_root(normalized)
1178 current_relative = self._relative_html_target(root, normalized)
1179 if current_relative is None:
1180 return ValidationResult(valid=True)
1181
1182 declared_targets, authoritative_root_graph = self._collect_declared_html_targets(
1183 root,
1184 normalized,
1185 )
1186 if not declared_targets and not authoritative_root_graph:
1187 return ValidationResult(valid=True)
1188 if current_relative in declared_targets:
1189 return ValidationResult(valid=True)
1190
1191 declared_suggestions = self._suggest_declared_html_targets(
1192 declared_targets,
1193 [current_relative],
1194 )
1195 declared_preview = ", ".join(sorted(declared_targets)[:3])
1196 if authoritative_root_graph:
1197 if declared_suggestions:
1198 suggestion = (
1199 "Keep new non-root HTML files within the root-declared artifact set. "
1200 f"Do not create undeclared sibling page `{current_relative}`; "
1201 "use the closest declared local target instead"
1202 )
1203 else:
1204 root_index = (root / "index.html").resolve(strict=False)
1205 suggestion = (
1206 "Keep new non-root HTML files within the root-declared artifact set and "
1207 f"update the guide root `{root_index}` before creating undeclared sibling pages, "
1208 f"for example: {current_relative}"
1209 )
1210 else:
1211 suggestion = (
1212 "Keep new non-root HTML files within the current declared artifact set and "
1213 f"avoid creating undeclared sibling pages, for example: {current_relative}"
1214 )
1215 if declared_preview:
1216 suggestion += f". Already-declared local targets include: {declared_preview}"
1217 if declared_suggestions:
1218 suggestion += (
1219 ". Closest declared local targets include: "
1220 + ", ".join(declared_suggestions[:3])
1221 )
1222 return ValidationResult(
1223 valid=False,
1224 reason="HTML file creation falls outside the current declared artifact set",
1225 suggestion=suggestion,
1226 severity="error",
1227 )
1228
1229 def _validate_read(self, arguments: dict) -> ValidationResult:
1230 file_path = arguments.get("file_path", "")
1231
1232 if not file_path or not file_path.strip():
1233 return ValidationResult(
1234 valid=False,
1235 reason="Empty file path",
1236 suggestion="Provide a valid file path",
1237 severity="error",
1238 )
1239
1240 path_result = self._validate_path(file_path)
1241 if not path_result.valid:
1242 return path_result
1243
1244 sibling_result = self._validate_numbered_sibling_conflict(str(file_path))
1245 if not sibling_result.valid:
1246 return ValidationResult(
1247 valid=False,
1248 reason="Read target conflicts with an existing numbered sibling",
1249 suggestion=sibling_result.suggestion,
1250 severity="error",
1251 )
1252 return path_result
1253
1254 def _validate_search(self, tool_name: str, arguments: dict) -> ValidationResult:
1255 pattern = arguments.get("pattern", "")
1256
1257 if not pattern or not pattern.strip():
1258 return ValidationResult(
1259 valid=False,
1260 reason=f"Empty {tool_name} pattern",
1261 suggestion="Provide a valid search pattern",
1262 severity="error",
1263 )
1264
1265 return ValidationResult(valid=True)
1266
1267 def _validate_html_index_links(
1268 self,
1269 file_path: str,
1270 content: str,
1271 ) -> ValidationResult:
1272 normalized = Path(file_path).expanduser()
1273 if normalized.suffix.lower() != ".html" or "<a " not in content:
1274 return ValidationResult(valid=True)
1275
1276 link_pairs = re.findall(r'<a\s+href="([^"]+)">([^<]+)</a>', content)
1277 if not link_pairs:
1278 return ValidationResult(valid=True)
1279
1280 root = normalized.parent
1281 missing: list[str] = []
1282 existing_local_targets: list[str] = []
1283 for href, _label in link_pairs:
1284 target_text = href.strip()
1285 if not target_text or target_text.startswith(("#", "mailto:", "tel:", "javascript:")):
1286 continue
1287 if "://" in target_text:
1288 continue
1289 target = (root / href).resolve(strict=False)
1290 if not target.exists():
1291 if href not in missing:
1292 missing.append(href)
1293 elif href not in existing_local_targets:
1294 existing_local_targets.append(href)
1295
1296 if missing:
1297 if self._allows_root_html_graph_seed(str(file_path), str(content), missing):
1298 return ValidationResult(valid=True)
1299 broken_preview = ", ".join(missing[:3])
1300 if len(missing) > 3:
1301 broken_preview += ", ..."
1302 suggestion = (
1303 "Use only existing local targets for href values and avoid introducing missing links. "
1304 f"Broken href(s): {broken_preview}. "
1305 )
1306 if existing_local_targets:
1307 example_targets = ", ".join(existing_local_targets[:3])
1308 if len(existing_local_targets) > 3:
1309 example_targets += ", ..."
1310 suggestion += (
1311 "Replace them with an existing local target such as "
1312 f"{example_targets}, or remove the broken link entirely."
1313 )
1314 else:
1315 suggestion += "Replace them with an existing local target or remove the broken link."
1316 return ValidationResult(
1317 valid=False,
1318 reason="Edited HTML links point to files that do not exist",
1319 suggestion=suggestion,
1320 severity="error",
1321 )
1322
1323 return ValidationResult(valid=True)
1324
1325 def _validate_html_root_duplicate_local_links(
1326 self,
1327 file_path: str,
1328 content: str,
1329 ) -> ValidationResult:
1330 normalized = Path(file_path).expanduser()
1331 if normalized.suffix.lower() != ".html" or normalized.name.lower() != "index.html":
1332 return ValidationResult(valid=True)
1333
1334 root = self._resolve_html_artifact_root(normalized)
1335 labels_by_target: dict[str, list[str]] = {}
1336 for _href, resolved, label in self._collect_local_html_link_labels(
1337 normalized,
1338 content,
1339 ):
1340 relative_target = self._relative_html_target(root, resolved)
1341 if relative_target is None:
1342 continue
1343 labels = labels_by_target.setdefault(relative_target, [])
1344 normalized_label = " ".join(label.split())
1345 if normalized_label and normalized_label not in labels:
1346 labels.append(normalized_label)
1347
1348 conflicting: list[str] = []
1349 for target, labels in labels_by_target.items():
1350 if len(labels) < 3:
1351 continue
1352 conflicting.append(f"{target} ({', '.join(labels[:3])})")
1353
1354 if not conflicting:
1355 return ValidationResult(valid=True)
1356
1357 preview = "; ".join(conflicting[:2])
1358 if len(conflicting) > 2:
1359 preview += "; ..."
1360 return ValidationResult(
1361 valid=False,
1362 reason="HTML root page repeats one local page as multiple distinct links",
1363 suggestion=(
1364 "Do not inflate a root index or table of contents by pointing many "
1365 "different entries at the same local page. Expand substantive body "
1366 "content in the target files, create any new pages before linking them, "
1367 f"or keep one accurate entry per local page. Repeated target(s): {preview}"
1368 ),
1369 severity="error",
1370 )
1371
1372 def _prospective_edit_content(
1373 self,
1374 file_path: str,
1375 old_string: str,
1376 new_string: str,
1377 ) -> str:
1378 if old_string == "":
1379 return new_string
1380
1381 normalized = Path(file_path).expanduser()
1382 try:
1383 current = normalized.read_text()
1384 except OSError:
1385 return new_string
1386
1387 if old_string not in current:
1388 return new_string
1389 return current.replace(old_string, new_string, 1)
1390
1391 def _allows_root_html_graph_seed(
1392 self,
1393 file_path: str,
1394 content: str,
1395 missing: list[str],
1396 ) -> bool:
1397 normalized = Path(file_path).expanduser()
1398 if normalized.suffix.lower() not in {".html", ".htm"}:
1399 return False
1400 if normalized.name.lower() != "index.html":
1401 return False
1402
1403 root = self._resolve_html_artifact_root(normalized)
1404 missing_after = self._collect_missing_local_html_targets(normalized, content)
1405 if not missing_after:
1406 return False
1407 existing_missing = self._collect_existing_missing_local_html_targets(normalized)
1408 if len(missing_after) > len(existing_missing):
1409 declared_targets, authoritative_root_graph = self._collect_declared_html_targets(
1410 root,
1411 normalized,
1412 )
1413 if not authoritative_root_graph:
1414 return False
1415 newly_missing = [
1416 href
1417 for href in missing_after
1418 if href not in existing_missing
1419 ]
1420 if not newly_missing:
1421 return False
1422 if any(
1423 not self._is_next_ordered_html_target(root, href, declared_targets)
1424 for href in newly_missing
1425 ):
1426 return False
1427
1428 for href in missing:
1429 resolved = (normalized.parent / href).resolve(strict=False)
1430 relative = self._relative_html_target(root, resolved)
1431 if relative is None:
1432 return False
1433 return True
1434
1435 def _is_next_ordered_html_target(
1436 self,
1437 root: Path,
1438 href: str,
1439 declared_targets: set[str],
1440 ) -> bool:
1441 relative_href = self._relative_html_target(root, (root / href).resolve(strict=False))
1442 if relative_href is None:
1443 return False
1444
1445 expected_number = _ordered_html_target_number(relative_href)
1446 if expected_number is None:
1447 return False
1448
1449 parent = Path(relative_href).parent
1450 sibling_numbers = sorted(
1451 number
1452 for target in declared_targets
1453 if Path(target).parent == parent
1454 if (number := _ordered_html_target_number(target)) is not None
1455 )
1456 if not sibling_numbers:
1457 return False
1458
1459 min_number = sibling_numbers[0]
1460 max_number = sibling_numbers[-1]
1461 if expected_number != max_number + 1:
1462 return False
1463
1464 return sibling_numbers == list(range(min_number, max_number + 1))
1465
1466 def _collect_existing_missing_local_html_targets(self, file_path: Path) -> list[str]:
1467 try:
1468 current = file_path.read_text()
1469 except OSError:
1470 return []
1471 return self._collect_missing_local_html_targets(file_path, current)
1472
1473 def _collect_missing_local_html_targets(
1474 self,
1475 file_path: Path,
1476 content: str,
1477 ) -> list[str]:
1478 missing: list[str] = []
1479 for href, resolved in self._collect_local_html_targets(file_path, content):
1480 if resolved.exists():
1481 continue
1482 if href not in missing:
1483 missing.append(href)
1484 return missing
1485
1486 def _validate_html_declared_target_set(
1487 self,
1488 file_path: str,
1489 content: str,
1490 ) -> ValidationResult:
1491 normalized = Path(file_path).expanduser()
1492 if normalized.suffix.lower() != ".html" or normalized.name.lower() == "index.html":
1493 return ValidationResult(valid=True)
1494
1495 local_targets = self._collect_local_html_targets(normalized, content)
1496 if not local_targets:
1497 return ValidationResult(valid=True)
1498
1499 root = self._resolve_html_artifact_root(normalized)
1500 current_relative = self._relative_html_target(root, normalized)
1501 declared_targets, authoritative_root_graph = self._collect_declared_html_targets(root, normalized)
1502 if not declared_targets and not authoritative_root_graph:
1503 return ValidationResult(valid=True)
1504
1505 undeclared_targets: list[str] = []
1506 for href, resolved in local_targets:
1507 relative_target = self._relative_html_target(root, resolved)
1508 if relative_target is None:
1509 continue
1510 if relative_target == "index.html" or relative_target == current_relative:
1511 continue
1512 if relative_target in declared_targets:
1513 continue
1514 if not authoritative_root_graph and resolved.exists():
1515 continue
1516 if href not in undeclared_targets:
1517 undeclared_targets.append(href)
1518
1519 if not undeclared_targets:
1520 return ValidationResult(valid=True)
1521
1522 preview = ", ".join(undeclared_targets[:3])
1523 if len(undeclared_targets) > 3:
1524 preview += ", ..."
1525 declared_preview = ", ".join(sorted(declared_targets)[:3])
1526 if authoritative_root_graph:
1527 suggestion = (
1528 "Keep non-root HTML pages within the root-declared local-link set and "
1529 "avoid introducing new sibling targets that the guide root does not declare; "
1530 f"remove or replace undeclared hrefs like: {preview}"
1531 )
1532 else:
1533 suggestion = (
1534 "Keep non-root HTML pages within the current declared local-link set and "
1535 f"avoid introducing new missing sibling targets; remove or replace undeclared hrefs like: {preview}"
1536 )
1537 if declared_preview:
1538 suggestion += f". Already-declared local targets include: {declared_preview}"
1539 allowed_hrefs = self._declared_html_hrefs_for_file(
1540 root,
1541 normalized,
1542 declared_targets,
1543 )
1544 if allowed_hrefs:
1545 allowed_preview = ", ".join(allowed_hrefs[:6])
1546 if len(allowed_hrefs) > 6:
1547 allowed_preview += ", ..."
1548 suggestion += f". Allowed hrefs from this file include: {allowed_preview}"
1549 declared_suggestions = self._suggest_declared_html_targets(
1550 declared_targets,
1551 undeclared_targets,
1552 )
1553 if declared_suggestions:
1554 suggestion += (
1555 ". Closest declared local targets include: "
1556 + ", ".join(declared_suggestions[:3])
1557 )
1558 return ValidationResult(
1559 valid=False,
1560 reason="HTML page introduces new local targets outside the current declared artifact set",
1561 suggestion=suggestion,
1562 severity="error",
1563 )
1564
1565 def _collect_local_html_targets(
1566 self,
1567 file_path: Path,
1568 content: str,
1569 ) -> list[tuple[str, Path]]:
1570 targets: list[tuple[str, Path]] = []
1571 seen: set[str] = set()
1572 for href, resolved in self._collect_local_href_targets(file_path, content):
1573 target_text = self._strip_local_href_target(href)
1574 if target_text is None or not self._is_local_html_link_target(target_text):
1575 continue
1576 key = f"{target_text}::{resolved}"
1577 if key in seen:
1578 continue
1579 seen.add(key)
1580 targets.append((href, resolved))
1581 return targets
1582
1583 def _collect_local_html_link_labels(
1584 self,
1585 file_path: Path,
1586 content: str,
1587 ) -> list[tuple[str, Path, str]]:
1588 pattern = re.compile(
1589 r"<a\b[^>]*href\s*=\s*[\"']([^\"']+)[\"'][^>]*>(.*?)</a>",
1590 re.IGNORECASE | re.DOTALL,
1591 )
1592 targets: list[tuple[str, Path, str]] = []
1593 for href, raw_label in pattern.findall(content):
1594 target_text = self._strip_local_href_target(href)
1595 if target_text is None or not self._is_local_html_link_target(target_text):
1596 continue
1597 resolved = (file_path.parent / target_text).resolve(strict=False)
1598 label = re.sub(r"<[^>]+>", " ", raw_label)
1599 label = re.sub(r"\s+", " ", label).strip()
1600 targets.append((href, resolved, label))
1601 return targets
1602
1603 def _collect_local_href_targets(
1604 self,
1605 file_path: Path,
1606 content: str,
1607 ) -> list[tuple[str, Path]]:
1608 pattern = re.compile(r'href\s*=\s*["\']([^"\']+)["\']', re.IGNORECASE)
1609 targets: list[tuple[str, Path]] = []
1610 seen: set[str] = set()
1611 for href in pattern.findall(content):
1612 target_text = self._strip_local_href_target(href)
1613 if target_text is None:
1614 continue
1615 resolved = (file_path.parent / target_text).resolve(strict=False)
1616 key = f"{href}::{resolved}"
1617 if key in seen:
1618 continue
1619 seen.add(key)
1620 targets.append((href, resolved))
1621 return targets
1622
1623 def _collect_declared_html_targets(
1624 self,
1625 root: Path,
1626 current_file: Path,
1627 ) -> tuple[set[str], bool]:
1628 root_index = root / "index.html"
1629 if root_index.exists():
1630 try:
1631 root_text = root_index.read_text()
1632 except OSError:
1633 root_text = ""
1634 declared_from_root = {
1635 relative_target
1636 for _href, resolved in self._collect_local_html_targets(root_index, root_text)
1637 if (relative_target := self._relative_html_target(root, resolved)) is not None
1638 }
1639 if declared_from_root:
1640 return declared_from_root, True
1641
1642 html_files = [
1643 path
1644 for path in root.rglob("*.html")
1645 if path.is_file() and path != current_file
1646 ]
1647 declared: set[str] = set()
1648 for html_file in html_files:
1649 try:
1650 text = html_file.read_text()
1651 except OSError:
1652 continue
1653 for _href, resolved in self._collect_local_html_targets(html_file, text):
1654 relative_target = self._relative_html_target(root, resolved)
1655 if relative_target is not None:
1656 declared.add(relative_target)
1657 return declared, False
1658
1659 def _resolve_html_artifact_root(self, file_path: Path) -> Path:
1660 for candidate in [file_path.parent, *file_path.parents]:
1661 if (candidate / "index.html").exists():
1662 return candidate
1663 return file_path.parent
1664
1665 def _relative_html_target(self, root: Path, target: Path) -> str | None:
1666 try:
1667 normalized_root = root.resolve(strict=False)
1668 except OSError:
1669 normalized_root = root.expanduser()
1670 try:
1671 normalized_target = target.resolve(strict=False)
1672 except OSError:
1673 normalized_target = target.expanduser()
1674 try:
1675 return str(normalized_target.relative_to(normalized_root))
1676 except ValueError:
1677 return None
1678
1679 @staticmethod
1680 def _is_local_html_link_target(href: str) -> bool:
1681 normalized = PreActionValidator._strip_local_href_target(href)
1682 return bool(normalized and normalized.lower().endswith((".html", ".htm")))
1683
1684 @staticmethod
1685 def _strip_local_href_target(href: str) -> str | None:
1686 target = href.strip()
1687 if not target:
1688 return None
1689 if target.startswith(("#", "mailto:", "tel:", "javascript:")):
1690 return None
1691 if "://" in target:
1692 return None
1693 normalized = target.split("#", 1)[0].split("?", 1)[0].strip()
1694 return normalized or None
1695
1696 def _suggest_existing_html_targets(self, root: Path, missing: list[str]) -> list[str]:
1697 available_by_directory: dict[Path, list[str]] = {}
1698 suggestions: list[str] = []
1699
1700 for href in missing:
1701 href_path = Path(href)
1702 directory = (root / href_path).parent
1703 if directory not in available_by_directory:
1704 available_by_directory[directory] = sorted(
1705 str(path.relative_to(root))
1706 for path in directory.glob("*.html")
1707 if path.is_file()
1708 )
1709
1710 available = available_by_directory[directory]
1711 if not available:
1712 continue
1713
1714 missing_name = href_path.name
1715 chapter_match = re.match(r"(\d+)-", missing_name)
1716 preferred = available
1717 if chapter_match is not None:
1718 prefix = f"{chapter_match.group(1)}-"
1719 same_prefix = [
1720 candidate
1721 for candidate in available
1722 if Path(candidate).name.startswith(prefix)
1723 ]
1724 if same_prefix:
1725 preferred = same_prefix
1726
1727 matched_names = get_close_matches(
1728 missing_name,
1729 [Path(candidate).name for candidate in preferred],
1730 n=1,
1731 cutoff=0.0,
1732 )
1733 if matched_names:
1734 matched_name = matched_names[0]
1735 candidate = next(
1736 (
1737 candidate
1738 for candidate in preferred
1739 if Path(candidate).name == matched_name
1740 ),
1741 None,
1742 )
1743 if candidate is not None and candidate not in suggestions:
1744 suggestions.append(candidate)
1745
1746 return suggestions
1747
1748 def _suggest_declared_html_targets(
1749 self,
1750 declared_targets: set[str],
1751 undeclared_targets: list[str],
1752 ) -> list[str]:
1753 suggestions: list[str] = []
1754 available = sorted(declared_targets)
1755 available_names = [Path(candidate).name for candidate in available]
1756
1757 for href in undeclared_targets:
1758 href_name = Path(href).name
1759 chapter_match = re.match(r"(\d+)[-_]", href_name)
1760 preferred = available
1761 preferred_names = available_names
1762 same_prefix_match = False
1763 if chapter_match is not None:
1764 prefix = f"{chapter_match.group(1)}-"
1765 filtered = [
1766 candidate
1767 for candidate in available
1768 if Path(candidate).name.startswith(prefix)
1769 ]
1770 if filtered:
1771 preferred = filtered
1772 preferred_names = [Path(candidate).name for candidate in filtered]
1773 same_prefix_match = True
1774
1775 matched_names = get_close_matches(
1776 href_name,
1777 preferred_names,
1778 n=1,
1779 cutoff=0.0,
1780 )
1781 if not matched_names:
1782 continue
1783
1784 candidate = next(
1785 (
1786 declared
1787 for declared in preferred
1788 if Path(declared).name == matched_names[0]
1789 ),
1790 None,
1791 )
1792 if candidate is not None and not same_prefix_match:
1793 href_tokens = _html_target_tokens(href)
1794 candidate_tokens = _html_target_tokens(candidate)
1795 if not href_tokens.intersection(candidate_tokens):
1796 continue
1797 if candidate is not None and candidate not in suggestions:
1798 suggestions.append(candidate)
1799
1800 return suggestions
1801
1802 def _declared_html_hrefs_for_file(
1803 self,
1804 root: Path,
1805 file_path: Path,
1806 declared_targets: set[str],
1807 ) -> list[str]:
1808 try:
1809 source_directory = file_path.parent.resolve(strict=False)
1810 root_index = (root / "index.html").resolve(strict=False)
1811 except OSError:
1812 source_directory = file_path.parent.expanduser()
1813 root_index = (root / "index.html").expanduser()
1814
1815 hrefs: list[str] = []
1816 if file_path.name.lower() != "index.html":
1817 hrefs.append(os.path.relpath(root_index, source_directory).replace(os.sep, "/"))
1818
1819 for target in sorted(declared_targets):
1820 target_path = (root / target).resolve(strict=False)
1821 href = os.path.relpath(target_path, source_directory).replace(os.sep, "/")
1822 if href == "." or href in hrefs:
1823 continue
1824 hrefs.append(href)
1825 return hrefs
1826
1827 def _validate_path(self, file_path: str) -> ValidationResult:
1828 if '\x00' in file_path:
1829 return ValidationResult(
1830 valid=False,
1831 reason="Path contains null byte",
1832 suggestion="Remove null bytes from path",
1833 severity="block",
1834 )
1835
1836 if '/../../../' in file_path or file_path.count('..') > 5:
1837 return ValidationResult(
1838 valid=False,
1839 reason="Excessive path traversal",
1840 suggestion="Use a direct path instead",
1841 severity="warning",
1842 )
1843
1844 return ValidationResult(valid=True)
1845
1846 def _validate_html_root_link_coverage(
1847 self,
1848 file_path: str,
1849 content: str,
1850 ) -> ValidationResult:
1851 normalized = Path(file_path).expanduser()
1852 if normalized.suffix.lower() != ".html" or normalized.name.lower() != "index.html":
1853 return ValidationResult(valid=True)
1854 if not normalized.exists():
1855 return ValidationResult(valid=True)
1856
1857 root = self._resolve_html_artifact_root(normalized)
1858 try:
1859 existing_text = normalized.read_text()
1860 except OSError:
1861 return ValidationResult(valid=True)
1862
1863 existing_targets = {
1864 relative_target
1865 for _href, resolved in self._collect_local_html_targets(normalized, existing_text)
1866 if (relative_target := self._relative_html_target(root, resolved)) is not None
1867 and resolved.exists()
1868 }
1869 if not existing_targets:
1870 return ValidationResult(valid=True)
1871
1872 new_targets = {
1873 relative_target
1874 for _href, resolved in self._collect_local_html_targets(normalized, content)
1875 if (relative_target := self._relative_html_target(root, resolved)) is not None
1876 }
1877 dropped_targets = sorted(existing_targets - new_targets)
1878 if not dropped_targets:
1879 return ValidationResult(valid=True)
1880
1881 preview = ", ".join(dropped_targets[:3])
1882 if len(dropped_targets) > 3:
1883 preview += ", ..."
1884 return ValidationResult(
1885 valid=False,
1886 reason="Edited HTML root page drops links to existing local pages",
1887 suggestion=(
1888 "Keep the existing local page set linked from the root HTML page "
1889 f"unless you are intentionally removing those files, for example restore: {preview}"
1890 ),
1891 severity="error",
1892 )