Python · 23887 bytes Raw Blame History
1 """Parse `.dlm` files into validated `ParsedDlm` values.
2
3 Flow:
4
5 read bytes → dlm.io.text.read_text (UTF-8 strict, BOM strip, CRLF→LF)
6 → split frontmatter and body on the two `---` delimiters
7 → YAML-parse the frontmatter
8 → Pydantic validate → DlmFrontmatter
9 → check dlm_version (delegates to the migration registry)
10 → tokenize body into Section list (code-fence aware)
11 → return ParsedDlm(frozen)
12
13 Errors are always typed (`DlmParseError` subclasses) and carry
14 `path:line:col` location info for the CLI reporter.
15 """
16
17 from __future__ import annotations
18
19 import math
20 import re
21 from dataclasses import dataclass
22 from datetime import datetime
23 from pathlib import Path
24 from typing import Final
25
26 import yaml
27
28 from dlm.doc.errors import (
29 FenceError,
30 FrontmatterError,
31 )
32 from dlm.doc.schema import DlmFrontmatter
33 from dlm.doc.sections import Section, SectionType
34 from dlm.doc.versioned import validate_versioned
35 from dlm.io.text import read_text
36
37 # Schema v7 marker: sections written back by `dlm harvest` carry a
38 # magic-comment first line inside the fenced section body. The parser
39 # recognizes it and moves the metadata to `Section.auto_harvest` +
40 # `Section.harvest_source`; it is not user-authored content.
41 _HARVEST_MARKER_RE: Final[re.Pattern[str]] = re.compile(
42 r'^<!-- dlm-auto-harvest: source="([^"]*)" -->$'
43 )
44 _AUTO_MINED_MARKER_RE: Final[re.Pattern[str]] = re.compile(r"^<!-- dlm-auto-mined:(.*) -->$")
45 _AUTO_SYNTH_MARKER_RE: Final[re.Pattern[str]] = re.compile(r"^<!-- dlm-auto-synth:(.*) -->$")
46 _MARKER_ATTR_BLOB_RE: Final[re.Pattern[str]] = re.compile(r'(?:\s+[a-z][a-z0-9_]*="[^"\n]*")+')
47 _AUTO_MINED_KEYS: Final[frozenset[str]] = frozenset(
48 {
49 "judge_name",
50 "judge_score_chosen",
51 "judge_score_rejected",
52 "mined_at",
53 "mined_run_id",
54 }
55 )
56 _AUTO_SYNTH_KEYS: Final[frozenset[str]] = frozenset(
57 {
58 "synth_teacher",
59 "synth_strategy",
60 "synth_at",
61 "source_section_id",
62 }
63 )
64
65 # --- public surface -----------------------------------------------------------
66
67
68 @dataclass(frozen=True)
69 class ParsedDlm:
70 """Immutable result of parsing a `.dlm` document."""
71
72 frontmatter: DlmFrontmatter
73 sections: tuple[Section, ...]
74 source_path: Path | None = None
75
76
77 def parse_file(path: Path) -> ParsedDlm:
78 """Read `path` as UTF-8 and parse it."""
79 text = read_text(path)
80 return parse_text(text, path=path)
81
82
83 def parse_text(text: str, *, path: Path | None = None) -> ParsedDlm:
84 """Parse the already-decoded text body of a `.dlm` document.
85
86 Assumes the caller has applied UTF-8 decoding + LF normalization (the
87 `dlm.io.text.read_text` helper does this automatically).
88 """
89 frontmatter_text, body, body_start_line = _split_frontmatter(text, path=path)
90 frontmatter = _validate_frontmatter(frontmatter_text, path=path)
91 sections = _tokenize_body(body, body_start_line=body_start_line, path=path)
92 return ParsedDlm(
93 frontmatter=frontmatter,
94 sections=tuple(sections),
95 source_path=path,
96 )
97
98
99 # --- internals ----------------------------------------------------------------
100
101
102 _FRONTMATTER_DELIM: Final = "---"
103
104 # A fence line is one of:
105 # `::<type>::` — bare fence
106 # `::<type>#<adapter>::` — adapter-routed fence
107 # `::<type> key="val" key="val"::` — attribute fence (IMAGE, schema v10+)
108 #
109 # - `<type>` is one of `SectionType` (validated in `_resolve_fence_type`).
110 # - `<adapter>` matches the schema's adapter-name grammar: lowercase
111 # alpha start + `[a-z0-9_]` tail, ≤32 chars. Keeps store paths safe
112 # and log output readable.
113 # - Attribute form values are double-quoted, ASCII-only, and cannot
114 # contain newlines (enforced at parse time). Currently only the
115 # IMAGE type uses attributes (`path`, `alt`); adding more is a
116 # compatible extension.
117 _FENCE_RE: Final[re.Pattern[str]] = re.compile(r"^::([A-Za-z0-9_#-]+)::$")
118 _ATTR_FENCE_RE: Final[re.Pattern[str]] = re.compile(
119 r'^::([a-z][a-z0-9_]*)((?:\s+[a-z][a-z0-9_]*="[^"\n]*")+)\s*::$'
120 )
121 _ATTR_KV_RE: Final[re.Pattern[str]] = re.compile(r'([a-z][a-z0-9_]*)="([^"\n]*)"')
122 _ADAPTER_SUFFIX_RE: Final[re.Pattern[str]] = re.compile(r"^[a-z][a-z0-9_]{0,31}$")
123 _CODE_FENCE_RE: Final[re.Pattern[str]] = re.compile(r"^```")
124
125 # Per-type attribute grammar. Keys marked required must appear; unknown
126 # keys raise a FenceError. Expanded by future multi-modal types.
127 _FENCE_ATTR_SPEC: Final[dict[str, tuple[frozenset[str], frozenset[str]]]] = {
128 # IMAGE: `path` required, `alt` optional.
129 "image": (frozenset({"path"}), frozenset({"path", "alt"})),
130 # AUDIO: `path` + `transcript` both required. Transcript carries
131 # the text-side supervision at train time; audio without text has
132 # no training signal, so the attribute is mandatory.
133 "audio": (frozenset({"path", "transcript"}), frozenset({"path", "transcript"})),
134 }
135
136
137 def _split_frontmatter(text: str, *, path: Path | None) -> tuple[str, str, int]:
138 """Return (frontmatter_yaml, body_text, body_start_line_1indexed).
139
140 The first line MUST be `---`. The next `---` line closes the block.
141 Everything after is body. Missing either delimiter is an error.
142 """
143 lines = text.split("\n")
144 if not lines or lines[0] != _FRONTMATTER_DELIM:
145 observed = repr(lines[0]) if lines else "''"
146 raise FrontmatterError(
147 f"expected '---' on line 1 to open frontmatter, got {observed}",
148 path=path,
149 line=1,
150 col=1,
151 )
152 # Find the closing delimiter.
153 for i in range(1, len(lines)):
154 if lines[i] == _FRONTMATTER_DELIM:
155 yaml_text = "\n".join(lines[1:i])
156 body = "\n".join(lines[i + 1 :])
157 # body starts on the line after the closer (1-indexed).
158 return yaml_text, body, i + 2
159 raise FrontmatterError(
160 "no closing '---' found for frontmatter block",
161 path=path,
162 line=1,
163 col=1,
164 )
165
166
167 def _validate_frontmatter(yaml_text: str, *, path: Path | None) -> DlmFrontmatter:
168 """YAML-parse → migrate-if-needed → Pydantic-validate.
169
170 The migration dispatcher runs between YAML parse and
171 Pydantic validate, so an older-but-known schema is upgraded to the
172 current shape before `extra="forbid"` enforcement.
173 """
174 try:
175 raw = yaml.safe_load(yaml_text) if yaml_text.strip() else {}
176 except yaml.YAMLError as exc:
177 line, col = _yaml_error_location(exc)
178 raise FrontmatterError(f"invalid YAML: {exc}", path=path, line=line, col=col) from exc
179
180 if not isinstance(raw, dict):
181 raise FrontmatterError(
182 f"frontmatter must be a mapping, got {type(raw).__name__}",
183 path=path,
184 line=2,
185 )
186
187 return validate_versioned(raw, path=path)
188
189
190 def _yaml_error_location(exc: yaml.YAMLError) -> tuple[int, int]:
191 """Extract 1-indexed (line, col) from a YAMLError, if present.
192
193 The exception's `problem_mark` attribute is 0-indexed internally.
194 """
195 mark = getattr(exc, "problem_mark", None) or getattr(exc, "context_mark", None)
196 if mark is None:
197 return 0, 0
198 # The frontmatter parser feeds yaml the content *without* its
199 # delimiters, so the reported line is offset by 1 (the opening `---`).
200 return mark.line + 2, mark.column + 1
201
202
203 def _tokenize_body(body: str, *, body_start_line: int, path: Path | None) -> list[Section]:
204 """Split body into Section list.
205
206 Rules:
207
208 - Active type starts as PROSE.
209 - A bare fence line is exactly `^::<type>::$` or `^::<type>#<adapter>::$`.
210 - An attribute fence line is `^::<type> key="val" ...::$` — currently
211 only IMAGE uses this form.
212 - Unknown fences raise `FenceError`.
213 - Triple-backtick code blocks (```...```) suppress fence
214 interpretation for their contents.
215 - Empty PROSE sections (between two fences back-to-back) are elided.
216 """
217 # The canonical layout emits a single blank line between the closing
218 # `---` and the first body line. Strip one leading LF so section
219 # content doesn't accumulate that separator line on every round-trip.
220 if body.startswith("\n"):
221 body = body[1:]
222 body_start_line += 1
223
224 # Likewise, files canonically end with a trailing LF; `split("\n")`
225 # would otherwise produce a spurious empty trailing element.
226 if body.endswith("\n"):
227 body = body[:-1]
228
229 lines = body.split("\n") if body else []
230 sections: list[Section] = []
231 in_code_block = False
232 current_type = SectionType.PROSE
233 current_adapter: str | None = None
234 current_media_path: str | None = None
235 current_media_alt: str | None = None
236 current_media_transcript: str | None = None
237 current_lines: list[str] = []
238 current_start_line = body_start_line
239
240 def flush() -> None:
241 # Schema v7: non-PROSE sections may carry a harvest marker as
242 # the first body line. Lift it into Section fields before the
243 # content hash sees it, so a harvested section's `section_id`
244 # matches a hand-authored section with identical content —
245 # provenance is metadata, not identity.
246 lines_for_content = list(current_lines)
247 auto_harvest = False
248 harvest_source: str | None = None
249 auto_mined = False
250 judge_name: str | None = None
251 judge_score_chosen: float | None = None
252 judge_score_rejected: float | None = None
253 mined_at: str | None = None
254 mined_run_id: int | None = None
255 auto_synth = False
256 synth_teacher: str | None = None
257 synth_strategy: str | None = None
258 synth_at: str | None = None
259 source_section_id: str | None = None
260 media_types = (SectionType.PROSE, SectionType.IMAGE, SectionType.AUDIO)
261 if current_type not in media_types:
262 while lines_for_content:
263 first = lines_for_content[0]
264 marker_match = _HARVEST_MARKER_RE.match(first)
265 if marker_match:
266 auto_harvest = True
267 harvest_source = marker_match.group(1)
268 lines_for_content = lines_for_content[1:]
269 continue
270 mined_match = (
271 _AUTO_MINED_MARKER_RE.match(first)
272 if current_type == SectionType.PREFERENCE
273 else None
274 )
275 if mined_match:
276 (
277 judge_name,
278 judge_score_chosen,
279 judge_score_rejected,
280 mined_at,
281 mined_run_id,
282 ) = _parse_auto_mined_marker(
283 mined_match.group(1),
284 path=path,
285 line=current_start_line,
286 )
287 auto_mined = True
288 lines_for_content = lines_for_content[1:]
289 continue
290 synth_match = (
291 _AUTO_SYNTH_MARKER_RE.match(first)
292 if current_type == SectionType.INSTRUCTION
293 else None
294 )
295 if synth_match:
296 (
297 synth_teacher,
298 synth_strategy,
299 synth_at,
300 source_section_id,
301 ) = _parse_auto_synth_marker(
302 synth_match.group(1),
303 path=path,
304 line=current_start_line,
305 )
306 auto_synth = True
307 lines_for_content = lines_for_content[1:]
308 continue
309 break
310 content = "\n".join(lines_for_content)
311 # Elide empty PROSE sections (no content at all).
312 if current_type == SectionType.PROSE and not content.strip() and not current_lines:
313 return
314 if current_type == SectionType.PROSE and not content.strip():
315 # Purely-whitespace prose between fences: drop, keeps round-trip tidy.
316 return
317 sections.append(
318 Section(
319 type=current_type,
320 content=content,
321 start_line=current_start_line,
322 adapter=current_adapter,
323 auto_harvest=auto_harvest,
324 harvest_source=harvest_source,
325 auto_mined=auto_mined,
326 judge_name=judge_name,
327 judge_score_chosen=judge_score_chosen,
328 judge_score_rejected=judge_score_rejected,
329 mined_at=mined_at,
330 mined_run_id=mined_run_id,
331 auto_synth=auto_synth,
332 synth_teacher=synth_teacher,
333 synth_strategy=synth_strategy,
334 synth_at=synth_at,
335 source_section_id=source_section_id,
336 media_path=current_media_path,
337 media_alt=current_media_alt,
338 media_transcript=current_media_transcript,
339 ),
340 )
341
342 for idx, line in enumerate(lines):
343 source_line = body_start_line + idx
344
345 # Track fenced code blocks to suppress fence interpretation.
346 if _CODE_FENCE_RE.match(line):
347 in_code_block = not in_code_block
348 current_lines.append(line)
349 continue
350
351 if not in_code_block:
352 attr_match = _ATTR_FENCE_RE.match(line)
353 if attr_match:
354 fence_type, attrs = _resolve_attr_fence(attr_match, source_line, path)
355 flush()
356 current_type = fence_type
357 current_adapter = None
358 current_media_path = attrs.get("path")
359 current_media_alt = attrs.get("alt")
360 current_media_transcript = attrs.get("transcript")
361 current_lines = []
362 current_start_line = source_line
363 continue
364 match = _FENCE_RE.match(line)
365 if match:
366 fence_name = match.group(1)
367 fence_type, fence_adapter = _resolve_fence_type(fence_name, source_line, path)
368 if fence_type in _FENCE_ATTR_SPEC:
369 raise FenceError(
370 f"fence '::{fence_name}::' requires attributes "
371 f'(expected e.g. `::{fence_type.value} path="..."::`)',
372 path=path,
373 line=source_line,
374 col=1,
375 )
376 flush()
377 current_type = fence_type
378 current_adapter = fence_adapter
379 current_media_path = None
380 current_media_alt = None
381 current_media_transcript = None
382 current_lines = []
383 current_start_line = source_line
384 continue
385
386 current_lines.append(line)
387
388 if in_code_block:
389 raise FenceError(
390 "unterminated triple-backtick code block in body",
391 path=path,
392 line=current_start_line,
393 )
394
395 flush()
396 return sections
397
398
399 def _resolve_attr_fence(
400 match: re.Match[str], line: int, path: Path | None
401 ) -> tuple[SectionType, dict[str, str]]:
402 """Validate an attribute-form fence and return (type, attrs).
403
404 The attribute grammar is type-specific — `_FENCE_ATTR_SPEC` names
405 the required and allowed keys per type. Required-but-missing keys
406 and unknown keys raise `FenceError`; duplicate keys raise too so
407 `path="a" path="b"` can't silently pick one.
408 """
409 fence_name = match.group(1)
410 attr_blob = match.group(2)
411 try:
412 section_type = SectionType(fence_name)
413 except ValueError as exc:
414 raise FenceError(
415 f"unknown attribute fence '::{fence_name}...::'; attribute form "
416 f"is only valid for types {sorted(_FENCE_ATTR_SPEC)}",
417 path=path,
418 line=line,
419 col=1,
420 ) from exc
421 if fence_name not in _FENCE_ATTR_SPEC:
422 raise FenceError(
423 f"fence '::{fence_name}...::' does not take attributes",
424 path=path,
425 line=line,
426 col=1,
427 )
428 required, allowed = _FENCE_ATTR_SPEC[fence_name]
429
430 attrs: dict[str, str] = {}
431 for kv in _ATTR_KV_RE.finditer(attr_blob):
432 key = kv.group(1)
433 value = kv.group(2)
434 if key in attrs:
435 raise FenceError(
436 f"fence '::{fence_name}...::' repeats attribute {key!r}",
437 path=path,
438 line=line,
439 col=1,
440 )
441 if key not in allowed:
442 raise FenceError(
443 f"fence '::{fence_name}...::' has unknown attribute {key!r} "
444 f"(allowed: {sorted(allowed)})",
445 path=path,
446 line=line,
447 col=1,
448 )
449 if not value.isascii():
450 raise FenceError(
451 f"fence '::{fence_name}...::' attribute {key!r} contains non-ASCII characters",
452 path=path,
453 line=line,
454 col=1,
455 )
456 attrs[key] = value
457
458 missing = required - attrs.keys()
459 if missing:
460 raise FenceError(
461 f"fence '::{fence_name}...::' is missing required attribute(s) {sorted(missing)}",
462 path=path,
463 line=line,
464 col=1,
465 )
466 return section_type, attrs
467
468
469 def _resolve_fence_type(name: str, line: int, path: Path | None) -> tuple[SectionType, str | None]:
470 """Map a fence name to `(SectionType, adapter_name|None)` or raise.
471
472 Multi-adapter fences carry a `#<adapter>` suffix; the adapter part is
473 validated against the same grammar the schema uses. A fence like
474 `::instruction#::` (trailing hash but no name) or `::foo#bar::` (bad
475 base) raises `FenceError`.
476 """
477 if "#" in name:
478 base, _, adapter = name.partition("#")
479 if not adapter:
480 raise FenceError(
481 f"fence '::{name}::' has an empty adapter suffix after '#'",
482 path=path,
483 line=line,
484 col=1,
485 )
486 if not _ADAPTER_SUFFIX_RE.fullmatch(adapter):
487 raise FenceError(
488 f"fence '::{name}::' has an invalid adapter name "
489 f"{adapter!r} (must match {_ADAPTER_SUFFIX_RE.pattern})",
490 path=path,
491 line=line,
492 col=1,
493 )
494 else:
495 base, adapter = name, None
496
497 try:
498 section_type = SectionType(base)
499 except ValueError as exc:
500 raise FenceError(
501 f"unknown section fence '::{name}::'; valid types are {[t.value for t in SectionType]}",
502 path=path,
503 line=line,
504 col=1,
505 ) from exc
506 return section_type, adapter or None
507
508
509 def _parse_auto_mined_marker(
510 attr_blob: str, *, path: Path | None, line: int
511 ) -> tuple[str, float, float, str, int]:
512 """Parse the auto-mined metadata marker on preference sections."""
513 if not _MARKER_ATTR_BLOB_RE.fullmatch(attr_blob):
514 raise FenceError(
515 "invalid dlm-auto-mined marker syntax",
516 path=path,
517 line=line,
518 col=1,
519 )
520
521 attrs: dict[str, str] = {}
522 for kv in _ATTR_KV_RE.finditer(attr_blob):
523 key = kv.group(1)
524 value = kv.group(2)
525 if key in attrs:
526 raise FenceError(
527 f"dlm-auto-mined marker repeats attribute {key!r}",
528 path=path,
529 line=line,
530 col=1,
531 )
532 if key not in _AUTO_MINED_KEYS:
533 raise FenceError(
534 f"dlm-auto-mined marker has unknown attribute {key!r} "
535 f"(allowed: {sorted(_AUTO_MINED_KEYS)})",
536 path=path,
537 line=line,
538 col=1,
539 )
540 attrs[key] = value
541
542 missing = _AUTO_MINED_KEYS - attrs.keys()
543 if missing:
544 raise FenceError(
545 f"dlm-auto-mined marker is missing required attribute(s) {sorted(missing)}",
546 path=path,
547 line=line,
548 col=1,
549 )
550
551 try:
552 score_chosen = float(attrs["judge_score_chosen"])
553 score_rejected = float(attrs["judge_score_rejected"])
554 except ValueError as exc:
555 raise FenceError(
556 "dlm-auto-mined marker judge scores must be floats",
557 path=path,
558 line=line,
559 col=1,
560 ) from exc
561 if not math.isfinite(score_chosen) or not math.isfinite(score_rejected):
562 raise FenceError(
563 "dlm-auto-mined marker judge scores must be finite",
564 path=path,
565 line=line,
566 col=1,
567 )
568
569 mined_at = attrs["mined_at"]
570 try:
571 datetime.fromisoformat(mined_at.replace("Z", "+00:00"))
572 except ValueError as exc:
573 raise FenceError(
574 "dlm-auto-mined marker mined_at must be ISO-8601",
575 path=path,
576 line=line,
577 col=1,
578 ) from exc
579
580 try:
581 mined_run_id = int(attrs["mined_run_id"])
582 except ValueError as exc:
583 raise FenceError(
584 "dlm-auto-mined marker mined_run_id must be an integer",
585 path=path,
586 line=line,
587 col=1,
588 ) from exc
589 if mined_run_id < 1:
590 raise FenceError(
591 "dlm-auto-mined marker mined_run_id must be >= 1",
592 path=path,
593 line=line,
594 col=1,
595 )
596
597 return (
598 attrs["judge_name"],
599 score_chosen,
600 score_rejected,
601 mined_at,
602 mined_run_id,
603 )
604
605
606 def _parse_auto_synth_marker(
607 attr_blob: str, *, path: Path | None, line: int
608 ) -> tuple[str, str, str, str]:
609 """Parse the auto-synth metadata marker on instruction sections."""
610 if not _MARKER_ATTR_BLOB_RE.fullmatch(attr_blob):
611 raise FenceError(
612 "invalid dlm-auto-synth marker syntax",
613 path=path,
614 line=line,
615 col=1,
616 )
617
618 attrs: dict[str, str] = {}
619 for kv in _ATTR_KV_RE.finditer(attr_blob):
620 key = kv.group(1)
621 value = kv.group(2)
622 if key in attrs:
623 raise FenceError(
624 f"dlm-auto-synth marker repeats attribute {key!r}",
625 path=path,
626 line=line,
627 col=1,
628 )
629 if key not in _AUTO_SYNTH_KEYS:
630 raise FenceError(
631 f"dlm-auto-synth marker has unknown attribute {key!r} "
632 f"(allowed: {sorted(_AUTO_SYNTH_KEYS)})",
633 path=path,
634 line=line,
635 col=1,
636 )
637 attrs[key] = value
638
639 missing = _AUTO_SYNTH_KEYS - attrs.keys()
640 if missing:
641 raise FenceError(
642 f"dlm-auto-synth marker is missing required attribute(s) {sorted(missing)}",
643 path=path,
644 line=line,
645 col=1,
646 )
647
648 synth_at = attrs["synth_at"]
649 try:
650 datetime.fromisoformat(synth_at.replace("Z", "+00:00"))
651 except ValueError as exc:
652 raise FenceError(
653 "dlm-auto-synth marker synth_at must be ISO-8601",
654 path=path,
655 line=line,
656 col=1,
657 ) from exc
658
659 source_section_id = attrs["source_section_id"]
660 if len(source_section_id) != 16 or any(
661 ch not in "0123456789abcdef" for ch in source_section_id
662 ):
663 raise FenceError(
664 "dlm-auto-synth marker source_section_id must be a 16-char lowercase hex section id",
665 path=path,
666 line=line,
667 col=1,
668 )
669
670 return (
671 attrs["synth_teacher"],
672 attrs["synth_strategy"],
673 synth_at,
674 source_section_id,
675 )