| 1 | """File operation tools.""" |
| 2 | |
| 3 | import asyncio |
| 4 | from pathlib import Path |
| 5 | from typing import Any |
| 6 | |
| 7 | from ..runtime.permissions import PermissionMode |
| 8 | from ..utils.file_mutations import build_file_mutation_preview_dict |
| 9 | from .base import ConfirmationRequired, Tool, ToolResult |
| 10 | from .fs_safety import ( |
| 11 | StructuredPatchHunk, |
| 12 | apply_structured_patch, |
| 13 | coerce_structured_patch_payload, |
| 14 | ensure_safe_to_read, |
| 15 | ensure_safe_to_write, |
| 16 | make_structured_patch, |
| 17 | parse_unified_diff_patch, |
| 18 | resolve_workspace_path, |
| 19 | ) |
| 20 | |
| 21 | _GLOB_MAGIC_CHARS = "*?[" |
| 22 | |
| 23 | |
| 24 | def _has_glob_magic(segment: str) -> bool: |
| 25 | """Return whether one path segment contains glob syntax.""" |
| 26 | |
| 27 | return any(char in segment for char in _GLOB_MAGIC_CHARS) |
| 28 | |
| 29 | |
| 30 | def _resolve_glob_base_and_pattern( |
| 31 | pattern: str, |
| 32 | path: str, |
| 33 | ) -> tuple[Path, str]: |
| 34 | """Resolve glob inputs, including `~`/absolute patterns outside the cwd.""" |
| 35 | |
| 36 | expanded_pattern = Path(pattern).expanduser() |
| 37 | pattern_is_explicit_path = pattern.startswith("~") or expanded_pattern.is_absolute() |
| 38 | |
| 39 | if not pattern_is_explicit_path: |
| 40 | base_path = resolve_workspace_path(path, workspace_root=None) |
| 41 | return base_path, pattern |
| 42 | |
| 43 | base_parts: list[str] = [] |
| 44 | pattern_parts: list[str] = [] |
| 45 | saw_glob = False |
| 46 | for part in expanded_pattern.parts: |
| 47 | if saw_glob or _has_glob_magic(part): |
| 48 | saw_glob = True |
| 49 | pattern_parts.append(part) |
| 50 | else: |
| 51 | base_parts.append(part) |
| 52 | |
| 53 | if not pattern_parts: |
| 54 | if expanded_pattern.name: |
| 55 | pattern_parts = [expanded_pattern.name] |
| 56 | base_parts = list(expanded_pattern.parent.parts) |
| 57 | else: |
| 58 | pattern_parts = ["*"] |
| 59 | |
| 60 | raw_base = str(Path(*base_parts)) if base_parts else expanded_pattern.anchor or "." |
| 61 | base_path = resolve_workspace_path(raw_base, workspace_root=None) |
| 62 | return base_path, "/".join(pattern_parts) |
| 63 | |
| 64 | |
| 65 | class ReadTool(Tool): |
| 66 | """Read file contents.""" |
| 67 | |
| 68 | required_permission = PermissionMode.READ_ONLY |
| 69 | |
| 70 | def __init__(self, workspace_root: Path | str | None = None) -> None: |
| 71 | self.workspace_root = ( |
| 72 | Path(workspace_root).expanduser().resolve() if workspace_root else None |
| 73 | ) |
| 74 | |
| 75 | @property |
| 76 | def name(self) -> str: |
| 77 | return "read" |
| 78 | |
| 79 | def set_workspace_root(self, workspace_root: Path | None) -> None: |
| 80 | self.workspace_root = workspace_root |
| 81 | |
| 82 | @property |
| 83 | def description(self) -> str: |
| 84 | return "Read the contents of a file. Returns the file content with line numbers." |
| 85 | |
| 86 | @property |
| 87 | def parameters(self) -> dict[str, Any]: |
| 88 | return { |
| 89 | "type": "object", |
| 90 | "properties": { |
| 91 | "file_path": { |
| 92 | "type": "string", |
| 93 | "description": "Path to the file to read", |
| 94 | }, |
| 95 | "offset": { |
| 96 | "type": "integer", |
| 97 | "description": "Line number to start reading from (1-indexed)", |
| 98 | "default": 1, |
| 99 | }, |
| 100 | "limit": { |
| 101 | "type": "integer", |
| 102 | "description": "Maximum number of lines to read", |
| 103 | "default": 500, |
| 104 | }, |
| 105 | }, |
| 106 | "required": ["file_path"], |
| 107 | } |
| 108 | |
| 109 | async def execute( |
| 110 | self, |
| 111 | file_path: str, |
| 112 | offset: int = 1, |
| 113 | limit: int = 500, |
| 114 | **kwargs: Any, |
| 115 | ) -> ToolResult: |
| 116 | try: |
| 117 | # Reads are safe — don't enforce workspace boundary |
| 118 | path = resolve_workspace_path( |
| 119 | file_path, |
| 120 | workspace_root=None, |
| 121 | ) |
| 122 | except FileNotFoundError: |
| 123 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 124 | except Exception as exc: |
| 125 | return ToolResult(f"Error resolving file path: {exc}", is_error=True) |
| 126 | |
| 127 | if not path.exists(): |
| 128 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 129 | |
| 130 | if not path.is_file(): |
| 131 | return ToolResult(f"Not a file: {file_path}", is_error=True) |
| 132 | |
| 133 | try: |
| 134 | ensure_safe_to_read(path) |
| 135 | content = await asyncio.to_thread(path.read_text) |
| 136 | lines = content.splitlines() |
| 137 | |
| 138 | # Apply offset and limit |
| 139 | start_idx = max(0, offset - 1) |
| 140 | end_idx = start_idx + limit |
| 141 | selected_lines = lines[start_idx:end_idx] |
| 142 | |
| 143 | # Add line numbers |
| 144 | numbered = [] |
| 145 | for i, line in enumerate(selected_lines, start=offset): |
| 146 | numbered.append(f"{i:4d}\t{line}") |
| 147 | |
| 148 | output = "\n".join(numbered) |
| 149 | |
| 150 | if end_idx < len(lines): |
| 151 | output += f"\n\n... ({len(lines) - end_idx} more lines)" |
| 152 | |
| 153 | return ToolResult( |
| 154 | output, |
| 155 | metadata={ |
| 156 | "file_path": str(path), |
| 157 | "line_count": len(lines), |
| 158 | "offset": offset, |
| 159 | "limit": limit, |
| 160 | }, |
| 161 | ) |
| 162 | except Exception as e: |
| 163 | return ToolResult(f"Error reading file: {e}", is_error=True) |
| 164 | |
| 165 | |
| 166 | class WriteTool(Tool): |
| 167 | """Write content to a file.""" |
| 168 | |
| 169 | required_permission = PermissionMode.WORKSPACE_WRITE |
| 170 | |
| 171 | def __init__(self, workspace_root: Path | str | None = None) -> None: |
| 172 | self.workspace_root = ( |
| 173 | Path(workspace_root).expanduser().resolve() if workspace_root else None |
| 174 | ) |
| 175 | self._pending_escape_approvals: set[str] = set() |
| 176 | |
| 177 | @property |
| 178 | def name(self) -> str: |
| 179 | return "write" |
| 180 | |
| 181 | def set_workspace_root(self, workspace_root: Path | None) -> None: |
| 182 | self.workspace_root = workspace_root |
| 183 | |
| 184 | @property |
| 185 | def description(self) -> str: |
| 186 | return "Write content to a file. Creates the file if it doesn't exist." |
| 187 | |
| 188 | @property |
| 189 | def parameters(self) -> dict[str, Any]: |
| 190 | return { |
| 191 | "type": "object", |
| 192 | "properties": { |
| 193 | "file_path": { |
| 194 | "type": "string", |
| 195 | "description": "Path to the file to write", |
| 196 | }, |
| 197 | "content": { |
| 198 | "type": "string", |
| 199 | "description": "Content to write to the file", |
| 200 | }, |
| 201 | }, |
| 202 | "required": ["file_path", "content"], |
| 203 | } |
| 204 | |
| 205 | @property |
| 206 | def is_destructive(self) -> bool: |
| 207 | return True |
| 208 | |
| 209 | def check_confirmation(self, skip_confirmation: bool = False, **kwargs: Any) -> None: |
| 210 | if skip_confirmation: |
| 211 | return |
| 212 | file_path = kwargs.get("file_path", "") |
| 213 | content = kwargs.get("content", "") |
| 214 | raise ConfirmationRequired( |
| 215 | tool_name=self.name, |
| 216 | message=f"Write to file: {file_path}", |
| 217 | details=f"{len(content)} bytes", |
| 218 | preview=build_file_mutation_preview_dict(self.name, tool_args=kwargs), |
| 219 | ) |
| 220 | |
| 221 | async def execute( |
| 222 | self, |
| 223 | file_path: str, |
| 224 | content: str, |
| 225 | **kwargs: Any, |
| 226 | ) -> ToolResult: |
| 227 | kwargs.pop("_skip_confirmation", None) |
| 228 | try: |
| 229 | ensure_safe_to_write(content) |
| 230 | path = resolve_workspace_path( |
| 231 | file_path, |
| 232 | workspace_root=self.workspace_root, |
| 233 | allow_missing=True, |
| 234 | ) |
| 235 | except PermissionError: |
| 236 | resolved = Path(file_path).expanduser().resolve() |
| 237 | key = str(resolved) |
| 238 | if key in self._pending_escape_approvals: |
| 239 | self._pending_escape_approvals.discard(key) |
| 240 | path = resolved |
| 241 | else: |
| 242 | self._pending_escape_approvals.add(key) |
| 243 | raise ConfirmationRequired( |
| 244 | tool_name=self.name, |
| 245 | message=f"Write outside workspace: {file_path}", |
| 246 | details=f"Target is outside the workspace root ({self.workspace_root})", |
| 247 | preview=build_file_mutation_preview_dict( |
| 248 | self.name, |
| 249 | tool_args={"file_path": file_path, "content": content}, |
| 250 | ), |
| 251 | ) |
| 252 | except Exception as exc: |
| 253 | return ToolResult(f"Error writing file: {exc}", is_error=True) |
| 254 | |
| 255 | try: |
| 256 | original_content = "" |
| 257 | if path.exists(): |
| 258 | ensure_safe_to_read(path) |
| 259 | original_content = await asyncio.to_thread(path.read_text) |
| 260 | |
| 261 | # Create parent directories if needed |
| 262 | path.parent.mkdir(parents=True, exist_ok=True) |
| 263 | |
| 264 | await asyncio.to_thread(path.write_text, content) |
| 265 | |
| 266 | structured_patch = [ |
| 267 | hunk.to_dict() |
| 268 | for hunk in make_structured_patch(original_content, content) |
| 269 | ] |
| 270 | metadata = { |
| 271 | "kind": "update" if original_content else "create", |
| 272 | "file_path": str(path), |
| 273 | "content": content, |
| 274 | "original_file": original_content or None, |
| 275 | "structured_patch": structured_patch, |
| 276 | "bytes_written": len(content.encode("utf-8")), |
| 277 | } |
| 278 | return ToolResult( |
| 279 | f"Successfully wrote {len(content)} bytes to {path}", |
| 280 | metadata=metadata, |
| 281 | ) |
| 282 | except Exception as e: |
| 283 | return ToolResult(f"Error writing file: {e}", is_error=True) |
| 284 | |
| 285 | |
| 286 | class EditTool(Tool): |
| 287 | """Edit a file by replacing text.""" |
| 288 | |
| 289 | required_permission = PermissionMode.WORKSPACE_WRITE |
| 290 | |
| 291 | def __init__(self, workspace_root: Path | str | None = None) -> None: |
| 292 | self.workspace_root = ( |
| 293 | Path(workspace_root).expanduser().resolve() if workspace_root else None |
| 294 | ) |
| 295 | self._pending_escape_approvals: set[str] = set() |
| 296 | |
| 297 | @property |
| 298 | def name(self) -> str: |
| 299 | return "edit" |
| 300 | |
| 301 | def set_workspace_root(self, workspace_root: Path | None) -> None: |
| 302 | self.workspace_root = workspace_root |
| 303 | |
| 304 | @property |
| 305 | def description(self) -> str: |
| 306 | return ( |
| 307 | "Edit a file by replacing old_string with new_string. " |
| 308 | "The old_string must match exactly." |
| 309 | ) |
| 310 | |
| 311 | @property |
| 312 | def parameters(self) -> dict[str, Any]: |
| 313 | return { |
| 314 | "type": "object", |
| 315 | "properties": { |
| 316 | "file_path": { |
| 317 | "type": "string", |
| 318 | "description": "Path to the file to edit", |
| 319 | }, |
| 320 | "old_string": { |
| 321 | "type": "string", |
| 322 | "description": "The exact string to replace", |
| 323 | }, |
| 324 | "new_string": { |
| 325 | "type": "string", |
| 326 | "description": "The replacement string", |
| 327 | }, |
| 328 | }, |
| 329 | "required": ["file_path", "old_string", "new_string"], |
| 330 | } |
| 331 | |
| 332 | @property |
| 333 | def is_destructive(self) -> bool: |
| 334 | return True |
| 335 | |
| 336 | def check_confirmation(self, skip_confirmation: bool = False, **kwargs: Any) -> None: |
| 337 | if skip_confirmation: |
| 338 | return |
| 339 | file_path = kwargs.get("file_path", "") |
| 340 | raise ConfirmationRequired( |
| 341 | tool_name=self.name, |
| 342 | message=f"Edit file: {file_path}", |
| 343 | details="replace text", |
| 344 | preview=build_file_mutation_preview_dict(self.name, tool_args=kwargs), |
| 345 | ) |
| 346 | |
| 347 | async def execute( |
| 348 | self, |
| 349 | file_path: str | None = None, |
| 350 | old_string: str | None = None, |
| 351 | new_string: str | None = None, |
| 352 | **kwargs: Any, |
| 353 | ) -> ToolResult: |
| 354 | kwargs.pop("_skip_confirmation", None) |
| 355 | if file_path is None: |
| 356 | file_path = str(kwargs.pop("path", "") or "") |
| 357 | replacement_content = kwargs.pop("content", None) |
| 358 | if replacement_content is not None and ( |
| 359 | old_string is None or new_string is None |
| 360 | ): |
| 361 | return await self._replace_file_content( |
| 362 | file_path, |
| 363 | str(replacement_content), |
| 364 | ) |
| 365 | if old_string is None or new_string is None: |
| 366 | return ToolResult( |
| 367 | "edit requires file_path, old_string, and new_string. " |
| 368 | "For whole-file replacement, provide path/file_path with content.", |
| 369 | is_error=True, |
| 370 | ) |
| 371 | try: |
| 372 | path = resolve_workspace_path( |
| 373 | file_path, |
| 374 | workspace_root=self.workspace_root, |
| 375 | ) |
| 376 | except FileNotFoundError: |
| 377 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 378 | except PermissionError: |
| 379 | resolved = Path(file_path).expanduser().resolve() |
| 380 | key = str(resolved) |
| 381 | if key in self._pending_escape_approvals: |
| 382 | self._pending_escape_approvals.discard(key) |
| 383 | path = resolved |
| 384 | else: |
| 385 | self._pending_escape_approvals.add(key) |
| 386 | raise ConfirmationRequired( |
| 387 | tool_name=self.name, |
| 388 | message=f"Edit outside workspace: {file_path}", |
| 389 | details=f"Target is outside the workspace root ({self.workspace_root})", |
| 390 | preview=build_file_mutation_preview_dict( |
| 391 | self.name, |
| 392 | tool_args={ |
| 393 | "file_path": file_path, |
| 394 | "old_string": old_string, |
| 395 | "new_string": new_string, |
| 396 | }, |
| 397 | ), |
| 398 | ) |
| 399 | except Exception as exc: |
| 400 | return ToolResult(f"Error resolving file path: {exc}", is_error=True) |
| 401 | |
| 402 | if not path.exists(): |
| 403 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 404 | |
| 405 | try: |
| 406 | ensure_safe_to_read(path) |
| 407 | content = await asyncio.to_thread(path.read_text) |
| 408 | |
| 409 | if old_string not in content: |
| 410 | return ToolResult( |
| 411 | "old_string not found in file. Make sure it matches exactly.", |
| 412 | is_error=True, |
| 413 | ) |
| 414 | |
| 415 | # Count occurrences |
| 416 | count = content.count(old_string) |
| 417 | if count > 1: |
| 418 | return ToolResult( |
| 419 | "old_string appears " |
| 420 | f"{count} times. Please provide more context to make it unique.", |
| 421 | is_error=True, |
| 422 | ) |
| 423 | |
| 424 | new_content = content.replace(old_string, new_string, 1) |
| 425 | ensure_safe_to_write(new_content) |
| 426 | await asyncio.to_thread(path.write_text, new_content) |
| 427 | |
| 428 | structured_patch = [ |
| 429 | hunk.to_dict() |
| 430 | for hunk in make_structured_patch(content, new_content) |
| 431 | ] |
| 432 | return ToolResult( |
| 433 | f"Successfully edited {path}", |
| 434 | metadata={ |
| 435 | "file_path": str(path), |
| 436 | "old_string": old_string, |
| 437 | "new_string": new_string, |
| 438 | "original_file": content, |
| 439 | "structured_patch": structured_patch, |
| 440 | }, |
| 441 | ) |
| 442 | except Exception as e: |
| 443 | return ToolResult(f"Error editing file: {e}", is_error=True) |
| 444 | |
| 445 | async def _replace_file_content( |
| 446 | self, |
| 447 | file_path: str, |
| 448 | new_content: str, |
| 449 | ) -> ToolResult: |
| 450 | try: |
| 451 | path = resolve_workspace_path( |
| 452 | file_path, |
| 453 | workspace_root=self.workspace_root, |
| 454 | ) |
| 455 | except FileNotFoundError: |
| 456 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 457 | except PermissionError: |
| 458 | return ToolResult( |
| 459 | "Whole-file edit target is outside the workspace root.", |
| 460 | is_error=True, |
| 461 | ) |
| 462 | except Exception as exc: |
| 463 | return ToolResult(f"Error resolving file path: {exc}", is_error=True) |
| 464 | |
| 465 | if not path.exists(): |
| 466 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 467 | |
| 468 | try: |
| 469 | ensure_safe_to_read(path) |
| 470 | original_content = await asyncio.to_thread(path.read_text) |
| 471 | ensure_safe_to_write(new_content) |
| 472 | await asyncio.to_thread(path.write_text, new_content) |
| 473 | structured_patch = [ |
| 474 | hunk.to_dict() |
| 475 | for hunk in make_structured_patch(original_content, new_content) |
| 476 | ] |
| 477 | return ToolResult( |
| 478 | f"Successfully edited {path}", |
| 479 | metadata={ |
| 480 | "file_path": str(path), |
| 481 | "old_string": original_content, |
| 482 | "new_string": new_content, |
| 483 | "original_file": original_content, |
| 484 | "structured_patch": structured_patch, |
| 485 | }, |
| 486 | ) |
| 487 | except Exception as exc: |
| 488 | return ToolResult(f"Error editing file: {exc}", is_error=True) |
| 489 | |
| 490 | |
| 491 | class PatchTool(Tool): |
| 492 | """Edit a file by applying structured patch hunks.""" |
| 493 | |
| 494 | required_permission = PermissionMode.WORKSPACE_WRITE |
| 495 | |
| 496 | def __init__(self, workspace_root: Path | str | None = None) -> None: |
| 497 | self.workspace_root = ( |
| 498 | Path(workspace_root).expanduser().resolve() if workspace_root else None |
| 499 | ) |
| 500 | self._pending_escape_approvals: set[str] = set() |
| 501 | |
| 502 | @property |
| 503 | def name(self) -> str: |
| 504 | return "patch" |
| 505 | |
| 506 | def set_workspace_root(self, workspace_root: Path | None) -> None: |
| 507 | self.workspace_root = workspace_root |
| 508 | |
| 509 | @property |
| 510 | def description(self) -> str: |
| 511 | return ( |
| 512 | "Apply structured patch hunks to a file. Prefer this for larger " |
| 513 | "or multi-line edits where exact old/new string replacement is brittle. " |
| 514 | "A raw unified diff string is also accepted via `patch`." |
| 515 | ) |
| 516 | |
| 517 | @property |
| 518 | def parameters(self) -> dict[str, Any]: |
| 519 | return { |
| 520 | "type": "object", |
| 521 | "properties": { |
| 522 | "file_path": { |
| 523 | "type": "string", |
| 524 | "description": "Path to the file to patch", |
| 525 | }, |
| 526 | "hunks": { |
| 527 | "type": "array", |
| 528 | "description": "Structured patch hunks to apply in order.", |
| 529 | "items": { |
| 530 | "type": "object", |
| 531 | "properties": { |
| 532 | "old_start": {"type": "integer"}, |
| 533 | "old_lines": {"type": "integer"}, |
| 534 | "new_start": {"type": "integer"}, |
| 535 | "new_lines": {"type": "integer"}, |
| 536 | "lines": { |
| 537 | "type": "array", |
| 538 | "items": {"type": "string"}, |
| 539 | }, |
| 540 | }, |
| 541 | "required": [ |
| 542 | "old_start", |
| 543 | "old_lines", |
| 544 | "new_start", |
| 545 | "new_lines", |
| 546 | "lines", |
| 547 | ], |
| 548 | }, |
| 549 | }, |
| 550 | "patch": { |
| 551 | "type": "string", |
| 552 | "description": ( |
| 553 | "Optional unified diff patch string. Loader will parse this " |
| 554 | "into structured hunks when possible." |
| 555 | ), |
| 556 | }, |
| 557 | }, |
| 558 | "required": ["file_path"], |
| 559 | } |
| 560 | |
| 561 | @property |
| 562 | def is_destructive(self) -> bool: |
| 563 | return True |
| 564 | |
| 565 | def check_confirmation(self, skip_confirmation: bool = False, **kwargs: Any) -> None: |
| 566 | if skip_confirmation: |
| 567 | return |
| 568 | file_path = kwargs.get("file_path", "") |
| 569 | raise ConfirmationRequired( |
| 570 | tool_name=self.name, |
| 571 | message=f"Patch file: {file_path}", |
| 572 | details="apply structured patch hunks", |
| 573 | preview=build_file_mutation_preview_dict(self.name, tool_args=kwargs), |
| 574 | ) |
| 575 | |
| 576 | async def execute( |
| 577 | self, |
| 578 | file_path: str, |
| 579 | hunks: list[dict[str, Any]] | dict[str, Any] | str | None = None, |
| 580 | patch: str | None = None, |
| 581 | **kwargs: Any, |
| 582 | ) -> ToolResult: |
| 583 | kwargs.pop("_skip_confirmation", None) |
| 584 | try: |
| 585 | path = resolve_workspace_path( |
| 586 | file_path, |
| 587 | workspace_root=self.workspace_root, |
| 588 | ) |
| 589 | except FileNotFoundError: |
| 590 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 591 | except PermissionError: |
| 592 | resolved = Path(file_path).expanduser().resolve() |
| 593 | key = str(resolved) |
| 594 | if key in self._pending_escape_approvals: |
| 595 | self._pending_escape_approvals.discard(key) |
| 596 | path = resolved |
| 597 | else: |
| 598 | self._pending_escape_approvals.add(key) |
| 599 | raise ConfirmationRequired( |
| 600 | tool_name=self.name, |
| 601 | message=f"Patch outside workspace: {file_path}", |
| 602 | details=f"Target is outside the workspace root ({self.workspace_root})", |
| 603 | preview=build_file_mutation_preview_dict( |
| 604 | self.name, |
| 605 | tool_args={"file_path": file_path, "hunks": hunks}, |
| 606 | ), |
| 607 | ) |
| 608 | |
| 609 | except Exception as exc: |
| 610 | return ToolResult(f"Error resolving file path: {exc}", is_error=True) |
| 611 | |
| 612 | if not path.exists(): |
| 613 | return ToolResult(f"File not found: {file_path}", is_error=True) |
| 614 | |
| 615 | try: |
| 616 | ensure_safe_to_read(path) |
| 617 | original_content = await asyncio.to_thread(path.read_text) |
| 618 | original_lines = original_content.splitlines() |
| 619 | raw_patch = patch or kwargs.get("diff") or kwargs.get("patch_text") |
| 620 | structured_hunks = coerce_structured_patch_payload(hunks) |
| 621 | parsed_hunks: list[StructuredPatchHunk] |
| 622 | if structured_hunks: |
| 623 | parsed_hunks = [ |
| 624 | hunk |
| 625 | if isinstance(hunk, StructuredPatchHunk) |
| 626 | else StructuredPatchHunk.from_dict_with_original( |
| 627 | hunk, |
| 628 | original_lines=original_lines, |
| 629 | ) |
| 630 | for hunk in structured_hunks |
| 631 | ] |
| 632 | elif isinstance(raw_patch, str) and raw_patch.strip(): |
| 633 | parsed_hunks = parse_unified_diff_patch(raw_patch) |
| 634 | else: |
| 635 | parsed_hunks = [] |
| 636 | if not parsed_hunks: |
| 637 | raise ValueError("hunks must not be empty") |
| 638 | updated_content = apply_structured_patch(original_content, parsed_hunks) |
| 639 | ensure_safe_to_write(updated_content) |
| 640 | await asyncio.to_thread(path.write_text, updated_content) |
| 641 | structured_patch = [hunk.to_dict() for hunk in parsed_hunks] |
| 642 | return ToolResult( |
| 643 | f"Successfully patched {path}", |
| 644 | metadata={ |
| 645 | "file_path": str(path), |
| 646 | "original_file": original_content, |
| 647 | "content": updated_content, |
| 648 | "structured_patch": structured_patch, |
| 649 | "bytes_written": len(updated_content.encode("utf-8")), |
| 650 | }, |
| 651 | ) |
| 652 | except Exception as exc: |
| 653 | return ToolResult(f"Error patching file: {exc}", is_error=True) |
| 654 | |
| 655 | |
| 656 | class GlobTool(Tool): |
| 657 | """Find files matching a glob pattern.""" |
| 658 | |
| 659 | required_permission = PermissionMode.READ_ONLY |
| 660 | |
| 661 | def __init__(self, workspace_root: Path | str | None = None) -> None: |
| 662 | self.workspace_root = ( |
| 663 | Path(workspace_root).expanduser().resolve() if workspace_root else None |
| 664 | ) |
| 665 | |
| 666 | @property |
| 667 | def name(self) -> str: |
| 668 | return "glob" |
| 669 | |
| 670 | def set_workspace_root(self, workspace_root: Path | None) -> None: |
| 671 | self.workspace_root = workspace_root |
| 672 | |
| 673 | @property |
| 674 | def description(self) -> str: |
| 675 | return ( |
| 676 | "Find files matching a glob pattern (e.g., '**/*.py', 'src/*.ts'). " |
| 677 | "For external directories, prefer path='~/Loader/animals' with " |
| 678 | "pattern='*.html'; absolute or '~'-prefixed patterns are also accepted." |
| 679 | ) |
| 680 | |
| 681 | @property |
| 682 | def parameters(self) -> dict[str, Any]: |
| 683 | return { |
| 684 | "type": "object", |
| 685 | "properties": { |
| 686 | "pattern": { |
| 687 | "type": "string", |
| 688 | "description": "Glob pattern to match files", |
| 689 | }, |
| 690 | "path": { |
| 691 | "type": "string", |
| 692 | "description": "Base directory to search in (default: current directory)", |
| 693 | "default": ".", |
| 694 | }, |
| 695 | }, |
| 696 | "required": ["pattern"], |
| 697 | } |
| 698 | |
| 699 | async def execute( |
| 700 | self, |
| 701 | pattern: str, |
| 702 | path: str = ".", |
| 703 | **kwargs: Any, |
| 704 | ) -> ToolResult: |
| 705 | try: |
| 706 | # Glob is read-only — don't enforce workspace boundary |
| 707 | base_path, effective_pattern = _resolve_glob_base_and_pattern(pattern, path) |
| 708 | except FileNotFoundError: |
| 709 | return ToolResult(f"Directory not found: {path}", is_error=True) |
| 710 | except Exception as exc: |
| 711 | return ToolResult(f"Error resolving directory: {exc}", is_error=True) |
| 712 | |
| 713 | if not base_path.exists(): |
| 714 | missing_target = path if path != "." else str(base_path) |
| 715 | return ToolResult(f"Directory not found: {missing_target}", is_error=True) |
| 716 | |
| 717 | try: |
| 718 | matches = list(base_path.glob(effective_pattern)) |
| 719 | # Sort by modification time (newest first) |
| 720 | matches.sort(key=lambda p: p.stat().st_mtime, reverse=True) |
| 721 | |
| 722 | # Limit results |
| 723 | total_matches = len(matches) |
| 724 | truncated = total_matches > 100 |
| 725 | if truncated: |
| 726 | matches = matches[:100] |
| 727 | output = "\n".join(str(p) for p in matches) |
| 728 | output += f"\n\n... (showing first 100 of {total_matches} matches)" |
| 729 | else: |
| 730 | output = "\n".join(str(p) for p in matches) |
| 731 | |
| 732 | if not matches: |
| 733 | output = f"No files matching pattern: {pattern}" |
| 734 | |
| 735 | return ToolResult( |
| 736 | output, |
| 737 | metadata={ |
| 738 | "base_path": str(base_path), |
| 739 | "effective_pattern": effective_pattern, |
| 740 | "requested_pattern": pattern, |
| 741 | "num_files": len(matches), |
| 742 | "truncated": truncated if "truncated" in locals() else False, |
| 743 | }, |
| 744 | ) |
| 745 | except Exception as e: |
| 746 | return ToolResult(f"Error searching files: {e}", is_error=True) |