tenseleyflow/loader / e84c07c

Browse files

Add persisted workflow ledger core

Authored by espadonne
SHA
e84c07cb6dad81923462c971ff40fe61ff85c7f1
Parents
6a4ef72
Tree
b991c70

4 changed files

StatusFile+-
M src/loader/runtime/session.py 26 1
A src/loader/runtime/workflow_ledger.py 336 0
M tests/test_session_state.py 53 0
A tests/test_workflow_ledger.py 63 0
src/loader/runtime/session.pymodified
@@ -18,9 +18,10 @@ from .compaction import (
1818
     compact_session_messages,
1919
     estimate_message_tokens,
2020
 )
21
+from .workflow_ledger import WorkflowLedger
2122
 from .workflow_policy import WorkflowTimelineEntry
2223
 
23
-SESSION_VERSION = 5
24
+SESSION_VERSION = 6
2425
 DEFAULT_ROTATE_AFTER_BYTES = 256 * 1024
2526
 MAX_ROTATED_FILES = 3
2627
 _UNSET = object()
@@ -114,6 +115,16 @@ def normalize_workflow_timeline(value: Any) -> list[WorkflowTimelineEntry]:
114115
     return entries
115116
 
116117
 
118
+def normalize_workflow_ledger(value: Any) -> WorkflowLedger:
119
+    """Coerce persisted workflow-ledger state."""
120
+
121
+    if isinstance(value, WorkflowLedger):
122
+        return value.copy()
123
+    if isinstance(value, dict):
124
+        return WorkflowLedger.from_dict(value)
125
+    return WorkflowLedger()
126
+
127
+
117128
 @dataclass(slots=True)
118129
 class SessionCompaction:
119130
     """Metadata describing the latest transcript compaction."""
@@ -175,6 +186,7 @@ class SessionSnapshot:
175186
     last_turn_transition_kind: str | None = None
176187
     last_turn_transition_reason_code: str | None = None
177188
     workflow_timeline: list[WorkflowTimelineEntry] = field(default_factory=list)
189
+    workflow_ledger: WorkflowLedger = field(default_factory=WorkflowLedger)
178190
     compaction: SessionCompaction | None = None
179191
     version: int = SESSION_VERSION
180192
 
@@ -206,6 +218,7 @@ class SessionSnapshot:
206218
             "last_turn_transition_kind": self.last_turn_transition_kind,
207219
             "last_turn_transition_reason_code": self.last_turn_transition_reason_code,
208220
             "workflow_timeline": [entry.to_dict() for entry in self.workflow_timeline],
221
+            "workflow_ledger": self.workflow_ledger.to_dict(),
209222
             "compaction": self.compaction.to_dict() if self.compaction else None,
210223
         }
211224
 
@@ -265,6 +278,7 @@ class SessionSnapshot:
265278
             workflow_timeline=normalize_workflow_timeline(
266279
                 data.get("workflow_timeline")
267280
             ),
