Python · 5908 bytes Raw Blame History
1 """Shared z-score math for numeric probes.
2
3 Every numeric probe computes ``(raw - mean) / std`` against a null-adapter
4 baseline and converts the z-score to a verdict + normalized score. S02
5 centralizes this math so probes don't each reinvent it (historical bug:
6 ``delta_kl`` had bespoke z-score code while every other numeric probe
7 ignored null stats entirely — Audit 01 finding P02).
8
9 The helpers here are tiny but load-bearing — they're the one place the
10 "null calibration won / fixed-threshold fallback" decision is made.
11 """
12
13 from __future__ import annotations
14
15 import math
16 from collections.abc import Mapping
17 from typing import TypedDict
18
19 from dlm_sway.core.result import Verdict
20
21
22 class NullStats(TypedDict):
23 """Per-kind null-adapter baseline stats published by ``NullAdapterProbe``."""
24
25 mean: float
26 std: float
27 n: float
28
29
30 #: Minimum ``std`` the z-score path accepts. Below this we treat the null
31 #: distribution as too degenerate to divide by — probes fall back to the
32 #: fixed-threshold path rather than emit runaway z-scores.
33 MIN_STD: float = 1e-6
34
35
36 def z_score(raw: float, stats: Mapping[str, float] | None) -> float | None:
37 """Compute ``(raw - mean) / std`` against a null-adapter baseline.
38
39 Returns ``None`` when:
40
41 - ``stats`` is missing (no calibration ran for this kind)
42 - ``stats["degenerate"]`` is truthy (F02 Audit 03 — null ran but
43 was too narrow to calibrate against: ``runs: 1``, or multi-seed
44 raws that collapsed to an effectively-zero variance)
45 - ``std`` is below :data:`MIN_STD` (belt-and-suspenders guard
46 for stats dicts that predate the ``degenerate`` field)
47 - ``raw`` or ``mean`` is non-finite
48
49 Callers that get ``None`` are expected to fall back to their probe's
50 fixed-threshold path — and surface ``(no calibration)`` in the
51 report so the user knows the z-score path didn't fire.
52 """
53 if stats is None:
54 return None
55 mean = stats.get("mean")
56 std = stats.get("std", 0.0)
57 if mean is None or std is None:
58 return None
59 if not (math.isfinite(raw) and math.isfinite(mean) and math.isfinite(std)):
60 return None
61 # ``degenerate`` is stored as a float (1.0 / 0.0) so the stats
62 # dict stays Mapping[str, float] across every consumer.
63 if stats.get("degenerate", 0.0) >= 0.5:
64 return None
65 if std < MIN_STD:
66 return None
67 return float((raw - mean) / std)
68
69
70 def verdict_from_z(z: float | None, threshold: float) -> Verdict | None:
71 """Map a z-score to ``PASS``/``FAIL`` against a threshold.
72
73 Returns ``None`` when ``z`` is ``None`` (no calibration) so the
74 caller knows to use the fixed-threshold verdict path instead.
75
76 Higher-z-is-better is the convention — the adapter's raw metric
77 should be *above* the null distribution for the probe to pass.
78 """
79 if z is None:
80 return None
81 return Verdict.PASS if z >= threshold else Verdict.FAIL
82
83
84 def score_from_z(z: float | None) -> float | None:
85 """Map a z-score to a normalized ``[0, 1]`` composite score.
86
87 ``sigmoid(z / 3)`` is the shape: z=0 → 0.5, z=3 → ≈0.88, z=-3 → ≈0.12.
88 Returns ``None`` when ``z`` is ``None``.
89
90 The /3 divisor centers the knee at "3σ above null" — the convention
91 we publish in the README for "the adapter is significantly swayed".
92 """
93 if z is None:
94 return None
95 # Guard against extreme z values overflowing math.exp.
96 clamped = max(-50.0, min(50.0, z / 3.0))
97 return 1.0 / (1.0 + math.exp(-clamped))
98
99
100 def no_calibration_note(probe_kind: str) -> str:
101 """The visible annotation probes add to messages when falling back.
102
103 Surfaces in the terminal and markdown reports so users can see which
104 probes used fixed thresholds. Matches the string the S02.6 report
105 code looks for when formatting rows.
106 """
107 return f"(no calibration for {probe_kind})"
108
109
110 def z_scores_by_rank(
111 raw: float,
112 stats_by_rank: Mapping[str, Mapping[str, float]] | None,
113 *,
114 sign: int = 1,
115 ) -> dict[str, float] | None:
116 """Compute per-rank z-scores for a probe's raw metric.
117
118 Parameters
119 ----------
120 raw:
121 The probe's raw metric at the real adapter.
122 stats_by_rank:
123 ``{rank_key: null_stats}`` from
124 :func:`dlm_sway.probes.null_adapter.get_null_stats_by_rank`.
125 ``None`` short-circuits to ``None``.
126 sign:
127 ``+1`` for higher-is-better probes (default), ``-1`` for
128 lower-is-better. Applied after the raw z computation so each
129 probe keeps its existing sign convention unchanged.
130
131 Returns
132 -------
133 ``{rank_key: z}`` with only the ranks that produced a finite z
134 (divergent std or non-finite inputs drop out silently). ``None``
135 when ``stats_by_rank`` is ``None`` or empty.
136 """
137 if not stats_by_rank:
138 return None
139 out: dict[str, float] = {}
140 for rkey, s in stats_by_rank.items():
141 z = z_score(raw, s)
142 if z is None:
143 continue
144 out[rkey] = sign * z
145 return out or None
146
147
148 def format_z_profile(z_by_rank: Mapping[str, float] | None) -> str:
149 """Render ``{rank_key: z}`` as ``+4.2σ @ 1x / +6.8σ @ 0.5x / +2.1σ @ 2x``.
150
151 Rank labels are rendered as ``{multiplier}x`` (e.g. ``0.5x``) when
152 they parse as ``rank_<float>``; anything else is passed through
153 verbatim. ``None`` or empty input returns the empty string so
154 callers can unconditionally append with ``f"{z} {profile}".rstrip()``.
155 """
156 if not z_by_rank:
157 return ""
158 parts: list[str] = []
159 for rkey, z in z_by_rank.items():
160 if rkey.startswith("rank_"):
161 try:
162 mult = float(rkey.removeprefix("rank_"))
163 label = f"{mult:g}x"
164 except ValueError:
165 label = rkey
166 else:
167 label = rkey
168 parts.append(f"{z:+.2f}σ @ {label}")
169 return " / ".join(parts)