Python · 12920 bytes Raw Blame History
1 """Cross-run comparison of saved ``sway run --json`` outputs (S11 / F5).
2
3 Answers the CI question: *"did last night's run regress anything vs
4 the baseline?"* The user hands :func:`build_matrix` a sequence of
5 ``(SuiteResult, SwayScore)`` pairs — typically rehydrated from JSON
6 files via :func:`dlm_sway.suite.report.from_json` — and gets back a
7 :class:`CompareMatrix` whose cells line up probe-by-probe across runs.
8 Probes that were added, renamed, or removed between runs show as
9 ``None`` cells so a renderer can mark them with the em-dash sentinel
10 every sway surface uses for "no value" (S06.10).
11
12 This module deliberately owns no IO: the CLI reads the files and
13 ``report.from_json`` rehydrates them; this module just does the
14 matrix/delta math and the rendering. That separation keeps
15 ``build_matrix`` unit-testable without touching the filesystem.
16 """
17
18 from __future__ import annotations
19
20 import json
21 import math
22 from dataclasses import dataclass, field
23 from io import StringIO
24 from typing import Any
25
26 from rich.console import Console
27 from rich.table import Table
28 from rich.text import Text
29
30 from dlm_sway.core.result import SuiteResult, SwayScore
31
32 #: Matches the em-dash glyph every sway surface uses for "no value."
33 #: Imported from :mod:`report` via :func:`format_score` / :func:`format_z`,
34 #: but we also need the literal when rendering the *deltas* row (where
35 #: ``None`` means "no prior run to diff against").
36 _NONE_GLYPH = "—"
37
38 #: Default threshold for ``--fail-on-regression``: a probe whose score
39 #: dropped by ≥ this value between the newest run and the previous one
40 #: counts as a regression. Same scale as ``format_score`` (0..1).
41 DEFAULT_REGRESSION_THRESHOLD: float = 0.10
42
43
44 @dataclass(frozen=True, slots=True)
45 class CompareMatrix:
46 """Cross-run score matrix produced by :func:`build_matrix`.
47
48 Attributes
49 ----------
50 labels:
51 Human-readable label per run, in the order they were passed in
52 (the CLI forwards filenames or timestamps). Column headers in
53 every renderer.
54 timestamps:
55 ``finished_at`` ISO-8601 strings per run. Shown in the markdown
56 and JSON renderings as metadata — terminal output prefers the
57 shorter ``labels`` for column headers.
58 probe_names:
59 Union of probe names across every run, sorted alphabetically so
60 the row order is deterministic across runs (it isn't tied to any
61 one run's spec order).
62 scores:
63 ``name → [score_per_run]``. A cell is ``None`` when the probe
64 didn't appear in that run (added later, removed, renamed).
65 deltas:
66 ``name → [delta_i]`` where ``delta_i = scores[i] - scores[i-1]``.
67 ``deltas`` has ``len(labels) - 1`` entries per probe; cells are
68 ``None`` when either neighbor is ``None`` or the delta is
69 non-finite.
70 composite_series:
71 Per-run overall ``SwayScore.overall``. Parallel to ``labels``.
72 """
73
74 labels: tuple[str, ...]
75 timestamps: tuple[str, ...]
76 probe_names: tuple[str, ...]
77 scores: dict[str, list[float | None]] = field(default_factory=dict)
78 deltas: dict[str, list[float | None]] = field(default_factory=dict)
79 composite_series: list[float | None] = field(default_factory=list)
80
81 @property
82 def n_runs(self) -> int:
83 return len(self.labels)
84
85 def latest_regressions(self, threshold: float) -> list[tuple[str, float]]:
86 """Probes whose newest-run score dropped ≥ ``threshold`` vs the prior run.
87
88 Returns an empty list when there are fewer than 2 runs (no prior
89 to compare) or when no probe regressed that hard. Each entry is
90 ``(probe_name, delta)`` with ``delta <= -threshold``. Sorted by
91 most severe (most negative delta) first.
92 """
93 if self.n_runs < 2 or threshold <= 0.0:
94 return []
95 out: list[tuple[str, float]] = []
96 for name in self.probe_names:
97 series = self.deltas.get(name, [])
98 if not series:
99 continue
100 last = series[-1]
101 if last is None:
102 continue
103 if last <= -threshold:
104 out.append((name, last))
105 out.sort(key=lambda pair: pair[1])
106 return out
107
108
109 def build_matrix(
110 results: list[tuple[SuiteResult, SwayScore]],
111 *,
112 labels: list[str] | None = None,
113 ) -> CompareMatrix:
114 """Fold an N-run history into a :class:`CompareMatrix`.
115
116 ``labels`` let the CLI pass filenames or short identifiers; when
117 omitted we fall back to ``finished_at`` timestamps. The order of
118 ``results`` is the column order — runs are not sorted.
119 """
120 if not results:
121 raise ValueError("build_matrix requires at least one run")
122
123 label_tuple: tuple[str, ...]
124 if labels is not None:
125 if len(labels) != len(results):
126 raise ValueError(f"labels length {len(labels)} != results length {len(results)}")
127 label_tuple = tuple(labels)
128 else:
129 label_tuple = tuple(
130 (s.finished_at.isoformat() if s.finished_at else f"run-{i}")
131 for i, (s, _) in enumerate(results)
132 )
133
134 timestamp_tuple = tuple(
135 (s.finished_at.isoformat() if s.finished_at else "") for s, _ in results
136 )
137
138 # Per-run map: probe name → score. Missing probe in a run → absent
139 # key. The outer matrix fills with None for those.
140 per_run_scores: list[dict[str, float | None]] = []
141 for suite, _score in results:
142 run_map: dict[str, float | None] = {}
143 for p in suite.probes:
144 run_map[p.name] = (
145 float(p.score) if p.score is not None and math.isfinite(p.score) else None
146 )
147 per_run_scores.append(run_map)
148
149 # Union of probe names across every run — sorted so row order is
150 # stable across invocations regardless of which run appears first.
151 union_names = sorted({name for run in per_run_scores for name in run})
152
153 scores: dict[str, list[float | None]] = {
154 name: [run.get(name) for run in per_run_scores] for name in union_names
155 }
156
157 # Delta series: scores[i] - scores[i-1] per probe, guarded against
158 # None neighbors. First delta index is 0 (between run 0 and run 1).
159 deltas: dict[str, list[float | None]] = {}
160 for name, series in scores.items():
161 row: list[float | None] = []
162 for i in range(1, len(series)):
163 prev = series[i - 1]
164 cur = series[i]
165 if prev is None or cur is None:
166 row.append(None)
167 continue
168 delta = cur - prev
169 row.append(delta if math.isfinite(delta) else None)
170 deltas[name] = row
171
172 composite_series: list[float | None] = [
173 (
174 float(score.overall)
175 if score is not None and math.isfinite(float(score.overall))
176 else None
177 )
178 for _, score in results
179 ]
180
181 return CompareMatrix(
182 labels=label_tuple,
183 timestamps=timestamp_tuple,
184 probe_names=tuple(union_names),
185 scores=scores,
186 deltas=deltas,
187 composite_series=composite_series,
188 )
189
190
191 def _format_cell(v: float | None) -> str:
192 """Score cell: two decimals with em-dash for None."""
193 return _NONE_GLYPH if v is None else f"{v:.2f}"
194
195
196 def _format_delta(v: float | None) -> str:
197 """Delta cell: signed two decimals, em-dash for None, explicit sign."""
198 if v is None:
199 return _NONE_GLYPH
200 if abs(v) < 5e-5:
201 return "0.00"
202 return f"{v:+.2f}"
203
204
205 def _delta_style(v: float | None, threshold: float) -> str:
206 """Rich style for a delta cell. Red on regression, green on improvement."""
207 if v is None:
208 return "dim"
209 if v <= -threshold:
210 return "bold red"
211 if v >= threshold:
212 return "bold green"
213 return "dim"
214
215
216 def render_terminal(
217 matrix: CompareMatrix,
218 *,
219 console: Console | None = None,
220 regression_threshold: float = DEFAULT_REGRESSION_THRESHOLD,
221 ) -> None:
222 """Rich-formatted terminal output. One row per probe + composite + deltas."""
223 c = console or Console()
224
225 c.print(Text(f"sway compare — {matrix.n_runs} runs", style="bold"))
226 c.print()
227
228 table = Table(show_header=True, header_style="bold", box=None, padding=(0, 1))
229 table.add_column("probe", style="cyan")
230 for label in matrix.labels:
231 table.add_column(label, justify="right")
232 for i in range(matrix.n_runs - 1):
233 table.add_column(
234 f{matrix.labels[i]}{matrix.labels[i + 1]}", justify="right", style="dim"
235 )
236
237 for name in matrix.probe_names:
238 row: list[str | Text] = [name]
239 for v in matrix.scores[name]:
240 row.append(_format_cell(v))
241 for v in matrix.deltas[name]:
242 row.append(Text(_format_delta(v), style=_delta_style(v, regression_threshold)))
243 table.add_row(*row)
244
245 # Composite row lives below the probes, separated by a divider the
246 # terminal draws implicitly when the name style differs.
247 composite_row: list[str | Text] = [Text("composite (overall)", style="bold")]
248 for v in matrix.composite_series:
249 composite_row.append(_format_cell(v))
250 composite_deltas: list[float | None] = []
251 for i in range(1, len(matrix.composite_series)):
252 prev = matrix.composite_series[i - 1]
253 cur = matrix.composite_series[i]
254 composite_deltas.append(cur - prev if (prev is not None and cur is not None) else None)
255 for v in composite_deltas:
256 composite_row.append(Text(_format_delta(v), style=_delta_style(v, regression_threshold)))
257 table.add_row(*composite_row)
258
259 c.print(table)
260
261 regressions = matrix.latest_regressions(regression_threshold)
262 if regressions:
263 c.print()
264 c.print(
265 Text(
266 f"regressions (≥{regression_threshold:.2f} drop vs previous run):",
267 style="bold red",
268 )
269 )
270 for name, delta in regressions:
271 c.print(f" {name}: {delta:+.3f}")
272
273
274 def render_markdown(
275 matrix: CompareMatrix,
276 *,
277 regression_threshold: float = DEFAULT_REGRESSION_THRESHOLD,
278 ) -> str:
279 """Markdown table — same content as the terminal, pipe-friendly."""
280 buf = StringIO()
281 buf.write(f"# sway compare — {matrix.n_runs} runs\n\n")
282
283 if matrix.timestamps:
284 buf.write("## Runs\n\n")
285 buf.write("| label | finished_at |\n|---|---|\n")
286 for label, ts in zip(matrix.labels, matrix.timestamps, strict=True):
287 buf.write(f"| {label} | {ts or _NONE_GLYPH} |\n")
288 buf.write("\n")
289
290 buf.write("## Scores\n\n")
291 header = ["probe"] + list(matrix.labels)
292 for i in range(matrix.n_runs - 1):
293 header.append(f{matrix.labels[i]}{matrix.labels[i + 1]}")
294 buf.write("| " + " | ".join(header) + " |\n")
295 buf.write("|" + "|".join(["---"] * len(header)) + "|\n")
296
297 for name in matrix.probe_names:
298 cells = [name] + [_format_cell(v) for v in matrix.scores[name]]
299 cells += [_format_delta(v) for v in matrix.deltas[name]]
300 buf.write("| " + " | ".join(cells) + " |\n")
301
302 # Composite row
303 cells = ["**composite**"] + [_format_cell(v) for v in matrix.composite_series]
304 composite_deltas: list[float | None] = []
305 for i in range(1, len(matrix.composite_series)):
306 prev = matrix.composite_series[i - 1]
307 cur = matrix.composite_series[i]
308 composite_deltas.append(cur - prev if (prev is not None and cur is not None) else None)
309 cells += [_format_delta(v) for v in composite_deltas]
310 buf.write("| " + " | ".join(cells) + " |\n")
311
312 regressions = matrix.latest_regressions(regression_threshold)
313 if regressions:
314 buf.write(f"\n## Regressions (≥{regression_threshold:.2f} drop vs previous run)\n\n")
315 for name, delta in regressions:
316 buf.write(f"- **{name}** — `{delta:+.3f}`\n")
317
318 return buf.getvalue()
319
320
321 def render_json(
322 matrix: CompareMatrix,
323 *,
324 regression_threshold: float = DEFAULT_REGRESSION_THRESHOLD,
325 ) -> str:
326 """Machine-readable JSON. Same field names as :class:`CompareMatrix`."""
327 payload: dict[str, Any] = {
328 "labels": list(matrix.labels),
329 "timestamps": list(matrix.timestamps),
330 "probe_names": list(matrix.probe_names),
331 "scores": {name: list(series) for name, series in matrix.scores.items()},
332 "deltas": {name: list(series) for name, series in matrix.deltas.items()},
333 "composite_series": list(matrix.composite_series),
334 "regression_threshold": regression_threshold,
335 "latest_regressions": [
336 {"probe": name, "delta": delta}
337 for name, delta in matrix.latest_regressions(regression_threshold)
338 ],
339 }
340 return json.dumps(payload, indent=2, sort_keys=False)
341
342
343 __all__ = [
344 "CompareMatrix",
345 "DEFAULT_REGRESSION_THRESHOLD",
346 "build_matrix",
347 "render_json",
348 "render_markdown",
349 "render_terminal",
350 ]