281
+            workflow_ledger=normalize_workflow_ledger(data.get("workflow_ledger")),
268282
             compaction=(
269283
                 SessionCompaction.from_dict(data["compaction"])
270284
                 if data.get("compaction")
@@ -407,6 +421,7 @@ class ConversationSession:
407421
     last_turn_transition_kind: str | None = None
408422
     last_turn_transition_reason_code: str | None = None
409423
     workflow_timeline: list[WorkflowTimelineEntry] = field(default_factory=list)
424
+    workflow_ledger: WorkflowLedger = field(default_factory=WorkflowLedger)
410425
     compaction: SessionCompaction | None = None
411426
     rotate_after_bytes: int = DEFAULT_ROTATE_AFTER_BYTES
412427
     max_rotated_files: int = MAX_ROTATED_FILES
@@ -467,6 +482,7 @@ class ConversationSession:
467482
         self.last_turn_transition_kind = None
468483
         self.last_turn_transition_reason_code = None
469484
         self.workflow_timeline = []
485
+        self.workflow_ledger = WorkflowLedger()
470486
         self.compaction = None
471487
         self.usage_totals = {}
472488
         self.touch()
@@ -575,6 +591,13 @@ class ConversationSession:
575591
         self.touch()
576592
         self.persist()
577593
 
594
+    def update_workflow_ledger(self, ledger: WorkflowLedger) -> None:
595
+        """Replace persisted workflow-ledger state."""
596
+
597
+        self.workflow_ledger = normalize_workflow_ledger(ledger)
598
+        self.touch()
599
+        self.persist()
600
+
578601
     def maybe_compact(self) -> SessionCompactionResult | None:
579602
         """Compact the transcript when the current request grows too large."""
580603
 
@@ -655,6 +678,7 @@ class ConversationSession:
655678
             last_turn_transition_kind=self.last_turn_transition_kind,
656679
             last_turn_transition_reason_code=self.last_turn_transition_reason_code,
657680
             workflow_timeline=list(self.workflow_timeline),
681
+            workflow_ledger=self.workflow_ledger.copy(),
658682
             compaction=self.compaction,
659683
         )
660684
         return self.store.save(snapshot)
@@ -715,6 +739,7 @@ class ConversationSession:
715739
             snapshot.last_turn_transition_reason_code
716740
         )
717741
         instance.workflow_timeline = list(snapshot.workflow_timeline)
742
+        instance.workflow_ledger = snapshot.workflow_ledger.copy()
718743
         instance.compaction = snapshot.compaction
719744
         instance.rotate_after_bytes = rotate_after_bytes
720745
         instance.max_rotated_files = max_rotated_files
