Python · 12781 bytes Raw Blame History
1 """Provenance chain: dataclass + digest + TOFU + verify."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 from pathlib import Path
8
9 import pytest
10
11 from dlm.share.provenance import (
12 Provenance,
13 ProvenanceChainBroken,
14 ProvenanceError,
15 ProvenanceSchemaError,
16 ProvenanceVerifyResult,
17 UnknownSignerError,
18 canonical_json_bytes,
19 dump_provenance_json,
20 find_matching_trusted_key,
21 iso_utc_now,
22 load_provenance_json,
23 pubkey_fingerprint,
24 recompute_chain_consistency,
25 record_trusted_key,
26 verify_provenance,
27 )
28
29 _SAMPLE_PUBKEY = (
30 "untrusted comment: minisign public key ABCDEF1234567890\n"
31 "RWSABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmno+/=\n"
32 ).strip()
33
34
35 def _sample_provenance(**overrides: str) -> Provenance:
36 base = {
37 "adapter_sha256": "a" * 64,
38 "base_revision": "b" * 40,
39 "corpus_root_sha256": "c" * 64,
40 "env_lock_digest": "d" * 64,
41 "signed_at": "2026-04-21T12:00:00Z",
42 "signer_public_key": _SAMPLE_PUBKEY,
43 "signature": "untrusted comment: signature\nabcxyz=\ntrusted comment: signed",
44 }
45 base.update(overrides)
46 return Provenance(**base) # type: ignore[arg-type]
47
48
49 class TestCanonicalJsonBytes:
50 def test_sorts_keys(self) -> None:
51 out_a = canonical_json_bytes({"b": "2", "a": "1"})
52 out_b = canonical_json_bytes({"a": "1", "b": "2"})
53 assert out_a == out_b
54
55 def test_compact_separators(self) -> None:
56 # No whitespace — signature determinism depends on this.
57 out = canonical_json_bytes({"a": "1", "b": "2"})
58 assert b" " not in out
59 assert b"\n" not in out
60
61 def test_utf8_preserved_roundtrip(self) -> None:
62 out = canonical_json_bytes({"a": "1"})
63 assert json.loads(out.decode("utf-8")) == {"a": "1"}
64
65
66 class TestProvenanceDigest:
67 def test_chain_bytes_excludes_signature(self) -> None:
68 prov = _sample_provenance()
69 fields = json.loads(prov.chain_bytes().decode("utf-8"))
70 assert "signature" not in fields
71 # Sanity — the non-signature fields ARE present.
72 assert fields["adapter_sha256"] == prov.adapter_sha256
73
74 def test_chain_digest_is_deterministic(self) -> None:
75 prov_a = _sample_provenance()
76 prov_b = _sample_provenance()
77 assert prov_a.compute_chain_digest() == prov_b.compute_chain_digest()
78
79 def test_chain_digest_changes_on_any_field(self) -> None:
80 base = _sample_provenance().compute_chain_digest()
81 for field, new in (
82 ("adapter_sha256", "z" * 64),
83 ("base_revision", "z" * 40),
84 ("corpus_root_sha256", "z" * 64),
85 ("env_lock_digest", "z" * 64),
86 ("signed_at", "2025-01-01T00:00:00Z"),
87 ("signer_public_key", "different-key"),
88 ):
89 alt = _sample_provenance(**{field: new}).compute_chain_digest()
90 assert alt != base, f"{field} change didn't affect digest"
91
92 def test_signature_change_does_not_change_digest(self) -> None:
93 """The digest is over the SIGNED fields — the signature itself
94 is not part of what gets hashed, or verify would be circular."""
95 base = _sample_provenance().compute_chain_digest()
96 alt = _sample_provenance(signature="different-sig").compute_chain_digest()
97 assert base == alt
98
99
100 class TestJsonIO:
101 def test_roundtrip(self, tmp_path: Path) -> None:
102 prov = _sample_provenance()
103 path = tmp_path / "provenance.json"
104 dump_provenance_json(prov, path)
105 loaded = load_provenance_json(path)
106 assert loaded == prov
107
108 def test_missing_file_raises(self, tmp_path: Path) -> None:
109 with pytest.raises(ProvenanceSchemaError, match="not found"):
110 load_provenance_json(tmp_path / "does-not-exist.json")
111
112 def test_malformed_json_raises(self, tmp_path: Path) -> None:
113 path = tmp_path / "bad.json"
114 path.write_text("{ not json", encoding="utf-8")
115 with pytest.raises(ProvenanceSchemaError, match="unreadable"):
116 load_provenance_json(path)
117
118 def test_missing_field_raises_with_names(self, tmp_path: Path) -> None:
119 path = tmp_path / "partial.json"
120 path.write_text(
121 json.dumps({"adapter_sha256": "x" * 64, "signature": "sig"}),
122 encoding="utf-8",
123 )
124 with pytest.raises(ProvenanceSchemaError, match="missing required fields"):
125 load_provenance_json(path)
126
127 def test_non_string_field_raises(self, tmp_path: Path) -> None:
128 path = tmp_path / "typed.json"
129 payload = {
130 "adapter_sha256": 12345, # int, not str
131 "base_revision": "b" * 40,
132 "corpus_root_sha256": "c" * 64,
133 "env_lock_digest": "d" * 64,
134 "signed_at": "2026-04-21T12:00:00Z",
135 "signer_public_key": "key",
136 "signature": "sig",
137 }
138 path.write_text(json.dumps(payload), encoding="utf-8")
139 with pytest.raises(ProvenanceSchemaError, match="adapter_sha256"):
140 load_provenance_json(path)
141
142 def test_non_object_root_raises(self, tmp_path: Path) -> None:
143 path = tmp_path / "array.json"
144 path.write_text("[]", encoding="utf-8")
145 with pytest.raises(ProvenanceSchemaError, match="JSON object"):
146 load_provenance_json(path)
147
148
149 class TestIsoUtcNow:
150 def test_format_matches_signed_at(self) -> None:
151 s = iso_utc_now()
152 # Pattern: `YYYY-MM-DDTHH:MM:SSZ`
153 assert s.endswith("Z")
154 assert s[4] == "-"
155 assert s[7] == "-"
156 assert s[10] == "T"
157 assert s[13] == ":"
158 assert s[16] == ":"
159
160
161 class TestTrustedKeyRegistry:
162 def test_fingerprint_is_stable(self) -> None:
163 assert pubkey_fingerprint(_SAMPLE_PUBKEY) == pubkey_fingerprint(_SAMPLE_PUBKEY)
164 # sha256 first 12 hex chars.
165 expected = hashlib.sha256(_SAMPLE_PUBKEY.encode("utf-8")).hexdigest()[:12]
166 assert pubkey_fingerprint(_SAMPLE_PUBKEY) == expected
167
168 def test_record_creates_file_with_fingerprint_name(self, tmp_path: Path) -> None:
169 target = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
170 assert target.name.endswith(".pub")
171 assert pubkey_fingerprint(_SAMPLE_PUBKEY) in target.name
172
173 def test_record_with_label(self, tmp_path: Path) -> None:
174 target = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path, label="alice")
175 assert target.name.startswith("alice-")
176
177 def test_record_is_idempotent(self, tmp_path: Path) -> None:
178 first = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
179 second = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
180 assert first == second
181
182 def test_record_refuses_to_overwrite_different_key_contents(self, tmp_path: Path) -> None:
183 target = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path, label="alice")
184
185 with pytest.MonkeyPatch.context() as mp:
186 mp.setattr(
187 "dlm.share.provenance.pubkey_fingerprint",
188 lambda _key: target.stem.removeprefix("alice-"),
189 )
190 with pytest.raises(ProvenanceError, match="refusing to overwrite"):
191 record_trusted_key(
192 _SAMPLE_PUBKEY + "\nDIFFERENT",
193 trusted_keys_dir=tmp_path,
194 label="alice",
195 )
196 assert target.is_file()
197
198 def test_find_matching_returns_path(self, tmp_path: Path) -> None:
199 record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
200 found = find_matching_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
201 assert found is not None
202
203 def test_find_matching_returns_none_on_miss(self, tmp_path: Path) -> None:
204 found = find_matching_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
205 assert found is None
206
207 def test_find_matching_handles_missing_dir(self, tmp_path: Path) -> None:
208 found = find_matching_trusted_key(
209 _SAMPLE_PUBKEY, trusted_keys_dir=tmp_path / "does-not-exist"
210 )
211 assert found is None
212
213 def test_find_matching_skips_unreadable_pubkey_files(
214 self,
215 tmp_path: Path,
216 monkeypatch: pytest.MonkeyPatch,
217 ) -> None:
218 good = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
219 bad = tmp_path / "000-bad.pub"
220 bad.write_text("broken", encoding="utf-8")
221 path_type = type(bad)
222 real_read_text = path_type.read_text
223
224 def _maybe_broken(self: Path, *args: object, **kwargs: object) -> str:
225 if self == bad:
226 raise OSError("boom")
227 return real_read_text(self, *args, **kwargs)
228
229 monkeypatch.setattr(path_type, "read_text", _maybe_broken)
230
231 assert find_matching_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) == good
232
233
234 class TestVerifyProvenance:
235 def _stub_verifier_accepts(self, chain: bytes, signature: str, pubkey_path: Path) -> None:
236 """Pretend-verifier that always succeeds."""
237
238 def _stub_verifier_rejects(self, chain: bytes, signature: str, pubkey_path: Path) -> None:
239 """Pretend-verifier that always refuses."""
240 from dlm.share.errors import ShareError
241
242 raise ShareError("stub: refusing")
243
244 def test_verified_happy_path(self, tmp_path: Path) -> None:
245 record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
246 prov = _sample_provenance()
247 result = verify_provenance(
248 prov,
249 trusted_keys_dir=tmp_path,
250 signature_verifier=self._stub_verifier_accepts,
251 )
252 assert isinstance(result, ProvenanceVerifyResult)
253 assert result.verified is True
254 assert result.tofu_recorded is False
255 assert result.signer_fingerprint == pubkey_fingerprint(_SAMPLE_PUBKEY)
256
257 def test_unknown_signer_strict_raises(self, tmp_path: Path) -> None:
258 prov = _sample_provenance()
259 with pytest.raises(UnknownSignerError, match=pubkey_fingerprint(_SAMPLE_PUBKEY)):
260 verify_provenance(
261 prov,
262 trusted_keys_dir=tmp_path,
263 signature_verifier=self._stub_verifier_accepts,
264 )
265
266 def test_unknown_signer_tofu_records(self, tmp_path: Path) -> None:
267 prov = _sample_provenance()
268 result = verify_provenance(
269 prov,
270 trusted_keys_dir=tmp_path,
271 tofu=True,
272 signature_verifier=self._stub_verifier_accepts,
273 )
274 assert result.verified is True
275 assert result.tofu_recorded is True
276 # Second verify under TOFU is now just a regular match.
277 second = verify_provenance(
278 prov,
279 trusted_keys_dir=tmp_path,
280 tofu=True,
281 signature_verifier=self._stub_verifier_accepts,
282 )
283 assert second.tofu_recorded is False
284
285 def test_bad_signature_raises(self, tmp_path: Path) -> None:
286 from dlm.share.errors import ShareError
287
288 record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path)
289 prov = _sample_provenance()
290
291 with pytest.raises(ShareError):
292 verify_provenance(
293 prov,
294 trusted_keys_dir=tmp_path,
295 signature_verifier=self._stub_verifier_rejects,
296 )
297
298
299 class TestChainConsistency:
300 def test_matching_sha_passes(self) -> None:
301 prov = _sample_provenance(adapter_sha256="a" * 64)
302 recompute_chain_consistency(prov, adapter_sha256="a" * 64)
303
304 def test_mismatched_sha_raises(self) -> None:
305 prov = _sample_provenance(adapter_sha256="a" * 64)
306 with pytest.raises(ProvenanceChainBroken, match="mismatch"):
307 recompute_chain_consistency(prov, adapter_sha256="b" * 64)
308
309
310 class TestDefaultSignatureVerifier:
311 def test_default_signature_verifier_writes_temp_files_and_calls_minisign(
312 self,
313 monkeypatch: pytest.MonkeyPatch,
314 tmp_path: Path,
315 ) -> None:
316 from dlm.share.provenance import _default_signature_verifier
317
318 seen: dict[str, object] = {}
319
320 def _fake_minisign_verify(payload: Path, sig: Path, pubkey: Path) -> None:
321 seen["payload"] = payload.read_bytes()
322 seen["signature"] = sig.read_text(encoding="utf-8")
323 seen["pubkey"] = pubkey
324
325 monkeypatch.setattr("dlm.share.signing._minisign_verify", _fake_minisign_verify)
326
327 pubkey = tmp_path / "key.pub"
328 pubkey.write_text("pub", encoding="utf-8")
329 _default_signature_verifier(b"chain-bytes", "signature-block", pubkey)
330
331 assert seen["payload"] == b"chain-bytes"
332 assert seen["signature"] == "signature-block"
333 assert seen["pubkey"] == pubkey