Python · 10280 bytes Raw Blame History
1 """Shared read models projected from the canonical workflow timeline."""
2
3 from __future__ import annotations
4
5 from dataclasses import dataclass, field
6
7 from .evidence_provenance import EvidenceProvenanceRollup, rollup_evidence_provenance
8 from .verification_observations import (
9 VerificationObservation,
10 describe_verification_attempt,
11 )
12 from .workflow_ledger import WorkflowLedger, workflow_ledger_highlights
13 from .workflow_policy import WorkflowTimelineEntry
14
15
16 @dataclass(slots=True)
17 class WorkflowTimelineProjection:
18 """Projected views over persisted workflow timeline entries."""
19
20 total_entries: int
21 entries: list[WorkflowTimelineEntry] = field(default_factory=list)
22 policy_entries: list[WorkflowTimelineEntry] = field(default_factory=list)
23 latest_policy_entry: WorkflowTimelineEntry | None = None
24 latest_policy_summary: str | None = None
25 latest_policy_evidence: EvidenceProvenanceRollup | None = None
26 latest_policy_observed_verification: list[str] = field(default_factory=list)
27 highlights: list[str] = field(default_factory=list)
28
29
30 def project_workflow_timeline(
31 entries: list[WorkflowTimelineEntry],
32 *,
33 workflow_ledger: WorkflowLedger | None = None,
34 mode: str | None = None,
35 kind: str | None = None,
36 accountability_only: bool = False,
37 limit: int | None = None,
38 ) -> WorkflowTimelineProjection:
39 """Project reusable filtered views from canonical workflow timeline entries."""
40
41 source_entries = list(entries)
42 policy_entries = filter_policy_accountability_entries(source_entries)
43 filtered_entries = list(policy_entries if accountability_only else source_entries)
44 if mode:
45 filtered_entries = [entry for entry in filtered_entries if entry.mode == mode]
46 if kind:
47 filtered_entries = [entry for entry in filtered_entries if entry.kind == kind]
48
49 highlights = workflow_timeline_highlights(filtered_entries)
50 if workflow_ledger is not None:
51 highlights.extend(workflow_ledger_highlights(workflow_ledger))
52 if limit is not None:
53 filtered_entries = filtered_entries[-limit:]
54
55 latest_policy_entry = _latest_matching_entry(
56 source_entries,
57 is_policy_accountability_entry,
58 )
59
60 return WorkflowTimelineProjection(
61 total_entries=len(source_entries),
62 entries=filtered_entries,
63 policy_entries=policy_entries,
64 latest_policy_entry=latest_policy_entry,
65 latest_policy_summary=latest_policy_accountability_summary(source_entries),
66 latest_policy_evidence=(
67 workflow_entry_evidence_rollup(latest_policy_entry)
68 if latest_policy_entry is not None
69 else None
70 ),
71 latest_policy_observed_verification=(
72 summarize_observed_verification(
73 latest_policy_entry.verification_observations,
74 )
75 if latest_policy_entry is not None
76 else []
77 ),
78 highlights=list(dict.fromkeys(highlights)),
79 )
80
81
82 def filter_policy_accountability_entries(
83 entries: list[WorkflowTimelineEntry],
84 ) -> list[WorkflowTimelineEntry]:
85 """Return only unified policy-accountability entries from the workflow timeline."""
86
87 return [entry for entry in entries if is_policy_accountability_entry(entry)]
88
89
90 def latest_policy_accountability_summary(
91 entries: list[WorkflowTimelineEntry],
92 ) -> str | None:
93 """Return one compact explanation for the latest canonical policy event."""
94
95 entry = _latest_matching_entry(entries, is_policy_accountability_entry)
96 if entry is None:
97 return None
98 return workflow_entry_explanation(entry)
99
100
101 def workflow_timeline_highlights(
102 entries: list[WorkflowTimelineEntry],
103 ) -> list[str]:
104 """Build the operator-facing highlight list for timeline answers."""
105
106 highlights: list[str] = []
107
108 clarify_entry = _latest_matching_entry(
109 entries,
110 lambda entry: entry.kind in {"clarify_continue", "clarify_exit"},
111 )
112 if clarify_entry is not None:
113 prefix = (
114 "Asked again:"
115 if clarify_entry.kind == "clarify_continue"
116 else "Clarify stopped:"
117 )
118 highlights.append(prefix + " " + workflow_entry_explanation(clarify_entry))
119
120 recovery_entry = _latest_matching_entry(
121 entries,
122 lambda entry: entry.kind in {"reentry", "plan_refresh"}
123 or "replan" in entry.reason_code
124 or "refresh" in entry.reason_code,
125 )
126 if recovery_entry is not None:
127 highlights.append(
128 "Recovered workflow: " + workflow_entry_explanation(recovery_entry)
129 )
130
131 repair_entry = _latest_matching_entry(
132 entries,
133 lambda entry: entry.kind.startswith("repair_"),
134 )
135 if repair_entry is not None:
136 prefix = "Repair failed:" if repair_entry.kind == "repair_fail" else "Repair path:"
137 highlights.append(prefix + " " + workflow_entry_explanation(repair_entry))
138
139 completion_entry = _latest_matching_entry(
140 entries,
141 lambda entry: entry.kind
142 in {
143 "completion_check",
144 "completion_continue",
145 "completion_complete",
146 "completion_finalize",
147 },
148 )
149 if completion_entry is not None:
150 highlights.append(
151 "Completion decision: " + workflow_entry_explanation(completion_entry)
152 )
153
154 verify_entry = _latest_matching_entry(
155 entries,
156 lambda entry: entry.kind in {"verify_skip", "verify_observation"}
157 or "verify_skip" in entry.reason_code,
158 )
159 if verify_entry is not None:
160 if verify_entry.kind == "verify_skip":
161 prefix = "Skipped verify:"
162 elif verify_entry.policy_outcome == "planned":
163 prefix = "Verify planned:"
164 elif verify_entry.policy_outcome == "pending":
165 prefix = "Verify pending:"
166 elif verify_entry.policy_outcome == "stale":
167 prefix = "Verify stale:"
168 else:
169 prefix = "Verify observed:"
170 highlights.append(prefix + " " + workflow_entry_explanation(verify_entry))
171
172 return list(dict.fromkeys(highlights))
173
174
175 def is_policy_accountability_entry(entry: WorkflowTimelineEntry) -> bool:
176 """Return whether one workflow timeline entry is a policy-accountability event."""
177
178 kind = entry.kind
179 return kind.startswith(("completion_", "repair_")) or kind in {
180 "verify_skip",
181 "verify_observation",
182 }
183
184
185 def workflow_entry_explanation(entry: WorkflowTimelineEntry) -> str:
186 """Render one compact explanation for policy and workflow timeline surfaces."""
187
188 parts = [entry.summary]
189 evidence_rollup = workflow_entry_evidence_rollup(entry)
190 if entry.reason_code:
191 parts.append(f"code={entry.reason_code}")
192 if entry.clarify_stage:
193 parts.append(f"stage={entry.clarify_stage}")
194 if entry.clarify_pressure_kind:
195 parts.append(f"pressure={entry.clarify_pressure_kind}")
196 if entry.policy_stage:
197 parts.append(f"policy-stage={entry.policy_stage}")
198 if entry.policy_outcome:
199 parts.append(f"policy-outcome={entry.policy_outcome}")
200 if entry.missing_readiness_gates:
201 parts.append("gates=" + ",".join(entry.missing_readiness_gates))
202 if entry.unresolved_questions:
203 parts.append(entry.unresolved_questions[0])
204 if evidence_rollup.blocking:
205 parts.append("needs=" + "; ".join(evidence_rollup.blocking[:2]))
206 if evidence_rollup.supporting:
207 parts.append("satisfied=" + "; ".join(evidence_rollup.supporting[:2]))
208 if entry.evidence_summary and not evidence_rollup.blocking and not evidence_rollup.supporting:
209 parts.append("evidence=" + "; ".join(entry.evidence_summary[:2]))
210 if entry.evidence_provenance:
211 parts.append(
212 "provenance=" + format_evidence_provenance_brief(entry.evidence_provenance)
213 )
214 observed = summarize_observed_verification(entry.verification_observations)
215 if observed:
216 parts.append("observed=" + "; ".join(observed))
217 if entry.signal_summary:
218 parts.append("; ".join(entry.signal_summary[:2]))
219 return " | ".join(part for part in parts if part)
220
221
222 def _latest_matching_entry(
223 entries: list[WorkflowTimelineEntry],
224 predicate,
225 ) -> WorkflowTimelineEntry | None:
226 for entry in reversed(entries):
227 if predicate(entry):
228 return entry
229 return None
230
231
232 def format_evidence_provenance_brief(entries, *, max_entries: int = 2) -> str:
233 """Render a compact operator-facing provenance summary."""
234
235 parts: list[str] = []
236 for entry in list(entries)[:max_entries]:
237 source = f"@{entry.source}" if entry.source else ""
238 subject = f"({entry.subject})" if entry.subject else ""
239 parts.append(f"{entry.status}:{entry.category}{source}{subject}")
240 return "; ".join(parts)
241
242
243 def summarize_observed_verification(
244 entries: list[VerificationObservation],
245 *,
246 max_items: int = 2,
247 ) -> list[str]:
248 """Render observed verification facts for operator-facing inspection surfaces."""
249
250 summaries: list[str] = []
251 for entry in entries[:max_items]:
252 summary = entry.summary.strip()
253 attempt = describe_verification_attempt(entry)
254 if entry.detail:
255 detail = entry.detail.strip()
256 if detail and detail not in summary:
257 summary = f"{summary} [{detail}]"
258 if attempt:
259 if "[" in summary and summary.endswith("]"):
260 summary = summary[:-1] + f"; {attempt}]"
261 elif attempt not in summary:
262 summary = f"{summary} [{attempt}]"
263 if summary and summary not in summaries:
264 summaries.append(summary)
265 return summaries
266
267
268 def workflow_entry_evidence_rollup(
269 entry: WorkflowTimelineEntry,
270 ) -> EvidenceProvenanceRollup:
271 """Return the grouped evidence view for one workflow timeline entry."""
272
273 rollup = rollup_evidence_provenance(entry.evidence_provenance, max_items_per_status=2)
274 if entry.evidence_summary and not (
275 rollup.supporting or rollup.missing or rollup.contradicted or rollup.context
276 ):
277 rollup.context.extend(
278 item
279 for item in entry.evidence_summary[:2]
280 if item not in rollup.context
281 )
282 return rollup