src/loader/runtime/workflow_ledger.pyadded
@@ -0,0 +1,336 @@
1
+"""Durable workflow ledger state for assumptions, anchors, and boundaries."""
2
+
3
+from __future__ import annotations
4
+
5
+import re
6
+from dataclasses import dataclass, field
7
+from typing import TYPE_CHECKING, Any
8
+
9
+if TYPE_CHECKING:
10
+    from .workflow import ClarifyBrief
11
+    from .workflow_policy import ArtifactFreshness
12
+
13
+
14
+@dataclass(slots=True)
15
+class WorkflowLedgerItem:
16
+    """One durable workflow-ledger item."""
17
+
18
+    text: str
19
+    status: str
20
+    introduced_phase: str
21
+    updated_phase: str | None = None
22
+    evidence: list[str] = field(default_factory=list)
23
+
24
+    def to_dict(self) -> dict[str, Any]:
25
+        return {
26
+            "text": self.text,
27
+            "status": self.status,
28
+            "introduced_phase": self.introduced_phase,
29
+            "updated_phase": self.updated_phase,
30
+            "evidence": list(self.evidence),
31
+        }
32
+
33
+    @classmethod
34
+    def from_dict(cls, data: dict[str, Any]) -> WorkflowLedgerItem:
35
+        return cls(
36
+            text=str(data.get("text", "")).strip(),
37
+            status=str(data.get("status", "open")).strip() or "open",
38
+            introduced_phase=str(data.get("introduced_phase", "unknown")).strip()
39
+            or "unknown",
40
+            updated_phase=_optional_text(data.get("updated_phase")),
41
+            evidence=[str(item).strip() for item in data.get("evidence", []) if str(item).strip()],
42
+        )
43
+
44
+    def with_evidence(self, summary: str, *, phase: str, status: str | None = None) -> None:
45
+        """Update one item with fresh evidence."""
46
+
47
+        cleaned = summary.strip()
48
+        if cleaned and cleaned not in self.evidence:
49
+            self.evidence.append(cleaned)
50
+        if status is not None:
51
+            self.status = status
52
+        self.updated_phase = phase
53
+
54
+
55
+@dataclass(slots=True)
56
+class WorkflowLedger:
57
+    """Persisted semantic workflow state used for inspection and recovery."""
58
+
59
+    assumptions: list[WorkflowLedgerItem] = field(default_factory=list)
60
+    acceptance_anchors: list[WorkflowLedgerItem] = field(default_factory=list)
61
+    decision_boundaries: list[WorkflowLedgerItem] = field(default_factory=list)
62
+
63
+    def to_dict(self) -> dict[str, Any]:
64
+        return {
65
+            "assumptions": [item.to_dict() for item in self.assumptions],
66
+            "acceptance_anchors": [item.to_dict() for item in self.acceptance_anchors],
67
+            "decision_boundaries": [item.to_dict() for item in self.decision_boundaries],
68
+        }
69
+
70
+    @classmethod
71
+    def from_dict(cls, data: dict[str, Any]) -> WorkflowLedger:
72
+        return cls(
73
+            assumptions=_items_from_dict(data.get("assumptions")),
74
+            acceptance_anchors=_items_from_dict(data.get("acceptance_anchors")),
75
+            decision_boundaries=_items_from_dict(data.get("decision_boundaries")),
76
+        )
77
+
78
+    def copy(self) -> WorkflowLedger:
79
+        """Return a detached copy safe for mutation."""
80
+
81
+        return WorkflowLedger.from_dict(self.to_dict())
82
+
83
+    def has_items(self) -> bool:
84
+        """Return whether any semantic ledger state exists."""
85
+
86
+        return bool(
87
+            self.assumptions
88
+            or self.acceptance_anchors
89
+            or self.decision_boundaries
90
+        )
91
+
92
+
93
+def seed_workflow_ledger_from_brief(
94
+    ledger: WorkflowLedger,
95
+    brief: ClarifyBrief,
96
+    *,
97
+    phase: str = "clarify",
98
+) -> WorkflowLedger:
99
+    """Merge clarify-brief semantics into the durable workflow ledger."""
100
+
101
+    next_ledger = ledger.copy()
102
+    _merge_text_items(
103
+        next_ledger.assumptions,
104
+        brief.assumptions,
105
+        status="open",
106
+        phase=phase,
107
+    )
108
+    _merge_text_items(
109
+        next_ledger.acceptance_anchors,
110
+        brief.acceptance_criteria,
111
+        status="active",
112
+        phase=phase,
113
+    )
114
+    _merge_text_items(
115
+        next_ledger.decision_boundaries,
116
+        brief.decision_boundaries,
117
+        status="tracked",
118
+        phase=phase,
119
+    )
120
+    return next_ledger
121
+
122
+
123
+def seed_workflow_ledger_from_acceptance_criteria(
124
+    ledger: WorkflowLedger,
125
+    acceptance_criteria: list[str],
126
+    *,
127
+    phase: str = "plan",
128
+) -> WorkflowLedger:
129
+    """Merge acceptance anchors discovered during planning or verification."""
130
+
131
+    next_ledger = ledger.copy()
132
+    _merge_text_items(
133
+        next_ledger.acceptance_anchors,
134
+        acceptance_criteria,
135
+        status="active",
136
+        phase=phase,
137
+    )
138
+    return next_ledger
139
+
140
+
141
+def apply_freshness_to_workflow_ledger(
142
+    ledger: WorkflowLedger,
143
+    freshness: ArtifactFreshness,
144
+    *,
145
+    phase: str = "recovery",
146
+) -> WorkflowLedger:
147
+    """Apply drift evidence to the durable workflow ledger."""
148
+
149
+    next_ledger = ledger.copy()
150
+    for evidence in freshness.evidence:
151
+        summary = evidence.summary.strip()
152
+        if not summary:
153
+            continue
154
+
155
+        if evidence.kind == "contradicted_assumption":
156
+            item = _find_best_match(next_ledger.assumptions, summary)
157
+            if item is None:
158
+                item = WorkflowLedgerItem(
159
+                    text=_extract_focus_text(summary),
160
+                    status="contradicted",
161
+                    introduced_phase=phase,
162
+                )
163
+                next_ledger.assumptions.append(item)
164
+            item.with_evidence(summary, phase=phase, status="contradicted")
165
+            continue
166
+
167
+        if evidence.kind in {"acceptance_anchor", "verification_contradiction"}:
168
+            item = _find_best_match(next_ledger.acceptance_anchors, summary)
169
+            if item is None:
170
+                item = WorkflowLedgerItem(
171
+                    text=_extract_focus_text(summary),
172
+                    status="changed",
173
+                    introduced_phase=phase,
174
+                )
175
+                next_ledger.acceptance_anchors.append(item)
176
+            item.with_evidence(summary, phase=phase, status="changed")
177
+            continue
178
+
179
+        if evidence.kind == "task_boundary_change":
180
+            item = _find_best_match(next_ledger.decision_boundaries, summary)
181
+            if item is None:
182
+                item = WorkflowLedgerItem(
183
+                    text=_extract_focus_text(summary),
184
+                    status="reopened",
185
+                    introduced_phase=phase,
186
+                )
187
+                next_ledger.decision_boundaries.append(item)
188
+            item.with_evidence(summary, phase=phase, status="reopened")
189
+
190
+    return next_ledger
191
+
192
+
193
+def workflow_ledger_highlights(ledger: WorkflowLedger) -> list[str]:
194
+    """Return concise operator-facing highlights for one ledger."""
195
+
196
+    highlights: list[str] = []
197
+    contradicted = [item.text for item in ledger.assumptions if item.status == "contradicted"]
198
+    changed_anchors = [
199
+        item.text for item in ledger.acceptance_anchors if item.status == "changed"
200
+    ]
201
+    reopened = [
202
+        item.text for item in ledger.decision_boundaries if item.status == "reopened"
203
+    ]
204
+    if contradicted:
205
+        highlights.append(f"Contradicted assumptions: {', '.join(contradicted[:2])}")
206
+    if changed_anchors:
207
+        highlights.append(f"Changed acceptance anchors: {', '.join(changed_anchors[:2])}")
208
+    if reopened:
209
+        highlights.append(f"Reopened boundaries: {', '.join(reopened[:2])}")
210
+    return highlights
211
+
212
+
213
+def _items_from_dict(value: Any) -> list[WorkflowLedgerItem]:
214
+    if not isinstance(value, list):
215
+        return []
216
+    items: list[WorkflowLedgerItem] = []
217
+    for raw in value:
218
+        if not isinstance(raw, dict):
219
+            continue
220
+        item = WorkflowLedgerItem.from_dict(raw)
221
+        if item.text:
222
+            items.append(item)
223
+    return items
224
+
225
+
226
+def _merge_text_items(
227
+    items: list[WorkflowLedgerItem],
228
+    values: list[str],
229
+    *,
230
+    status: str,
231
+    phase: str,
232
+) -> None:
233
+    for text in values:
234
+        normalized = _normalized_text(text)
235
+        if not normalized:
236
+            continue
237
+        existing = _find_exact_match(items, normalized)
238
+        if existing is not None:
239
+            existing.updated_phase = phase
240
+            continue
241
+        items.append(
242
+            WorkflowLedgerItem(
243
+                text=text.strip(),
244
+                status=status,
245
+                introduced_phase=phase,
246
+            )
247
+        )
248
+
249
+
250
+def _find_exact_match(
251
+    items: list[WorkflowLedgerItem],
252
+    normalized_text: str,
253
+) -> WorkflowLedgerItem | None:
254
+    for item in items:
255
+        if _normalized_text(item.text) == normalized_text:
256
+            return item
257
+    return None
258
+
259
+
260
+def _find_best_match(
261
+    items: list[WorkflowLedgerItem],
262
+    summary: str,
263
+) -> WorkflowLedgerItem | None:
264
+    normalized_summary = _normalized_text(summary)
265
+    summary_tokens = _semantic_tokens(summary)
266
+    best_item: WorkflowLedgerItem | None = None
267
+    best_score = 0
268
+
269
+    for item in items:
270
+        normalized_item = _normalized_text(item.text)
271
+        if not normalized_item:
272
+            continue
273
+        if normalized_item in normalized_summary or normalized_summary in normalized_item:
274
+            return item
275
+
276
+        overlap = len(_semantic_tokens(item.text) & summary_tokens)
277
+        if overlap > best_score:
278
+            best_item = item
279
+            best_score = overlap
280
+
281
+    if best_score >= 2:
282
+        return best_item
283
+    return None
284
+
285
+
286
+def _extract_focus_text(summary: str) -> str:
287
+    quoted = re.findall(r"`([^`]+)`", summary)
288
+    if quoted:
289
+        return quoted[0].strip()
290
+    shortened = " ".join(summary.split()).strip()
291
+    if len(shortened) <= 96:
292
+        return shortened
293
+    return shortened[:93].rstrip() + "..."
294
+
295
+
296
+def _normalized_text(value: str | None) -> str:
297
+    if value is None:
298
+        return ""
299
+    return " ".join(re.sub(r"[`*_]+", "", str(value)).lower().split()).strip()
300
+
301
+
302
+def _optional_text(value: Any) -> str | None:
303
+    if value is None:
304
+        return None
305
+    text = str(value).strip()
306
+    return text or None
307
+
308
+
309
+def _semantic_tokens(text: str) -> set[str]:
310
+    return {
311
+        token
312
+        for token in re.findall(r"[a-z0-9_./-]+", _normalized_text(text))
313
+        if len(token) > 2 and token not in _STOP_WORDS
314
+    }
315
+
316
+
317
+_STOP_WORDS = {
318
+    "the",
319
+    "and",
320
+    "for",
321
+    "with",
322
+    "that",
323
+    "this",
324
+    "from",
325
+    "into",
326
+    "before",
327
+    "after",
328
+    "current",
329
+    "still",
330
+    "brief",
331
+    "plan",
332
+    "task",
333
+    "scope",
334
+    "exists",
335
+    "runtime",
336
+}
tests/test_session_state.pymodified
@@ -9,6 +9,7 @@ import pytest
99
 from loader.agent.loop import Agent, AgentConfig, ReasoningConfig
