Python · 26583 bytes Raw Blame History
1 """File operation tools."""
2
3 import asyncio
4 from pathlib import Path
5 from typing import Any
6
7 from ..runtime.permissions import PermissionMode
8 from ..utils.file_mutations import build_file_mutation_preview_dict
9 from .base import ConfirmationRequired, Tool, ToolResult
10 from .fs_safety import (
11 StructuredPatchHunk,
12 apply_structured_patch,
13 coerce_structured_patch_payload,
14 ensure_safe_to_read,
15 ensure_safe_to_write,
16 make_structured_patch,
17 parse_unified_diff_patch,
18 resolve_workspace_path,
19 )
20
21 _GLOB_MAGIC_CHARS = "*?["
22
23
24 def _has_glob_magic(segment: str) -> bool:
25 """Return whether one path segment contains glob syntax."""
26
27 return any(char in segment for char in _GLOB_MAGIC_CHARS)
28
29
30 def _resolve_glob_base_and_pattern(
31 pattern: str,
32 path: str,
33 ) -> tuple[Path, str]:
34 """Resolve glob inputs, including `~`/absolute patterns outside the cwd."""
35
36 expanded_pattern = Path(pattern).expanduser()
37 pattern_is_explicit_path = pattern.startswith("~") or expanded_pattern.is_absolute()
38
39 if not pattern_is_explicit_path:
40 base_path = resolve_workspace_path(path, workspace_root=None)
41 return base_path, pattern
42
43 base_parts: list[str] = []
44 pattern_parts: list[str] = []
45 saw_glob = False
46 for part in expanded_pattern.parts:
47 if saw_glob or _has_glob_magic(part):
48 saw_glob = True
49 pattern_parts.append(part)
50 else:
51 base_parts.append(part)
52
53 if not pattern_parts:
54 if expanded_pattern.name:
55 pattern_parts = [expanded_pattern.name]
56 base_parts = list(expanded_pattern.parent.parts)
57 else:
58 pattern_parts = ["*"]
59
60 raw_base = str(Path(*base_parts)) if base_parts else expanded_pattern.anchor or "."
61 base_path = resolve_workspace_path(raw_base, workspace_root=None)
62 return base_path, "/".join(pattern_parts)
63
64
65 class ReadTool(Tool):
66 """Read file contents."""
67
68 required_permission = PermissionMode.READ_ONLY
69
70 def __init__(self, workspace_root: Path | str | None = None) -> None:
71 self.workspace_root = (
72 Path(workspace_root).expanduser().resolve() if workspace_root else None
73 )
74
75 @property
76 def name(self) -> str:
77 return "read"
78
79 def set_workspace_root(self, workspace_root: Path | None) -> None:
80 self.workspace_root = workspace_root
81
82 @property
83 def description(self) -> str:
84 return "Read the contents of a file. Returns the file content with line numbers."
85
86 @property
87 def parameters(self) -> dict[str, Any]:
88 return {
89 "type": "object",
90 "properties": {
91 "file_path": {
92 "type": "string",
93 "description": "Path to the file to read",
94 },
95 "offset": {
96 "type": "integer",
97 "description": "Line number to start reading from (1-indexed)",
98 "default": 1,
99 },
100 "limit": {
101 "type": "integer",
102 "description": "Maximum number of lines to read",
103 "default": 500,
104 },
105 },
106 "required": ["file_path"],
107 }
108
109 async def execute(
110 self,
111 file_path: str,
112 offset: int = 1,
113 limit: int = 500,
114 **kwargs: Any,
115 ) -> ToolResult:
116 try:
117 # Reads are safe — don't enforce workspace boundary
118 path = resolve_workspace_path(
119 file_path,
120 workspace_root=None,
121 )
122 except FileNotFoundError:
123 return ToolResult(f"File not found: {file_path}", is_error=True)
124 except Exception as exc:
125 return ToolResult(f"Error resolving file path: {exc}", is_error=True)
126
127 if not path.exists():
128 return ToolResult(f"File not found: {file_path}", is_error=True)
129
130 if not path.is_file():
131 return ToolResult(f"Not a file: {file_path}", is_error=True)
132
133 try:
134 ensure_safe_to_read(path)
135 content = await asyncio.to_thread(path.read_text)
136 lines = content.splitlines()
137
138 # Apply offset and limit
139 start_idx = max(0, offset - 1)
140 end_idx = start_idx + limit
141 selected_lines = lines[start_idx:end_idx]
142
143 # Add line numbers
144 numbered = []
145 for i, line in enumerate(selected_lines, start=offset):
146 numbered.append(f"{i:4d}\t{line}")
147
148 output = "\n".join(numbered)
149
150 if end_idx < len(lines):
151 output += f"\n\n... ({len(lines) - end_idx} more lines)"
152
153 return ToolResult(
154 output,
155 metadata={
156 "file_path": str(path),
157 "line_count": len(lines),
158 "offset": offset,
159 "limit": limit,
160 },
161 )
162 except Exception as e:
163 return ToolResult(f"Error reading file: {e}", is_error=True)
164
165
166 class WriteTool(Tool):
167 """Write content to a file."""
168
169 required_permission = PermissionMode.WORKSPACE_WRITE
170
171 def __init__(self, workspace_root: Path | str | None = None) -> None:
172 self.workspace_root = (
173 Path(workspace_root).expanduser().resolve() if workspace_root else None
174 )
175 self._pending_escape_approvals: set[str] = set()
176
177 @property
178 def name(self) -> str:
179 return "write"
180
181 def set_workspace_root(self, workspace_root: Path | None) -> None:
182 self.workspace_root = workspace_root
183
184 @property
185 def description(self) -> str:
186 return "Write content to a file. Creates the file if it doesn't exist."
187
188 @property
189 def parameters(self) -> dict[str, Any]:
190 return {
191 "type": "object",
192 "properties": {
193 "file_path": {
194 "type": "string",
195 "description": "Path to the file to write",
196 },
197 "content": {
198 "type": "string",
199 "description": "Content to write to the file",
200 },
201 },
202 "required": ["file_path", "content"],
203 }
204
205 @property
206 def is_destructive(self) -> bool:
207 return True
208
209 def check_confirmation(self, skip_confirmation: bool = False, **kwargs: Any) -> None:
210 if skip_confirmation:
211 return
212 file_path = kwargs.get("file_path", "")
213 content = kwargs.get("content", "")
214 raise ConfirmationRequired(
215 tool_name=self.name,
216 message=f"Write to file: {file_path}",
217 details=f"{len(content)} bytes",
218 preview=build_file_mutation_preview_dict(self.name, tool_args=kwargs),
219 )
220
221 async def execute(
222 self,
223 file_path: str,
224 content: str,
225 **kwargs: Any,
226 ) -> ToolResult:
227 kwargs.pop("_skip_confirmation", None)
228 try:
229 ensure_safe_to_write(content)
230 path = resolve_workspace_path(
231 file_path,
232 workspace_root=self.workspace_root,
233 allow_missing=True,
234 )
235 except PermissionError:
236 resolved = Path(file_path).expanduser().resolve()
237 key = str(resolved)
238 if key in self._pending_escape_approvals:
239 self._pending_escape_approvals.discard(key)
240 path = resolved
241 else:
242 self._pending_escape_approvals.add(key)
243 raise ConfirmationRequired(
244 tool_name=self.name,
245 message=f"Write outside workspace: {file_path}",
246 details=f"Target is outside the workspace root ({self.workspace_root})",
247 preview=build_file_mutation_preview_dict(
248 self.name,
249 tool_args={"file_path": file_path, "content": content},
250 ),
251 )
252 except Exception as exc:
253 return ToolResult(f"Error writing file: {exc}", is_error=True)
254
255 try:
256 original_content = ""
257 if path.exists():
258 ensure_safe_to_read(path)
259 original_content = await asyncio.to_thread(path.read_text)
260
261 # Create parent directories if needed
262 path.parent.mkdir(parents=True, exist_ok=True)
263
264 await asyncio.to_thread(path.write_text, content)
265
266 structured_patch = [
267 hunk.to_dict()
268 for hunk in make_structured_patch(original_content, content)
269 ]
270 metadata = {
271 "kind": "update" if original_content else "create",
272 "file_path": str(path),
273 "content": content,
274 "original_file": original_content or None,
275 "structured_patch": structured_patch,
276 "bytes_written": len(content.encode("utf-8")),
277 }
278 return ToolResult(
279 f"Successfully wrote {len(content)} bytes to {path}",
280 metadata=metadata,
281 )
282 except Exception as e:
283 return ToolResult(f"Error writing file: {e}", is_error=True)
284
285
286 class EditTool(Tool):
287 """Edit a file by replacing text."""
288
289 required_permission = PermissionMode.WORKSPACE_WRITE
290
291 def __init__(self, workspace_root: Path | str | None = None) -> None:
292 self.workspace_root = (
293 Path(workspace_root).expanduser().resolve() if workspace_root else None
294 )
295 self._pending_escape_approvals: set[str] = set()
296
297 @property
298 def name(self) -> str:
299 return "edit"
300
301 def set_workspace_root(self, workspace_root: Path | None) -> None:
302 self.workspace_root = workspace_root
303
304 @property
305 def description(self) -> str:
306 return (
307 "Edit a file by replacing old_string with new_string. "
308 "The old_string must match exactly."
309 )
310
311 @property
312 def parameters(self) -> dict[str, Any]:
313 return {
314 "type": "object",
315 "properties": {
316 "file_path": {
317 "type": "string",
318 "description": "Path to the file to edit",
319 },
320 "old_string": {
321 "type": "string",
322 "description": "The exact string to replace",
323 },
324 "new_string": {
325 "type": "string",
326 "description": "The replacement string",
327 },
328 },
329 "required": ["file_path", "old_string", "new_string"],
330 }
331
332 @property
333 def is_destructive(self) -> bool:
334 return True
335
336 def check_confirmation(self, skip_confirmation: bool = False, **kwargs: Any) -> None:
337 if skip_confirmation:
338 return
339 file_path = kwargs.get("file_path", "")
340 raise ConfirmationRequired(
341 tool_name=self.name,
342 message=f"Edit file: {file_path}",
343 details="replace text",
344 preview=build_file_mutation_preview_dict(self.name, tool_args=kwargs),
345 )
346
347 async def execute(
348 self,
349 file_path: str | None = None,
350 old_string: str | None = None,
351 new_string: str | None = None,
352 **kwargs: Any,
353 ) -> ToolResult:
354 kwargs.pop("_skip_confirmation", None)
355 if file_path is None:
356 file_path = str(kwargs.pop("path", "") or "")
357 replacement_content = kwargs.pop("content", None)
358 if replacement_content is not None and (
359 old_string is None or new_string is None
360 ):
361 return await self._replace_file_content(
362 file_path,
363 str(replacement_content),
364 )
365 if old_string is None or new_string is None:
366 return ToolResult(
367 "edit requires file_path, old_string, and new_string. "
368 "For whole-file replacement, provide path/file_path with content.",
369 is_error=True,
370 )
371 try:
372 path = resolve_workspace_path(
373 file_path,
374 workspace_root=self.workspace_root,
375 )
376 except FileNotFoundError:
377 return ToolResult(f"File not found: {file_path}", is_error=True)
378 except PermissionError:
379 resolved = Path(file_path).expanduser().resolve()
380 key = str(resolved)
381 if key in self._pending_escape_approvals:
382 self._pending_escape_approvals.discard(key)
383 path = resolved
384 else:
385 self._pending_escape_approvals.add(key)
386 raise ConfirmationRequired(
387 tool_name=self.name,
388 message=f"Edit outside workspace: {file_path}",
389 details=f"Target is outside the workspace root ({self.workspace_root})",
390 preview=build_file_mutation_preview_dict(
391 self.name,
392 tool_args={
393 "file_path": file_path,
394 "old_string": old_string,
395 "new_string": new_string,
396 },
397 ),
398 )
399 except Exception as exc:
400 return ToolResult(f"Error resolving file path: {exc}", is_error=True)
401
402 if not path.exists():
403 return ToolResult(f"File not found: {file_path}", is_error=True)
404
405 try:
406 ensure_safe_to_read(path)
407 content = await asyncio.to_thread(path.read_text)
408
409 if old_string not in content:
410 return ToolResult(
411 "old_string not found in file. Make sure it matches exactly.",
412 is_error=True,
413 )
414
415 # Count occurrences
416 count = content.count(old_string)
417 if count > 1:
418 return ToolResult(
419 "old_string appears "
420 f"{count} times. Please provide more context to make it unique.",
421 is_error=True,
422 )
423
424 new_content = content.replace(old_string, new_string, 1)
425 ensure_safe_to_write(new_content)
426 await asyncio.to_thread(path.write_text, new_content)
427
428 structured_patch = [
429 hunk.to_dict()
430 for hunk in make_structured_patch(content, new_content)
431 ]
432 return ToolResult(
433 f"Successfully edited {path}",
434 metadata={
435 "file_path": str(path),
436 "old_string": old_string,
437 "new_string": new_string,
438 "original_file": content,
439 "structured_patch": structured_patch,
440 },
441 )
442 except Exception as e:
443 return ToolResult(f"Error editing file: {e}", is_error=True)
444
445 async def _replace_file_content(
446 self,
447 file_path: str,
448 new_content: str,
449 ) -> ToolResult:
450 try:
451 path = resolve_workspace_path(
452 file_path,
453 workspace_root=self.workspace_root,
454 )
455 except FileNotFoundError:
456 return ToolResult(f"File not found: {file_path}", is_error=True)
457 except PermissionError:
458 return ToolResult(
459 "Whole-file edit target is outside the workspace root.",
460 is_error=True,
461 )
462 except Exception as exc:
463 return ToolResult(f"Error resolving file path: {exc}", is_error=True)
464
465 if not path.exists():
466 return ToolResult(f"File not found: {file_path}", is_error=True)
467
468 try:
469 ensure_safe_to_read(path)
470 original_content = await asyncio.to_thread(path.read_text)
471 ensure_safe_to_write(new_content)
472 await asyncio.to_thread(path.write_text, new_content)
473 structured_patch = [
474 hunk.to_dict()
475 for hunk in make_structured_patch(original_content, new_content)
476 ]
477 return ToolResult(
478 f"Successfully edited {path}",
479 metadata={
480 "file_path": str(path),
481 "old_string": original_content,
482 "new_string": new_content,
483 "original_file": original_content,
484 "structured_patch": structured_patch,
485 },
486 )
487 except Exception as exc:
488 return ToolResult(f"Error editing file: {exc}", is_error=True)
489
490
491 class PatchTool(Tool):
492 """Edit a file by applying structured patch hunks."""
493
494 required_permission = PermissionMode.WORKSPACE_WRITE
495
496 def __init__(self, workspace_root: Path | str | None = None) -> None:
497 self.workspace_root = (
498 Path(workspace_root).expanduser().resolve() if workspace_root else None
499 )
500 self._pending_escape_approvals: set[str] = set()
501
502 @property
503 def name(self) -> str:
504 return "patch"
505
506 def set_workspace_root(self, workspace_root: Path | None) -> None:
507 self.workspace_root = workspace_root
508
509 @property
510 def description(self) -> str:
511 return (
512 "Apply structured patch hunks to a file. Prefer this for larger "
513 "or multi-line edits where exact old/new string replacement is brittle. "
514 "A raw unified diff string is also accepted via `patch`."
515 )
516
517 @property
518 def parameters(self) -> dict[str, Any]:
519 return {
520 "type": "object",
521 "properties": {
522 "file_path": {
523 "type": "string",
524 "description": "Path to the file to patch",
525 },
526 "hunks": {
527 "type": "array",
528 "description": "Structured patch hunks to apply in order.",
529 "items": {
530 "type": "object",
531 "properties": {
532 "old_start": {"type": "integer"},
533 "old_lines": {"type": "integer"},
534 "new_start": {"type": "integer"},
535 "new_lines": {"type": "integer"},
536 "lines": {
537 "type": "array",
538 "items": {"type": "string"},
539 },
540 },
541 "required": [
542 "old_start",
543 "old_lines",
544 "new_start",
545 "new_lines",
546 "lines",
547 ],
548 },
549 },
550 "patch": {
551 "type": "string",
552 "description": (
553 "Optional unified diff patch string. Loader will parse this "
554 "into structured hunks when possible."
555 ),
556 },
557 },
558 "required": ["file_path"],
559 }
560
561 @property
562 def is_destructive(self) -> bool:
563 return True
564
565 def check_confirmation(self, skip_confirmation: bool = False, **kwargs: Any) -> None:
566 if skip_confirmation:
567 return
568 file_path = kwargs.get("file_path", "")
569 raise ConfirmationRequired(
570 tool_name=self.name,
571 message=f"Patch file: {file_path}",
572 details="apply structured patch hunks",
573 preview=build_file_mutation_preview_dict(self.name, tool_args=kwargs),
574 )
575
576 async def execute(
577 self,
578 file_path: str,
579 hunks: list[dict[str, Any]] | dict[str, Any] | str | None = None,
580 patch: str | None = None,
581 **kwargs: Any,
582 ) -> ToolResult:
583 kwargs.pop("_skip_confirmation", None)
584 try:
585 path = resolve_workspace_path(
586 file_path,
587 workspace_root=self.workspace_root,
588 )
589 except FileNotFoundError:
590 return ToolResult(f"File not found: {file_path}", is_error=True)
591 except PermissionError:
592 resolved = Path(file_path).expanduser().resolve()
593 key = str(resolved)
594 if key in self._pending_escape_approvals:
595 self._pending_escape_approvals.discard(key)
596 path = resolved
597 else:
598 self._pending_escape_approvals.add(key)
599 raise ConfirmationRequired(
600 tool_name=self.name,
601 message=f"Patch outside workspace: {file_path}",
602 details=f"Target is outside the workspace root ({self.workspace_root})",
603 preview=build_file_mutation_preview_dict(
604 self.name,
605 tool_args={"file_path": file_path, "hunks": hunks},
606 ),
607 )
608
609 except Exception as exc:
610 return ToolResult(f"Error resolving file path: {exc}", is_error=True)
611
612 if not path.exists():
613 return ToolResult(f"File not found: {file_path}", is_error=True)
614
615 try:
616 ensure_safe_to_read(path)
617 original_content = await asyncio.to_thread(path.read_text)
618 original_lines = original_content.splitlines()
619 raw_patch = patch or kwargs.get("diff") or kwargs.get("patch_text")
620 structured_hunks = coerce_structured_patch_payload(hunks)
621 parsed_hunks: list[StructuredPatchHunk]
622 if structured_hunks:
623 parsed_hunks = [
624 hunk
625 if isinstance(hunk, StructuredPatchHunk)
626 else StructuredPatchHunk.from_dict_with_original(
627 hunk,
628 original_lines=original_lines,
629 )
630 for hunk in structured_hunks
631 ]
632 elif isinstance(raw_patch, str) and raw_patch.strip():
633 parsed_hunks = parse_unified_diff_patch(raw_patch)
634 else:
635 parsed_hunks = []
636 if not parsed_hunks:
637 raise ValueError("hunks must not be empty")
638 updated_content = apply_structured_patch(original_content, parsed_hunks)
639 ensure_safe_to_write(updated_content)
640 await asyncio.to_thread(path.write_text, updated_content)
641 structured_patch = [hunk.to_dict() for hunk in parsed_hunks]
642 return ToolResult(
643 f"Successfully patched {path}",
644 metadata={
645 "file_path": str(path),
646 "original_file": original_content,
647 "content": updated_content,
648 "structured_patch": structured_patch,
649 "bytes_written": len(updated_content.encode("utf-8")),
650 },
651 )
652 except Exception as exc:
653 return ToolResult(f"Error patching file: {exc}", is_error=True)
654
655
656 class GlobTool(Tool):
657 """Find files matching a glob pattern."""
658
659 required_permission = PermissionMode.READ_ONLY
660
661 def __init__(self, workspace_root: Path | str | None = None) -> None:
662 self.workspace_root = (
663 Path(workspace_root).expanduser().resolve() if workspace_root else None
664 )
665
666 @property
667 def name(self) -> str:
668 return "glob"
669
670 def set_workspace_root(self, workspace_root: Path | None) -> None:
671 self.workspace_root = workspace_root
672
673 @property
674 def description(self) -> str:
675 return (
676 "Find files matching a glob pattern (e.g., '**/*.py', 'src/*.ts'). "
677 "For external directories, prefer path='~/Loader/animals' with "
678 "pattern='*.html'; absolute or '~'-prefixed patterns are also accepted."
679 )
680
681 @property
682 def parameters(self) -> dict[str, Any]:
683 return {
684 "type": "object",
685 "properties": {
686 "pattern": {
687 "type": "string",
688 "description": "Glob pattern to match files",
689 },
690 "path": {
691 "type": "string",
692 "description": "Base directory to search in (default: current directory)",
693 "default": ".",
694 },
695 },
696 "required": ["pattern"],
697 }
698
699 async def execute(
700 self,
701 pattern: str,
702 path: str = ".",
703 **kwargs: Any,
704 ) -> ToolResult:
705 try:
706 # Glob is read-only — don't enforce workspace boundary
707 base_path, effective_pattern = _resolve_glob_base_and_pattern(pattern, path)
708 except FileNotFoundError:
709 return ToolResult(f"Directory not found: {path}", is_error=True)
710 except Exception as exc:
711 return ToolResult(f"Error resolving directory: {exc}", is_error=True)
712
713 if not base_path.exists():
714 missing_target = path if path != "." else str(base_path)
715 return ToolResult(f"Directory not found: {missing_target}", is_error=True)
716
717 try:
718 matches = list(base_path.glob(effective_pattern))
719 # Sort by modification time (newest first)
720 matches.sort(key=lambda p: p.stat().st_mtime, reverse=True)
721
722 # Limit results
723 total_matches = len(matches)
724 truncated = total_matches > 100
725 if truncated:
726 matches = matches[:100]
727 output = "\n".join(str(p) for p in matches)
728 output += f"\n\n... (showing first 100 of {total_matches} matches)"
729 else:
730 output = "\n".join(str(p) for p in matches)
731
732 if not matches:
733 output = f"No files matching pattern: {pattern}"
734
735 return ToolResult(
736 output,
737 metadata={
738 "base_path": str(base_path),
739 "effective_pattern": effective_pattern,
740 "requested_pattern": pattern,
741 "num_files": len(matches),
742 "truncated": truncated if "truncated" in locals() else False,
743 },
744 )
745 except Exception as e:
746 return ToolResult(f"Error searching files: {e}", is_error=True)