Python · 24445 bytes Raw Blame History
1 #!/usr/bin/env python3
2 """
3 Interactive test runner for shell.
4
5 Runs both YAML-based test specifications and pytest test files.
6 """
7
8 import sys
9 import os
10 import argparse
11 import time
12 from pathlib import Path
13 from datetime import datetime
14 from typing import List, Dict, Any, Tuple, Optional
15
16 import gc
17 import re
18 import yaml
19 import pexpect
20 from colorama import init, Fore, Style
21
22 # Add parent directory to path for imports
23 sys.path.insert(0, str(Path(__file__).parent))
24
25 from shell_pty import ShellPTY, ShellTestSession
26 from utils.keys import KEYS, get_key
27 from utils.matchers import (
28 OutputMatcher, match_exact, match_contains, match_regex,
29 MatchResult
30 )
31
32 # Initialize colorama for cross-platform colors (strip=False to avoid OSC issues on macOS)
33 init(strip=False, convert=False)
34
35
36 def strip_control_sequences(text: str) -> str:
37 """Remove ANSI and OSC control sequences from text."""
38 # Remove OSC sequences (like terminal title)
39 text = re.sub(r'\x1b\].*?(?:\x07|\x1b\\)', '', text)
40 # Remove CSI sequences
41 text = re.sub(r'\x1b\[[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e]', '', text)
42 # Remove other escape sequences
43 text = re.sub(r'\x1b[^\[\]].?', '', text)
44 return text
45
46
47 class TestResult:
48 """Result of a single test."""
49
50 def __init__(self, name: str, passed: bool, error: str = "", duration: float = 0.0):
51 self.name = name
52 self.passed = passed
53 self.error = error
54 self.duration = duration
55 self.test_id = "" # e.g., "[history] 5"
56
57
58 class YAMLTestRunner:
59 """
60 Runs tests defined in YAML specification files.
61
62 Uses session reuse to avoid PTY exhaustion - reuses the same
63 shell session across multiple tests, resetting state between them.
64 """
65
66 def __init__(self, shell_path: str, verbose: bool = False, tests_per_session: int = 10,
67 profile: dict = None):
68 self.shell_path = shell_path
69 self.verbose = verbose
70 self.results: List[TestResult] = []
71 self.profile = profile or {}
72
73 # Scale timeouts for slower platforms (ARM64, macOS with flang-new)
74 import platform
75 machine = platform.machine().lower()
76 system = platform.system().lower()
77 if machine in ('arm64', 'aarch64'):
78 self.pty_timeout = 10.0 # 2x default for ARM64
79 self.delay_scale = 1.0
80 else:
81 self.pty_timeout = 5.0
82 self.delay_scale = 1.0
83 # macOS: fewer tests per session to reduce state accumulation issues
84 # with flang-new I/O buffering and readline mode interactions
85 if tests_per_session != 10:
86 # Explicit override from caller
87 self.tests_per_session = tests_per_session
88 elif system == 'darwin':
89 # Fresh session per test on macOS: readline cursor tracking
90 # gets out of sync across reused sessions with flang-new
91 self.tests_per_session = 1
92 else:
93 self.tests_per_session = tests_per_session
94 self._current_session: Optional[ShellPTY] = None
95 self._test_count = 0
96 self._step_sync_id = 0
97 self._use_marker_sync = (system == 'darwin')
98
99 def _get_session(self, env: dict = None, rc_file: str = "/dev/null", fresh: bool = False) -> ShellPTY:
100 """
101 Get a shell session, reusing existing one if possible.
102
103 Args:
104 env: Environment variables for the session
105 rc_file: RC file path
106 fresh: If True, always create a new session
107
108 Returns:
109 ShellPTY session
110 """
111 needs_new = (
112 fresh or
113 (env is not None and len(env) > 0) or # Custom env requires fresh session
114 self._current_session is None or
115 not self._current_session.is_running or
116 self._test_count % self.tests_per_session == 0
117 )
118
119 if needs_new:
120 if self._current_session is not None:
121 try:
122 self._current_session.stop()
123 except:
124 pass
125 gc.collect()
126 time.sleep(0.2 * self.delay_scale)
127
128 self._current_session = ShellPTY(
129 shell_path=self.shell_path,
130 timeout=self.pty_timeout,
131 env=env or {},
132 profile=self.profile,
133 )
134 self._current_session.start(rc_file=rc_file)
135 else:
136 # Reset session state for reuse
137 self._reset_session()
138
139 return self._current_session
140
141 def _reset_session(self) -> None:
142 """Reset session state between tests."""
143 if self._current_session is None or not self._current_session.is_running:
144 return
145
146 try:
147 # Exit any special mode the shell might be in:
148 # - Ctrl+G cancels search mode (Ctrl+R/Ctrl+S)
149 # - Escape exits vi insert→command, or is harmless in emacs mode
150 # - Ctrl+C interrupts running commands and clears line
151 # - Ctrl+U kills the line
152 self._current_session.send_key("C-g")
153 time.sleep(0.05)
154 self._current_session.send(chr(27)) # Escape
155 time.sleep(0.05)
156 self._current_session.send_key("C-c")
157 time.sleep(0.1)
158 self._current_session.send_key("C-c")
159 time.sleep(0.1)
160 self._current_session.send_key("C-u")
161 time.sleep(0.1)
162
163 # Clear buffer before reset command
164 self._current_session.clear_buffer()
165 time.sleep(0.05)
166
167 # Reset PS1 and editing mode, then echo marker
168 marker = f"RESET_{self._test_count}"
169 # Build reset command from profile
170 mode_reset = self.profile.get("mode_reset_command", "")
171 prompt_set = self.profile.get("prompt_set_command", "PS1='$ '")
172 reset_parts = [p for p in [mode_reset, prompt_set, f"echo {marker}"] if p]
173 self._current_session.send_line(" " + "; ".join(reset_parts)) # leading space to exclude from history
174
175 # Wait for the marker to ensure we're at a clean state
176 try:
177 self._current_session.expect(marker, timeout=self.pty_timeout)
178 except:
179 pass
180
181 # Wait for prompt after marker and clear buffer again
182 time.sleep(0.3)
183 self._current_session.clear_buffer()
184 time.sleep(0.05)
185 except:
186 pass
187
188 def _cleanup_session(self) -> None:
189 """Clean up the current session."""
190 if self._current_session is not None:
191 try:
192 self._current_session.stop()
193 except:
194 pass
195 self._current_session = None
196 gc.collect()
197
198 def should_skip_spec(self, spec_path: Path) -> Optional[str]:
199 """Check if a spec file should be skipped based on profile capabilities."""
200 skip_list = self.profile.get("suites", {}).get("skip", [])
201 # Check if any skip pattern matches the spec path
202 spec_str = str(spec_path)
203 for pattern in skip_list:
204 if pattern in spec_str:
205 return pattern
206 # Check capability requirements
207 caps = self.profile.get("capabilities", {})
208 spec_name = spec_path.stem.lower()
209 if "vi_mode" in spec_name and not caps.get("vi_mode", False):
210 return "shell lacks vi_mode capability"
211 if "completion" in spec_name and not caps.get("command_completion", False):
212 return "shell lacks command_completion capability"
213 if ("line_editing" in spec_name or "history" in spec_name) and not caps.get("readline", False):
214 return "shell lacks readline capability"
215 return None
216
217 def run_spec_file(self, spec_path: Path) -> List[TestResult]:
218 """
219 Run all tests in a YAML spec file.
220
221 Args:
222 spec_path: Path to the YAML specification file
223
224 Returns:
225 List of TestResult objects
226 """
227 # Check if this suite should be skipped for this shell
228 skip_reason = self.should_skip_spec(spec_path)
229 if skip_reason:
230 print(f"\n{Fore.YELLOW}[SKIP]{Style.RESET_ALL} {spec_path.name}{skip_reason}")
231 return []
232
233 with open(spec_path) as f:
234 spec = yaml.safe_load(f)
235
236 category = spec.get('metadata', {}).get('category', spec_path.stem)
237 # Use filename stem as prefix: history.yaml -> [history]
238 file_prefix = f"[{spec_path.stem}]"
239 print(f"\n{Fore.CYAN}=== {category} ==={Style.RESET_ALL}")
240
241 results = []
242 test_num = 0
243 for test in spec.get('tests', []):
244 test_num += 1
245 result = self.run_test(test)
246 # Store test ID for failed test summary
247 result.test_id = f"{file_prefix} {test_num}"
248 results.append(result)
249 self._test_count += 1
250
251 # Delay between tests for OS cleanup
252 time.sleep(0.3 * self.delay_scale)
253
254 if result.passed:
255 print(f" {Fore.GREEN}{Style.RESET_ALL} {file_prefix} {test_num}: {result.name}", flush=True)
256 else:
257 error_msg = strip_control_sequences(result.error)
258 print(f" {Fore.RED}{Style.RESET_ALL} {file_prefix} {test_num}: {result.name}: {error_msg}", flush=True)
259
260 # Clean up session at end of spec file
261 self._cleanup_session()
262 # Reset test count for fresh session at start of next category
263 self._test_count = 0
264
265 return results
266
267 def run_test(self, test: Dict[str, Any]) -> TestResult:
268 """
269 Run a single test from a spec.
270
271 Args:
272 test: Test specification dictionary
273
274 Returns:
275 TestResult
276 """
277 name = test.get('name', 'Unnamed test')
278 start_time = time.time()
279
280 # Set up environment
281 env = test.get('env', {})
282 rc_file = test.get('rc_file', '/dev/null')
283 fresh_session = test.get('fresh_session', False)
284
285 try:
286 # Get session (may be reused or fresh)
287 shell = self._get_session(env=env, rc_file=rc_file, fresh=fresh_session)
288
289 try:
290 # Execute test steps
291 steps = test.get('steps', [])
292 for i, step in enumerate(steps):
293 is_last = (i == len(steps) - 1)
294 next_step = steps[i + 1] if not is_last else None
295 self._execute_step(shell, step, is_last=is_last, next_step=next_step)
296
297 # Get command output
298 if 'expect_output' in test:
299 expected = test['expect_output']
300 # Wait for the expected output to appear
301 try:
302 shell.expect(expected)
303 # Test passed - we found the expected output
304 duration = time.time() - start_time
305 return TestResult(name, True, "", duration)
306 except pexpect.TIMEOUT:
307 duration = time.time() - start_time
308 # Get cleaned output for error reporting
309 raw_output = shell.get_clean_output()
310 output = strip_control_sequences(raw_output)
311 # Truncate for readability
312 if len(output) > 300:
313 output = output[:300] + "..."
314 return TestResult(
315 name, False,
316 f"Expected '{expected}' not found. Got: '{output}'",
317 duration
318 )
319 except Exception as e:
320 duration = time.time() - start_time
321 return TestResult(
322 name, False,
323 f"Error: {str(e)}",
324 duration
325 )
326 elif 'expect_not' in test:
327 # Wait for prompt, then check output doesn't contain unwanted
328 output = shell.wait_for_prompt()
329 output = strip_control_sequences(output)
330 unwanted = test['expect_not']
331 if unwanted in output:
332 duration = time.time() - start_time
333 return TestResult(
334 name, False,
335 f"Found unwanted output: '{unwanted}'",
336 duration
337 )
338 duration = time.time() - start_time
339 return TestResult(name, True, "", duration)
340 else:
341 # No expectation, just run the steps
342 duration = time.time() - start_time
343 return TestResult(name, True, "", duration)
344
345 finally:
346 # Don't stop session - it will be reused or cleaned up later
347 pass
348
349 except pexpect.TIMEOUT as e:
350 duration = time.time() - start_time
351 return TestResult(name, False, f"Timeout: {e}", duration)
352 except pexpect.EOF as e:
353 duration = time.time() - start_time
354 return TestResult(name, False, f"Unexpected EOF: {e}", duration)
355 except Exception as e:
356 duration = time.time() - start_time
357 return TestResult(name, False, str(e), duration)
358
359 def _execute_step(self, shell: ShellPTY, step: Dict[str, Any], is_last: bool = False,
360 next_step: Optional[Dict[str, Any]] = None) -> None:
361 """Execute a single test step."""
362 ds = self.delay_scale
363 if 'send' in step:
364 shell.send(step['send'])
365 time.sleep(0.02 * ds)
366 elif 'send_line' in step:
367 # Use marker sync only on macOS AND only when the next step is
368 # also a send_line. If next step is send_key/send/wait, the
369 # command may be long-running or interactive — the marker echo
370 # would queue behind it and interfere.
371 next_is_send_line = next_step is not None and 'send_line' in next_step
372 cmd_text = step['send_line'].strip()
373 is_background = cmd_text.endswith('&') and not cmd_text.endswith('&&')
374 # Don't use marker sync for job control commands — their output
375 # interacts with background processes and can swallow the marker
376 first_word = cmd_text.split()[0] if cmd_text else ''
377 is_job_control = first_word in ('bg', 'fg', 'kill', 'disown', 'wait', 'jobs')
378 use_marker = (not is_last and self._use_marker_sync and next_is_send_line
379 and not is_background and not is_job_control)
380 if use_marker:
381 self._step_sync_id += 1
382 marker = f"__STEP_SYNC_{self._step_sync_id}__"
383 shell.send_line(step['send_line'])
384 shell.send_line(f" echo {marker}") # leading space to exclude from history
385 try:
386 shell.expect(marker, timeout=self.pty_timeout)
387 except pexpect.TIMEOUT:
388 pass
389 time.sleep(0.1 * ds)
390 shell.clear_buffer()
391 else:
392 shell.send_line(step['send_line'])
393 if self._use_marker_sync and not is_last:
394 next_is_wait = (next_step is not None and 'wait' in next_step)
395 if next_is_wait and not is_background and not is_job_control:
396 # Foreground command followed by explicit wait — likely
397 # blocking (sleep 10). Don't wait_for_prompt or it blocks.
398 time.sleep(0.05 * ds)
399 else:
400 # Quick command — wait for prompt, clear buffer
401 try:
402 shell.wait_for_prompt(timeout=self.pty_timeout)
403 except pexpect.TIMEOUT:
404 pass
405 time.sleep(0.05)
406 shell.clear_buffer()
407 else:
408 # Last step or non-macOS: short delay. macOS last-step
409 # needs more time for flang-new I/O to flush.
410 time.sleep(0.3 if (is_last and self._use_marker_sync) else 0.05 * ds)
411 elif 'send_key' in step:
412 key = step['send_key']
413 shell.send_key(key)
414 if self._use_marker_sync and key in ('C-c', 'C-z') and not is_last:
415 # Signal keys interrupt/suspend commands — shell needs to
416 # process the signal, reap children, and return to readline.
417 if key == 'C-c':
418 # Ctrl+C always returns to prompt — wait for it, clear buffer
419 try:
420 shell.wait_for_prompt(timeout=self.pty_timeout)
421 except pexpect.TIMEOUT:
422 time.sleep(0.5)
423 shell.clear_buffer()
424 else:
425 # Ctrl+Z: only wait if next step needs input, otherwise
426 # let expect_output find the Stopped message
427 next_needs_input = (next_step is not None and
428 ('send' in next_step or 'send_key' in next_step or
429 'send_line' in next_step))
430 if next_needs_input:
431 try:
432 shell.wait_for_prompt(timeout=self.pty_timeout)
433 except pexpect.TIMEOUT:
434 time.sleep(0.5)
435 else:
436 time.sleep(0.5)
437 else:
438 time.sleep(0.02 * ds)
439 elif 'send_keys' in step:
440 for key in step['send_keys']:
441 shell.send_key(key)
442 time.sleep(0.02 * ds)
443 elif 'wait' in step:
444 time.sleep(step['wait'] * ds)
445 elif 'wait_for_prompt' in step:
446 shell.wait_for_prompt()
447 elif 'expect' in step:
448 shell.expect(step['expect'])
449 elif 'resize' in step:
450 rows = step['resize'].get('rows', 24)
451 cols = step['resize'].get('cols', 80)
452 shell.set_terminal_size(rows, cols)
453
454
455 def find_shell_binary() -> str:
456 """Find the shell binary."""
457 candidates = []
458
459 # Check SHELL_BIN or SHELL environment variables
460 env_path = os.environ.get('SHELL_BIN') or os.environ.get('SHELL')
461 if env_path:
462 candidates.insert(0, env_path)
463
464 for path in candidates:
465 if os.path.isfile(path) and os.access(path, os.X_OK):
466 return path
467
468 # Default
469 return # removed — bensch requires explicit --shell
470
471
472 def generate_markdown_report(results: List[TestResult], output_path: Path) -> None:
473 """
474 Generate a markdown report of test results.
475
476 Args:
477 results: List of test results
478 output_path: Path to write the report
479 """
480 passed = sum(1 for r in results if r.passed)
481 failed = len(results) - passed
482 total_time = sum(r.duration for r in results)
483
484 with open(output_path, 'w') as f:
485 f.write("# bensch Test Results\n\n")
486 f.write(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
487 f.write(f"## Summary\n\n")
488 f.write(f"- **Total:** {len(results)}\n")
489 f.write(f"- **Passed:** {passed}\n")
490 f.write(f"- **Failed:** {failed}\n")
491 f.write(f"- **Duration:** {total_time:.2f}s\n\n")
492
493 if failed > 0:
494 f.write("## Failed Tests\n\n")
495 for r in results:
496 if not r.passed:
497 f.write(f"### {r.name}\n\n")
498 f.write(f"**Error:** {r.error}\n\n")
499
500 f.write("## All Tests\n\n")
501 f.write("| Test | Status | Duration |\n")
502 f.write("|------|--------|----------|\n")
503 for r in results:
504 status = "✓ Pass" if r.passed else "✗ Fail"
505 f.write(f"| {r.name} | {status} | {r.duration:.3f}s |\n")
506
507
508 def main():
509 parser = argparse.ArgumentParser(
510 description="Run interactive tests for shell"
511 )
512 parser.add_argument(
513 '--shell',
514 default=None,
515 help='Path to shell binary'
516 )
517 parser.add_argument(
518 '--spec',
519 default=None,
520 help='Run specific YAML spec file'
521 )
522 parser.add_argument(
523 '--spec-dir',
524 default=None,
525 help='Directory containing YAML spec subdirectories'
526 )
527 parser.add_argument(
528 '--pytest',
529 action='store_true',
530 help='Run pytest tests instead of YAML specs'
531 )
532 parser.add_argument(
533 '--verbose', '-v',
534 action='store_true',
535 help='Verbose output'
536 )
537 parser.add_argument(
538 '--report', '-r',
539 default=None,
540 help='Generate markdown report at path'
541 )
542
543 args = parser.parse_args()
544
545 # Find shell binary
546 shell_path = args.shell or find_shell_binary()
547
548 if not os.path.isfile(shell_path):
549 print(f"{Fore.RED}Error: shell binary not found at {shell_path}{Style.RESET_ALL}")
550 print("Set --shell path first or specify path with --shell")
551 return 1
552
553 print(f"{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗{Style.RESET_ALL}")
554 print(f"{Fore.CYAN}║ bensch Interactive Test Suite ║{Style.RESET_ALL}")
555 print(f"{Fore.CYAN}╚══════════════════════════════════════════════════════════════╝{Style.RESET_ALL}")
556 print(f"\nshell binary: {shell_path}")
557
558 if args.pytest:
559 # Run pytest
560 import pytest
561 test_dir = Path(__file__).parent
562 return pytest.main([str(test_dir), '-v' if args.verbose else '-q'])
563
564 # Load profile if available
565 profile = {}
566 profile_name = os.environ.get('BENSCH_PROFILE', '')
567 if profile_name:
568 try:
569 from profile import load_profile
570 profile = load_profile(profile_name)
571 except Exception:
572 pass
573
574 # Run YAML specs
575 runner = YAMLTestRunner(shell_path, verbose=args.verbose, profile=profile)
576
577 # Determine spec directory
578 if args.spec_dir:
579 test_dir = Path(args.spec_dir)
580 else:
581 test_dir = Path(__file__).parent / "test_specs"
582
583 if args.spec:
584 # Run specific spec
585 spec_path = Path(args.spec)
586 if not spec_path.exists():
587 spec_path = test_dir / args.spec
588 if not spec_path.exists():
589 print(f"{Fore.RED}Error: Spec file not found: {args.spec}{Style.RESET_ALL}")
590 return 1
591 results = runner.run_spec_file(spec_path)
592 else:
593 # Run all specs (recursively find YAML files)
594 results = []
595 for spec_file in sorted(test_dir.rglob("*.yaml")):
596 results.extend(runner.run_spec_file(spec_file))
597
598 # Print summary
599 passed = sum(1 for r in results if r.passed)
600 failed = len(results) - passed
601
602 print(f"\n{'='*50}")
603 print(f"{Fore.CYAN}Test Summary{Style.RESET_ALL}")
604 print(f"{'='*50}\n")
605 print(f"Total tests run: {len(results)}")
606 print(f"{Fore.GREEN}Passed: {passed}{Style.RESET_ALL}")
607 if failed > 0:
608 print(f"{Fore.RED}Failed: {failed}{Style.RESET_ALL}")
609 else:
610 print(f"Failed: {failed}")
611
612 if failed == 0:
613 print(f"\n{Fore.GREEN}✓ ALL TESTS PASSED!{Style.RESET_ALL}")
614 else:
615 print(f"\n{Fore.RED}✗ SOME TESTS FAILED{Style.RESET_ALL}")
616 # Print failed test summary
617 print(f"\n{Fore.RED}Failed tests:{Style.RESET_ALL}")
618 for r in results:
619 if not r.passed:
620 print(f" {r.test_id}: {r.name}")
621
622 # Generate report if requested
623 if args.report:
624 report_path = Path(args.report)
625 generate_markdown_report(results, report_path)
626 print(f"\nReport written to: {report_path}")
627
628 return 0 if failed == 0 else 1
629
630
631 if __name__ == "__main__":
632 sys.exit(main())