Python · 12651 bytes Raw Blame History
1 """Semantic artifact invalidation and recovery-strategy selection."""
2
3 from __future__ import annotations
4
5 import re
6 from enum import StrEnum
7 from pathlib import Path
8
9 from .workflow_policy import (
10 ArtifactEvidence,
11 ArtifactEvidenceKind,
12 ArtifactFreshness,
13 )
14
15
16 class WorkflowRecoveryStrategy(StrEnum):
17 """Next workflow move when persisted artifacts drift."""
18
19 NONE = "none"
20 CLARIFY_REENTRY = "clarify_reentry"
21 PLAN_REFRESH = "plan_refresh"
22 FULL_REPLAN = "full_replan"
23
24
25 class ArtifactInvalidationAssessor:
26 """Assess whether workflow artifacts still match the current task state."""
27
28 def assess(
29 self,
30 *,
31 task_statement: str,
32 clarify_text: str | None,
33 implementation_text: str | None,
34 verification_text: str | None,
35 acceptance_criteria: list[str],
36 touched_files: list[str],
37 last_verification_result: str | None,
38 retry_count: int = 0,
39 planned_artifacts_complete: bool = False,
40 ) -> ArtifactFreshness:
41 """Return stale-artifact state and the recommended recovery strategy."""
42
43 if not clarify_text and not implementation_text and not verification_text:
44 return ArtifactFreshness()
45
46 plan_text = f"{implementation_text or ''}\n{verification_text or ''}".lower()
47 brief_text = (clarify_text or "").lower()
48 reasons: list[str] = []
49 reason_codes: list[str] = []
50 evidence: list[ArtifactEvidence] = []
51
52 allow_repair_local_touchpoints = planned_artifacts_complete and retry_count > 0
53 unexpected_paths = [
54 name
55 for path in touched_files
56 if (name := _path_name(path))
57 and not _text_covers_path_reference(plan_text, path)
58 ]
59 confirmed_touchpoints = [
60 name
61 for path in touched_files
62 if (name := _path_name(path))
63 ]
64 confirmed_touchpoint_keys = {
65 _path_reference_identity(path)
66 for path in touched_files
67 if _path_reference_identity(path)
68 }
69 inferred_touchpoints = [
70 item
71 for item in _extract_path_mentions(
72 clarify_text,
73 implementation_text,
74 verification_text,
75 )
76 if _path_reference_identity(item) not in confirmed_touchpoint_keys
77 ]
78 stale_plan = False
79 stale_brief = False
80
81 for item in dict.fromkeys(confirmed_touchpoints):
82 _append_evidence(
83 evidence,
84 ArtifactEvidenceKind.CONFIRMED_TOUCHPOINT,
85 f"`{item}` was already touched during execution.",
86 )
87 for item in dict.fromkeys(inferred_touchpoints):
88 _append_evidence(
89 evidence,
90 ArtifactEvidenceKind.INFERRED_TOUCHPOINT,
91 f"Persisted artifacts still point at `{item}`.",
92 )
93
94 if unexpected_paths and not allow_repair_local_touchpoints:
95 stale_plan = True
96 reason_codes.append("touched_files_outside_plan")
97 reasons.append(
98 "Touched files outside the current plan: "
99 + ", ".join(dict.fromkeys(unexpected_paths))
100 )
101 elif unexpected_paths:
102 for item in dict.fromkeys(unexpected_paths):
103 _append_evidence(
104 evidence,
105 ArtifactEvidenceKind.CONFIRMED_TOUCHPOINT,
106 "Verification repair touched supplemental file "
107 f"`{item}` after the originally planned artifacts were complete.",
108 )
109
110 acceptance_anchors = [
111 item
112 for item in acceptance_criteria
113 if item.strip()
114 and item.strip().lower() != task_statement.strip().lower()
115 and "runtime verification evidence" not in item.strip().lower()
116 ]
117 for item in acceptance_anchors[:2]:
118 _append_evidence(
119 evidence,
120 ArtifactEvidenceKind.ACCEPTANCE_ANCHOR,
121 f"Current acceptance anchor: `{_short_requirement(item)}`.",
122 )
123
124 uncovered_criteria = [
125 item
126 for item in acceptance_anchors
127 if not _text_covers_requirement(plan_text, item)
128 ]
129 if uncovered_criteria:
130 stale_plan = True
131 reason_codes.append("acceptance_criteria_outside_plan")
132 reasons.append(
133 "Acceptance criteria are missing from the current plan: "
134 + "; ".join(uncovered_criteria[:2])
135 )
136 for item in uncovered_criteria[:2]:
137 _append_evidence(
138 evidence,
139 ArtifactEvidenceKind.ACCEPTANCE_ANCHOR,
140 f"Plan coverage is missing acceptance anchor `{_short_requirement(item)}`.",
141 )
142
143 if brief_text:
144 brief_gaps = [
145 item
146 for item in acceptance_criteria
147 if item.strip()
148 and item.strip().lower() != task_statement.strip().lower()
149 and "runtime verification evidence" not in item.strip().lower()
150 and not _text_covers_requirement(brief_text, item)
151 ]
152 if brief_gaps and last_verification_result == "failed":
153 stale_brief = True
154 reason_codes.append("brief_missing_acceptance_scope")
155 reasons.append(
156 "The clarify brief no longer captures the active acceptance criteria: "
157 + "; ".join(brief_gaps[:2])
158 )
159 for item in brief_gaps[:2]:
160 _append_evidence(
161 evidence,
162 ArtifactEvidenceKind.VERIFICATION_CONTRADICTION,
163 "Failed verification exposed missing brief coverage for "
164 f"`{_short_requirement(item)}`.",
165 )
166
167 out_of_brief_paths = [
168 name
169 for path in touched_files
170 if (name := _path_name(path))
171 and name in unexpected_paths
172 and not _text_covers_path_reference(brief_text, path)
173 ]
174 if out_of_brief_paths:
175 stale_brief = True
176 reason_codes.append("touchpoints_outside_brief")
177 reasons.append(
178 "The clarify brief no longer matches the touched files: "
179 + ", ".join(dict.fromkeys(out_of_brief_paths))
180 )
181 for item in dict.fromkeys(out_of_brief_paths):
182 _append_evidence(
183 evidence,
184 ArtifactEvidenceKind.CONTRADICTED_ASSUMPTION,
185 f"Clarify scope assumed `{item}` stayed out of scope.",
186 )
187
188 if not _text_covers_requirement(brief_text, task_statement):
189 stale_brief = True
190 reason_codes.append("task_drifted_beyond_brief")
191 reasons.append(
192 "The clarify brief no longer reflects the current task framing."
193 )
194 _append_evidence(
195 evidence,
196 ArtifactEvidenceKind.TASK_BOUNDARY_CHANGE,
197 "The active task framing outgrew the persisted clarify brief.",
198 )
199
200 recovery_strategy = WorkflowRecoveryStrategy.NONE
201 if stale_brief and stale_plan:
202 recovery_strategy = WorkflowRecoveryStrategy.FULL_REPLAN
203 elif stale_brief:
204 recovery_strategy = WorkflowRecoveryStrategy.CLARIFY_REENTRY
205 elif stale_plan:
206 recovery_strategy = WorkflowRecoveryStrategy.PLAN_REFRESH
207
208 return ArtifactFreshness(
209 stale_brief=stale_brief,
210 stale_plan=stale_plan,
211 reasons=list(dict.fromkeys(reasons)),
212 reason_codes=list(dict.fromkeys(reason_codes)),
213 recovery_strategy=recovery_strategy.value,
214 evidence=evidence,
215 )
216
217
218 def _path_name(path: str) -> str:
219 normalized = str(path).strip()
220 if not normalized:
221 return ""
222 return normalized.rsplit("/", maxsplit=1)[-1].strip()
223
224
225 def _path_reference_identity(path: str) -> str:
226 normalized = _path_name(path)
227 if not normalized:
228 return ""
229 return _canonical_path_reference(normalized)
230
231
232 def _text_covers_path_reference(text: str, path: str) -> bool:
233 normalized_text = text.lower()
234 candidates = [candidate for candidate in (str(path).strip(), _path_name(path)) if candidate]
235
236 for candidate in candidates:
237 if candidate.lower() in normalized_text:
238 return True
239
240 canonical_text = _canonical_path_reference(text)
241 if any(
242 canonical_candidate and canonical_candidate in canonical_text
243 for canonical_candidate in (_canonical_path_reference(candidate) for candidate in candidates)
244 ):
245 return True
246
247 directory_mentions = _extract_directory_mentions(text)
248 directory_suffixes = _directory_reference_suffixes(path)
249 return any(
250 mention and mention in directory_suffixes
251 for mention in (_canonical_path_reference(item.rstrip("/")) for item in directory_mentions)
252 )
253
254
255 def _canonical_path_reference(value: str) -> str:
256 normalized = value.lower().strip()
257 normalized = re.sub(r"[^a-z0-9]+", " ", normalized)
258 return " ".join(normalized.split())
259
260
261 def _directory_reference_suffixes(path: str) -> tuple[str, ...]:
262 normalized = str(path).strip()
263 if not normalized:
264 return ()
265
266 candidate = Path(normalized)
267 directory = candidate if not candidate.suffix else candidate.parent
268 parts = [part for part in directory.parts if part not in {"", "/", "~"}]
269 if len(parts) < 2:
270 return ()
271
272 anchors: list[str] = []
273 for width in range(min(4, len(parts)), 1, -1):
274 anchor = _canonical_path_reference("/".join(parts[-width:]))
275 if anchor and anchor not in anchors:
276 anchors.append(anchor)
277 return tuple(anchors)
278
279
280 def _text_covers_requirement(text: str, requirement: str) -> bool:
281 normalized_text = text.lower()
282 normalized_requirement = requirement.lower()
283 if normalized_requirement in normalized_text:
284 return True
285
286 tokens = [
287 token
288 for token in re.findall(r"[a-z0-9_./-]+", normalized_requirement)
289 if len(token) > 2 and token not in _STOP_WORDS
290 ]
291 if not tokens:
292 return normalized_requirement.strip() in normalized_text
293 matches = sum(1 for token in tokens if token in normalized_text)
294 threshold = max(1, min(2, len(tokens)))
295 return matches >= threshold
296
297
298 def _extract_path_mentions(*texts: str | None) -> list[str]:
299 mentions: list[str] = []
300 seen: set[str] = set()
301 for text in texts:
302 if not text:
303 continue
304 for match in re.findall(r"[\w./-]+\.[a-z0-9]+", text):
305 normalized = match.strip("`'\",.:;()[]{}")
306 if not normalized or normalized in seen:
307 continue
308 seen.add(normalized)
309 mentions.append(normalized)
310 return mentions
311
312
313 def _extract_directory_mentions(*texts: str | None) -> list[str]:
314 mentions: list[str] = []
315 seen: set[str] = set()
316 for text in texts:
317 if not text:
318 continue
319 for match in re.finditer(r"(?:~|/)?[\w./-]+/", text):
320 if match.end() < len(text) and re.match(r"[\w.-]", text[match.end()]):
321 continue
322 normalized = match.group(0).strip("`'\",.:;()[]{}")
323 if not normalized or normalized in seen:
324 continue
325 seen.add(normalized)
326 mentions.append(normalized)
327 return mentions
328
329
330 def _short_requirement(requirement: str, *, limit: int = 72) -> str:
331 normalized = " ".join(str(requirement).split()).strip()
332 if len(normalized) <= limit:
333 return normalized
334 return normalized[: limit - 3].rstrip() + "..."
335
336
337 def _append_evidence(
338 evidence: list[ArtifactEvidence],
339 kind: ArtifactEvidenceKind,
340 summary: str,
341 ) -> None:
342 item = ArtifactEvidence(kind=kind.value, summary=summary)
343 if any(
344 existing.kind == item.kind and existing.summary == item.summary
345 for existing in evidence
346 ):
347 return
348 evidence.append(item)
349
350
351 _STOP_WORDS = {
352 "the",
353 "and",
354 "with",
355 "that",
356 "this",
357 "into",
358 "from",
359 "without",
360 "while",
361 "when",
362 "then",
363 "must",
364 "should",
365 "exists",
366 }