| 1 |
"""Hypothesis property tests for invariants across dlm.* (audit-04 T4). |
| 2 |
|
| 3 |
These aren't replacements for example-based tests — they're guardrails |
| 4 |
for properties that hold for *all* inputs. Keep them cheap: the suite |
| 5 |
runs inside the default pytest invocation, so each property's shrinking |
| 6 |
budget is tuned small. |
| 7 |
""" |
| 8 |
|
| 9 |
from __future__ import annotations |
| 10 |
|
| 11 |
import string |
| 12 |
from pathlib import Path |
| 13 |
|
| 14 |
import pytest |
| 15 |
from hypothesis import HealthCheck, given, settings |
| 16 |
from hypothesis import strategies as st |
| 17 |
|
| 18 |
from dlm.export.errors import PreflightError, UnsafeMergeError |
| 19 |
from dlm.export.merge import check_merge_safety |
| 20 |
from dlm.export.plan import ExportPlan |
| 21 |
from dlm.export.tokenizer_sync import read_gguf_vocab_size |
| 22 |
from dlm.io.ulid import mint_ulid |
| 23 |
from dlm.pack.integrity import rollup_sha256 |
| 24 |
|
| 25 |
# --- ULID --------------------------------------------------------------------- |
| 26 |
|
| 27 |
|
| 28 |
class TestUlidMonotonicity: |
| 29 |
"""Crockford ULIDs are 26 chars, time-prefix monotonic within a ms.""" |
| 30 |
|
| 31 |
@given(n=st.integers(min_value=2, max_value=32)) |
| 32 |
def test_each_ulid_is_26_chars(self, n: int) -> None: |
| 33 |
for _ in range(n): |
| 34 |
u = mint_ulid() |
| 35 |
assert len(u) == 26 |
| 36 |
|
| 37 |
@given(n=st.integers(min_value=2, max_value=16)) |
| 38 |
def test_two_ulids_minted_in_sequence_are_distinct(self, n: int) -> None: |
| 39 |
# Even at the same millisecond, random-component collisions should |
| 40 |
# be astronomically unlikely. Property: no duplicates across a run. |
| 41 |
seen = {mint_ulid() for _ in range(n)} |
| 42 |
assert len(seen) == n |
| 43 |
|
| 44 |
def test_alphabet_is_crockford(self) -> None: |
| 45 |
# Crockford base32 excludes I, L, O, U. 32 samples gives high |
| 46 |
# confidence that every alphabet slot has been observed. |
| 47 |
allowed = set(string.digits + "ABCDEFGHJKMNPQRSTVWXYZ") |
| 48 |
for _ in range(32): |
| 49 |
u = mint_ulid() |
| 50 |
assert set(u) <= allowed, f"ULID {u} has non-Crockford char" |
| 51 |
|
| 52 |
|
| 53 |
# --- rollup_sha256 ------------------------------------------------------------ |
| 54 |
|
| 55 |
|
| 56 |
_hex64 = st.text(alphabet="0123456789abcdef", min_size=64, max_size=64) |
| 57 |
_relpath = st.text( |
| 58 |
alphabet=string.ascii_lowercase + string.digits + "/", |
| 59 |
min_size=1, |
| 60 |
max_size=20, |
| 61 |
).filter(lambda s: ".." not in s and not s.startswith("/") and not s.endswith("/")) |
| 62 |
|
| 63 |
|
| 64 |
class TestRollupDeterminism: |
| 65 |
"""Rollup is content-addressable: input → same hex digest every time.""" |
| 66 |
|
| 67 |
@given(st.dictionaries(_relpath, _hex64, min_size=1, max_size=12)) |
| 68 |
def test_rollup_is_deterministic(self, d: dict[str, str]) -> None: |
| 69 |
assert rollup_sha256(d) == rollup_sha256(d) |
| 70 |
|
| 71 |
@given(st.dictionaries(_relpath, _hex64, min_size=2, max_size=12)) |
| 72 |
def test_rollup_is_order_independent(self, d: dict[str, str]) -> None: |
| 73 |
# Reverse-ordered dict → same rollup (sort is the contract). |
| 74 |
reversed_d = dict(reversed(list(d.items()))) |
| 75 |
assert rollup_sha256(d) == rollup_sha256(reversed_d) |
| 76 |
|
| 77 |
@given(st.dictionaries(_relpath, _hex64, min_size=1, max_size=8)) |
| 78 |
def test_rollup_changes_with_content(self, d: dict[str, str]) -> None: |
| 79 |
base = rollup_sha256(d) |
| 80 |
tweaked = {**d, "extra_path": "0" * 64} |
| 81 |
assert rollup_sha256(tweaked) != base |
| 82 |
|
| 83 |
|
| 84 |
# --- merge-safety truth table ------------------------------------------------- |
| 85 |
|
| 86 |
|
| 87 |
class TestMergeSafetyTruthTable: |
| 88 |
"""Exhaustive `(merged, dequantize, was_qlora)` sweep. |
| 89 |
|
| 90 |
Contract (CLAUDE.md pitfall #3): |
| 91 |
- `merged=False`: always safe, regardless of other flags. |
| 92 |
- `merged=True, was_qlora=False`: safe (plain LoRA merge). |
| 93 |
- `merged=True, was_qlora=True, dequantize=False`: REFUSE. |
| 94 |
- `merged=True, was_qlora=True, dequantize=True`: safe (user confirmed). |
| 95 |
""" |
| 96 |
|
| 97 |
@pytest.mark.parametrize("merged", [False, True]) |
| 98 |
@pytest.mark.parametrize("dequantize", [False, True]) |
| 99 |
@pytest.mark.parametrize("was_qlora", [False, True]) |
| 100 |
def test_truth_table(self, merged: bool, dequantize: bool, was_qlora: bool) -> None: |
| 101 |
# `dequantize_confirmed=True` is meaningless without merged=True; |
| 102 |
# `ExportPlan.__post_init__` rejects the combo before we can test it. |
| 103 |
if dequantize and not merged: |
| 104 |
pytest.skip("invalid flag combination: --dequantize without --merged") |
| 105 |
plan = ExportPlan(merged=merged, dequantize_confirmed=dequantize) |
| 106 |
should_refuse = merged and was_qlora and not dequantize |
| 107 |
if should_refuse: |
| 108 |
with pytest.raises(UnsafeMergeError): |
| 109 |
check_merge_safety(plan, was_qlora=was_qlora) |
| 110 |
else: |
| 111 |
check_merge_safety(plan, was_qlora=was_qlora) # no raise |
| 112 |
|
| 113 |
|
| 114 |
# --- GGUF parser fuzz --------------------------------------------------------- |
| 115 |
|
| 116 |
|
| 117 |
class TestGgufParserFuzz: |
| 118 |
"""Feed random bytes at `read_gguf_vocab_size`; it must surface a typed |
| 119 |
`PreflightError`, never leak `struct.error` / `MemoryError` / a raw |
| 120 |
decoded string. Audit-04 T6 / B7 defense-in-depth. |
| 121 |
|
| 122 |
Reusing `tmp_path` across hypothesis iterations is deliberate — we |
| 123 |
overwrite the file each run, and spinning up a fresh dir per-sample |
| 124 |
would dominate the test runtime. |
| 125 |
""" |
| 126 |
|
| 127 |
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], max_examples=50) |
| 128 |
@given( |
| 129 |
payload=st.binary(min_size=0, max_size=256).filter( |
| 130 |
# Skip valid headers — fuzzing valid packets isn't the point. |
| 131 |
lambda b: not b.startswith(b"GGUF") |
| 132 |
) |
| 133 |
) |
| 134 |
def test_random_bytes_raise_preflight_error(self, payload: bytes, tmp_path: Path) -> None: |
| 135 |
path = tmp_path / "fuzz.gguf" |
| 136 |
path.write_bytes(payload) |
| 137 |
with pytest.raises(PreflightError): |
| 138 |
read_gguf_vocab_size(path) |
| 139 |
|
| 140 |
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], max_examples=50) |
| 141 |
@given(body=st.binary(min_size=0, max_size=256)) |
| 142 |
def test_random_body_after_magic_doesnt_crash(self, body: bytes, tmp_path: Path) -> None: |
| 143 |
"""Even with valid magic, garbage body → typed error, no crash.""" |
| 144 |
path = tmp_path / "fuzz.gguf" |
| 145 |
path.write_bytes(b"GGUF" + body) |
| 146 |
with pytest.raises(PreflightError): |
| 147 |
read_gguf_vocab_size(path) |