@@ -10,6 +10,7 @@ from pathlib import Path |
| 10 | 10 | import pytest |
| 11 | 11 | import zstandard as zstd |
| 12 | 12 | |
| 13 | +import dlm.pack.unpacker as unpacker_mod |
| 13 | 14 | from dlm.pack.errors import ( |
| 14 | 15 | PackFormatVersionError, |
| 15 | 16 | PackIntegrityError, |
@@ -97,6 +98,22 @@ class TestHappyPath: |
| 97 | 98 | assert result.dlm_path == tmp_path / "out" / "mydoc.dlm" |
| 98 | 99 | assert result.dlm_path.read_text().startswith("---") |
| 99 | 100 | |
| 101 | + def test_existing_quarantine_is_removed_before_force_install(self, tmp_path: Path) -> None: |
| 102 | + import os |
| 103 | + |
| 104 | + pack_path = _synth_pack(tmp_path) |
| 105 | + home = tmp_path / "home" |
| 106 | + unpack(pack_path, home=home, out_dir=tmp_path / "out1") |
| 107 | + target = home / "store" / "01TEST" |
| 108 | + quarantine = target.parent / f".{target.name}.old-{os.getpid()}" |
| 109 | + quarantine.mkdir(parents=True) |
| 110 | + (quarantine / "stale.txt").write_text("stale", encoding="utf-8") |
| 111 | + |
| 112 | + unpack(pack_path, home=home, force=True, out_dir=tmp_path / "out2") |
| 113 | + |
| 114 | + assert target.exists() |
| 115 | + assert not quarantine.exists() |
| 116 | + |
| 100 | 117 | |
| 101 | 118 | class TestVersionGate: |
| 102 | 119 | def test_newer_than_current_refused(self, tmp_path: Path) -> None: |
@@ -391,6 +408,92 @@ class TestLayoutGate: |
| 391 | 408 | unpack(out, home=tmp_path / "home") |
| 392 | 409 | |
| 393 | 410 | |
| 411 | +class TestReadPackMemberBytes: |
| 412 | + def test_reads_named_member_bytes(self, tmp_path: Path) -> None: |
| 413 | + pack_path = _synth_pack(tmp_path) |
| 414 | + |
| 415 | + data = unpacker_mod.read_pack_member_bytes(pack_path, "dlm/mydoc.dlm") |
| 416 | + |
| 417 | + assert data is not None |
| 418 | + assert data.startswith(b"---") |
| 419 | + |
| 420 | + def test_missing_member_returns_none(self, tmp_path: Path) -> None: |
| 421 | + pack_path = _synth_pack(tmp_path) |
| 422 | + |
| 423 | + assert unpacker_mod.read_pack_member_bytes(pack_path, "provenance.json") is None |
| 424 | + |
| 425 | + def test_unsafe_member_is_refused(self, tmp_path: Path) -> None: |
| 426 | + payload = tmp_path / "payload.txt" |
| 427 | + payload.write_text("evil", encoding="utf-8") |
| 428 | + out = tmp_path / "unsafe.pack" |
| 429 | + cctx = zstd.ZstdCompressor(level=1) |
| 430 | + with out.open("wb") as fh, cctx.stream_writer(fh) as compressor: |
| 431 | + with tarfile.open(fileobj=compressor, mode="w|") as tar: |
| 432 | + tar.add(payload, arcname="../escape") |
| 433 | + |
| 434 | + with pytest.raises(PackLayoutError, match="unsafe tar entry"): |
| 435 | + unpacker_mod.read_pack_member_bytes(out, "../escape") |
| 436 | + |
| 437 | + def test_oversized_member_is_refused( |
| 438 | + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch |
| 439 | + ) -> None: |
| 440 | + monkeypatch.setattr(unpacker_mod, "_MAX_TAR_MEMBER_BYTES", 10) |
| 441 | + payload = tmp_path / "big.bin" |
| 442 | + payload.write_bytes(b"x" * 100) |
| 443 | + out = tmp_path / "big.pack" |
| 444 | + cctx = zstd.ZstdCompressor(level=1) |
| 445 | + with out.open("wb") as fh, cctx.stream_writer(fh) as compressor: |
| 446 | + with tarfile.open(fileobj=compressor, mode="w|") as tar: |
| 447 | + tar.add(payload, arcname="big.bin") |
| 448 | + |
| 449 | + with pytest.raises(PackLayoutError, match="per-member cap"): |
| 450 | + unpacker_mod.read_pack_member_bytes(out, "big.bin") |
| 451 | + |
| 452 | + def test_directory_member_returns_none(self, tmp_path: Path) -> None: |
| 453 | + out = tmp_path / "dir-only.pack" |
| 454 | + cctx = zstd.ZstdCompressor(level=1) |
| 455 | + with out.open("wb") as fh, cctx.stream_writer(fh) as compressor: |
| 456 | + with tarfile.open(fileobj=compressor, mode="w|") as tar: |
| 457 | + info = tarfile.TarInfo(name="folder") |
| 458 | + info.type = tarfile.DIRTYPE |
| 459 | + tar.addfile(info) |
| 460 | + |
| 461 | + assert unpacker_mod.read_pack_member_bytes(out, "folder") is None |
| 462 | + |
| 463 | + |
| 464 | +class TestUnpackerInternals: |
| 465 | + def test_is_unsafe_member_rejects_absolute_and_parent_paths(self) -> None: |
| 466 | + assert unpacker_mod._is_unsafe_member("/abs/path") is True |
| 467 | + assert unpacker_mod._is_unsafe_member("\\windows") is True |
| 468 | + assert unpacker_mod._is_unsafe_member("../escape") is True |
| 469 | + assert unpacker_mod._is_unsafe_member("safe/path") is False |
| 470 | + |
| 471 | + def test_read_header_malformed_raises(self, tmp_path: Path) -> None: |
| 472 | + (tmp_path / HEADER_FILENAME).write_text("{not json", encoding="utf-8") |
| 473 | + |
| 474 | + with pytest.raises(PackLayoutError, match=f"cannot read {HEADER_FILENAME}"): |
| 475 | + unpacker_mod._read_header(tmp_path) |
| 476 | + |
| 477 | + def test_read_manifest_malformed_raises(self, tmp_path: Path) -> None: |
| 478 | + (tmp_path / MANIFEST_FILENAME).write_text("{not json", encoding="utf-8") |
| 479 | + |
| 480 | + with pytest.raises(PackLayoutError, match=f"cannot read {MANIFEST_FILENAME}"): |
| 481 | + unpacker_mod._read_manifest(tmp_path) |
| 482 | + |
| 483 | + def test_find_dlm_file_requires_exactly_one_file(self, tmp_path: Path) -> None: |
| 484 | + dlm_dir = tmp_path / "dlm" |
| 485 | + dlm_dir.mkdir() |
| 486 | + |
| 487 | + with pytest.raises(PackLayoutError, match="expected exactly one .dlm file"): |
| 488 | + unpacker_mod._find_dlm_file(dlm_dir) |
| 489 | + |
| 490 | + (dlm_dir / "a.dlm").write_text("a", encoding="utf-8") |
| 491 | + (dlm_dir / "b.dlm").write_text("b", encoding="utf-8") |
| 492 | + |
| 493 | + with pytest.raises(PackLayoutError, match="expected exactly one .dlm file"): |
| 494 | + unpacker_mod._find_dlm_file(dlm_dir) |
| 495 | + |
| 496 | + |
| 394 | 497 | class TestForce: |
| 395 | 498 | def test_existing_store_refused_without_force(self, tmp_path: Path) -> None: |
| 396 | 499 | pack_path = _synth_pack(tmp_path) |