| 1 |
"""Provenance chain: dataclass + digest + TOFU + verify.""" |
| 2 |
|
| 3 |
from __future__ import annotations |
| 4 |
|
| 5 |
import hashlib |
| 6 |
import json |
| 7 |
from pathlib import Path |
| 8 |
|
| 9 |
import pytest |
| 10 |
|
| 11 |
from dlm.share.provenance import ( |
| 12 |
Provenance, |
| 13 |
ProvenanceChainBroken, |
| 14 |
ProvenanceError, |
| 15 |
ProvenanceSchemaError, |
| 16 |
ProvenanceVerifyResult, |
| 17 |
UnknownSignerError, |
| 18 |
canonical_json_bytes, |
| 19 |
dump_provenance_json, |
| 20 |
find_matching_trusted_key, |
| 21 |
iso_utc_now, |
| 22 |
load_provenance_json, |
| 23 |
pubkey_fingerprint, |
| 24 |
recompute_chain_consistency, |
| 25 |
record_trusted_key, |
| 26 |
verify_provenance, |
| 27 |
) |
| 28 |
|
| 29 |
_SAMPLE_PUBKEY = ( |
| 30 |
"untrusted comment: minisign public key ABCDEF1234567890\n" |
| 31 |
"RWSABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmno+/=\n" |
| 32 |
).strip() |
| 33 |
|
| 34 |
|
| 35 |
def _sample_provenance(**overrides: str) -> Provenance: |
| 36 |
base = { |
| 37 |
"adapter_sha256": "a" * 64, |
| 38 |
"base_revision": "b" * 40, |
| 39 |
"corpus_root_sha256": "c" * 64, |
| 40 |
"env_lock_digest": "d" * 64, |
| 41 |
"signed_at": "2026-04-21T12:00:00Z", |
| 42 |
"signer_public_key": _SAMPLE_PUBKEY, |
| 43 |
"signature": "untrusted comment: signature\nabcxyz=\ntrusted comment: signed", |
| 44 |
} |
| 45 |
base.update(overrides) |
| 46 |
return Provenance(**base) # type: ignore[arg-type] |
| 47 |
|
| 48 |
|
| 49 |
class TestCanonicalJsonBytes: |
| 50 |
def test_sorts_keys(self) -> None: |
| 51 |
out_a = canonical_json_bytes({"b": "2", "a": "1"}) |
| 52 |
out_b = canonical_json_bytes({"a": "1", "b": "2"}) |
| 53 |
assert out_a == out_b |
| 54 |
|
| 55 |
def test_compact_separators(self) -> None: |
| 56 |
# No whitespace — signature determinism depends on this. |
| 57 |
out = canonical_json_bytes({"a": "1", "b": "2"}) |
| 58 |
assert b" " not in out |
| 59 |
assert b"\n" not in out |
| 60 |
|
| 61 |
def test_utf8_preserved_roundtrip(self) -> None: |
| 62 |
out = canonical_json_bytes({"a": "1"}) |
| 63 |
assert json.loads(out.decode("utf-8")) == {"a": "1"} |
| 64 |
|
| 65 |
|
| 66 |
class TestProvenanceDigest: |
| 67 |
def test_chain_bytes_excludes_signature(self) -> None: |
| 68 |
prov = _sample_provenance() |
| 69 |
fields = json.loads(prov.chain_bytes().decode("utf-8")) |
| 70 |
assert "signature" not in fields |
| 71 |
# Sanity — the non-signature fields ARE present. |
| 72 |
assert fields["adapter_sha256"] == prov.adapter_sha256 |
| 73 |
|
| 74 |
def test_chain_digest_is_deterministic(self) -> None: |
| 75 |
prov_a = _sample_provenance() |
| 76 |
prov_b = _sample_provenance() |
| 77 |
assert prov_a.compute_chain_digest() == prov_b.compute_chain_digest() |
| 78 |
|
| 79 |
def test_chain_digest_changes_on_any_field(self) -> None: |
| 80 |
base = _sample_provenance().compute_chain_digest() |
| 81 |
for field, new in ( |
| 82 |
("adapter_sha256", "z" * 64), |
| 83 |
("base_revision", "z" * 40), |
| 84 |
("corpus_root_sha256", "z" * 64), |
| 85 |
("env_lock_digest", "z" * 64), |
| 86 |
("signed_at", "2025-01-01T00:00:00Z"), |
| 87 |
("signer_public_key", "different-key"), |
| 88 |
): |
| 89 |
alt = _sample_provenance(**{field: new}).compute_chain_digest() |
| 90 |
assert alt != base, f"{field} change didn't affect digest" |
| 91 |
|
| 92 |
def test_signature_change_does_not_change_digest(self) -> None: |
| 93 |
"""The digest is over the SIGNED fields — the signature itself |
| 94 |
is not part of what gets hashed, or verify would be circular.""" |
| 95 |
base = _sample_provenance().compute_chain_digest() |
| 96 |
alt = _sample_provenance(signature="different-sig").compute_chain_digest() |
| 97 |
assert base == alt |
| 98 |
|
| 99 |
|
| 100 |
class TestJsonIO: |
| 101 |
def test_roundtrip(self, tmp_path: Path) -> None: |
| 102 |
prov = _sample_provenance() |
| 103 |
path = tmp_path / "provenance.json" |
| 104 |
dump_provenance_json(prov, path) |
| 105 |
loaded = load_provenance_json(path) |
| 106 |
assert loaded == prov |
| 107 |
|
| 108 |
def test_missing_file_raises(self, tmp_path: Path) -> None: |
| 109 |
with pytest.raises(ProvenanceSchemaError, match="not found"): |
| 110 |
load_provenance_json(tmp_path / "does-not-exist.json") |
| 111 |
|
| 112 |
def test_malformed_json_raises(self, tmp_path: Path) -> None: |
| 113 |
path = tmp_path / "bad.json" |
| 114 |
path.write_text("{ not json", encoding="utf-8") |
| 115 |
with pytest.raises(ProvenanceSchemaError, match="unreadable"): |
| 116 |
load_provenance_json(path) |
| 117 |
|
| 118 |
def test_missing_field_raises_with_names(self, tmp_path: Path) -> None: |
| 119 |
path = tmp_path / "partial.json" |
| 120 |
path.write_text( |
| 121 |
json.dumps({"adapter_sha256": "x" * 64, "signature": "sig"}), |
| 122 |
encoding="utf-8", |
| 123 |
) |
| 124 |
with pytest.raises(ProvenanceSchemaError, match="missing required fields"): |
| 125 |
load_provenance_json(path) |
| 126 |
|
| 127 |
def test_non_string_field_raises(self, tmp_path: Path) -> None: |
| 128 |
path = tmp_path / "typed.json" |
| 129 |
payload = { |
| 130 |
"adapter_sha256": 12345, # int, not str |
| 131 |
"base_revision": "b" * 40, |
| 132 |
"corpus_root_sha256": "c" * 64, |
| 133 |
"env_lock_digest": "d" * 64, |
| 134 |
"signed_at": "2026-04-21T12:00:00Z", |
| 135 |
"signer_public_key": "key", |
| 136 |
"signature": "sig", |
| 137 |
} |
| 138 |
path.write_text(json.dumps(payload), encoding="utf-8") |
| 139 |
with pytest.raises(ProvenanceSchemaError, match="adapter_sha256"): |
| 140 |
load_provenance_json(path) |
| 141 |
|
| 142 |
def test_non_object_root_raises(self, tmp_path: Path) -> None: |
| 143 |
path = tmp_path / "array.json" |
| 144 |
path.write_text("[]", encoding="utf-8") |
| 145 |
with pytest.raises(ProvenanceSchemaError, match="JSON object"): |
| 146 |
load_provenance_json(path) |
| 147 |
|
| 148 |
|
| 149 |
class TestIsoUtcNow: |
| 150 |
def test_format_matches_signed_at(self) -> None: |
| 151 |
s = iso_utc_now() |
| 152 |
# Pattern: `YYYY-MM-DDTHH:MM:SSZ` |
| 153 |
assert s.endswith("Z") |
| 154 |
assert s[4] == "-" |
| 155 |
assert s[7] == "-" |
| 156 |
assert s[10] == "T" |
| 157 |
assert s[13] == ":" |
| 158 |
assert s[16] == ":" |
| 159 |
|
| 160 |
|
| 161 |
class TestTrustedKeyRegistry: |
| 162 |
def test_fingerprint_is_stable(self) -> None: |
| 163 |
assert pubkey_fingerprint(_SAMPLE_PUBKEY) == pubkey_fingerprint(_SAMPLE_PUBKEY) |
| 164 |
# sha256 first 12 hex chars. |
| 165 |
expected = hashlib.sha256(_SAMPLE_PUBKEY.encode("utf-8")).hexdigest()[:12] |
| 166 |
assert pubkey_fingerprint(_SAMPLE_PUBKEY) == expected |
| 167 |
|
| 168 |
def test_record_creates_file_with_fingerprint_name(self, tmp_path: Path) -> None: |
| 169 |
target = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 170 |
assert target.name.endswith(".pub") |
| 171 |
assert pubkey_fingerprint(_SAMPLE_PUBKEY) in target.name |
| 172 |
|
| 173 |
def test_record_with_label(self, tmp_path: Path) -> None: |
| 174 |
target = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path, label="alice") |
| 175 |
assert target.name.startswith("alice-") |
| 176 |
|
| 177 |
def test_record_is_idempotent(self, tmp_path: Path) -> None: |
| 178 |
first = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 179 |
second = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 180 |
assert first == second |
| 181 |
|
| 182 |
def test_record_refuses_to_overwrite_different_key_contents(self, tmp_path: Path) -> None: |
| 183 |
target = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path, label="alice") |
| 184 |
|
| 185 |
with pytest.MonkeyPatch.context() as mp: |
| 186 |
mp.setattr( |
| 187 |
"dlm.share.provenance.pubkey_fingerprint", |
| 188 |
lambda _key: target.stem.removeprefix("alice-"), |
| 189 |
) |
| 190 |
with pytest.raises(ProvenanceError, match="refusing to overwrite"): |
| 191 |
record_trusted_key( |
| 192 |
_SAMPLE_PUBKEY + "\nDIFFERENT", |
| 193 |
trusted_keys_dir=tmp_path, |
| 194 |
label="alice", |
| 195 |
) |
| 196 |
assert target.is_file() |
| 197 |
|
| 198 |
def test_find_matching_returns_path(self, tmp_path: Path) -> None: |
| 199 |
record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 200 |
found = find_matching_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 201 |
assert found is not None |
| 202 |
|
| 203 |
def test_find_matching_returns_none_on_miss(self, tmp_path: Path) -> None: |
| 204 |
found = find_matching_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 205 |
assert found is None |
| 206 |
|
| 207 |
def test_find_matching_handles_missing_dir(self, tmp_path: Path) -> None: |
| 208 |
found = find_matching_trusted_key( |
| 209 |
_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path / "does-not-exist" |
| 210 |
) |
| 211 |
assert found is None |
| 212 |
|
| 213 |
def test_find_matching_skips_unreadable_pubkey_files( |
| 214 |
self, |
| 215 |
tmp_path: Path, |
| 216 |
monkeypatch: pytest.MonkeyPatch, |
| 217 |
) -> None: |
| 218 |
good = record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 219 |
bad = tmp_path / "000-bad.pub" |
| 220 |
bad.write_text("broken", encoding="utf-8") |
| 221 |
path_type = type(bad) |
| 222 |
real_read_text = path_type.read_text |
| 223 |
|
| 224 |
def _maybe_broken(self: Path, *args: object, **kwargs: object) -> str: |
| 225 |
if self == bad: |
| 226 |
raise OSError("boom") |
| 227 |
return real_read_text(self, *args, **kwargs) |
| 228 |
|
| 229 |
monkeypatch.setattr(path_type, "read_text", _maybe_broken) |
| 230 |
|
| 231 |
assert find_matching_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) == good |
| 232 |
|
| 233 |
|
| 234 |
class TestVerifyProvenance: |
| 235 |
def _stub_verifier_accepts(self, chain: bytes, signature: str, pubkey_path: Path) -> None: |
| 236 |
"""Pretend-verifier that always succeeds.""" |
| 237 |
|
| 238 |
def _stub_verifier_rejects(self, chain: bytes, signature: str, pubkey_path: Path) -> None: |
| 239 |
"""Pretend-verifier that always refuses.""" |
| 240 |
from dlm.share.errors import ShareError |
| 241 |
|
| 242 |
raise ShareError("stub: refusing") |
| 243 |
|
| 244 |
def test_verified_happy_path(self, tmp_path: Path) -> None: |
| 245 |
record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 246 |
prov = _sample_provenance() |
| 247 |
result = verify_provenance( |
| 248 |
prov, |
| 249 |
trusted_keys_dir=tmp_path, |
| 250 |
signature_verifier=self._stub_verifier_accepts, |
| 251 |
) |
| 252 |
assert isinstance(result, ProvenanceVerifyResult) |
| 253 |
assert result.verified is True |
| 254 |
assert result.tofu_recorded is False |
| 255 |
assert result.signer_fingerprint == pubkey_fingerprint(_SAMPLE_PUBKEY) |
| 256 |
|
| 257 |
def test_unknown_signer_strict_raises(self, tmp_path: Path) -> None: |
| 258 |
prov = _sample_provenance() |
| 259 |
with pytest.raises(UnknownSignerError, match=pubkey_fingerprint(_SAMPLE_PUBKEY)): |
| 260 |
verify_provenance( |
| 261 |
prov, |
| 262 |
trusted_keys_dir=tmp_path, |
| 263 |
signature_verifier=self._stub_verifier_accepts, |
| 264 |
) |
| 265 |
|
| 266 |
def test_unknown_signer_tofu_records(self, tmp_path: Path) -> None: |
| 267 |
prov = _sample_provenance() |
| 268 |
result = verify_provenance( |
| 269 |
prov, |
| 270 |
trusted_keys_dir=tmp_path, |
| 271 |
tofu=True, |
| 272 |
signature_verifier=self._stub_verifier_accepts, |
| 273 |
) |
| 274 |
assert result.verified is True |
| 275 |
assert result.tofu_recorded is True |
| 276 |
# Second verify under TOFU is now just a regular match. |
| 277 |
second = verify_provenance( |
| 278 |
prov, |
| 279 |
trusted_keys_dir=tmp_path, |
| 280 |
tofu=True, |
| 281 |
signature_verifier=self._stub_verifier_accepts, |
| 282 |
) |
| 283 |
assert second.tofu_recorded is False |
| 284 |
|
| 285 |
def test_bad_signature_raises(self, tmp_path: Path) -> None: |
| 286 |
from dlm.share.errors import ShareError |
| 287 |
|
| 288 |
record_trusted_key(_SAMPLE_PUBKEY, trusted_keys_dir=tmp_path) |
| 289 |
prov = _sample_provenance() |
| 290 |
|
| 291 |
with pytest.raises(ShareError): |
| 292 |
verify_provenance( |
| 293 |
prov, |
| 294 |
trusted_keys_dir=tmp_path, |
| 295 |
signature_verifier=self._stub_verifier_rejects, |
| 296 |
) |
| 297 |
|
| 298 |
|
| 299 |
class TestChainConsistency: |
| 300 |
def test_matching_sha_passes(self) -> None: |
| 301 |
prov = _sample_provenance(adapter_sha256="a" * 64) |
| 302 |
recompute_chain_consistency(prov, adapter_sha256="a" * 64) |
| 303 |
|
| 304 |
def test_mismatched_sha_raises(self) -> None: |
| 305 |
prov = _sample_provenance(adapter_sha256="a" * 64) |
| 306 |
with pytest.raises(ProvenanceChainBroken, match="mismatch"): |
| 307 |
recompute_chain_consistency(prov, adapter_sha256="b" * 64) |
| 308 |
|
| 309 |
|
| 310 |
class TestDefaultSignatureVerifier: |
| 311 |
def test_default_signature_verifier_writes_temp_files_and_calls_minisign( |
| 312 |
self, |
| 313 |
monkeypatch: pytest.MonkeyPatch, |
| 314 |
tmp_path: Path, |
| 315 |
) -> None: |
| 316 |
from dlm.share.provenance import _default_signature_verifier |
| 317 |
|
| 318 |
seen: dict[str, object] = {} |
| 319 |
|
| 320 |
def _fake_minisign_verify(payload: Path, sig: Path, pubkey: Path) -> None: |
| 321 |
seen["payload"] = payload.read_bytes() |
| 322 |
seen["signature"] = sig.read_text(encoding="utf-8") |
| 323 |
seen["pubkey"] = pubkey |
| 324 |
|
| 325 |
monkeypatch.setattr("dlm.share.signing._minisign_verify", _fake_minisign_verify) |
| 326 |
|
| 327 |
pubkey = tmp_path / "key.pub" |
| 328 |
pubkey.write_text("pub", encoding="utf-8") |
| 329 |
_default_signature_verifier(b"chain-bytes", "signature-block", pubkey) |
| 330 |
|
| 331 |
assert seen["payload"] == b"chain-bytes" |
| 332 |
assert seen["signature"] == "signature-block" |
| 333 |
assert seen["pubkey"] == pubkey |