1010
 from loader.llm.base import CompletionResponse, Message, Role, ToolCall
1111
 from loader.runtime.session import ConversationSession
12
+from loader.runtime.workflow_ledger import WorkflowLedger, WorkflowLedgerItem
1213
 from loader.runtime.workflow_policy import WorkflowTimelineEntry
1314
 from tests.helpers.runtime_harness import ScriptedBackend
1415
 
@@ -218,6 +219,58 @@ def test_session_persists_permission_policy_metadata(temp_dir: Path) -> None:
218219
     ]
219220
 
220221
 
222
+def test_session_persists_workflow_ledger_state(temp_dir: Path) -> None:
223
+    session = ConversationSession(
224
+        system_message_factory=_dummy_system,
225
+        few_shot_factory=_dummy_few_shots,
226
+        project_root=temp_dir,
227
+    )
228
+
229
+    session.update_workflow_ledger(
230
+        WorkflowLedger(
231
+            assumptions=[
232
+                WorkflowLedgerItem(
233
+                    text="notes.txt stays out of scope unless clarified otherwise.",
234
+                    status="contradicted",
235
+                    introduced_phase="clarify",
236
+                    updated_phase="recovery",
237
+                    evidence=["Clarify scope assumed `notes.txt` stayed out of scope."],
238
+                )
239
+            ],
240
+            acceptance_anchors=[
241
+                WorkflowLedgerItem(
242
+                    text="notes.txt exists in the workspace root.",
243
+                    status="changed",
244
+                    introduced_phase="clarify",
245
+                    updated_phase="recovery",
246
+                )
247
+            ],
248
+            decision_boundaries=[
249
+                WorkflowLedgerItem(
250
+                    text="Escalate before broad UX changes.",
251
+                    status="tracked",
252
+                    introduced_phase="clarify",
253
+                )
254
+            ],
255
+        )
256
+    )
257
+
258
+    reloaded = ConversationSession.load(
259
+        project_root=temp_dir,
260
+        system_message_factory=_dummy_system,
261
+        few_shot_factory=_dummy_few_shots,
262
+        session_id=session.session_id,
263
+    )
264
+
265
+    assert reloaded is not None
266
+    assert reloaded.workflow_ledger.assumptions[0].status == "contradicted"
267
+    assert reloaded.workflow_ledger.assumptions[0].updated_phase == "recovery"
268
+    assert reloaded.workflow_ledger.acceptance_anchors[0].status == "changed"
269
+    assert reloaded.workflow_ledger.decision_boundaries[0].text == (
270
+        "Escalate before broad UX changes."
271
+    )
272
+
273
+
221274
 @pytest.mark.asyncio
