Python · 13155 bytes Raw Blame History
1 """Tests for permission policy and tool lifecycle hooks."""
2
3 from __future__ import annotations
4
5 from pathlib import Path
6
7 import pytest
8
9 from loader.llm.base import ToolCall
10 from loader.runtime.executor import ToolExecutionState, ToolExecutor
11 from loader.runtime.hooks import (
12 BaseToolHook,
13 FilePathAliasHook,
14 HookDecision,
15 HookContext,
16 HookManager,
17 HookResult,
18 SearchPathAliasHook,
19 )
20 from loader.runtime.permissions import (
21 PermissionMode,
22 PermissionOverride,
23 PermissionRuleDisposition,
24 PermissionRuleSet,
25 build_permission_policy,
26 )
27 from loader.runtime.tracing import RuntimeTracer
28 from loader.tools.base import create_default_registry
29
30
31 class RecordingHook(BaseToolHook):
32 """Hook that records lifecycle events."""
33
34 def __init__(self, events: list[str]) -> None:
35 self.events = events
36
37 async def pre_tool_use(self, context) -> HookResult:
38 self.events.append("pre_tool_use")
39 return HookResult()
40
41 async def post_tool_use(self, context) -> HookResult:
42 self.events.append("post_tool_use")
43 return HookResult()
44
45 async def post_tool_use_failure(self, context) -> HookResult:
46 self.events.append("post_tool_use_failure")
47 return HookResult()
48
49
50 class DenyInPreHook(BaseToolHook):
51 """Hook that denies execution before the tool runs."""
52
53 def __init__(self, events: list[str]) -> None:
54 self.events = events
55
56 async def pre_tool_use(self, context) -> HookResult:
57 self.events.append("pre_tool_use")
58 return HookResult(
59 decision=HookDecision.DENY,
60 message="[Blocked - denied by test hook]",
61 terminal_state="blocked",
62 )
63
64 async def post_tool_use_failure(self, context) -> HookResult:
65 self.events.append("post_tool_use_failure")
66 return HookResult()
67
68
69 @pytest.mark.asyncio
70 async def test_permission_policy_honors_overrides(temp_dir: Path) -> None:
71 policy = build_permission_policy(
72 active_mode=PermissionMode.READ_ONLY,
73 workspace_root=temp_dir,
74 tool_requirements={"write": PermissionMode.WORKSPACE_WRITE},
75 )
76
77 denied = policy.authorize("write")
78 allowed = policy.authorize("write", override=PermissionOverride.ALLOW)
79 asked = policy.authorize("write", override=PermissionOverride.ASK)
80
81 assert denied.decision.value == "deny"
82 assert allowed.allowed
83 assert asked.decision.value == "ask"
84
85
86 def test_permission_mode_parsing_supports_prompt_and_allow() -> None:
87 assert PermissionMode.from_str("prompt") == PermissionMode.PROMPT
88 assert PermissionMode.from_str("allow") == PermissionMode.ALLOW
89
90
91 def test_permission_policy_honors_rule_precedence(temp_dir: Path) -> None:
92 policy = build_permission_policy(
93 active_mode=PermissionMode.ALLOW,
94 workspace_root=temp_dir,
95 tool_requirements={"write": PermissionMode.WORKSPACE_WRITE},
96 rules=PermissionRuleSet.from_dict(
97 {
98 "allow": [{"tool": "write", "contains": "safe change"}],
99 "deny": [{"tool": "write", "path_contains": "secrets"}],
100 "ask": [{"tool": "write", "path_contains": "README"}],
101 }
102 ),
103 )
104
105 denied = policy.authorize(
106 "write",
107 arguments={
108 "file_path": str(temp_dir / "secrets.txt"),
109 "content": "safe change\n",
110 },
111 )
112 asked = policy.authorize(
113 "write",
114 arguments={
115 "file_path": str(temp_dir / "README.md"),
116 "content": "safe change\n",
117 },
118 )
119 allowed = policy.authorize(
120 "write",
121 arguments={
122 "file_path": str(temp_dir / "notes.txt"),
123 "content": "safe change\n",
124 },
125 )
126
127 assert denied.decision.value == "deny"
128 assert denied.matched_disposition == PermissionRuleDisposition.DENY
129 assert asked.decision.value == "ask"
130 assert asked.matched_disposition == PermissionRuleDisposition.ASK
131 assert allowed.decision.value == "allow"
132 assert allowed.matched_disposition == PermissionRuleDisposition.ALLOW
133
134
135 @pytest.mark.asyncio
136 async def test_prompt_mode_executor_prompts_once_and_respects_denial(
137 temp_dir: Path,
138 ) -> None:
139 prompts: list[tuple[str, str, str]] = []
140 registry = create_default_registry(temp_dir)
141 policy = build_permission_policy(
142 active_mode=PermissionMode.PROMPT,
143 workspace_root=temp_dir,
144 tool_requirements=registry.get_tool_requirements(),
145 )
146 executor = ToolExecutor(registry, RuntimeTracer(), policy)
147 target = temp_dir / "prompted.txt"
148
149 async def deny(tool_name: str, message: str, details: str) -> bool:
150 prompts.append((tool_name, message, details))
151 return False
152
153 outcome = await executor.execute_tool_call(
154 ToolCall(
155 id="write-1",
156 name="write",
157 arguments={"file_path": str(target), "content": "prompted\n"},
158 ),
159 source="native",
160 on_confirmation=deny,
161 )
162
163 assert outcome.state == ToolExecutionState.DECLINED
164 assert not target.exists()
165 assert len(prompts) == 1
166 assert "active_mode=prompt" in prompts[0][2]
167 assert "required_mode=workspace-write" in prompts[0][2]
168
169
170 @pytest.mark.asyncio
171 async def test_allow_mode_executor_skips_prompt_for_destructive_write(
172 temp_dir: Path,
173 ) -> None:
174 prompts: list[str] = []
175 registry = create_default_registry(temp_dir)
176 policy = build_permission_policy(
177 active_mode=PermissionMode.ALLOW,
178 workspace_root=temp_dir,
179 tool_requirements=registry.get_tool_requirements(),
180 )
181 executor = ToolExecutor(registry, RuntimeTracer(), policy)
182 target = temp_dir / "allowed.txt"
183
184 async def unexpected(tool_name: str, message: str, details: str) -> bool:
185 prompts.append(tool_name)
186 return False
187
188 outcome = await executor.execute_tool_call(
189 ToolCall(
190 id="write-1",
191 name="write",
192 arguments={"file_path": str(target), "content": "allowed\n"},
193 ),
194 source="native",
195 on_confirmation=unexpected,
196 )
197
198 assert outcome.state == ToolExecutionState.EXECUTED
199 assert target.read_text() == "allowed\n"
200 assert prompts == []
201
202
203 @pytest.mark.asyncio
204 async def test_ask_rule_prompts_even_when_allow_mode(temp_dir: Path) -> None:
205 prompts: list[str] = []
206 registry = create_default_registry(temp_dir)
207 policy = build_permission_policy(
208 active_mode=PermissionMode.ALLOW,
209 workspace_root=temp_dir,
210 tool_requirements=registry.get_tool_requirements(),
211 rules=PermissionRuleSet.from_dict(
212 {"ask": [{"tool": "write", "path_contains": "README"}]}
213 ),
214 )
215 executor = ToolExecutor(registry, RuntimeTracer(), policy)
216 target = temp_dir / "README.md"
217
218 async def deny(tool_name: str, message: str, details: str) -> bool:
219 prompts.append(details)
220 return False
221
222 outcome = await executor.execute_tool_call(
223 ToolCall(
224 id="write-1",
225 name="write",
226 arguments={"file_path": str(target), "content": "no thanks\n"},
227 ),
228 source="native",
229 on_confirmation=deny,
230 )
231
232 assert outcome.state == ToolExecutionState.DECLINED
233 assert not target.exists()
234 assert len(prompts) == 1
235 assert "matched_ask_rule=tool=write, path_contains=README" in prompts[0]
236
237
238 @pytest.mark.asyncio
239 async def test_hook_lifecycle_runs_in_order_for_success(temp_dir: Path) -> None:
240 events: list[str] = []
241 registry = create_default_registry(temp_dir)
242 policy = build_permission_policy(
243 active_mode=PermissionMode.WORKSPACE_WRITE,
244 workspace_root=temp_dir,
245 tool_requirements=registry.get_tool_requirements(),
246 )
247 executor = ToolExecutor(
248 registry,
249 RuntimeTracer(),
250 policy,
251 hooks=HookManager([RecordingHook(events)]),
252 )
253 target = temp_dir / "hook-success.txt"
254
255 outcome = await executor.execute_tool_call(
256 ToolCall(
257 id="write-1",
258 name="write",
259 arguments={"file_path": str(target), "content": "hook success\n"},
260 ),
261 source="native",
262 skip_confirmation=True,
263 )
264
265 assert outcome.state == ToolExecutionState.EXECUTED
266 assert events == ["pre_tool_use", "post_tool_use"]
267 assert target.read_text() == "hook success\n"
268
269
270 @pytest.mark.asyncio
271 async def test_pre_hook_deny_still_runs_failure_hook_once(temp_dir: Path) -> None:
272 events: list[str] = []
273 registry = create_default_registry(temp_dir)
274 policy = build_permission_policy(
275 active_mode=PermissionMode.WORKSPACE_WRITE,
276 workspace_root=temp_dir,
277 tool_requirements=registry.get_tool_requirements(),
278 )
279 executor = ToolExecutor(
280 registry,
281 RuntimeTracer(),
282 policy,
283 hooks=HookManager([DenyInPreHook(events)]),
284 )
285 target = temp_dir / "hook-denied.txt"
286
287 outcome = await executor.execute_tool_call(
288 ToolCall(
289 id="write-1",
290 name="write",
291 arguments={"file_path": str(target), "content": "should not exist\n"},
292 ),
293 source="native",
294 skip_confirmation=True,
295 )
296
297 assert outcome.state == ToolExecutionState.BLOCKED
298 assert events == ["pre_tool_use", "post_tool_use_failure"]
299 assert not target.exists()
300 assert len(outcome.message.tool_results) == 1
301 assert "denied by test hook" in outcome.event_content
302
303
304 @pytest.mark.asyncio
305 @pytest.mark.parametrize(
306 ("tool_name", "arguments", "expected_path"),
307 [
308 ("read", {"file": "notes.txt"}, "notes.txt"),
309 ("write", {"filepath": "notes.txt", "content": "hello\n"}, "notes.txt"),
310 (
311 "edit",
312 {"filePath": "notes.txt", "old_string": "before", "new_string": "after"},
313 "notes.txt",
314 ),
315 ("patch", {"path": "notes.txt", "hunks": []}, "notes.txt"),
316 ],
317 )
318 async def test_file_path_alias_hook_canonicalizes_common_aliases(
319 temp_dir: Path,
320 tool_name: str,
321 arguments: dict[str, object],
322 expected_path: str,
323 ) -> None:
324 registry = create_default_registry(temp_dir)
325 policy = build_permission_policy(
326 active_mode=PermissionMode.WORKSPACE_WRITE,
327 workspace_root=temp_dir,
328 tool_requirements=registry.get_tool_requirements(),
329 )
330 hook = FilePathAliasHook()
331
332 result = await hook.pre_tool_use(
333 HookContext(
334 tool_call=ToolCall(id=f"{tool_name}-1", name=tool_name, arguments=arguments),
335 tool=registry.get(tool_name),
336 registry=registry,
337 permission_policy=policy,
338 source="native",
339 )
340 )
341
342 assert result.updated_arguments is not None
343 assert result.updated_arguments["file_path"] == expected_path
344 for alias in ("file", "filepath", "filePath", "filename", "path"):
345 assert alias not in result.updated_arguments
346
347
348 @pytest.mark.asyncio
349 @pytest.mark.parametrize(
350 ("tool_name", "arguments", "expected_path"),
351 [
352 ("glob", {"pattern": "*.html", "directory": "chapters"}, "chapters"),
353 ("grep", {"pattern": "alpha", "dir": "src"}, "src"),
354 ],
355 )
356 async def test_search_path_alias_hook_canonicalizes_common_aliases(
357 temp_dir: Path,
358 tool_name: str,
359 arguments: dict[str, object],
360 expected_path: str,
361 ) -> None:
362 registry = create_default_registry(temp_dir)
363 policy = build_permission_policy(
364 active_mode=PermissionMode.WORKSPACE_WRITE,
365 workspace_root=temp_dir,
366 tool_requirements=registry.get_tool_requirements(),
367 )
368 hook = SearchPathAliasHook()
369
370 result = await hook.pre_tool_use(
371 HookContext(
372 tool_call=ToolCall(id=f"{tool_name}-1", name=tool_name, arguments=arguments),
373 tool=registry.get(tool_name),
374 registry=registry,
375 permission_policy=policy,
376 source="native",
377 )
378 )
379
380 assert result.updated_arguments is not None
381 assert result.updated_arguments["path"] == expected_path
382 for alias in ("directory", "dir", "folder"):
383 assert alias not in result.updated_arguments
384
385
386 @pytest.mark.asyncio
387 async def test_search_path_alias_hook_splits_full_glob_pattern(
388 temp_dir: Path,
389 ) -> None:
390 registry = create_default_registry(temp_dir)
391 policy = build_permission_policy(
392 active_mode=PermissionMode.WORKSPACE_WRITE,
393 workspace_root=temp_dir,
394 tool_requirements=registry.get_tool_requirements(),
395 )
396 hook = SearchPathAliasHook()
397 chapters = temp_dir / "chapters"
398
399 result = await hook.pre_tool_use(
400 HookContext(
401 tool_call=ToolCall(
402 id="glob-1",
403 name="glob",
404 arguments={"pattern": f"{chapters}/*.html"},
405 ),
406 tool=registry.get("glob"),
407 registry=registry,
408 permission_policy=policy,
409 source="native",
410 )
411 )
412
413 assert result.updated_arguments is not None
414 assert result.updated_arguments["path"] == str(chapters)
415 assert result.updated_arguments["pattern"] == "*.html"