Python · 16172 bytes Raw Blame History
1 """Resolve `training.sources` directives into synthesized Sections.
2
3 Entry point: `expand_sources(parsed, base_path)`. Walks every
4 `SourceDirective` on the parsed frontmatter, reads matching files
5 through `dlm.io.text.read_text` (so UTF-8 strict + BOM + CRLF
6 hygiene are identical to in-body sections), and returns a tuple of
7 `Section(SectionType.PROSE, ...)` ready to be concatenated with
8 `parsed.sections` before `build_dataset`.
9
10 Discovery + merge: when a directive points at a directory,
11 `discover_configs` finds every `.dlm/training.yaml` + `.dlm/ignore`
12 inside the tree; for each candidate file, `effective_config_for`
13 resolves the merged include/exclude verdict and the metadata tags to
14 flow onto the synthesized Section.
15
16 Content-hash collision defense: every synthesized Section's content
17 is prefixed with a canonical `# source: <relpath>\\n\\n` header. Two
18 different files with identical bodies therefore produce distinct
19 `section_id`s — the path becomes part of identity, matching the
20 file-granular semantics users expect for tree ingestion. Tags are
21 not part of `section_id`, so metadata churn doesn't invalidate the
22 replay corpus.
23
24 Per-source provenance is returned alongside the sections so
25 `TrainingRunSummary.source_directives` can record what was ingested,
26 what was skipped, and why.
27 """
28
29 from __future__ import annotations
30
31 import logging
32 from collections.abc import Iterator
33 from dataclasses import dataclass
34 from pathlib import Path
35 from typing import Final
36
37 from dlm.directives.discovery import DiscoveredConfig, discover_configs
38 from dlm.directives.errors import DirectivePathError
39 from dlm.directives.merge import effective_config_for
40 from dlm.directives.safety import confine_path, is_probably_binary
41 from dlm.doc.parser import ParsedDlm
42 from dlm.doc.schema import SourceDirective
43 from dlm.doc.sections import Section, SectionType
44 from dlm.io.text import DlmEncodingError, read_text
45 from dlm.store.blobs import BlobStore
46
47 _LOG = logging.getLogger(__name__)
48
49 # File extensions dispatched to the blob store as IMAGE sections.
50 # Kept lowercase; comparison lowers the observed suffix.
51 _IMAGE_EXTENSIONS: Final[frozenset[str]] = frozenset(
52 {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".tiff"}
53 )
54
55 # File extensions dispatched to the blob store as AUDIO sections.
56 # `.mp3` / `.m4a` deferred — soundfile can't decode them without
57 # libsndfile MP3 support, which isn't in our runtime dep tree.
58 # Users with mp3 corpora re-encode to wav/flac first.
59 _AUDIO_EXTENSIONS: Final[frozenset[str]] = frozenset({".wav", ".flac", ".ogg"})
60
61 # Sidecar transcript filename suffix: `clips/hello.wav` pairs with
62 # `clips/hello.txt`. Missing the sidecar is a hard refusal — audio
63 # without text supervision has no training signal, so we surface the
64 # gap immediately rather than emitting a section that'd fail at
65 # row-emission time.
66 _TRANSCRIPT_SIDECAR_SUFFIX: Final[str] = ".txt"
67
68
69 @dataclass(frozen=True)
70 class SourceProvenance:
71 """Per-directive bookkeeping for the training summary.
72
73 `path` is the directive's raw path (user-facing, before ~ or
74 symlink resolution) so it matches the frontmatter on disk.
75 `file_count` / `total_bytes` reflect text sections that made it
76 into the Section list. `image_count` / `image_bytes` /
77 `audio_count` / `audio_bytes` reflect media sections ingested
78 through the blob store. `skipped_binary` / `skipped_encoding` /
79 `skipped_over_size` / `skipped_by_descent` break down the drops —
80 if a directive yields zero sections, the skip counts let the user
81 see why. `skipped_image_no_store` / `skipped_audio_no_store`
82 count media hits dropped because no BlobStore was passed to
83 `expand_sources` (tests, `dlm show`). `skipped_audio_no_transcript`
84 counts audio files missing their `<stem>.txt` sidecar.
85 """
86
87 path: str
88 file_count: int
89 total_bytes: int
90 image_count: int = 0
91 image_bytes: int = 0
92 audio_count: int = 0
93 audio_bytes: int = 0
94 skipped_binary: int = 0
95 skipped_encoding: int = 0
96 skipped_over_size: int = 0
97 skipped_by_descent: int = 0
98 skipped_image_no_store: int = 0
99 skipped_audio_no_store: int = 0
100 skipped_audio_no_transcript: int = 0
101
102
103 @dataclass(frozen=True)
104 class ExpandResult:
105 """Return value of `expand_sources` — sections + provenance."""
106
107 sections: tuple[Section, ...]
108 provenance: tuple[SourceProvenance, ...]
109 discovered: tuple[DiscoveredConfig, ...] = ()
110
111
112 def expand_sources(
113 parsed: ParsedDlm,
114 *,
115 base_path: Path,
116 blob_store: BlobStore | None = None,
117 ) -> ExpandResult:
118 """Walk every `training.sources` directive and synthesize sections.
119
120 `base_path` is the `.dlm` file's parent directory — the anchor
121 for relative paths and the strict-policy confinement root.
122 `parsed.source_path` could supply this but we take it explicitly
123 because unit tests commonly synthesize `ParsedDlm` with
124 `source_path=None`.
125
126 When the `.dlm` lives under a `.dlm/` metadata directory (the
127 scaffolded shape at `<corpus>/.dlm/corpus.dlm`), the user's
128 intended anchor is the corpus directory, not the metadata
129 directory — so relative resolution and strict confinement use the
130 grandparent as the effective base.
131
132 `blob_store` receives ingested image bytes. When `None`, image-
133 extension files are counted in `skipped_image_no_store` and no
134 IMAGE section is emitted — this is the right shape for read-only
135 paths like `dlm show`, where we don't want to mutate disk.
136
137 Returns an `ExpandResult` with an empty sections tuple when the
138 frontmatter has no directives — callers can unconditionally
139 concatenate without a None check.
140 """
141 training = parsed.frontmatter.training
142 directives = training.sources or ()
143 if not directives:
144 return ExpandResult(sections=(), provenance=(), discovered=())
145
146 effective_base = base_path.parent if base_path.name == ".dlm" else base_path
147 strict = training.sources_policy == "strict"
148 sections: list[Section] = []
149 provenance: list[SourceProvenance] = []
150 all_discovered: list[DiscoveredConfig] = []
151
152 for directive in directives:
153 root_raw = Path(directive.path)
154 # Relative paths anchor on effective_base (the corpus dir for
155 # scaffolded .dlms, the .dlm's parent otherwise).
156 if not root_raw.is_absolute() and not directive.path.startswith("~"):
157 root_raw = effective_base / root_raw
158
159 resolved_root = confine_path(root_raw, effective_base, strict=strict)
160 if not resolved_root.exists():
161 raise DirectivePathError(resolved_root, "path does not exist")
162
163 # Discovery: only meaningful for directory directives. A
164 # single-file directive has no tree to descend into.
165 discovered = discover_configs(resolved_root) if resolved_root.is_dir() else ()
166 all_discovered.extend(discovered)
167
168 dir_sections, dir_prov = _expand_one(
169 directive=directive,
170 resolved_root=resolved_root,
171 discovered=discovered,
172 blob_store=blob_store,
173 )
174 sections.extend(dir_sections)
175 provenance.append(dir_prov)
176
177 return ExpandResult(
178 sections=tuple(sections),
179 provenance=tuple(provenance),
180 discovered=tuple(all_discovered),
181 )
182
183
184 def _expand_one(
185 *,
186 directive: SourceDirective,
187 resolved_root: Path,
188 discovered: tuple[DiscoveredConfig, ...],
189 blob_store: BlobStore | None,
190 ) -> tuple[list[Section], SourceProvenance]:
191 """Expand a single directive into sections + per-directive provenance."""
192 sections: list[Section] = []
193 total_bytes = 0
194 image_count = 0
195 image_bytes = 0
196 audio_count = 0
197 audio_bytes = 0
198 skipped_binary = 0
199 skipped_encoding = 0
200 skipped_over_size = 0
201 skipped_by_descent = 0
202 skipped_image_no_store = 0
203 skipped_audio_no_store = 0
204 skipped_audio_no_transcript = 0
205
206 # Anchor for relpath-in-header. For a single-file directive the
207 # header uses the file name; for a directory the relpath is the
208 # tree-relative form.
209 header_root = resolved_root if resolved_root.is_dir() else resolved_root.parent
210
211 for file_path in _iter_candidates(resolved_root):
212 if directive.max_files is not None and _section_cap_reached(sections, directive.max_files):
213 _LOG.info(
214 "directive: hit max_files=%d for %s; truncating deterministically",
215 directive.max_files,
216 directive.path,
217 )
218 break
219
220 # Descent verdict: merge parent directive + discovered `.dlm/`
221 # configs. `included=False` means a deeper config or default-
222 # exclude ruled this file out.
223 effective = effective_config_for(
224 file_path,
225 source_root=resolved_root,
226 discovered=discovered,
227 parent_directive=directive,
228 is_dir=False,
229 )
230 if not effective.included:
231 skipped_by_descent += 1
232 continue
233
234 try:
235 size = file_path.stat().st_size
236 except OSError as exc:
237 _LOG.warning("directive: stat failed for %s: %s; skipping", file_path, exc)
238 continue
239
240 if directive.max_bytes_per_file is not None and size > directive.max_bytes_per_file:
241 _LOG.info(
242 "directive: %s (%d bytes) exceeds max_bytes_per_file=%d; skipping",
243 file_path,
244 size,
245 directive.max_bytes_per_file,
246 )
247 skipped_over_size += 1
248 continue
249
250 # Image-extension dispatch: skips the text-read + binary-skip
251 # path entirely. The blob store owns ingestion; the synthesized
252 # section carries only the path + blob sha.
253 if file_path.suffix.lower() in _IMAGE_EXTENSIONS:
254 if blob_store is None:
255 _LOG.info(
256 "directive: %s is an image but no blob_store supplied; skipping",
257 file_path,
258 )
259 skipped_image_no_store += 1
260 continue
261 handle = blob_store.put(file_path)
262 relpath = file_path.relative_to(header_root).as_posix()
263 alt = file_path.stem
264 sections.append(
265 Section(
266 type=SectionType.IMAGE,
267 content="",
268 media_path=relpath,
269 media_alt=alt,
270 media_blob_sha=handle.sha,
271 tags=effective.tags,
272 ),
273 )
274 image_count += 1
275 image_bytes += handle.size
276 continue
277
278 # Audio-extension dispatch: same shape as image but requires
279 # a `<stem>.txt` sidecar for the transcript —
280 # audio without text has no training signal. Missing sidecar
281 # is a skip (with an explicit counter), not a hard raise,
282 # because a mixed corpus may have both "for-training" audio
283 # (has .txt) and "reference" audio (no .txt) side by side.
284 if file_path.suffix.lower() in _AUDIO_EXTENSIONS:
285 if blob_store is None:
286 _LOG.info(
287 "directive: %s is audio but no blob_store supplied; skipping",
288 file_path,
289 )
290 skipped_audio_no_store += 1
291 continue
292 transcript = _read_audio_transcript(file_path)
293 if transcript is None:
294 _LOG.info(
295 "directive: %s has no %s sidecar; skipping "
296 "(audio without transcript has no training signal)",
297 file_path,
298 _TRANSCRIPT_SIDECAR_SUFFIX,
299 )
300 skipped_audio_no_transcript += 1
301 continue
302 handle = blob_store.put(file_path)
303 relpath = file_path.relative_to(header_root).as_posix()
304 sections.append(
305 Section(
306 type=SectionType.AUDIO,
307 content="",
308 media_path=relpath,
309 media_blob_sha=handle.sha,
310 media_transcript=transcript,
311 tags=effective.tags,
312 ),
313 )
314 audio_count += 1
315 audio_bytes += handle.size
316 continue
317
318 try:
319 raw = file_path.read_bytes()
320 except OSError as exc:
321 _LOG.warning("directive: read failed for %s: %s; skipping", file_path, exc)
322 continue
323
324 if is_probably_binary(raw):
325 _LOG.info("directive: %s looks binary (NUL in first KiB); skipping", file_path)
326 skipped_binary += 1
327 continue
328
329 try:
330 text = read_text(file_path)
331 except DlmEncodingError:
332 _LOG.warning("directive: %s is not UTF-8; skipping", file_path)
333 skipped_encoding += 1
334 continue
335
336 relpath = file_path.relative_to(header_root).as_posix()
337 content = f"# source: {relpath}\n\n{text}"
338 sections.append(Section(type=SectionType.PROSE, content=content, tags=effective.tags))
339 total_bytes += len(raw)
340
341 # Count prose-bearing sections separately from media so the
342 # per-modality counters don't collide. PROSE + INSTRUCTION +
343 # PREFERENCE → file_count; IMAGE → image_count; AUDIO →
344 # audio_count.
345 text_sections = sum(1 for s in sections if s.type not in (SectionType.IMAGE, SectionType.AUDIO))
346 return sections, SourceProvenance(
347 path=directive.path,
348 file_count=text_sections,
349 total_bytes=total_bytes,
350 image_count=image_count,
351 image_bytes=image_bytes,
352 audio_count=audio_count,
353 audio_bytes=audio_bytes,
354 skipped_binary=skipped_binary,
355 skipped_encoding=skipped_encoding,
356 skipped_over_size=skipped_over_size,
357 skipped_by_descent=skipped_by_descent,
358 skipped_image_no_store=skipped_image_no_store,
359 skipped_audio_no_store=skipped_audio_no_store,
360 skipped_audio_no_transcript=skipped_audio_no_transcript,
361 )
362
363
364 def _read_audio_transcript(audio_path: Path) -> str | None:
365 """Read the sibling `<stem>.txt` transcript, or return None.
366
367 Sidecar is `<stem><_TRANSCRIPT_SIDECAR_SUFFIX>` in the same
368 directory as the audio file — `clips/hello.wav` pairs with
369 `clips/hello.txt`. The transcript is stripped of leading/trailing
370 whitespace and re-UTF-8-decoded through `dlm.io.text.read_text`
371 so it matches the encoding contract the rest of the pipeline
372 uses for ingested text.
373
374 Returns None (not empty string) when the sidecar is missing — the
375 caller treats "no sidecar" and "empty sidecar" differently: the
376 former is a skip, the latter is an author bug surfaced at train
377 time via a loud refusal (an empty transcript has no training
378 signal either).
379 """
380 sidecar = audio_path.with_suffix(_TRANSCRIPT_SIDECAR_SUFFIX)
381 if not sidecar.exists():
382 return None
383 try:
384 text = read_text(sidecar)
385 except (OSError, DlmEncodingError) as exc:
386 _LOG.warning(
387 "directive: transcript sidecar %s is unreadable: %s; skipping audio",
388 sidecar,
389 exc,
390 )
391 return None
392 return text.strip()
393
394
395 def _section_cap_reached(sections: list[Section], max_files: int) -> bool:
396 """True when the cap has been hit for the user's intent.
397
398 `max_files` caps total ingested files (text + image) — the user
399 wrote one number; both modalities count against it.
400 """
401 return len(sections) >= max_files
402
403
404 def _iter_candidates(root: Path) -> Iterator[Path]:
405 """Yield every file under `root` in deterministic order.
406
407 Filtering moves into `effective_config_for` now that descent
408 protocol layers rules beyond the parent directive's include /
409 exclude. Single-file roots are yielded as the single file.
410 """
411 if root.is_file():
412 yield root
413 return
414 if not root.is_dir():
415 return
416 for candidate in sorted(root.rglob("*")):
417 if candidate.is_file():
418 yield candidate