Python · 9219 bytes Raw Blame History
1 """Shared helpers for extracting and enforcing active repair focus."""
2
3 from __future__ import annotations
4
5 import re
6 from dataclasses import dataclass
7 from os import sep
8 from pathlib import Path
9
10 from ..llm.base import Message
11
12 _STALE_REPAIR_MUTATION_MARKERS = (
13 "old_string not found",
14 "old_string was stale",
15 "do not retry the same remembered text",
16 "patch hunks are missing",
17 "provide structured patch hunks",
18 "hunks must not be empty",
19 "structured patch context mismatch",
20 "structured patch hunk consumed",
21 "structured patch references lines past the end",
22 "structured patch hunks overlap",
23 "failed to complete the operation after",
24 )
25 _HTML_REPAIR_ISSUE_MARKERS = (
26 "thin content",
27 "insufficient structured content",
28 "content-quality",
29 "content quality",
30 "quality target",
31 "html guide content quality",
32 "expected exactly one closing </body>",
33 "expected exactly one closing </html>",
34 "content appears after closing </html>",
35 "closing </body> appears after closing </html>",
36 "missing <h1>",
37 )
38 _HTML_STRUCTURAL_REPAIR_MARKERS = (
39 "expected exactly one closing </body>",
40 "expected exactly one closing </html>",
41 "content appears after closing </html>",
42 "closing </body> appears after closing </html>",
43 )
44 _HTML_CLOSE_RE = re.compile(r"</html\s*>", re.IGNORECASE)
45 _BODY_CLOSE_RE = re.compile(r"</body\s*>", re.IGNORECASE)
46
47
48 @dataclass(frozen=True)
49 class ActiveRepairContext:
50 """Concrete repair focus extracted from recent verification feedback."""
51
52 artifact_path: str
53 repair_lines: list[str]
54 allowed_paths: tuple[str, ...]
55 allowed_roots: tuple[str, ...]
56
57
58 def extract_active_repair_context(
59 messages: list[Message],
60 ) -> ActiveRepairContext | None:
61 """Return the most recent concrete repair target from session history."""
62
63 for message in reversed(messages):
64 content = str(getattr(message, "content", "") or "")
65 if "Repair focus:" not in content:
66 continue
67
68 repair_lines: list[str] = []
69 artifact_path = ""
70 absolute_paths: list[str] = []
71 capture = False
72 for raw_line in content.splitlines():
73 line = raw_line.strip()
74 if not capture:
75 if line == "Repair focus:":
76 capture = True
77 continue
78 if not line:
79 if repair_lines:
80 break
81 continue
82 if not line.startswith("- "):
83 if repair_lines:
84 break
85 continue
86
87 repair_lines.append(line)
88 if not artifact_path:
89 match = re.search(
90 r"Immediate next step: (?:edit|write|patch|create|update|replace) `([^`]+)`",
91 line,
92 )
93 if match:
94 artifact_path = normalize_repair_path(match.group(1))
95
96 for candidate in re.findall(r"`([^`]+)`", line):
97 if not candidate.startswith(("/", "~")):
98 continue
99 normalized = normalize_repair_path(candidate)
100 if normalized not in absolute_paths:
101 absolute_paths.append(normalized)
102
103 if repair_lines:
104 if artifact_path:
105 if artifact_path not in absolute_paths:
106 absolute_paths.insert(0, artifact_path)
107 allowed_paths = _ordered_allowed_paths(
108 absolute_paths,
109 primary_path=artifact_path,
110 )
111 allowed_roots = _collapse_roots(_path_roots(set(absolute_paths)))
112 return ActiveRepairContext(
113 artifact_path=artifact_path,
114 repair_lines=repair_lines,
115 allowed_paths=allowed_paths,
116 allowed_roots=allowed_roots,
117 )
118 return None
119
120
121 def path_within_allowed_roots(path: str, allowed_roots: tuple[str, ...]) -> bool:
122 """Return whether the normalized path stays within the repair artifact set."""
123
124 normalized = normalize_repair_path(path)
125 normalized_roots = tuple(
126 normalize_repair_path(root) for root in allowed_roots if str(root).strip()
127 )
128 return any(
129 normalized == root or normalized.startswith(f"{root}{sep}")
130 for root in normalized_roots
131 )
132
133
134 def path_matches_allowed_paths(path: str, allowed_paths: tuple[str, ...]) -> bool:
135 """Return whether the normalized path matches one concrete repair file."""
136
137 normalized = normalize_repair_path(path)
138 normalized_paths = {
139 normalize_repair_path(candidate) for candidate in allowed_paths if str(candidate).strip()
140 }
141 return normalized in normalized_paths
142
143
144 def recent_repair_mutation_context_failed(
145 messages: list[Message],
146 target: str,
147 *,
148 lookback: int = 24,
149 ) -> bool:
150 """Return whether recent repair attempts proved the target context is stale."""
151
152 target_tokens = _target_match_tokens(target)
153 if not target_tokens:
154 return False
155
156 for message in reversed(messages[-lookback:]):
157 content = str(getattr(message, "content", "") or "")
158 if not content:
159 continue
160 lowered = content.lower()
161 if not any(token and token in content for token in target_tokens):
162 continue
163 if any(marker in lowered for marker in _STALE_REPAIR_MUTATION_MARKERS):
164 return True
165 return False
166
167
168 def repair_line_is_html_quality(line: str) -> bool:
169 """Return whether a repair-focus line describes generated HTML quality."""
170
171 lowered = str(line or "").lower()
172 return any(marker in lowered for marker in _HTML_REPAIR_ISSUE_MARKERS)
173
174
175 def repair_line_matches_target(line: str, target: str) -> bool:
176 """Return whether a repair-focus line refers to the target path."""
177
178 line_text = str(line or "")
179 target_text = str(target or "").strip()
180 if not line_text or not target_text:
181 return False
182 normalized_target = normalize_repair_path(target_text)
183 if target_text in line_text or (normalized_target and normalized_target in line_text):
184 return True
185 for candidate in re.findall(r"`([^`]+)`", line_text):
186 if normalize_repair_path(candidate) == normalized_target:
187 return True
188 return False
189
190
191 def html_repair_issue_is_structural(line: str) -> bool:
192 """Return whether an HTML quality issue is about document structure."""
193
194 lowered = str(line or "").lower()
195 return any(marker in lowered for marker in _HTML_STRUCTURAL_REPAIR_MARKERS)
196
197
198 def html_quality_repair_insertion_anchor(raw_path: str) -> str | None:
199 """Return an exact on-disk closing-tail anchor for bounded HTML expansion."""
200
201 normalized = normalize_repair_path(raw_path)
202 if not normalized:
203 return None
204 path = Path(normalized)
205 if not path.is_file():
206 return None
207 try:
208 text = path.read_text()
209 except (OSError, UnicodeDecodeError):
210 return None
211
212 body_matches = list(_BODY_CLOSE_RE.finditer(text))
213 html_matches = list(_HTML_CLOSE_RE.finditer(text))
214 if len(body_matches) != 1 or len(html_matches) != 1:
215 return None
216 body_match = body_matches[0]
217 html_match = html_matches[0]
218 if body_match.start() > html_match.start():
219 return None
220 if text[html_match.end() :].strip():
221 return None
222 anchor = text[body_match.start() :].rstrip()
223 if not anchor.strip():
224 return None
225 return anchor
226
227
228 def normalize_repair_path(raw_path: str) -> str:
229 text = str(raw_path or "").strip()
230 if not text:
231 return ""
232 try:
233 return str(Path(text).expanduser().resolve(strict=False))
234 except (OSError, RuntimeError, ValueError):
235 return str(Path(text).expanduser())
236
237
238 def _target_match_tokens(raw_path: str) -> tuple[str, ...]:
239 text = str(raw_path or "").strip()
240 if not text:
241 return ()
242 tokens: list[str] = [text]
243 normalized = normalize_repair_path(text)
244 if normalized and normalized not in tokens:
245 tokens.append(normalized)
246 try:
247 name = Path(normalized or text).name
248 except (OSError, RuntimeError, ValueError):
249 name = ""
250 if name and name not in tokens:
251 tokens.append(name)
252 return tuple(tokens)
253
254
255 def _path_roots(paths: set[str]) -> set[str]:
256 roots: set[str] = set()
257 for raw_path in paths:
258 path = Path(raw_path)
259 roots.add(str(path.parent))
260 return roots
261
262
263 def _collapse_roots(roots: set[str]) -> tuple[str, ...]:
264 collapsed: list[str] = []
265 for root in sorted(roots, key=lambda item: (len(item), item)):
266 if any(root == candidate or root.startswith(f"{candidate}{sep}") for candidate in collapsed):
267 continue
268 collapsed.append(root)
269 return tuple(collapsed)
270
271
272 def _ordered_allowed_paths(paths: list[str], *, primary_path: str) -> tuple[str, ...]:
273 """Preserve repair-focus order with the immediate target first."""
274
275 ordered: list[str] = []
276
277 def add(path: str) -> None:
278 if not path or path in ordered:
279 return
280 ordered.append(path)
281
282 add(primary_path)
283 for path in paths:
284 add(path)
285 return tuple(ordered)