Python · 13790 bytes Raw Blame History
1 """Unit coverage for the share push orchestrator."""
2
3 from __future__ import annotations
4
5 import importlib
6 import io
7 import json
8 import tarfile
9 from pathlib import Path
10 from types import SimpleNamespace
11 from typing import cast
12
13 import pytest
14 import zstandard as zstd
15
16 from dlm.share.errors import ShareError, SinkError
17 from dlm.share.push import (
18 PushResult,
19 _collect_readme_fields,
20 _dispatch_push,
21 _ensure_pack,
22 _noop,
23 _sign_pack,
24 push,
25 )
26 from dlm.share.sinks import SinkKind, SinkSpec
27
28 push_mod = importlib.import_module("dlm.share.push")
29
30
31 def _write_pack_with_header(tmp_path: Path, header: dict[str, str]) -> Path:
32 tar_bytes = io.BytesIO()
33 with tarfile.open(fileobj=tar_bytes, mode="w") as tar:
34 payload = json.dumps(header).encode("utf-8")
35 info = tarfile.TarInfo("pack/header.json")
36 info.size = len(payload)
37 tar.addfile(info, io.BytesIO(payload))
38 pack_path = tmp_path / "bundle.dlm.pack"
39 with pack_path.open("wb") as dst, zstd.ZstdCompressor().stream_writer(dst) as writer:
40 writer.write(tar_bytes.getvalue())
41 return pack_path
42
43
44 class TestPush:
45 def test_push_rejects_peer_destinations(
46 self,
47 tmp_path: Path,
48 monkeypatch: pytest.MonkeyPatch,
49 ) -> None:
50 monkeypatch.setattr(
51 push_mod,
52 "parse_source",
53 lambda destination: SinkSpec(kind=SinkKind.PEER, target=destination),
54 )
55
56 with pytest.raises(ShareError, match="push to peer:// is not supported"):
57 push(tmp_path / "doc.dlm", "peer://host:7337/doc?token=abc")
58
59 def test_push_signs_dispatches_and_cleans_up(
60 self,
61 tmp_path: Path,
62 monkeypatch: pytest.MonkeyPatch,
63 ) -> None:
64 source = tmp_path / "doc.dlm"
65 source.write_text("body", encoding="utf-8")
66 pack_path = tmp_path / "doc.dlm.pack"
67 cleanup_called = False
68 order: list[str] = []
69 progress = object()
70 expected = PushResult(
71 destination="https://example.test/upload",
72 sink_kind=SinkKind.URL,
73 bytes_sent=11,
74 )
75
76 monkeypatch.setattr(
77 push_mod,
78 "parse_source",
79 lambda destination: SinkSpec(kind=SinkKind.URL, target=destination),
80 )
81
82 def _fake_ensure_pack(
83 actual_source: Path,
84 *,
85 include_exports: bool,
86 include_base: bool,
87 include_logs: bool,
88 licensee_acceptance_url: str | None,
89 ) -> tuple[Path, object]:
90 nonlocal cleanup_called
91 assert actual_source == source
92 assert include_exports is True
93 assert include_base is True
94 assert include_logs is True
95 assert licensee_acceptance_url == "https://license.example/accept"
96 pack_path.write_bytes(b"packed-bytes")
97
98 def _cleanup() -> None:
99 nonlocal cleanup_called
100 cleanup_called = True
101
102 return pack_path, _cleanup
103
104 def _fake_sign_pack(actual_pack: Path) -> None:
105 order.append("sign")
106 assert actual_pack == pack_path
107
108 def _fake_dispatch(
109 actual_pack: Path,
110 spec: SinkSpec,
111 *,
112 progress: object | None,
113 ) -> PushResult:
114 order.append("dispatch")
115 assert actual_pack == pack_path
116 assert spec == SinkSpec(
117 kind=SinkKind.URL,
118 target="https://example.test/upload",
119 )
120 assert progress is not None
121 return expected
122
123 monkeypatch.setattr(push_mod, "_ensure_pack", _fake_ensure_pack)
124 monkeypatch.setattr(push_mod, "_sign_pack", _fake_sign_pack)
125 monkeypatch.setattr(push_mod, "_dispatch_push", _fake_dispatch)
126
127 result = push(
128 source,
129 "https://example.test/upload",
130 sign=True,
131 include_exports=True,
132 include_base=True,
133 include_logs=True,
134 licensee_acceptance_url="https://license.example/accept",
135 progress=cast("object", progress),
136 )
137
138 assert result == expected
139 assert order == ["sign", "dispatch"]
140 assert cleanup_called is True
141
142 def test_push_cleans_up_when_dispatch_raises(
143 self,
144 tmp_path: Path,
145 monkeypatch: pytest.MonkeyPatch,
146 ) -> None:
147 source = tmp_path / "doc.dlm"
148 source.write_text("body", encoding="utf-8")
149 pack_path = tmp_path / "doc.dlm.pack"
150 cleanup_called = False
151
152 monkeypatch.setattr(
153 push_mod,
154 "parse_source",
155 lambda destination: SinkSpec(kind=SinkKind.URL, target=destination),
156 )
157 monkeypatch.setattr(
158 push_mod,
159 "_ensure_pack",
160 lambda *args, **kwargs: (
161 pack_path,
162 lambda: globals().__setitem__("_unused", None),
163 ),
164 )
165
166 def _cleanup() -> None:
167 nonlocal cleanup_called
168 cleanup_called = True
169
170 monkeypatch.setattr(push_mod, "_ensure_pack", lambda *args, **kwargs: (pack_path, _cleanup))
171 monkeypatch.setattr(
172 push_mod,
173 "_dispatch_push",
174 lambda *args, **kwargs: (_ for _ in ()).throw(SinkError("boom")),
175 )
176
177 with pytest.raises(SinkError, match="boom"):
178 push(source, "https://example.test/upload")
179
180 assert cleanup_called is True
181
182
183 class TestEnsurePack:
184 def test_ensure_pack_keeps_existing_pack(self, tmp_path: Path) -> None:
185 pack_path = tmp_path / "doc.dlm.pack"
186 pack_path.write_bytes(b"already-packed")
187
188 actual_path, cleanup = _ensure_pack(
189 pack_path,
190 include_exports=False,
191 include_base=False,
192 include_logs=False,
193 licensee_acceptance_url=None,
194 )
195
196 assert actual_path == pack_path
197 assert cleanup is _noop
198
199 def test_ensure_pack_packs_dlm_and_cleans_up(
200 self,
201 tmp_path: Path,
202 monkeypatch: pytest.MonkeyPatch,
203 ) -> None:
204 source = tmp_path / "doc.dlm"
205 source.write_text("body", encoding="utf-8")
206 seen: dict[str, object] = {}
207
208 def _fake_pack(
209 actual_source: Path,
210 *,
211 out: Path,
212 include_exports: bool,
213 include_base: bool,
214 include_logs: bool,
215 licensee_acceptance_url: str | None,
216 ) -> SimpleNamespace:
217 seen["source"] = actual_source
218 seen["out"] = out
219 seen["include_exports"] = include_exports
220 seen["include_base"] = include_base
221 seen["include_logs"] = include_logs
222 seen["license"] = licensee_acceptance_url
223 out.write_bytes(b"packed")
224 return SimpleNamespace(path=out)
225
226 monkeypatch.setattr(push_mod, "pack", _fake_pack)
227
228 actual_path, cleanup = _ensure_pack(
229 source,
230 include_exports=True,
231 include_base=True,
232 include_logs=True,
233 licensee_acceptance_url="https://license.example/accept",
234 )
235
236 temp_dir = actual_path.parent
237 assert actual_path.read_bytes() == b"packed"
238 assert seen == {
239 "source": source,
240 "out": actual_path,
241 "include_exports": True,
242 "include_base": True,
243 "include_logs": True,
244 "license": "https://license.example/accept",
245 }
246
247 cleanup()
248 assert not temp_dir.exists()
249
250
251 class TestSignPack:
252 def test_sign_pack_calls_sign_file(
253 self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
254 ) -> None:
255 import dlm.share.signing as signing
256
257 pack_path = tmp_path / "bundle.dlm.pack"
258 pack_path.write_bytes(b"packed")
259 sig_path = pack_path.with_suffix(pack_path.suffix + ".minisig")
260 seen: dict[str, object] = {}
261
262 def _fake_sign_file(target: Path, *, comment: str | None = None) -> Path:
263 seen["target"] = target
264 seen["comment"] = comment
265 sig_path.write_text("signature", encoding="utf-8")
266 return sig_path
267
268 monkeypatch.setattr(signing, "sign_file", _fake_sign_file)
269
270 _sign_pack(pack_path)
271
272 assert seen == {
273 "target": pack_path,
274 "comment": f"dlm push {pack_path.name}",
275 }
276
277 def test_sign_pack_propagates_missing_minisign(
278 self,
279 tmp_path: Path,
280 monkeypatch: pytest.MonkeyPatch,
281 ) -> None:
282 import dlm.share.signing as signing
283
284 pack_path = tmp_path / "bundle.dlm.pack"
285 pack_path.write_bytes(b"packed")
286
287 def _fake_sign_file(target: Path, *, comment: str | None = None) -> Path:
288 raise signing.MinisignNotAvailableError("missing")
289
290 monkeypatch.setattr(signing, "sign_file", _fake_sign_file)
291
292 with pytest.raises(signing.MinisignNotAvailableError, match="missing"):
293 _sign_pack(pack_path)
294
295
296 class TestDispatchPush:
297 def test_dispatch_push_hf_uploads_pack(
298 self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
299 ) -> None:
300 import dlm.share.hf_sink as hf_sink
301
302 pack_path = tmp_path / "bundle.dlm.pack"
303 pack_path.write_bytes(b"packed")
304 progress = object()
305 seen: dict[str, object] = {}
306
307 def _fake_push_hf(
308 actual_pack: Path,
309 repo_id: str,
310 *,
311 private: bool = False,
312 readme_fields: dict[str, str] | None = None,
313 progress: object | None = None,
314 ) -> SimpleNamespace:
315 seen["pack"] = actual_pack
316 seen["repo_id"] = repo_id
317 seen["private"] = private
318 seen["readme_fields"] = readme_fields
319 seen["progress"] = progress
320 return SimpleNamespace(
321 pack_url="https://huggingface.co/org/repo/blob/main/adapter.dlm.pack"
322 )
323
324 monkeypatch.setattr(hf_sink, "push_hf", _fake_push_hf)
325 monkeypatch.setattr(
326 push_mod,
327 "_collect_readme_fields",
328 lambda path: {"dlm_id": "01HZPUSH", "base_model": "qwen3-4b"},
329 )
330
331 result = _dispatch_push(
332 pack_path,
333 SinkSpec(kind=SinkKind.HF, target="org/repo"),
334 progress=cast("object", progress),
335 )
336
337 assert result == PushResult(
338 destination="hf:org/repo",
339 sink_kind=SinkKind.HF,
340 bytes_sent=len(b"packed"),
341 detail="pack: https://huggingface.co/org/repo/blob/main/adapter.dlm.pack",
342 )
343 assert seen == {
344 "pack": pack_path,
345 "repo_id": "org/repo",
346 "private": False,
347 "readme_fields": {"dlm_id": "01HZPUSH", "base_model": "qwen3-4b"},
348 "progress": progress,
349 }
350
351 def test_dispatch_push_url_uploads_pack(
352 self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
353 ) -> None:
354 import dlm.share.url_sink as url_sink
355
356 pack_path = tmp_path / "bundle.dlm.pack"
357 pack_path.write_bytes(b"packed")
358 seen: dict[str, object] = {}
359
360 def _fake_push_url(actual_pack: Path, url: str, *, progress: object | None = None) -> None:
361 seen["pack"] = actual_pack
362 seen["url"] = url
363 seen["progress"] = progress
364
365 monkeypatch.setattr(url_sink, "push_url", _fake_push_url)
366
367 result = _dispatch_push(
368 pack_path,
369 SinkSpec(kind=SinkKind.URL, target="https://example.test/upload"),
370 progress=None,
371 )
372
373 assert result == PushResult(
374 destination="https://example.test/upload",
375 sink_kind=SinkKind.URL,
376 bytes_sent=len(b"packed"),
377 )
378 assert seen == {
379 "pack": pack_path,
380 "url": "https://example.test/upload",
381 "progress": None,
382 }
383
384 def test_dispatch_push_local_copies_pack(self, tmp_path: Path) -> None:
385 pack_path = tmp_path / "bundle.dlm.pack"
386 pack_path.write_bytes(b"packed")
387 dest = tmp_path / "nested" / "copy.dlm.pack"
388
389 result = _dispatch_push(
390 pack_path,
391 SinkSpec(kind=SinkKind.LOCAL, target=str(dest)),
392 progress=None,
393 )
394
395 assert result == PushResult(
396 destination=str(dest),
397 sink_kind=SinkKind.LOCAL,
398 bytes_sent=len(b"packed"),
399 )
400 assert dest.read_bytes() == b"packed"
401
402 def test_dispatch_push_rejects_unsupported_kind(self, tmp_path: Path) -> None:
403 pack_path = tmp_path / "bundle.dlm.pack"
404 pack_path.write_bytes(b"packed")
405
406 with pytest.raises(SinkError, match="unsupported sink kind"):
407 _dispatch_push(
408 pack_path,
409 SinkSpec(kind=cast("SinkKind", "weird"), target="x"),
410 progress=None,
411 )
412
413
414 class TestReadmeFields:
415 def test_collect_readme_fields_from_pack(self, tmp_path: Path) -> None:
416 pack_path = _write_pack_with_header(
417 tmp_path,
418 {
419 "dlm_id": "01HZHEADER",
420 "base_model": "qwen3-8b",
421 "adapter_version": "v0007",
422 },
423 )
424
425 assert _collect_readme_fields(pack_path) == {
426 "dlm_id": "01HZHEADER",
427 "base_model": "qwen3-8b",
428 "adapter_version": "v0007",
429 }
430
431 def test_collect_readme_fields_returns_empty_on_bad_pack(self, tmp_path: Path) -> None:
432 assert _collect_readme_fields(tmp_path / "missing.dlm.pack") == {}
433
434 def test_noop_is_noop(self) -> None:
435 assert _noop() is None