Python · 9637 bytes Raw Blame History
1 """Store inspection + orphan detection."""
2
3 from __future__ import annotations
4
5 from datetime import datetime, timedelta
6 from pathlib import Path
7
8 import pytest
9
10 from dlm.store.inspect import _directory_size, _discover_named_adapters, _max_version, inspect_store
11 from dlm.store.manifest import Manifest, TrainingRunSummary, save_manifest
12 from dlm.store.paths import StorePath, for_dlm
13 from tests.fixtures.dlm_factory import make_dlm
14
15 VALID_ID = "01HZ4X7TGZM3J1A2B3C4D5E6F7"
16 OTHER_ID = "01HZ4X7TGZM3J1A2B3C4D5E6F8"
17
18
19 @pytest.fixture
20 def populated_store(tmp_path: Path) -> StorePath:
21 """A store with manifest + empty layout so inspect_store can walk it."""
22 store = for_dlm(VALID_ID, home=tmp_path)
23 store.ensure_layout()
24 manifest = Manifest(
25 dlm_id=VALID_ID,
26 base_model="smollm2-135m",
27 base_model_revision="abc",
28 adapter_version=2,
29 training_runs=[
30 TrainingRunSummary(
31 run_id=1,
32 started_at=datetime(2026, 4, 18, 10, 0),
33 ended_at=datetime(2026, 4, 18, 10, 5),
34 adapter_version=1,
35 seed=42,
36 steps=100,
37 ),
38 TrainingRunSummary(
39 run_id=2,
40 started_at=datetime(2026, 4, 18, 11, 0),
41 ended_at=datetime(2026, 4, 18, 11, 10),
42 adapter_version=2,
43 seed=42,
44 steps=150,
45 ),
46 ],
47 content_hashes={"abc": "def"},
48 pinned_versions={"torch": "2.11.0"},
49 )
50 save_manifest(store.manifest, manifest)
51 # Populate an adapter version dir + pointer to simulate Sprint 09 output.
52 v2 = store.adapter_version(2)
53 v2.mkdir(parents=True, exist_ok=True)
54 (v2 / "adapter_model.safetensors").write_bytes(b"\x00" * 1024)
55 store.set_current_adapter(v2)
56 return store
57
58
59 class TestInspectBasic:
60 def test_reports_manifest_fields(self, populated_store: StorePath) -> None:
61 result = inspect_store(populated_store)
62 assert result.dlm_id == VALID_ID
63 assert result.base_model == "smollm2-135m"
64 assert result.base_model_revision == "abc"
65 assert result.adapter_version == 2
66 assert result.training_runs == 2
67
68 def test_has_adapter_current_true(self, populated_store: StorePath) -> None:
69 result = inspect_store(populated_store)
70 assert result.has_adapter_current is True
71
72 def test_last_trained_at_is_max(self, populated_store: StorePath) -> None:
73 result = inspect_store(populated_store)
74 assert result.last_trained_at == datetime(2026, 4, 18, 11, 10)
75
76 def test_total_size_includes_adapter_bytes(self, populated_store: StorePath) -> None:
77 result = inspect_store(populated_store)
78 assert result.total_size_bytes >= 1024
79
80 def test_replay_size_is_zero_when_empty(self, populated_store: StorePath) -> None:
81 result = inspect_store(populated_store)
82 assert result.replay_size_bytes == 0
83
84 def test_content_hashes_and_pins_propagate(self, populated_store: StorePath) -> None:
85 result = inspect_store(populated_store)
86 assert result.content_hashes == {"abc": "def"}
87 assert result.pinned_versions == {"torch": "2.11.0"}
88
89
90 class TestInspectNoAdapter:
91 def test_has_adapter_current_false_when_not_set(self, tmp_path: Path) -> None:
92 store = for_dlm(VALID_ID, home=tmp_path)
93 store.ensure_layout()
94 manifest = Manifest(dlm_id=VALID_ID, base_model="x")
95 save_manifest(store.manifest, manifest)
96 result = inspect_store(store)
97 assert result.has_adapter_current is False
98 assert result.last_trained_at is None
99 assert result.training_runs == 0
100
101
102 class TestOrphanDetection:
103 def test_orphan_when_source_missing(self, populated_store: StorePath, tmp_path: Path) -> None:
104 missing = tmp_path / "gone.dlm"
105 result = inspect_store(populated_store, source_path=missing)
106 assert result.orphaned is True
107 assert result.source_path == missing
108
109 def test_not_orphan_when_source_exists_and_matches(
110 self, populated_store: StorePath, tmp_path: Path
111 ) -> None:
112 source = tmp_path / "mine.dlm"
113 source.write_text(make_dlm(dlm_id=VALID_ID), encoding="utf-8")
114 result = inspect_store(populated_store, source_path=source)
115 assert result.orphaned is False
116
117 def test_orphan_when_source_has_wrong_dlm_id(
118 self, populated_store: StorePath, tmp_path: Path
119 ) -> None:
120 source = tmp_path / "other.dlm"
121 source.write_text(make_dlm(dlm_id=OTHER_ID), encoding="utf-8")
122 result = inspect_store(populated_store, source_path=source)
123 assert result.orphaned is True
124
125 def test_not_orphan_when_source_path_unknown(self, populated_store: StorePath) -> None:
126 result = inspect_store(populated_store)
127 assert result.orphaned is False
128 assert result.source_path is None
129
130 def test_manifest_source_path_used_when_no_arg(self, tmp_path: Path) -> None:
131 source = tmp_path / "mine.dlm"
132 source.write_text(make_dlm(dlm_id=VALID_ID), encoding="utf-8")
133 store = for_dlm(VALID_ID, home=tmp_path)
134 store.ensure_layout()
135 save_manifest(
136 store.manifest,
137 Manifest(
138 dlm_id=VALID_ID,
139 base_model="x",
140 source_path=source,
141 ),
142 )
143 result = inspect_store(store)
144 assert result.orphaned is False
145 assert result.source_path == source
146
147 def test_corrupt_source_file_treated_as_orphan(
148 self, populated_store: StorePath, tmp_path: Path
149 ) -> None:
150 source = tmp_path / "garbage.dlm"
151 source.write_text("not a dlm", encoding="utf-8")
152 result = inspect_store(populated_store, source_path=source)
153 assert result.orphaned is True
154
155
156 class TestTimelineEdges:
157 def test_running_run_does_not_set_last_trained_at(self, tmp_path: Path) -> None:
158 store = for_dlm(VALID_ID, home=tmp_path)
159 store.ensure_layout()
160 manifest = Manifest(
161 dlm_id=VALID_ID,
162 base_model="x",
163 training_runs=[
164 TrainingRunSummary(
165 run_id=1,
166 started_at=datetime(2026, 4, 18),
167 ended_at=None,
168 status="running",
169 adapter_version=1,
170 seed=0,
171 )
172 ],
173 )
174 save_manifest(store.manifest, manifest)
175 result = inspect_store(store)
176 assert result.last_trained_at is None
177
178 def test_mixed_runs_pick_latest_ended(self, tmp_path: Path) -> None:
179 store = for_dlm(VALID_ID, home=tmp_path)
180 store.ensure_layout()
181 base = datetime(2026, 4, 18, 10, 0)
182 manifest = Manifest(
183 dlm_id=VALID_ID,
184 base_model="x",
185 training_runs=[
186 TrainingRunSummary(
187 run_id=1,
188 started_at=base,
189 ended_at=base + timedelta(minutes=5),
190 adapter_version=1,
191 seed=0,
192 ),
193 TrainingRunSummary(
194 run_id=2,
195 started_at=base + timedelta(minutes=10),
196 ended_at=None,
197 status="running",
198 adapter_version=2,
199 seed=0,
200 ),
201 ],
202 )
203 save_manifest(store.manifest, manifest)
204 result = inspect_store(store)
205 assert result.last_trained_at == base + timedelta(minutes=5)
206
207
208 class TestInspectCoverageEdges:
209 def test_discover_named_adapters_on_missing_adapter_dir(self, tmp_path: Path) -> None:
210 store = for_dlm(VALID_ID, home=tmp_path)
211 assert _discover_named_adapters(store) == []
212
213 def test_directory_size_ignores_stat_errors(self, tmp_path: Path) -> None:
214 path = tmp_path / "root"
215 path.mkdir()
216 good = path / "good.bin"
217 good.write_bytes(b"1234")
218
219 class _BadPath:
220 def is_file(self) -> bool:
221 return True
222
223 def stat(self): # type: ignore[no-untyped-def]
224 raise OSError("transient")
225
226 monkeypatch = pytest.MonkeyPatch()
227 monkeypatch.setattr(Path, "rglob", lambda self, _pattern: iter([good, _BadPath()]))
228 try:
229 assert _directory_size(path) == 4
230 finally:
231 monkeypatch.undo()
232
233 def test_discover_named_adapters_tolerates_pointer_probe_errors(self, tmp_path: Path) -> None:
234 store = for_dlm(VALID_ID, home=tmp_path)
235 store.ensure_layout()
236 named = store.adapter / "knowledge"
237 (named / "versions" / "v0002").mkdir(parents=True)
238
239 def _boom(_name: str) -> None:
240 raise OSError("pointer unreadable")
241
242 monkeypatch = pytest.MonkeyPatch()
243 monkeypatch.setattr(
244 StorePath, "resolve_current_adapter_for", lambda self, _name: _boom(_name)
245 )
246 try:
247 states = _discover_named_adapters(store)
248 finally:
249 monkeypatch.undo()
250
251 assert states == [type(states[0])(name="knowledge", has_current=False, latest_version=2)]
252
253 def test_max_version_ignores_non_version_entries(self, tmp_path: Path) -> None:
254 versions = tmp_path / "versions"
255 versions.mkdir()
256 (versions / "v0002").mkdir()
257 (versions / "vbad").mkdir()
258 (versions / "notes").mkdir()
259 (versions / "v0009").write_text("not a dir", encoding="utf-8")
260 assert _max_version(versions) == 2