Python · 8239 bytes Raw Blame History
1 """Shared substrate for staged "pending plan" payloads under a store.
2
3 Both `dlm preference mine`/`apply` and `dlm synth instructions`/`apply`
4 need to stage a list of `Section` payloads on disk between two CLI
5 invocations, then read them back in the apply step. Each domain stores
6 the payload under a different store subdirectory and wraps validation
7 errors in its own typed exception, but the I/O shape is identical.
8
9 This module owns the I/O. The two domain modules
10 (`dlm.preference.pending`, `dlm.synth.pending`) supply their own
11 `PendingPlan` dataclass and error class via the small set of
12 parameterized functions below.
13
14 The on-disk format records the full optional-field surface of
15 ``Section`` so a domain that grows new optional fields tomorrow does
16 not need to bump ``schema_version``: load-time defaults absorb the
17 addition.
18 """
19
20 from __future__ import annotations
21
22 import json
23 from dataclasses import dataclass
24 from datetime import UTC, datetime
25 from pathlib import Path
26 from typing import TYPE_CHECKING, Any
27
28 from dlm.doc.sections import Section, SectionType
29 from dlm.io.atomic import write_text as atomic_write_text
30
31 if TYPE_CHECKING:
32 from collections.abc import Sequence
33
34 from dlm.store.paths import StorePath
35
36
37 _SCHEMA_VERSION = 1
38
39
40 @dataclass(frozen=True)
41 class PendingSectionPlan:
42 """Generic staged plan payload — domain modules subclass this for typing."""
43
44 source_path: Path
45 created_at: str
46 sections: tuple[Section, ...]
47
48
49 def pending_plan_path(store: StorePath, *, subdir: str) -> Path:
50 """Path to the staged payload for `store` under the given subdir."""
51 return store.root / subdir / "pending.json"
52
53
54 def save_pending_plan(
55 store: StorePath,
56 *,
57 source_path: Path,
58 sections: Sequence[Section],
59 subdir: str,
60 plan_cls: type[PendingSectionPlan],
61 ) -> PendingSectionPlan:
62 plan = plan_cls(
63 source_path=source_path.resolve(),
64 created_at=_utcnow(),
65 sections=tuple(sections),
66 )
67 path = pending_plan_path(store, subdir=subdir)
68 path.parent.mkdir(parents=True, exist_ok=True)
69 payload = {
70 "schema_version": _SCHEMA_VERSION,
71 "source_path": str(plan.source_path),
72 "created_at": plan.created_at,
73 "sections": [_section_to_payload(section) for section in plan.sections],
74 }
75 atomic_write_text(path, json.dumps(payload, indent=2, sort_keys=True) + "\n")
76 return plan
77
78
79 def load_pending_plan(
80 store: StorePath,
81 *,
82 subdir: str,
83 plan_cls: type[PendingSectionPlan],
84 error_cls: type[Exception],
85 label: str,
86 ) -> PendingSectionPlan | None:
87 """Return the staged plan, or None when absent. Raises `error_cls` on corruption.
88
89 `label` names the domain in error messages ("preference plan", "synth plan").
90 """
91 path = pending_plan_path(store, subdir=subdir)
92 if not path.exists():
93 return None
94 try:
95 raw = json.loads(path.read_text(encoding="utf-8"))
96 except OSError as exc:
97 raise error_cls(f"could not read staged {label}: {exc}") from exc
98 except json.JSONDecodeError as exc:
99 raise error_cls(f"staged {label} is not valid JSON: {exc}") from exc
100
101 if not isinstance(raw, dict):
102 raise error_cls(f"staged {label} must be a JSON object")
103 if raw.get("schema_version") != _SCHEMA_VERSION:
104 raise error_cls(f"unsupported staged {label} schema_version={raw.get('schema_version')!r}")
105
106 source_path = raw.get("source_path")
107 created_at = raw.get("created_at")
108 sections_raw = raw.get("sections")
109 if not isinstance(source_path, str) or not source_path:
110 raise error_cls(f"staged {label} is missing source_path")
111 if not isinstance(created_at, str) or not created_at:
112 raise error_cls(f"staged {label} is missing created_at")
113 if not isinstance(sections_raw, list):
114 raise error_cls(f"staged {label} is missing sections")
115
116 sections: list[Section] = []
117 for idx, entry in enumerate(sections_raw):
118 try:
119 sections.append(_section_from_payload(entry))
120 except (TypeError, ValueError, KeyError) as exc:
121 raise error_cls(f"invalid section payload at index {idx}: {exc}") from exc
122
123 return plan_cls(
124 source_path=Path(source_path),
125 created_at=created_at,
126 sections=tuple(sections),
127 )
128
129
130 def clear_pending_plan(store: StorePath, *, subdir: str) -> bool:
131 """Delete the staged plan; True iff it existed."""
132 path = pending_plan_path(store, subdir=subdir)
133 if not path.exists():
134 return False
135 path.unlink()
136 return True
137
138
139 def _utcnow() -> str:
140 return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
141
142
143 def _section_to_payload(section: Section) -> dict[str, Any]:
144 return {
145 "type": section.type.value,
146 "content": section.content,
147 "start_line": section.start_line,
148 "adapter": section.adapter,
149 "tags": dict(section.tags),
150 "auto_harvest": section.auto_harvest,
151 "harvest_source": section.harvest_source,
152 "auto_mined": section.auto_mined,
153 "judge_name": section.judge_name,
154 "judge_score_chosen": section.judge_score_chosen,
155 "judge_score_rejected": section.judge_score_rejected,
156 "mined_at": section.mined_at,
157 "mined_run_id": section.mined_run_id,
158 "auto_synth": section.auto_synth,
159 "synth_teacher": section.synth_teacher,
160 "synth_strategy": section.synth_strategy,
161 "synth_at": section.synth_at,
162 "source_section_id": section.source_section_id,
163 "media_path": section.media_path,
164 "media_alt": section.media_alt,
165 "media_blob_sha": section.media_blob_sha,
166 "media_transcript": section.media_transcript,
167 }
168
169
170 def _section_from_payload(raw: object) -> Section:
171 if not isinstance(raw, dict):
172 raise TypeError(f"expected object, got {type(raw).__name__}")
173 section_type = SectionType(str(raw["type"]))
174 tags = raw.get("tags", {})
175 if not isinstance(tags, dict):
176 raise TypeError("tags must be an object")
177 if not all(isinstance(k, str) and isinstance(v, str) for k, v in tags.items()):
178 raise TypeError("tags keys and values must be strings")
179 return Section(
180 type=section_type,
181 content=str(raw["content"]),
182 start_line=int(raw.get("start_line", 0)),
183 adapter=_optional_str(raw.get("adapter")),
184 tags=dict(tags),
185 auto_harvest=bool(raw.get("auto_harvest", False)),
186 harvest_source=_optional_str(raw.get("harvest_source")),
187 auto_mined=bool(raw.get("auto_mined", False)),
188 judge_name=_optional_str(raw.get("judge_name")),
189 judge_score_chosen=_optional_float(raw.get("judge_score_chosen")),
190 judge_score_rejected=_optional_float(raw.get("judge_score_rejected")),
191 mined_at=_optional_str(raw.get("mined_at")),
192 mined_run_id=_optional_int(raw.get("mined_run_id")),
193 auto_synth=bool(raw.get("auto_synth", False)),
194 synth_teacher=_optional_str(raw.get("synth_teacher")),
195 synth_strategy=_optional_str(raw.get("synth_strategy")),
196 synth_at=_optional_str(raw.get("synth_at")),
197 source_section_id=_optional_str(raw.get("source_section_id")),
198 media_path=_optional_str(raw.get("media_path")),
199 media_alt=_optional_str(raw.get("media_alt")),
200 media_blob_sha=_optional_str(raw.get("media_blob_sha")),
201 media_transcript=_optional_str(raw.get("media_transcript")),
202 )
203
204
205 def _optional_str(value: object) -> str | None:
206 if value is None:
207 return None
208 if not isinstance(value, str):
209 raise TypeError(f"expected string or null, got {type(value).__name__}")
210 return value
211
212
213 def _optional_float(value: object) -> float | None:
214 if value is None:
215 return None
216 if isinstance(value, bool) or not isinstance(value, int | float):
217 raise TypeError(f"expected float or null, got {type(value).__name__}")
218 return float(value)
219
220
221 def _optional_int(value: object) -> int | None:
222 if value is None:
223 return None
224 if isinstance(value, bool) or not isinstance(value, int):
225 raise TypeError(f"expected int or null, got {type(value).__name__}")
226 return value