Python · 12776 bytes Raw Blame History
1 """Serialize a `ParsedDlm` back to canonical `.dlm` text.
2
3 Contract:
4
5 - `serialize(parse_text(t))` may differ from `t` (whitespace/quoting
6 normalization), but applying the pipeline a second time is a no-op:
7 `serialize(parse_text(serialize(parse_text(t)))) == serialize(parse_text(t))`.
8 - Frontmatter key order is deterministic (see `_FRONTMATTER_ORDER`).
9 - Nested mappings (`training`, `export`) preserve the schema's declared
10 field order.
11 - Section content is emitted verbatim; fence lines are regenerated.
12 - Output uses LF line endings and ends with a single trailing newline.
13 """
14
15 from __future__ import annotations
16
17 from collections.abc import Iterable
18 from typing import Final
19
20 from pydantic import BaseModel
21
22 from dlm.doc.parser import ParsedDlm
23 from dlm.doc.schema import DlmFrontmatter, ExportConfig, TrainingConfig
24 from dlm.doc.sections import Section, SectionType
25
26 # Top-level frontmatter key order.
27 _FRONTMATTER_ORDER: Final[tuple[str, ...]] = (
28 "dlm_id",
29 "dlm_version",
30 "base_model",
31 "training",
32 "export",
33 "system_prompt",
34 )
35
36
37 def serialize(parsed: ParsedDlm) -> str:
38 """Produce canonical `.dlm` text for `parsed`.
39
40 Always ends with `\\n`.
41 """
42 parts: list[str] = [_serialize_frontmatter(parsed.frontmatter), "\n"]
43 for i, section in enumerate(parsed.sections):
44 if i > 0:
45 parts.append("\n")
46 parts.append(_serialize_section(section))
47 rendered = "".join(parts)
48 if not rendered.endswith("\n"):
49 rendered += "\n"
50 return rendered
51
52
53 # --- frontmatter --------------------------------------------------------------
54
55
56 def _serialize_frontmatter(fm: DlmFrontmatter) -> str:
57 lines: list[str] = ["---"]
58 for key in _FRONTMATTER_ORDER:
59 value = getattr(fm, key, None)
60 if key == "system_prompt":
61 if value is None:
62 continue
63 lines.extend(_emit_block_scalar(key, value))
64 continue
65 if isinstance(value, TrainingConfig | ExportConfig):
66 nested = _emit_nested_mapping(value, indent=2)
67 if not nested:
68 # All-default nested block — skip the header too so we
69 # don't emit an empty `training:` line.
70 continue
71 lines.append(f"{key}:")
72 lines.extend(nested)
73 continue
74 lines.append(f"{key}: {_scalar(value)}")
75 lines.append("---")
76 return "\n".join(lines) + "\n"
77
78
79 def _emit_nested_mapping(model: BaseModel, *, indent: int) -> list[str]:
80 """Emit a nested training/export/dpo block.
81
82 Suppress fields that equal their schema default so
83 re-serializing a minimal `.dlm` doesn't bloat it with every
84 inlined default. Idempotency is preserved — the
85 parser's defaults match the suppressed values, so round-trip
86 stability holds at the model level.
87
88 Nested `BaseModel` values (e.g. `TrainingConfig.preference`)
89 recurse with deeper indent; all-default sub-blocks are skipped.
90 """
91 pad = " " * indent
92 lines: list[str] = []
93 # model_fields preserves declaration order. Required fields (no
94 # default / default_factory) must always emit; optional fields are
95 # suppressed when they equal their schema default. Constructing
96 # `model.__class__()` would fail for models with required fields
97 # (e.g. SourceDirective.path).
98 from pydantic_core import PydanticUndefined
99
100 for field_name, field_info in model.__class__.model_fields.items():
101 value = getattr(model, field_name)
102 if field_info.default is not PydanticUndefined and value == field_info.default:
103 continue
104 if (
105 field_info.default is PydanticUndefined
106 and field_info.default_factory is not None
107 and value == field_info.default_factory() # type: ignore[call-arg]
108 ):
109 continue
110 if isinstance(value, BaseModel):
111 nested = _emit_nested_mapping(value, indent=indent + 2)
112 if not nested:
113 continue
114 lines.append(f"{pad}{field_name}:")
115 lines.extend(nested)
116 continue
117 if (
118 isinstance(value, dict)
119 and value
120 and all(isinstance(v, BaseModel) for v in value.values())
121 ):
122 # `dict[str, BaseModel]` (e.g. training.adapters) — emit
123 # each entry as a nested mapping. The key is the dict
124 # key; the value is the BaseModel's non-default fields.
125 lines.append(f"{pad}{field_name}:")
126 for k, v in value.items():
127 lines.append(f"{pad} {k}:")
128 nested = _emit_nested_mapping(v, indent=indent + 4)
129 if nested:
130 lines.extend(nested)
131 else:
132 # All-default AdapterConfig: emit explicit `{}` so
133 # YAML has a valid mapping value rather than bare key.
134 lines[-1] = f"{pad} {k}: {{}}"
135 continue
136 if (
137 isinstance(value, list | tuple)
138 and value
139 and all(isinstance(v, BaseModel) for v in value)
140 ):
141 # `tuple[BaseModel, ...]` / `list[BaseModel]` (e.g.
142 # training.sources). YAML list of nested mappings — each
143 # entry's first field emits with the `-` marker, subsequent
144 # fields indent aligned.
145 lines.append(f"{pad}{field_name}:")
146 for item in value:
147 nested = _emit_nested_mapping(item, indent=indent + 4)
148 if not nested:
149 lines.append(f"{pad} - {{}}")
150 continue
151 # Replace the first-field indent with ` - ` to start
152 # the list item; keep the rest at `indent + 4`.
153 first = nested[0]
154 prefix = f"{pad} - "
155 lines.append(prefix + first[len(pad) + 4 :])
156 lines.extend(nested[1:])
157 continue
158 lines.append(f"{pad}{field_name}: {_scalar(value)}")
159 return lines
160
161
162 def _emit_block_scalar(key: str, value: str) -> list[str]:
163 """YAML `|` block scalar: preserves line breaks verbatim."""
164 lines: list[str] = [f"{key}: |"]
165 for line in value.splitlines():
166 lines.append(f" {line}")
167 return lines
168
169
170 def _scalar(value: object) -> str:
171 """Render a scalar value in YAML-compatible form.
172
173 Conservative quoting: quote strings that could be misparsed (contain
174 whitespace, `:`, `#`, or look like a reserved scalar).
175 """
176 if isinstance(value, bool):
177 return "true" if value else "false"
178 if isinstance(value, int | float):
179 return _format_number(value)
180 if isinstance(value, str):
181 return _format_string(value)
182 if isinstance(value, list | tuple):
183 return _format_list(value)
184 if value is None:
185 return "null"
186 return str(value)
187
188
189 def _format_number(value: float | int) -> str:
190 """Render a numeric YAML scalar.
191
192 Integers serialize via `str()`; floats via `repr()` so `2e-4` round-trips
193 to `0.0002` cleanly. The `_scalar` dispatcher routes bools away before
194 we get here, so no bool guard is needed.
195 """
196 if isinstance(value, int):
197 return str(value)
198 if value == 0:
199 return "0.0"
200 return repr(value)
201
202
203 def _format_string(value: str) -> str:
204 if not value:
205 return '""'
206 if _needs_quoting(value):
207 escaped = value.replace("\\", "\\\\").replace('"', '\\"')
208 return f'"{escaped}"'
209 return value
210
211
212 _RESERVED_UNQUOTED = frozenset(
213 {
214 "true",
215 "false",
216 "null",
217 "yes",
218 "no",
219 "on",
220 "off",
221 "~",
222 }
223 )
224
225
226 def _needs_quoting(value: str) -> bool:
227 if value.lower() in _RESERVED_UNQUOTED:
228 return True
229 if any(ch in value for ch in " \t\n#\"':&*!|>?%@`{}[]"):
230 return True
231 # Leading `-` or `,` would be parsed as a YAML list element.
232 return value.startswith(("-", ","))
233
234
235 def _format_list(items: Iterable[object]) -> str:
236 """Inline flow-style list: `[a, b, c]`."""
237 rendered = [_scalar(item) for item in items]
238 return "[" + ", ".join(rendered) + "]"
239
240
241 # --- sections -----------------------------------------------------------------
242
243
244 def _serialize_section(section: Section) -> str:
245 if section.type == SectionType.PROSE:
246 body = section.content
247 if not body.endswith("\n"):
248 body += "\n"
249 return body
250 if section.type == SectionType.IMAGE:
251 attrs: list[str] = []
252 if section.media_path is not None:
253 attrs.append(f'path="{section.media_path}"')
254 if section.media_alt is not None:
255 attrs.append(f'alt="{section.media_alt}"')
256 attr_blob = (" " + " ".join(attrs)) if attrs else ""
257 fence = f"::{section.type.value}{attr_blob}::\n"
258 body = section.content
259 if body and not body.endswith("\n"):
260 body += "\n"
261 return fence + body
262 if section.type == SectionType.AUDIO:
263 attrs = []
264 if section.media_path is not None:
265 attrs.append(f'path="{section.media_path}"')
266 if section.media_transcript is not None:
267 transcript = section.media_transcript
268 # Fence attribute grammar rejects `"` and `\n` at parse
269 # time (the `_ATTR_KV_RE` character class is `[^"\n]*`).
270 # Refuse to emit unparseable output rather than producing
271 # something that survives serialization but fails re-read.
272 if '"' in transcript or "\n" in transcript:
273 raise ValueError(
274 "AUDIO transcript cannot contain double-quotes or "
275 "newlines — the fence attribute grammar disallows them. "
276 "Use curly quotes ('“'/'”') or rephrase. "
277 f"Offending transcript: {transcript!r}"
278 )
279 attrs.append(f'transcript="{transcript}"')
280 attr_blob = (" " + " ".join(attrs)) if attrs else ""
281 fence = f"::{section.type.value}{attr_blob}::\n"
282 body = section.content
283 if body and not body.endswith("\n"):
284 body += "\n"
285 return fence + body
286 suffix = f"#{section.adapter}" if section.adapter else ""
287 fence = f"::{section.type.value}{suffix}::\n"
288 body = section.content
289 if body and not body.endswith("\n"):
290 body += "\n"
291 markers: list[str] = []
292 # Schema v7: auto-harvested sections carry a magic-comment marker
293 # immediately after the fence. Parser lifts it back into
294 # `Section.auto_harvest` + `Section.harvest_source`; emitting it
295 # here keeps the round-trip symmetric.
296 if section.auto_harvest:
297 source = section.harvest_source or ""
298 markers.append(f'<!-- dlm-auto-harvest: source="{source}" -->\n')
299 if section.auto_mined:
300 if (
301 section.judge_name is None
302 or section.judge_score_chosen is None
303 or section.judge_score_rejected is None
304 or section.mined_at is None
305 or section.mined_run_id is None
306 ):
307 raise ValueError("auto_mined section is missing required metadata fields")
308 attr_blob = " ".join(
309 [
310 f'judge_name="{_marker_attr_value(section.judge_name)}"',
311 f'judge_score_chosen="{_format_number(section.judge_score_chosen)}"',
312 f'judge_score_rejected="{_format_number(section.judge_score_rejected)}"',
313 f'mined_at="{_marker_attr_value(section.mined_at)}"',
314 f'mined_run_id="{section.mined_run_id}"',
315 ]
316 )
317 markers.append(f"<!-- dlm-auto-mined: {attr_blob} -->\n")
318 if section.auto_synth:
319 if (
320 section.synth_teacher is None
321 or section.synth_strategy is None
322 or section.synth_at is None
323 or section.source_section_id is None
324 ):
325 raise ValueError("auto_synth section is missing required metadata fields")
326 attr_blob = " ".join(
327 [
328 f'synth_teacher="{_marker_attr_value(section.synth_teacher)}"',
329 f'synth_strategy="{_marker_attr_value(section.synth_strategy)}"',
330 f'synth_at="{_marker_attr_value(section.synth_at)}"',
331 f'source_section_id="{_marker_attr_value(section.source_section_id)}"',
332 ]
333 )
334 markers.append(f"<!-- dlm-auto-synth: {attr_blob} -->\n")
335 return fence + "".join(markers) + body
336
337
338 def _marker_attr_value(value: str) -> str:
339 """Reject metadata values the marker grammar cannot round-trip."""
340 if '"' in value or "\n" in value:
341 raise ValueError("metadata marker values cannot contain double-quotes or newlines")
342 return value