| 1 | """Auto-generate a ``sway.yaml`` from a ``.dlm`` document. |
| 2 | |
| 3 | Walks the parsed sections and emits one entry per primitive sway ships: |
| 4 | the full 11-primitive battery wired up against the document's own |
| 5 | content. The result is a YAML artifact the user commits alongside their |
| 6 | ``.dlm`` and diffs in PRs. |
| 7 | |
| 8 | The generated spec includes a ``dlm_source`` field that the suite loader |
| 9 | uses to pick up :class:`~dlm_sway.core.sections.Section` data at run |
| 10 | time — probes that need sections (B1, B3, C3) then work against the |
| 11 | typed structure instead of re-parsing text. |
| 12 | """ |
| 13 | |
| 14 | from __future__ import annotations |
| 15 | |
| 16 | from pathlib import Path |
| 17 | from typing import Any |
| 18 | |
| 19 | import yaml |
| 20 | |
| 21 | from dlm_sway.core.errors import SwayError |
| 22 | from dlm_sway.core.sections import Section |
| 23 | from dlm_sway.integrations.dlm.resolver import DlmHandle, resolve_dlm |
| 24 | |
| 25 | #: Stylistic-elicitation prompts used by the generated ``style_fingerprint`` |
| 26 | #: probe (B8). Picked to be open-ended and content-neutral — the model's |
| 27 | #: voice under the adapter is the signal we want, *not* its ability to |
| 28 | #: continue a sentence the doc already wrote. Each prompt deliberately |
| 29 | #: invites prose-shaped output of moderate length (one paragraph). |
| 30 | _STYLE_ELICITATION_PROMPTS: tuple[str, ...] = ( |
| 31 | "Write a short paragraph explaining your approach to a difficult problem.", |
| 32 | "Describe what you find most interesting about a topic you know well.", |
| 33 | "Summarize an important idea for a curious novice.", |
| 34 | "Reflect on a small lesson you learned recently.", |
| 35 | "Explain a concept using a concrete example.", |
| 36 | "Tell a brief story that illustrates a single point.", |
| 37 | ) |
| 38 | |
| 39 | |
| 40 | #: Per-probe intent one-liners (D5). Keyed by probe ``kind``. Used to |
| 41 | #: prepend a ``#``-comment above each suite entry in the generated |
| 42 | #: YAML so a first-time reader understands what each probe is for |
| 43 | #: without cross-referencing the docs. |
| 44 | _PROBE_INTENT: dict[str, str] = { |
| 45 | "null_adapter": ( |
| 46 | "Calibration baseline — runs first so downstream probes have " |
| 47 | "per-kind null stats for z-scores." |
| 48 | ), |
| 49 | "delta_kl": ( |
| 50 | "A1: mean JS divergence of next-token distributions between " |
| 51 | "base and ft. Did the adapter move the model on doc content?" |
| 52 | ), |
| 53 | "adapter_revert": ( |
| 54 | "A2: does the ft model drift back to base under adversarial " |
| 55 | "paraphrase? Needs the [semsim] extra." |
| 56 | ), |
| 57 | "prompt_collapse": ( |
| 58 | "A3: fit exponential decay of divergence over context length. " |
| 59 | "Catches adapters whose influence evaporates with context." |
| 60 | ), |
| 61 | "section_internalization": ( |
| 62 | "B1 (flagship): per-section attribution with leak-check. " |
| 63 | "Which parts of the doc actually moved the model?" |
| 64 | ), |
| 65 | "paraphrase_invariance": ( |
| 66 | "B2: memorization vs generalization — does the adapter lift " |
| 67 | "the verbatim prompt more than paraphrased variants?" |
| 68 | ), |
| 69 | "preference_flip": ( |
| 70 | "B3: on DPO/ORPO triples, did ft flip the chosen/rejected ranking relative to base?" |
| 71 | ), |
| 72 | "style_fingerprint": ( |
| 73 | "C1: stylistic shift toward the doc's fingerprint. Uses 9-dim " |
| 74 | "extended vector when [style] installed; 6-dim otherwise." |
| 75 | ), |
| 76 | "calibration_drift": ( |
| 77 | "C2: general-knowledge regression check. Did the fine-tune " |
| 78 | "forget the world while learning the doc?" |
| 79 | ), |
| 80 | "external_perplexity": ( |
| 81 | "F3: diffuse-forgetting check — rolling-logprob delta on " |
| 82 | "held-out public-domain English. Complements calibration_drift " |
| 83 | "(the point-factual counterpart)." |
| 84 | ), |
| 85 | "leakage": ( |
| 86 | "C3: verbatim-recital + perturbation-fragility check. High " |
| 87 | "recall + low fragility → memorization, not generalization." |
| 88 | ), |
| 89 | "adapter_ablation": ( |
| 90 | "N2 (signature): λ-scaled divergence curve. A healthy adapter " |
| 91 | "shows a smooth, non-saturated response; a degenerate one is " |
| 92 | "a step function." |
| 93 | ), |
| 94 | } |
| 95 | |
| 96 | |
| 97 | def write_sway_yaml(dlm_path: Path, out: Path) -> None: |
| 98 | """Resolve the .dlm, build a spec dict, write it as YAML to ``out``.""" |
| 99 | handle = resolve_dlm(dlm_path) |
| 100 | if handle.adapter_path is None: |
| 101 | raise SwayError( |
| 102 | f"{dlm_path}: no trained adapter found at ~/.dlm/store/{handle.dlm_id}/adapter; " |
| 103 | "train the document with `dlm train` before generating a sway suite." |
| 104 | ) |
| 105 | spec = build_spec_dict(handle, dlm_source=_portable_dlm_source(dlm_path)) |
| 106 | skipped = collect_skipped_probe_reasons(handle) |
| 107 | out.write_text( |
| 108 | _render_annotated_yaml(spec, handle, dlm_path, skipped=skipped), |
| 109 | encoding="utf-8", |
| 110 | ) |
| 111 | |
| 112 | |
| 113 | def collect_skipped_probe_reasons(handle: DlmHandle) -> list[tuple[str, str]]: |
| 114 | """Return ``(probe_kind, reason)`` tuples for every probe |
| 115 | ``_build_suite`` intentionally omitted for this ``.dlm``. |
| 116 | |
| 117 | F07 (Audit 03) — the emitted YAML previously had no record of |
| 118 | which probes were skipped and why. Users had to diff the autogen |
| 119 | output against the intent docstring to know. This surface is the |
| 120 | input to the YAML-comment block the renderer prepends. |
| 121 | |
| 122 | Mirrors the conditional logic inside :func:`_build_suite` — any |
| 123 | change to that function's gating must update this function too. |
| 124 | """ |
| 125 | sections = handle.sections |
| 126 | instruction_probes = [ |
| 127 | (p.prompt, p.gold) for s in sections if s.kind == "instruction" for p in s.probes |
| 128 | ] |
| 129 | prose_prompts = [ |
| 130 | s.content.split(".")[0].strip() |
| 131 | for s in sections |
| 132 | if s.kind == "prose" and s.content.strip() and s.content.split(".")[0].strip() |
| 133 | ] |
| 134 | has_instruction_probes = bool(instruction_probes) |
| 135 | has_prose = any(s.kind == "prose" for s in sections) |
| 136 | has_preferences = any(s.kind == "preference" and s.preferences for s in sections) |
| 137 | |
| 138 | kl_prompts = [q for q, _ in instruction_probes][:16] or prose_prompts[:16] |
| 139 | all_instruction_prompts = [q for q, _ in instruction_probes] |
| 140 | cluster_pool_size = len({*all_instruction_prompts, *prose_prompts}) |
| 141 | |
| 142 | skipped: list[tuple[str, str]] = [] |
| 143 | if not kl_prompts: |
| 144 | skipped.append(("delta_kl", "no instruction probes or prose sections")) |
| 145 | if not has_instruction_probes: |
| 146 | skipped.append(("adapter_revert", "no !probe markers in INSTRUCTION sections")) |
| 147 | skipped.append(("paraphrase_invariance", "no !probe markers in INSTRUCTION sections")) |
| 148 | if not kl_prompts: |
| 149 | skipped.append(("prompt_collapse", "no prompts available to score")) |
| 150 | if len(sections) < 2: |
| 151 | skipped.append(("section_internalization", "document has fewer than 2 sections")) |
| 152 | if not has_preferences: |
| 153 | skipped.append(("preference_flip", "no PREFERENCE sections with populated triples")) |
| 154 | if not has_prose: |
| 155 | skipped.append( |
| 156 | ("external_perplexity", "no PROSE sections to measure external-corpus drift against") |
| 157 | ) |
| 158 | skipped.append(("leakage", "no PROSE sections to extract prefix/continuation windows from")) |
| 159 | if cluster_pool_size < 20: |
| 160 | skipped.append( |
| 161 | ( |
| 162 | "cluster_kl", |
| 163 | f"only {cluster_pool_size} distinct prompts in pool (need ≥ 20 for stable clustering)", |
| 164 | ) |
| 165 | ) |
| 166 | if not kl_prompts: |
| 167 | skipped.append(("adapter_ablation", "no prompts available to score")) |
| 168 | return skipped |
| 169 | |
| 170 | |
| 171 | def _portable_dlm_source(dlm_path: Path) -> str: |
| 172 | """Return a ``dlm_source`` string that survives cross-machine checkout. |
| 173 | |
| 174 | F09 (Audit 03) — the pre-fix code unconditionally wrote an |
| 175 | absolute path (``/Users/mfwolffe/.../fortran.dlm``) which breaks |
| 176 | when the autogen'd ``sway.yaml`` is committed to a repo and |
| 177 | re-run from a different working tree (CI agents, another dev's |
| 178 | checkout). The cwd-relative form is round-trippable across |
| 179 | machines; only fall back to absolute when the ``.dlm`` lives |
| 180 | outside the cwd (e.g. a global user dir) where relativization |
| 181 | doesn't resolve on a fresh checkout. |
| 182 | """ |
| 183 | abs_path = dlm_path.resolve() |
| 184 | cwd = Path.cwd().resolve() |
| 185 | try: |
| 186 | # ``is_relative_to`` lands in 3.9+; this path is guaranteed |
| 187 | # to exist because sway requires ``>=3.11``. |
| 188 | if abs_path.is_relative_to(cwd): |
| 189 | return str(abs_path.relative_to(cwd)) |
| 190 | except ValueError: |
| 191 | pass |
| 192 | return str(abs_path) |
| 193 | |
| 194 | |
| 195 | def _render_annotated_yaml( |
| 196 | spec: dict[str, Any], |
| 197 | handle: DlmHandle, |
| 198 | dlm_path: Path, |
| 199 | *, |
| 200 | skipped: list[tuple[str, str]] | None = None, |
| 201 | ) -> str: |
| 202 | """Render the spec as YAML with a provenance header + per-probe intent lines (D5). |
| 203 | |
| 204 | Uses pyyaml (already a hard dep) and post-processes the output to |
| 205 | insert ``#``-comments above each suite entry. Avoids the |
| 206 | ``ruamel.yaml`` dep the sprint contemplated — the annotation here |
| 207 | is structural (position-based), not round-trippable, so the lighter |
| 208 | approach is sufficient. |
| 209 | |
| 210 | F07 (Audit 03) — when ``skipped`` is non-empty, the header gains a |
| 211 | ``# skipped: <probe> (<reason>)`` block so users see which probes |
| 212 | the autogen intentionally omitted, without diffing the autogen |
| 213 | module's docstring. |
| 214 | """ |
| 215 | import datetime as _dt |
| 216 | |
| 217 | from dlm_sway import __version__ |
| 218 | |
| 219 | body = yaml.safe_dump(spec, sort_keys=False) |
| 220 | annotated = _inject_probe_intent_comments(body) |
| 221 | |
| 222 | header_lines = [ |
| 223 | "# sway.yaml — auto-generated by `sway autogen`", |
| 224 | f"# source: {dlm_path.resolve()}", |
| 225 | f"# dlm_id: {handle.dlm_id}", |
| 226 | f"# base: {handle.base_model}", |
| 227 | f"# adapter: {handle.adapter_path}", |
| 228 | f"# generated: {_dt.datetime.now(_dt.UTC).isoformat(timespec='seconds')}", |
| 229 | f"# sway: {__version__}", |
| 230 | "#", |
| 231 | "# Edit freely — this file is your checked-in contract. Re-running", |
| 232 | "# `sway autogen` overwrites it; commit the generated file so your", |
| 233 | "# test suite is diffable in PRs.", |
| 234 | ] |
| 235 | if skipped: |
| 236 | header_lines.extend( |
| 237 | [ |
| 238 | "#", |
| 239 | f"# {len(skipped)} probe(s) intentionally omitted for this document:", |
| 240 | *[f"# skipped: {kind} ({reason})" for kind, reason in skipped], |
| 241 | "# (sway gate will still pass — missing probes don't fail the gate.)", |
| 242 | ] |
| 243 | ) |
| 244 | header_lines.append("") |
| 245 | return "\n".join(header_lines) + annotated |
| 246 | |
| 247 | |
| 248 | def _inject_probe_intent_comments(yaml_body: str) -> str: |
| 249 | """Walk the rendered YAML; prepend a ``#`` intent line above each suite entry.""" |
| 250 | import re as _re |
| 251 | |
| 252 | # Each suite entry begins with ``- name: <value>`` at the same |
| 253 | # indent. We scan the lines, track the indent of the first list |
| 254 | # item we see under ``suite:``, and insert intent comments there. |
| 255 | |
| 256 | lines = yaml_body.splitlines() |
| 257 | out: list[str] = [] |
| 258 | in_suite = False |
| 259 | # Each ``- name:`` marks the start of a suite entry. We buffer the |
| 260 | # lines of that entry and peek at the ``kind:`` value to pick the |
| 261 | # right intent comment to insert before the ``- name:`` line. A |
| 262 | # one-line "index where the intent goes" pointer is simpler than |
| 263 | # doing a two-pass rewrite. |
| 264 | entry_start: int | None = None |
| 265 | entry_indent = 0 |
| 266 | |
| 267 | def _flush_entry_header(entry_start_idx: int | None) -> None: |
| 268 | if entry_start_idx is None: |
| 269 | return |
| 270 | entry_lines = out[entry_start_idx:] |
| 271 | kind: str | None = None |
| 272 | for elt in entry_lines: |
| 273 | match = _re.search(r"\bkind:\s*([A-Za-z_][A-Za-z0-9_]*)", elt) |
| 274 | if match is not None: |
| 275 | kind = match.group(1) |
| 276 | break |
| 277 | if kind is None: |
| 278 | return |
| 279 | intent = _PROBE_INTENT.get(kind) |
| 280 | if intent is None: |
| 281 | return |
| 282 | out.insert(entry_start_idx, " " * entry_indent + f"# {intent}") |
| 283 | |
| 284 | for line in lines: |
| 285 | stripped = line.lstrip() |
| 286 | # Top-level keys toggle the suite scope. |
| 287 | if line and not line[0].isspace() and not line.startswith("- "): |
| 288 | # Close the previous entry (if any) before switching scope. |
| 289 | _flush_entry_header(entry_start) |
| 290 | entry_start = None |
| 291 | in_suite = stripped == "suite:" |
| 292 | out.append(line) |
| 293 | continue |
| 294 | |
| 295 | if in_suite and stripped.startswith("- "): |
| 296 | # New entry — flush any pending comment for the previous. |
| 297 | _flush_entry_header(entry_start) |
| 298 | entry_start = len(out) |
| 299 | entry_indent = len(line) - len(stripped) |
| 300 | |
| 301 | out.append(line) |
| 302 | |
| 303 | # Flush the final entry. |
| 304 | _flush_entry_header(entry_start) |
| 305 | return "\n".join(out) + ("\n" if yaml_body.endswith("\n") else "") |
| 306 | |
| 307 | |
| 308 | def build_spec_dict(handle: DlmHandle, *, dlm_source: str | None = None) -> dict[str, Any]: |
| 309 | """Build a sway.yaml-shaped dict from a :class:`DlmHandle`.""" |
| 310 | base_spec = {"kind": "hf", "base": handle.base_model} |
| 311 | ft_spec = { |
| 312 | "kind": "hf", |
| 313 | "base": handle.base_model, |
| 314 | "adapter": str(handle.adapter_path) if handle.adapter_path else None, |
| 315 | } |
| 316 | spec: dict[str, Any] = { |
| 317 | "version": 1, |
| 318 | "models": {"base": base_spec, "ft": ft_spec}, |
| 319 | "defaults": {"seed": 0, "differential": True}, |
| 320 | "suite": _build_suite(handle.sections), |
| 321 | } |
| 322 | if dlm_source is not None: |
| 323 | spec["dlm_source"] = dlm_source |
| 324 | return spec |
| 325 | |
| 326 | |
| 327 | def _build_suite(sections: tuple[Section, ...]) -> list[dict[str, Any]]: |
| 328 | """Assemble the full probe battery for the given sections. |
| 329 | |
| 330 | The ordering matters: ``null_adapter`` first so every downstream |
| 331 | probe's z-score threshold has stats to consult. |
| 332 | """ |
| 333 | instruction_probes: list[tuple[str, str]] = [ |
| 334 | (p.prompt, p.gold) for s in sections if s.kind == "instruction" for p in s.probes |
| 335 | ] |
| 336 | prose_prompts: list[str] = [] |
| 337 | for s in sections: |
| 338 | if s.kind == "prose" and s.content.strip(): |
| 339 | # Use the section's leading sentence as a natural completion prompt. |
| 340 | first_sentence = s.content.split(".")[0].strip() |
| 341 | if first_sentence: |
| 342 | prose_prompts.append(first_sentence + ".") |
| 343 | |
| 344 | kl_prompts = [q for q, _ in instruction_probes][:16] or prose_prompts[:16] |
| 345 | # B8: style_fingerprint needs *stylistic elicitation* — open-ended |
| 346 | # prompts that ask the model to write in its own voice — not the |
| 347 | # leading sentence of a doc paragraph (which elicits continuation |
| 348 | # of the doc itself, conflating style with content). The fixed set |
| 349 | # below is intentionally generic so the model's stylistic shift |
| 350 | # under the adapter is the only signal in play. |
| 351 | style_prompts = list(_STYLE_ELICITATION_PROMPTS) |
| 352 | |
| 353 | suite: list[dict[str, Any]] = [] |
| 354 | |
| 355 | # Baseline calibration — always first. |
| 356 | suite.append({"name": "null_baseline", "kind": "null_adapter", "runs": 3}) |
| 357 | |
| 358 | # Adherence. |
| 359 | if kl_prompts: |
| 360 | suite.append( |
| 361 | { |
| 362 | "name": "delta_kl_doc", |
| 363 | "kind": "delta_kl", |
| 364 | "prompts": kl_prompts, |
| 365 | "assert_mean_gte": 0.02, |
| 366 | } |
| 367 | ) |
| 368 | if instruction_probes: |
| 369 | suite.append( |
| 370 | { |
| 371 | "name": "revert_check", |
| 372 | "kind": "adapter_revert", |
| 373 | "cases": [ |
| 374 | {"prompt": q, "gold": a, "paraphrases": _auto_paraphrases(q)} |
| 375 | for q, a in instruction_probes[:8] |
| 376 | ], |
| 377 | "assert_revert_rate_lt": 0.3, |
| 378 | } |
| 379 | ) |
| 380 | if kl_prompts: |
| 381 | suite.append( |
| 382 | { |
| 383 | "name": "prompt_collapse", |
| 384 | "kind": "prompt_collapse", |
| 385 | "prompts": kl_prompts[:4], |
| 386 | "context_lengths": [0, 256, 512, 1024], |
| 387 | "assert_half_life_tokens": 300, |
| 388 | } |
| 389 | ) |
| 390 | |
| 391 | # Attribution. |
| 392 | if len(sections) >= 2: |
| 393 | suite.append( |
| 394 | { |
| 395 | "name": "section_attribution", |
| 396 | "kind": "section_internalization", |
| 397 | "per_section_threshold": 0.05, |
| 398 | } |
| 399 | ) |
| 400 | if instruction_probes: |
| 401 | suite.append( |
| 402 | { |
| 403 | "name": "paraphrase_invariance", |
| 404 | "kind": "paraphrase_invariance", |
| 405 | "cases": [ |
| 406 | {"prompt": q, "gold": a, "paraphrases": _auto_paraphrases(q)} |
| 407 | for q, a in instruction_probes[:6] |
| 408 | ], |
| 409 | } |
| 410 | ) |
| 411 | has_preferences = any(s.kind == "preference" and s.preferences for s in sections) |
| 412 | if has_preferences: |
| 413 | suite.append( |
| 414 | { |
| 415 | "name": "preference_flip", |
| 416 | "kind": "preference_flip", |
| 417 | "assert_flip_rate_gte": 0.7, |
| 418 | } |
| 419 | ) |
| 420 | |
| 421 | # Calibration. |
| 422 | if style_prompts: |
| 423 | suite.append( |
| 424 | { |
| 425 | "name": "style_shift", |
| 426 | "kind": "style_fingerprint", |
| 427 | "prompts": style_prompts, |
| 428 | } |
| 429 | ) |
| 430 | suite.append({"name": "general_knowledge", "kind": "calibration_drift"}) |
| 431 | # Emit the external_perplexity probe when the doc has any PROSE |
| 432 | # content at all — the probe measures *external* prose degradation, |
| 433 | # so the docs that benefit most are the ones where the adapter was |
| 434 | # trained on text that might over-fit the base model's English |
| 435 | # fluency. |
| 436 | if any(s.kind == "prose" for s in sections): |
| 437 | suite.append( |
| 438 | { |
| 439 | "name": "external_ppl", |
| 440 | "kind": "external_perplexity", |
| 441 | "corpus": "public_domain_en", |
| 442 | "max_chunks": 8, # half of default for faster autogen'd runs |
| 443 | } |
| 444 | ) |
| 445 | suite.append( |
| 446 | { |
| 447 | "name": "verbatim_leak", |
| 448 | "kind": "leakage", |
| 449 | "prefix_chars": 128, |
| 450 | "continuation_chars": 256, |
| 451 | } |
| 452 | ) |
| 453 | |
| 454 | # F07 — ``cluster_kl`` when the prompt pool clears the probe's |
| 455 | # ``min_prompts`` floor. Pulls from the *full* instruction pool + |
| 456 | # prose leading sentences (``kl_prompts`` is capped at 16 for |
| 457 | # delta_kl; we want wider coverage for clustering). S16's scope |
| 458 | # set a 20-prompt floor; mirror it so emission is stable across |
| 459 | # documents of varying length. |
| 460 | all_instruction_prompts = [q for q, _ in instruction_probes] |
| 461 | cluster_prompts: list[str] = [] |
| 462 | seen: set[str] = set() |
| 463 | for p in all_instruction_prompts + prose_prompts: |
| 464 | if p not in seen: |
| 465 | seen.add(p) |
| 466 | cluster_prompts.append(p) |
| 467 | if len(cluster_prompts) >= 20: |
| 468 | suite.append( |
| 469 | { |
| 470 | "name": "cluster_kl_topics", |
| 471 | "kind": "cluster_kl", |
| 472 | "prompts": cluster_prompts[:64], |
| 473 | "num_clusters": 5, |
| 474 | "min_prompts": 20, |
| 475 | } |
| 476 | ) |
| 477 | |
| 478 | # Signature ablation — goes last because it's the most expensive. |
| 479 | if kl_prompts: |
| 480 | suite.append( |
| 481 | { |
| 482 | "name": "adapter_ablation", |
| 483 | "kind": "adapter_ablation", |
| 484 | "prompts": kl_prompts[:6], |
| 485 | "lambdas": [0.0, 0.25, 0.5, 0.75, 1.0, 1.25], |
| 486 | } |
| 487 | ) |
| 488 | |
| 489 | return suite |
| 490 | |
| 491 | |
| 492 | def _auto_paraphrases(prompt: str) -> list[str]: |
| 493 | """Small, deterministic paraphrase set used when authors don't supply one. |
| 494 | |
| 495 | Purely heuristic — good enough to detect "did the model memorize the |
| 496 | exact wording". Real paraphrase generation lives behind the |
| 497 | ``semsim`` extra. |
| 498 | """ |
| 499 | variants: list[str] = [] |
| 500 | stripped = prompt.rstrip("?. ") |
| 501 | variants.append(f"Could you explain: {stripped}?") |
| 502 | variants.append(f"I'd like to know — {stripped}.") |
| 503 | variants.append(f"Please describe: {stripped}.") |
| 504 | return variants[:3] |