Python · 19495 bytes Raw Blame History
1 """Auto-generate a ``sway.yaml`` from a ``.dlm`` document.
2
3 Walks the parsed sections and emits one entry per primitive sway ships:
4 the full 11-primitive battery wired up against the document's own
5 content. The result is a YAML artifact the user commits alongside their
6 ``.dlm`` and diffs in PRs.
7
8 The generated spec includes a ``dlm_source`` field that the suite loader
9 uses to pick up :class:`~dlm_sway.core.sections.Section` data at run
10 time — probes that need sections (B1, B3, C3) then work against the
11 typed structure instead of re-parsing text.
12 """
13
14 from __future__ import annotations
15
16 from pathlib import Path
17 from typing import Any
18
19 import yaml
20
21 from dlm_sway.core.errors import SwayError
22 from dlm_sway.core.sections import Section
23 from dlm_sway.integrations.dlm.resolver import DlmHandle, resolve_dlm
24
25 #: Stylistic-elicitation prompts used by the generated ``style_fingerprint``
26 #: probe (B8). Picked to be open-ended and content-neutral — the model's
27 #: voice under the adapter is the signal we want, *not* its ability to
28 #: continue a sentence the doc already wrote. Each prompt deliberately
29 #: invites prose-shaped output of moderate length (one paragraph).
30 _STYLE_ELICITATION_PROMPTS: tuple[str, ...] = (
31 "Write a short paragraph explaining your approach to a difficult problem.",
32 "Describe what you find most interesting about a topic you know well.",
33 "Summarize an important idea for a curious novice.",
34 "Reflect on a small lesson you learned recently.",
35 "Explain a concept using a concrete example.",
36 "Tell a brief story that illustrates a single point.",
37 )
38
39
40 #: Per-probe intent one-liners (D5). Keyed by probe ``kind``. Used to
41 #: prepend a ``#``-comment above each suite entry in the generated
42 #: YAML so a first-time reader understands what each probe is for
43 #: without cross-referencing the docs.
44 _PROBE_INTENT: dict[str, str] = {
45 "null_adapter": (
46 "Calibration baseline — runs first so downstream probes have "
47 "per-kind null stats for z-scores."
48 ),
49 "delta_kl": (
50 "A1: mean JS divergence of next-token distributions between "
51 "base and ft. Did the adapter move the model on doc content?"
52 ),
53 "adapter_revert": (
54 "A2: does the ft model drift back to base under adversarial "
55 "paraphrase? Needs the [semsim] extra."
56 ),
57 "prompt_collapse": (
58 "A3: fit exponential decay of divergence over context length. "
59 "Catches adapters whose influence evaporates with context."
60 ),
61 "section_internalization": (
62 "B1 (flagship): per-section attribution with leak-check. "
63 "Which parts of the doc actually moved the model?"
64 ),
65 "paraphrase_invariance": (
66 "B2: memorization vs generalization — does the adapter lift "
67 "the verbatim prompt more than paraphrased variants?"
68 ),
69 "preference_flip": (
70 "B3: on DPO/ORPO triples, did ft flip the chosen/rejected ranking relative to base?"
71 ),
72 "style_fingerprint": (
73 "C1: stylistic shift toward the doc's fingerprint. Uses 9-dim "
74 "extended vector when [style] installed; 6-dim otherwise."
75 ),
76 "calibration_drift": (
77 "C2: general-knowledge regression check. Did the fine-tune "
78 "forget the world while learning the doc?"
79 ),
80 "external_perplexity": (
81 "F3: diffuse-forgetting check — rolling-logprob delta on "
82 "held-out public-domain English. Complements calibration_drift "
83 "(the point-factual counterpart)."
84 ),
85 "leakage": (
86 "C3: verbatim-recital + perturbation-fragility check. High "
87 "recall + low fragility → memorization, not generalization."
88 ),
89 "adapter_ablation": (
90 "N2 (signature): λ-scaled divergence curve. A healthy adapter "
91 "shows a smooth, non-saturated response; a degenerate one is "
92 "a step function."
93 ),
94 }
95
96
97 def write_sway_yaml(dlm_path: Path, out: Path) -> None:
98 """Resolve the .dlm, build a spec dict, write it as YAML to ``out``."""
99 handle = resolve_dlm(dlm_path)
100 if handle.adapter_path is None:
101 raise SwayError(
102 f"{dlm_path}: no trained adapter found at ~/.dlm/store/{handle.dlm_id}/adapter; "
103 "train the document with `dlm train` before generating a sway suite."
104 )
105 spec = build_spec_dict(handle, dlm_source=_portable_dlm_source(dlm_path))
106 skipped = collect_skipped_probe_reasons(handle)
107 out.write_text(
108 _render_annotated_yaml(spec, handle, dlm_path, skipped=skipped),
109 encoding="utf-8",
110 )
111
112
113 def collect_skipped_probe_reasons(handle: DlmHandle) -> list[tuple[str, str]]:
114 """Return ``(probe_kind, reason)`` tuples for every probe
115 ``_build_suite`` intentionally omitted for this ``.dlm``.
116
117 F07 (Audit 03) — the emitted YAML previously had no record of
118 which probes were skipped and why. Users had to diff the autogen
119 output against the intent docstring to know. This surface is the
120 input to the YAML-comment block the renderer prepends.
121
122 Mirrors the conditional logic inside :func:`_build_suite` — any
123 change to that function's gating must update this function too.
124 """
125 sections = handle.sections
126 instruction_probes = [
127 (p.prompt, p.gold) for s in sections if s.kind == "instruction" for p in s.probes
128 ]
129 prose_prompts = [
130 s.content.split(".")[0].strip()
131 for s in sections
132 if s.kind == "prose" and s.content.strip() and s.content.split(".")[0].strip()
133 ]
134 has_instruction_probes = bool(instruction_probes)
135 has_prose = any(s.kind == "prose" for s in sections)
136 has_preferences = any(s.kind == "preference" and s.preferences for s in sections)
137
138 kl_prompts = [q for q, _ in instruction_probes][:16] or prose_prompts[:16]
139 all_instruction_prompts = [q for q, _ in instruction_probes]
140 cluster_pool_size = len({*all_instruction_prompts, *prose_prompts})
141
142 skipped: list[tuple[str, str]] = []
143 if not kl_prompts:
144 skipped.append(("delta_kl", "no instruction probes or prose sections"))
145 if not has_instruction_probes:
146 skipped.append(("adapter_revert", "no !probe markers in INSTRUCTION sections"))
147 skipped.append(("paraphrase_invariance", "no !probe markers in INSTRUCTION sections"))
148 if not kl_prompts:
149 skipped.append(("prompt_collapse", "no prompts available to score"))
150 if len(sections) < 2:
151 skipped.append(("section_internalization", "document has fewer than 2 sections"))
152 if not has_preferences:
153 skipped.append(("preference_flip", "no PREFERENCE sections with populated triples"))
154 if not has_prose:
155 skipped.append(
156 ("external_perplexity", "no PROSE sections to measure external-corpus drift against")
157 )
158 skipped.append(("leakage", "no PROSE sections to extract prefix/continuation windows from"))
159 if cluster_pool_size < 20:
160 skipped.append(
161 (
162 "cluster_kl",
163 f"only {cluster_pool_size} distinct prompts in pool (need ≥ 20 for stable clustering)",
164 )
165 )
166 if not kl_prompts:
167 skipped.append(("adapter_ablation", "no prompts available to score"))
168 return skipped
169
170
171 def _portable_dlm_source(dlm_path: Path) -> str:
172 """Return a ``dlm_source`` string that survives cross-machine checkout.
173
174 F09 (Audit 03) — the pre-fix code unconditionally wrote an
175 absolute path (``/Users/mfwolffe/.../fortran.dlm``) which breaks
176 when the autogen'd ``sway.yaml`` is committed to a repo and
177 re-run from a different working tree (CI agents, another dev's
178 checkout). The cwd-relative form is round-trippable across
179 machines; only fall back to absolute when the ``.dlm`` lives
180 outside the cwd (e.g. a global user dir) where relativization
181 doesn't resolve on a fresh checkout.
182 """
183 abs_path = dlm_path.resolve()
184 cwd = Path.cwd().resolve()
185 try:
186 # ``is_relative_to`` lands in 3.9+; this path is guaranteed
187 # to exist because sway requires ``>=3.11``.
188 if abs_path.is_relative_to(cwd):
189 return str(abs_path.relative_to(cwd))
190 except ValueError:
191 pass
192 return str(abs_path)
193
194
195 def _render_annotated_yaml(
196 spec: dict[str, Any],
197 handle: DlmHandle,
198 dlm_path: Path,
199 *,
200 skipped: list[tuple[str, str]] | None = None,
201 ) -> str:
202 """Render the spec as YAML with a provenance header + per-probe intent lines (D5).
203
204 Uses pyyaml (already a hard dep) and post-processes the output to
205 insert ``#``-comments above each suite entry. Avoids the
206 ``ruamel.yaml`` dep the sprint contemplated — the annotation here
207 is structural (position-based), not round-trippable, so the lighter
208 approach is sufficient.
209
210 F07 (Audit 03) — when ``skipped`` is non-empty, the header gains a
211 ``# skipped: <probe> (<reason>)`` block so users see which probes
212 the autogen intentionally omitted, without diffing the autogen
213 module's docstring.
214 """
215 import datetime as _dt
216
217 from dlm_sway import __version__
218
219 body = yaml.safe_dump(spec, sort_keys=False)
220 annotated = _inject_probe_intent_comments(body)
221
222 header_lines = [
223 "# sway.yaml — auto-generated by `sway autogen`",
224 f"# source: {dlm_path.resolve()}",
225 f"# dlm_id: {handle.dlm_id}",
226 f"# base: {handle.base_model}",
227 f"# adapter: {handle.adapter_path}",
228 f"# generated: {_dt.datetime.now(_dt.UTC).isoformat(timespec='seconds')}",
229 f"# sway: {__version__}",
230 "#",
231 "# Edit freely — this file is your checked-in contract. Re-running",
232 "# `sway autogen` overwrites it; commit the generated file so your",
233 "# test suite is diffable in PRs.",
234 ]
235 if skipped:
236 header_lines.extend(
237 [
238 "#",
239 f"# {len(skipped)} probe(s) intentionally omitted for this document:",
240 *[f"# skipped: {kind} ({reason})" for kind, reason in skipped],
241 "# (sway gate will still pass — missing probes don't fail the gate.)",
242 ]
243 )
244 header_lines.append("")
245 return "\n".join(header_lines) + annotated
246
247
248 def _inject_probe_intent_comments(yaml_body: str) -> str:
249 """Walk the rendered YAML; prepend a ``#`` intent line above each suite entry."""
250 import re as _re
251
252 # Each suite entry begins with ``- name: <value>`` at the same
253 # indent. We scan the lines, track the indent of the first list
254 # item we see under ``suite:``, and insert intent comments there.
255
256 lines = yaml_body.splitlines()
257 out: list[str] = []
258 in_suite = False
259 # Each ``- name:`` marks the start of a suite entry. We buffer the
260 # lines of that entry and peek at the ``kind:`` value to pick the
261 # right intent comment to insert before the ``- name:`` line. A
262 # one-line "index where the intent goes" pointer is simpler than
263 # doing a two-pass rewrite.
264 entry_start: int | None = None
265 entry_indent = 0
266
267 def _flush_entry_header(entry_start_idx: int | None) -> None:
268 if entry_start_idx is None:
269 return
270 entry_lines = out[entry_start_idx:]
271 kind: str | None = None
272 for elt in entry_lines:
273 match = _re.search(r"\bkind:\s*([A-Za-z_][A-Za-z0-9_]*)", elt)
274 if match is not None:
275 kind = match.group(1)
276 break
277 if kind is None:
278 return
279 intent = _PROBE_INTENT.get(kind)
280 if intent is None:
281 return
282 out.insert(entry_start_idx, " " * entry_indent + f"# {intent}")
283
284 for line in lines:
285 stripped = line.lstrip()
286 # Top-level keys toggle the suite scope.
287 if line and not line[0].isspace() and not line.startswith("- "):
288 # Close the previous entry (if any) before switching scope.
289 _flush_entry_header(entry_start)
290 entry_start = None
291 in_suite = stripped == "suite:"
292 out.append(line)
293 continue
294
295 if in_suite and stripped.startswith("- "):
296 # New entry — flush any pending comment for the previous.
297 _flush_entry_header(entry_start)
298 entry_start = len(out)
299 entry_indent = len(line) - len(stripped)
300
301 out.append(line)
302
303 # Flush the final entry.
304 _flush_entry_header(entry_start)
305 return "\n".join(out) + ("\n" if yaml_body.endswith("\n") else "")
306
307
308 def build_spec_dict(handle: DlmHandle, *, dlm_source: str | None = None) -> dict[str, Any]:
309 """Build a sway.yaml-shaped dict from a :class:`DlmHandle`."""
310 base_spec = {"kind": "hf", "base": handle.base_model}
311 ft_spec = {
312 "kind": "hf",
313 "base": handle.base_model,
314 "adapter": str(handle.adapter_path) if handle.adapter_path else None,
315 }
316 spec: dict[str, Any] = {
317 "version": 1,
318 "models": {"base": base_spec, "ft": ft_spec},
319 "defaults": {"seed": 0, "differential": True},
320 "suite": _build_suite(handle.sections),
321 }
322 if dlm_source is not None:
323 spec["dlm_source"] = dlm_source
324 return spec
325
326
327 def _build_suite(sections: tuple[Section, ...]) -> list[dict[str, Any]]:
328 """Assemble the full probe battery for the given sections.
329
330 The ordering matters: ``null_adapter`` first so every downstream
331 probe's z-score threshold has stats to consult.
332 """
333 instruction_probes: list[tuple[str, str]] = [
334 (p.prompt, p.gold) for s in sections if s.kind == "instruction" for p in s.probes
335 ]
336 prose_prompts: list[str] = []
337 for s in sections:
338 if s.kind == "prose" and s.content.strip():
339 # Use the section's leading sentence as a natural completion prompt.
340 first_sentence = s.content.split(".")[0].strip()
341 if first_sentence:
342 prose_prompts.append(first_sentence + ".")
343
344 kl_prompts = [q for q, _ in instruction_probes][:16] or prose_prompts[:16]
345 # B8: style_fingerprint needs *stylistic elicitation* — open-ended
346 # prompts that ask the model to write in its own voice — not the
347 # leading sentence of a doc paragraph (which elicits continuation
348 # of the doc itself, conflating style with content). The fixed set
349 # below is intentionally generic so the model's stylistic shift
350 # under the adapter is the only signal in play.
351 style_prompts = list(_STYLE_ELICITATION_PROMPTS)
352
353 suite: list[dict[str, Any]] = []
354
355 # Baseline calibration — always first.
356 suite.append({"name": "null_baseline", "kind": "null_adapter", "runs": 3})
357
358 # Adherence.
359 if kl_prompts:
360 suite.append(
361 {
362 "name": "delta_kl_doc",
363 "kind": "delta_kl",
364 "prompts": kl_prompts,
365 "assert_mean_gte": 0.02,
366 }
367 )
368 if instruction_probes:
369 suite.append(
370 {
371 "name": "revert_check",
372 "kind": "adapter_revert",
373 "cases": [
374 {"prompt": q, "gold": a, "paraphrases": _auto_paraphrases(q)}
375 for q, a in instruction_probes[:8]
376 ],
377 "assert_revert_rate_lt": 0.3,
378 }
379 )
380 if kl_prompts:
381 suite.append(
382 {
383 "name": "prompt_collapse",
384 "kind": "prompt_collapse",
385 "prompts": kl_prompts[:4],
386 "context_lengths": [0, 256, 512, 1024],
387 "assert_half_life_tokens": 300,
388 }
389 )
390
391 # Attribution.
392 if len(sections) >= 2:
393 suite.append(
394 {
395 "name": "section_attribution",
396 "kind": "section_internalization",
397 "per_section_threshold": 0.05,
398 }
399 )
400 if instruction_probes:
401 suite.append(
402 {
403 "name": "paraphrase_invariance",
404 "kind": "paraphrase_invariance",
405 "cases": [
406 {"prompt": q, "gold": a, "paraphrases": _auto_paraphrases(q)}
407 for q, a in instruction_probes[:6]
408 ],
409 }
410 )
411 has_preferences = any(s.kind == "preference" and s.preferences for s in sections)
412 if has_preferences:
413 suite.append(
414 {
415 "name": "preference_flip",
416 "kind": "preference_flip",
417 "assert_flip_rate_gte": 0.7,
418 }
419 )
420
421 # Calibration.
422 if style_prompts:
423 suite.append(
424 {
425 "name": "style_shift",
426 "kind": "style_fingerprint",
427 "prompts": style_prompts,
428 }
429 )
430 suite.append({"name": "general_knowledge", "kind": "calibration_drift"})
431 # Emit the external_perplexity probe when the doc has any PROSE
432 # content at all — the probe measures *external* prose degradation,
433 # so the docs that benefit most are the ones where the adapter was
434 # trained on text that might over-fit the base model's English
435 # fluency.
436 if any(s.kind == "prose" for s in sections):
437 suite.append(
438 {
439 "name": "external_ppl",
440 "kind": "external_perplexity",
441 "corpus": "public_domain_en",
442 "max_chunks": 8, # half of default for faster autogen'd runs
443 }
444 )
445 suite.append(
446 {
447 "name": "verbatim_leak",
448 "kind": "leakage",
449 "prefix_chars": 128,
450 "continuation_chars": 256,
451 }
452 )
453
454 # F07 — ``cluster_kl`` when the prompt pool clears the probe's
455 # ``min_prompts`` floor. Pulls from the *full* instruction pool +
456 # prose leading sentences (``kl_prompts`` is capped at 16 for
457 # delta_kl; we want wider coverage for clustering). S16's scope
458 # set a 20-prompt floor; mirror it so emission is stable across
459 # documents of varying length.
460 all_instruction_prompts = [q for q, _ in instruction_probes]
461 cluster_prompts: list[str] = []
462 seen: set[str] = set()
463 for p in all_instruction_prompts + prose_prompts:
464 if p not in seen:
465 seen.add(p)
466 cluster_prompts.append(p)
467 if len(cluster_prompts) >= 20:
468 suite.append(
469 {
470 "name": "cluster_kl_topics",
471 "kind": "cluster_kl",
472 "prompts": cluster_prompts[:64],
473 "num_clusters": 5,
474 "min_prompts": 20,
475 }
476 )
477
478 # Signature ablation — goes last because it's the most expensive.
479 if kl_prompts:
480 suite.append(
481 {
482 "name": "adapter_ablation",
483 "kind": "adapter_ablation",
484 "prompts": kl_prompts[:6],
485 "lambdas": [0.0, 0.25, 0.5, 0.75, 1.0, 1.25],
486 }
487 )
488
489 return suite
490
491
492 def _auto_paraphrases(prompt: str) -> list[str]:
493 """Small, deterministic paraphrase set used when authors don't supply one.
494
495 Purely heuristic — good enough to detect "did the model memorize the
496 exact wording". Real paraphrase generation lives behind the
497 ``semsim`` extra.
498 """
499 variants: list[str] = []
500 stripped = prompt.rstrip("?. ")
501 variants.append(f"Could you explain: {stripped}?")
502 variants.append(f"I'd like to know — {stripped}.")
503 variants.append(f"Please describe: {stripped}.")
504 return variants[:3]