Python · 6472 bytes Raw Blame History
1 """Audit-03 cross-cutting: `dlm.io.atomic` atomic-write helper.
2
3 Covers the replacement of three near-identical copies (io/text.py,
4 store/manifest.py, store/paths.py) with a single primitive. Every prior
5 caller's behavior already has a test in its own module; these tests pin
6 down the `atomic` module's direct contract.
7 """
8
9 from __future__ import annotations
10
11 import os
12 from pathlib import Path
13 from unittest.mock import patch
14
15 from dlm.io import atomic
16
17
18 class TestWriteBytes:
19 def test_replaces_target_atomically(self, tmp_path: Path) -> None:
20 target = tmp_path / "file.bin"
21 atomic.write_bytes(target, b"hello")
22 assert target.read_bytes() == b"hello"
23 # No tmp siblings left behind.
24 assert list(tmp_path.iterdir()) == [target]
25
26 def test_overwrites_existing(self, tmp_path: Path) -> None:
27 target = tmp_path / "file.bin"
28 target.write_bytes(b"old")
29 atomic.write_bytes(target, b"new")
30 assert target.read_bytes() == b"new"
31 assert list(tmp_path.iterdir()) == [target]
32
33
34 class TestWriteText:
35 def test_utf8_default(self, tmp_path: Path) -> None:
36 target = tmp_path / "doc.txt"
37 atomic.write_text(target, "hello — world")
38 assert target.read_bytes() == "hello — world".encode()
39
40 def test_custom_encoding(self, tmp_path: Path) -> None:
41 target = tmp_path / "doc.txt"
42 atomic.write_text(target, "hello", encoding="ascii")
43 assert target.read_bytes() == b"hello"
44
45
46 class TestNonceSuffix:
47 """Audit-11 M9: tmp files carry a random nonce so PID reuse can't
48 collide a stale tmp with a live peer's scratch file."""
49
50 def test_tmp_path_includes_nonce(self, tmp_path: Path) -> None:
51 target = tmp_path / "file.bin"
52 tmp = atomic._tmp_path(target)
53 # Shape: `file.bin.tmp.<pid>.<8 hex chars>`
54 parts = tmp.name.rsplit(".", maxsplit=2)
55 assert len(parts) == 3
56 assert parts[1].isdigit() # PID
57 assert len(parts[2]) == 8
58 assert all(c in "0123456789abcdef" for c in parts[2])
59
60 def test_two_calls_yield_different_tmp_names(self, tmp_path: Path) -> None:
61 """Same PID, two writers, two distinct tmps — nonce distinguishes."""
62 target = tmp_path / "file.bin"
63 a = atomic._tmp_path(target)
64 b = atomic._tmp_path(target)
65 assert a != b
66
67 def test_cleanup_recognises_nonce_suffixed_tmp(self, tmp_path: Path) -> None:
68 live = tmp_path / "real.txt.tmp.1.0a1b2c3d"
69 dead = tmp_path / "real.txt.tmp.99999999.deadbeef"
70 live.write_bytes(b"live")
71 dead.write_bytes(b"dead")
72
73 def fake_is_alive(pid: int) -> bool:
74 return pid == 1
75
76 with patch("dlm.io.atomic._is_alive", side_effect=fake_is_alive):
77 removed = atomic.cleanup_stale_tmp_files(tmp_path)
78
79 assert removed == [dead]
80 assert live.exists()
81 assert not dead.exists()
82
83
84 class TestCleanupStaleTmp:
85 def test_cleanup_skips_directories(self, tmp_path: Path) -> None:
86 (tmp_path / "nested.tmp.999.deadbeef").mkdir()
87 assert atomic.cleanup_stale_tmp_files(tmp_path) == []
88
89 def test_removes_only_dead_pid_tmp_files(self, tmp_path: Path) -> None:
90 """Legacy nonce-less tmps still get cleaned up — back-compat for
91 sweeps that span a pre-/post-upgrade writer on the same store."""
92 live = tmp_path / "real.txt.tmp.1"
93 dead = tmp_path / "real.txt.tmp.99999999"
94 live.write_bytes(b"live")
95 dead.write_bytes(b"dead")
96
97 def fake_is_alive(pid: int) -> bool:
98 return pid == 1
99
100 with patch("dlm.io.atomic._is_alive", side_effect=fake_is_alive):
101 removed = atomic.cleanup_stale_tmp_files(tmp_path)
102
103 assert removed == [dead]
104 assert live.exists()
105 assert not dead.exists()
106
107 def test_skips_non_tmp_files(self, tmp_path: Path) -> None:
108 real = tmp_path / "manifest.json"
109 real.write_bytes(b"{}")
110 assert atomic.cleanup_stale_tmp_files(tmp_path) == []
111 assert real.exists()
112
113 def test_missing_directory_returns_empty(self, tmp_path: Path) -> None:
114 assert atomic.cleanup_stale_tmp_files(tmp_path / "absent") == []
115
116 def test_malformed_tmp_suffix_ignored(self, tmp_path: Path) -> None:
117 malformed = tmp_path / "name.tmp.notapid"
118 malformed.write_bytes(b"x")
119 assert atomic.cleanup_stale_tmp_files(tmp_path) == []
120 assert malformed.exists()
121
122 def test_cleanup_ignores_tmp_file_removed_between_list_and_unlink(self, tmp_path: Path) -> None:
123 doomed = tmp_path / "name.tmp.99999999.deadbeef"
124 doomed.write_bytes(b"x")
125
126 real_unlink = Path.unlink
127
128 def fake_unlink(self: Path, *args: object, **kwargs: object) -> None:
129 if self == doomed:
130 raise FileNotFoundError
131 real_unlink(self, *args, **kwargs)
132
133 with (
134 patch("dlm.io.atomic._is_alive", return_value=False),
135 patch("pathlib.Path.unlink", autospec=True, side_effect=fake_unlink),
136 ):
137 assert atomic.cleanup_stale_tmp_files(tmp_path) == []
138
139
140 class TestTmpPid:
141 def test_invalid_regex_pid_falls_back_to_none(self, tmp_path: Path) -> None:
142 target = tmp_path / "file.bin.tmp.1234.deadbeef"
143
144 class FakeMatch:
145 @staticmethod
146 def group(name: str) -> str:
147 assert name == "pid"
148 return "not-a-pid"
149
150 fake_pattern = type(
151 "FakePattern", (), {"search": staticmethod(lambda _name: FakeMatch())}
152 )()
153
154 with patch("dlm.io.atomic._TMP_RE", fake_pattern):
155 assert atomic._tmp_pid(target) is None
156
157
158 class TestIsAlive:
159 def test_zero_or_negative_dead(self) -> None:
160 assert atomic._is_alive(0) is False
161 assert atomic._is_alive(-1) is False
162
163 def test_self_is_alive(self) -> None:
164 assert atomic._is_alive(os.getpid()) is True
165
166 def test_dead_pid_is_dead(self) -> None:
167 # 99999999 is almost certainly not live.
168 assert atomic._is_alive(99999999) is False
169
170 def test_permission_error_treated_as_alive(self) -> None:
171 with patch("dlm.io.atomic.os.kill", side_effect=PermissionError):
172 assert atomic._is_alive(12345) is True
173
174 def test_generic_os_error_treated_as_dead(self) -> None:
175 with patch("dlm.io.atomic.os.kill", side_effect=OSError):
176 assert atomic._is_alive(12345) is False