tenseleyflow/loader / 4472f8b

Browse files

Normalize safe shell probes

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
4472f8b05b8449ace32dec2e172932f31bab7e03
Parents
fd483bf
Tree
a9ef64d

2 changed files

StatusFile+-
M src/loader/runtime/executor.py 95 0
M tests/test_permissions.py 33 0
src/loader/runtime/executor.pymodified
@@ -3,9 +3,11 @@
33
 from __future__ import annotations
44
 
55
 import inspect
6
+import shlex
67
 from collections.abc import Awaitable, Callable
78
 from dataclasses import dataclass
89
 from enum import StrEnum
10
+from pathlib import Path
911
 from typing import Any
1012
 
1113
 from ..llm.base import Message, ToolCall
@@ -542,6 +544,10 @@ class ToolExecutor:
542544
 def _normalize_tool_call_arguments(tool_call: ToolCall) -> ToolCall:
543545
     """Accept narrow, unambiguous argument aliases from local models."""
544546
 
547
+    shell_alias = _normalize_shell_probe_tool_call(tool_call)
548
+    if shell_alias is not None:
549
+        return shell_alias
550
+
545551
     if tool_call.name != "edit":
546552
         return tool_call
547553
 
@@ -554,3 +560,92 @@ def _normalize_tool_call_arguments(tool_call: ToolCall) -> ToolCall:
554560
             arguments=arguments,
555561
         )
556562
     return tool_call
563
+
564
+
565
+_SHELL_PROBE_TOOL_ALIASES = frozenset(
566
+    {"cat", "find", "head", "ls", "pwd", "stat", "tail"}
567
+)
568
+_SHELL_PROBE_TARGET_KEYS = (
569
+    "path",
570
+    "directory",
571
+    "dir",
572
+    "folder",
573
+    "file_path",
574
+    "file",
575
+    "target",
576
+)
577
+_SHELL_PROBE_OPTION_KEYS = ("flags", "options", "args")
578
+_SHELL_PROBE_PASSTHROUGH_KEYS = ("cwd", "timeout", "background")
579
+
580
+
581
+def _normalize_shell_probe_tool_call(tool_call: ToolCall) -> ToolCall | None:
582
+    """Map hallucinated native shell-probe tools to the real bash tool safely."""
583
+
584
+    command_name = tool_call.name.strip().lower()
585
+    if command_name not in _SHELL_PROBE_TOOL_ALIASES:
586
+        return None
587
+
588
+    arguments = dict(tool_call.arguments)
589
+    parts = [command_name]
590
+    parts.extend(_quote_option_tokens(arguments))
591
+    parts.extend(_quote_target_tokens(arguments))
592
+
593
+    bash_arguments: dict[str, Any] = {"command": " ".join(parts)}
594
+    for key in _SHELL_PROBE_PASSTHROUGH_KEYS:
595
+        if key in arguments:
596
+            bash_arguments[key] = arguments[key]
597
+
598
+    return ToolCall(
599
+        id=tool_call.id,
600
+        name="bash",
601
+        arguments=bash_arguments,
602
+    )
603
+
604
+
605
+def _quote_option_tokens(arguments: dict[str, Any]) -> list[str]:
606
+    quoted: list[str] = []
607
+    for key in _SHELL_PROBE_OPTION_KEYS:
608
+        value = arguments.get(key)
609
+        if value is None:
610
+            continue
611
+        quoted.extend(shlex.quote(token) for token in _split_shell_probe_options(value))
612
+    return quoted
613
+
614
+
615
+def _quote_target_tokens(arguments: dict[str, Any]) -> list[str]:
616
+    quoted: list[str] = []
617
+    for key in _SHELL_PROBE_TARGET_KEYS:
618
+        value = arguments.get(key)
619
+        if value is None:
620
+            continue
621
+        quoted.extend(
622
+            shlex.quote(_expand_shell_probe_target(token))
623
+            for token in _as_tokens(value)
624
+        )
625
+        break
626
+    return quoted
627
+
628
+
629
+def _split_shell_probe_options(value: Any) -> list[str]:
630
+    if isinstance(value, (list, tuple)):
631
+        return [str(item).strip() for item in value if str(item).strip()]
632
+    text = str(value).strip()
633
+    if not text:
634
+        return []
635
+    try:
636
+        return [token for token in shlex.split(text) if token.strip()]
637
+    except ValueError:
638
+        return [text]
639
+
640
+
641
+def _as_tokens(value: Any) -> list[str]:
642
+    if isinstance(value, (list, tuple)):
643
+        return [str(item).strip() for item in value if str(item).strip()]
644
+    text = str(value).strip()
645
+    return [text] if text else []
646
+
647
+
648
+def _expand_shell_probe_target(value: str) -> str:
649
+    if value == "~" or value.startswith("~/"):
650
+        return str(Path(value).expanduser())
651
+    return value
tests/test_permissions.pymodified
@@ -239,6 +239,39 @@ async def test_executor_accepts_edit_content_alias_for_new_string(
239239
     assert outcome.tool_call.arguments["new_string"] == "<h1>New</h1>"
240240
 
241241
 
242
+@pytest.mark.asyncio
243
+async def test_executor_maps_native_ls_alias_to_read_only_bash(
244
+    temp_dir: Path,
245
+    monkeypatch: pytest.MonkeyPatch,
246
+) -> None:
247
+    monkeypatch.setenv("HOME", str(temp_dir))
248
+    target_dir = temp_dir / "Loader"
249
+    target_dir.mkdir()
250
+    (target_dir / "notes.txt").write_text("details\n")
251
+    registry = create_default_registry(temp_dir)
252
+    policy = build_permission_policy(
253
+        active_mode=PermissionMode.WORKSPACE_WRITE,
254
+        workspace_root=temp_dir,
255
+        tool_requirements=registry.get_tool_requirements(),
256
+    )
257
+    executor = ToolExecutor(registry, RuntimeTracer(), policy)
258
+
259
+    outcome = await executor.execute_tool_call(
260
+        ToolCall(
261
+            id="ls-1",
262
+            name="ls",
263
+            arguments={"path": "~/Loader"},
264
+        ),
265
+        source="native",
266
+    )
267
+
268
+    assert outcome.state == ToolExecutionState.EXECUTED
269
+    assert outcome.tool_call.name == "bash"
270
+    assert outcome.tool_call.arguments["command"] == f"ls {target_dir}"
271
+    assert outcome.required_permission == PermissionMode.READ_ONLY
272
+    assert "notes.txt" in outcome.result_output
273
+
274
+
242275
 @pytest.mark.asyncio
243276
 async def test_ask_rule_prompts_even_when_allow_mode(temp_dir: Path) -> None:
244277
     prompts: list[str] = []