222275
 async def test_turn_summary_usage_rolls_up_into_session_totals(temp_dir: Path) -> None:
223276
     backend = ScriptedBackend(
tests/test_workflow_ledger.pyadded
@@ -0,0 +1,63 @@
1
+"""Tests for durable workflow-ledger semantics."""
2
+
3
+from __future__ import annotations
4
+
5
+from loader.runtime.workflow import ArtifactEvidence, ArtifactFreshness, ClarifyBrief
6
+from loader.runtime.workflow_ledger import (
7
+    WorkflowLedger,
8
+    apply_freshness_to_workflow_ledger,
9
+    seed_workflow_ledger_from_acceptance_criteria,
10
+    seed_workflow_ledger_from_brief,
11
+    workflow_ledger_highlights,
12
+)
13
+
14
+
15
+def test_workflow_ledger_seeds_from_brief_and_tracks_drift() -> None:
16
+    brief = ClarifyBrief(
17
+        task_statement="Keep the runtime artifact aligned with the actual work.",
18
+        assumptions=["notes.txt stays out of scope unless clarified otherwise."],
19
+        acceptance_criteria=["planned.txt exists in the workspace root."],
20
+        decision_boundaries=["Escalate before broad UX changes."],
21
+    )
22
+    brief.fill_defaults()
23
+
24
+    ledger = seed_workflow_ledger_from_brief(WorkflowLedger(), brief)
25
+    ledger = seed_workflow_ledger_from_acceptance_criteria(
26
+        ledger,
27
+        ["planned.txt exists in the workspace root."],
28
+        phase="plan",
29
+    )
30
+    freshness = ArtifactFreshness(
31
+        evidence=[
32
+            ArtifactEvidence(
33
+                kind="contradicted_assumption",
34
+                summary="Clarify scope assumed `notes.txt` stayed out of scope.",
35
+            ),
36
+            ArtifactEvidence(
37
+                kind="verification_contradiction",
38
+                summary=(
39
+                    "Failed verification exposed missing brief coverage for "
40
+                    "`notes.txt exists in the workspace root.`."
41
+                ),
42
+            ),
43
+            ArtifactEvidence(
44
+                kind="task_boundary_change",
45
+                summary="The active task framing outgrew the persisted clarify brief.",
46
+            ),
47
+        ]
48
+    )
49
+
50
+    updated = apply_freshness_to_workflow_ledger(ledger, freshness)
51
+
52
+    assert any(
53
+        item.text == "notes.txt stays out of scope unless clarified otherwise."
54
+        and item.status == "contradicted"
55
+        for item in updated.assumptions
56
+    )
57
+    assert any(
58
+        "notes.txt exists in the workspace root." in item.text
59
+        and item.status == "changed"
60
+        for item in updated.acceptance_anchors
61
+    )
62
+    assert any(item.status == "reopened" for item in updated.decision_boundaries)
63
+    assert workflow_ledger_highlights(updated)