Python · 22384 bytes Raw Blame History
1 #!/usr/bin/env python3
2 """
3 Interactive test runner for fortsh.
4
5 Runs both YAML-based test specifications and pytest test files.
6 """
7
8 import sys
9 import os
10 import argparse
11 import time
12 from pathlib import Path
13 from datetime import datetime
14 from typing import List, Dict, Any, Tuple, Optional
15
16 import gc
17 import re
18 import yaml
19 import pexpect
20 from colorama import init, Fore, Style
21
22 # Add parent directory to path for imports
23 sys.path.insert(0, str(Path(__file__).parent))
24
25 from fortsh_pty import FortshPTY, FortshTestSession
26 from utils.keys import KEYS, get_key
27 from utils.matchers import (
28 OutputMatcher, match_exact, match_contains, match_regex,
29 MatchResult
30 )
31
32 # Initialize colorama for cross-platform colors (strip=False to avoid OSC issues on macOS)
33 init(strip=False, convert=False)
34
35
36 def strip_control_sequences(text: str) -> str:
37 """Remove ANSI and OSC control sequences from text."""
38 # Remove OSC sequences (like terminal title)
39 text = re.sub(r'\x1b\].*?(?:\x07|\x1b\\)', '', text)
40 # Remove CSI sequences
41 text = re.sub(r'\x1b\[[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e]', '', text)
42 # Remove other escape sequences
43 text = re.sub(r'\x1b[^\[\]].?', '', text)
44 return text
45
46
47 class TestResult:
48 """Result of a single test."""
49
50 def __init__(self, name: str, passed: bool, error: str = "", duration: float = 0.0):
51 self.name = name
52 self.passed = passed
53 self.error = error
54 self.duration = duration
55 self.test_id = "" # e.g., "[history] 5"
56
57
58 class YAMLTestRunner:
59 """
60 Runs tests defined in YAML specification files.
61
62 Uses session reuse to avoid PTY exhaustion - reuses the same
63 fortsh session across multiple tests, resetting state between them.
64 """
65
66 def __init__(self, fortsh_path: str, verbose: bool = False, tests_per_session: int = 10):
67 self.fortsh_path = fortsh_path
68 self.verbose = verbose
69 self.results: List[TestResult] = []
70
71 # Scale timeouts for slower platforms (ARM64, macOS with flang-new)
72 import platform
73 machine = platform.machine().lower()
74 system = platform.system().lower()
75 if machine in ('arm64', 'aarch64'):
76 self.pty_timeout = 10.0 # 2x default for ARM64
77 self.delay_scale = 1.0
78 else:
79 self.pty_timeout = 5.0
80 self.delay_scale = 1.0
81 # macOS: fewer tests per session to reduce state accumulation issues
82 # with flang-new I/O buffering and readline mode interactions
83 if tests_per_session != 10:
84 # Explicit override from caller
85 self.tests_per_session = tests_per_session
86 elif system == 'darwin':
87 # Fresh session per test on macOS: readline cursor tracking
88 # gets out of sync across reused sessions with flang-new
89 self.tests_per_session = 1
90 else:
91 self.tests_per_session = tests_per_session
92 self._current_session: Optional[FortshPTY] = None
93 self._test_count = 0
94 self._step_sync_id = 0
95 self._use_marker_sync = (system == 'darwin')
96
97 def _get_session(self, env: dict = None, rc_file: str = "/dev/null", fresh: bool = False) -> FortshPTY:
98 """
99 Get a fortsh session, reusing existing one if possible.
100
101 Args:
102 env: Environment variables for the session
103 rc_file: RC file path
104 fresh: If True, always create a new session
105
106 Returns:
107 FortshPTY session
108 """
109 needs_new = (
110 fresh or
111 (env is not None and len(env) > 0) or # Custom env requires fresh session
112 self._current_session is None or
113 not self._current_session.is_running or
114 self._test_count % self.tests_per_session == 0
115 )
116
117 if needs_new:
118 if self._current_session is not None:
119 try:
120 self._current_session.stop()
121 except:
122 pass
123 gc.collect()
124 time.sleep(0.2 * self.delay_scale)
125
126 self._current_session = FortshPTY(
127 fortsh_path=self.fortsh_path,
128 timeout=self.pty_timeout,
129 env=env or {}
130 )
131 self._current_session.start(rc_file=rc_file)
132 else:
133 # Reset session state for reuse
134 self._reset_session()
135
136 return self._current_session
137
138 def _reset_session(self) -> None:
139 """Reset session state between tests."""
140 if self._current_session is None or not self._current_session.is_running:
141 return
142
143 try:
144 # Exit any special mode the shell might be in:
145 # - Ctrl+G cancels search mode (Ctrl+R/Ctrl+S)
146 # - Escape exits vi insert→command, or is harmless in emacs mode
147 # - Ctrl+C interrupts running commands and clears line
148 # - Ctrl+U kills the line
149 self._current_session.send_key("C-g")
150 time.sleep(0.05)
151 self._current_session.send(chr(27)) # Escape
152 time.sleep(0.05)
153 self._current_session.send_key("C-c")
154 time.sleep(0.1)
155 self._current_session.send_key("C-c")
156 time.sleep(0.1)
157 self._current_session.send_key("C-u")
158 time.sleep(0.1)
159
160 # Clear buffer before reset command
161 self._current_session.clear_buffer()
162 time.sleep(0.05)
163
164 # Reset PS1 and editing mode, then echo marker
165 marker = f"RESET_{self._test_count}"
166 self._current_session.send_line(f" set -o emacs; PS1='> '; echo {marker}") # leading space to exclude from history
167
168 # Wait for the marker to ensure we're at a clean state
169 try:
170 self._current_session.expect(marker, timeout=self.pty_timeout)
171 except:
172 pass
173
174 # Wait for prompt after marker and clear buffer again
175 time.sleep(0.3)
176 self._current_session.clear_buffer()
177 time.sleep(0.05)
178 except:
179 pass
180
181 def _cleanup_session(self) -> None:
182 """Clean up the current session."""
183 if self._current_session is not None:
184 try:
185 self._current_session.stop()
186 except:
187 pass
188 self._current_session = None
189 gc.collect()
190
191 def run_spec_file(self, spec_path: Path) -> List[TestResult]:
192 """
193 Run all tests in a YAML spec file.
194
195 Args:
196 spec_path: Path to the YAML specification file
197
198 Returns:
199 List of TestResult objects
200 """
201 with open(spec_path) as f:
202 spec = yaml.safe_load(f)
203
204 category = spec.get('metadata', {}).get('category', spec_path.stem)
205 # Use filename stem as prefix: history.yaml -> [history]
206 file_prefix = f"[{spec_path.stem}]"
207 print(f"\n{Fore.CYAN}=== {category} ==={Style.RESET_ALL}")
208
209 results = []
210 test_num = 0
211 for test in spec.get('tests', []):
212 test_num += 1
213 result = self.run_test(test)
214 # Store test ID for failed test summary
215 result.test_id = f"{file_prefix} {test_num}"
216 results.append(result)
217 self._test_count += 1
218
219 # Delay between tests for OS cleanup
220 time.sleep(0.3 * self.delay_scale)
221
222 if result.passed:
223 print(f" {Fore.GREEN}{Style.RESET_ALL} {file_prefix} {test_num}: {result.name}", flush=True)
224 else:
225 error_msg = strip_control_sequences(result.error)
226 print(f" {Fore.RED}{Style.RESET_ALL} {file_prefix} {test_num}: {result.name}: {error_msg}", flush=True)
227
228 # Clean up session at end of spec file
229 self._cleanup_session()
230 # Reset test count for fresh session at start of next category
231 self._test_count = 0
232
233 return results
234
235 def run_test(self, test: Dict[str, Any]) -> TestResult:
236 """
237 Run a single test from a spec.
238
239 Args:
240 test: Test specification dictionary
241
242 Returns:
243 TestResult
244 """
245 name = test.get('name', 'Unnamed test')
246 start_time = time.time()
247
248 # Set up environment
249 env = test.get('env', {})
250 rc_file = test.get('rc_file', '/dev/null')
251 fresh_session = test.get('fresh_session', False)
252
253 try:
254 # Get session (may be reused or fresh)
255 fortsh = self._get_session(env=env, rc_file=rc_file, fresh=fresh_session)
256
257 try:
258 # Execute test steps
259 steps = test.get('steps', [])
260 for i, step in enumerate(steps):
261 is_last = (i == len(steps) - 1)
262 next_step = steps[i + 1] if not is_last else None
263 self._execute_step(fortsh, step, is_last=is_last, next_step=next_step)
264
265 # Get command output
266 if 'expect_output' in test:
267 expected = test['expect_output']
268 # Wait for the expected output to appear
269 try:
270 fortsh.expect(expected)
271 # Test passed - we found the expected output
272 duration = time.time() - start_time
273 return TestResult(name, True, "", duration)
274 except pexpect.TIMEOUT:
275 duration = time.time() - start_time
276 # Get cleaned output for error reporting
277 raw_output = fortsh.get_clean_output()
278 output = strip_control_sequences(raw_output)
279 # Truncate for readability
280 if len(output) > 300:
281 output = output[:300] + "..."
282 return TestResult(
283 name, False,
284 f"Expected '{expected}' not found. Got: '{output}'",
285 duration
286 )
287 except Exception as e:
288 duration = time.time() - start_time
289 return TestResult(
290 name, False,
291 f"Error: {str(e)}",
292 duration
293 )
294 elif 'expect_not' in test:
295 # Wait for prompt, then check output doesn't contain unwanted
296 output = fortsh.wait_for_prompt()
297 output = strip_control_sequences(output)
298 unwanted = test['expect_not']
299 if unwanted in output:
300 duration = time.time() - start_time
301 return TestResult(
302 name, False,
303 f"Found unwanted output: '{unwanted}'",
304 duration
305 )
306 duration = time.time() - start_time
307 return TestResult(name, True, "", duration)
308 else:
309 # No expectation, just run the steps
310 duration = time.time() - start_time
311 return TestResult(name, True, "", duration)
312
313 finally:
314 # Don't stop session - it will be reused or cleaned up later
315 pass
316
317 except pexpect.TIMEOUT as e:
318 duration = time.time() - start_time
319 return TestResult(name, False, f"Timeout: {e}", duration)
320 except pexpect.EOF as e:
321 duration = time.time() - start_time
322 return TestResult(name, False, f"Unexpected EOF: {e}", duration)
323 except Exception as e:
324 duration = time.time() - start_time
325 return TestResult(name, False, str(e), duration)
326
327 def _execute_step(self, fortsh: FortshPTY, step: Dict[str, Any], is_last: bool = False,
328 next_step: Optional[Dict[str, Any]] = None) -> None:
329 """Execute a single test step."""
330 ds = self.delay_scale
331 if 'send' in step:
332 fortsh.send(step['send'])
333 time.sleep(0.02 * ds)
334 elif 'send_line' in step:
335 # Use marker sync only on macOS AND only when the next step is
336 # also a send_line. If next step is send_key/send/wait, the
337 # command may be long-running or interactive — the marker echo
338 # would queue behind it and interfere.
339 next_is_send_line = next_step is not None and 'send_line' in next_step
340 cmd_text = step['send_line'].strip()
341 is_background = cmd_text.endswith('&') and not cmd_text.endswith('&&')
342 # Don't use marker sync for job control commands — their output
343 # interacts with background processes and can swallow the marker
344 first_word = cmd_text.split()[0] if cmd_text else ''
345 is_job_control = first_word in ('bg', 'fg', 'kill', 'disown', 'wait', 'jobs')
346 use_marker = (not is_last and self._use_marker_sync and next_is_send_line
347 and not is_background and not is_job_control)
348 if use_marker:
349 self._step_sync_id += 1
350 marker = f"__STEP_SYNC_{self._step_sync_id}__"
351 fortsh.send_line(step['send_line'])
352 fortsh.send_line(f" echo {marker}") # leading space to exclude from history
353 try:
354 fortsh.expect(marker, timeout=self.pty_timeout)
355 except pexpect.TIMEOUT:
356 pass
357 time.sleep(0.1 * ds)
358 fortsh.clear_buffer()
359 else:
360 fortsh.send_line(step['send_line'])
361 if self._use_marker_sync and not is_last:
362 next_is_wait = (next_step is not None and 'wait' in next_step)
363 if next_is_wait and not is_background and not is_job_control:
364 # Foreground command followed by explicit wait — likely
365 # blocking (sleep 10). Don't wait_for_prompt or it blocks.
366 time.sleep(0.05 * ds)
367 else:
368 # Quick command — wait for prompt, clear buffer
369 try:
370 fortsh.wait_for_prompt(timeout=self.pty_timeout)
371 except pexpect.TIMEOUT:
372 pass
373 time.sleep(0.05)
374 fortsh.clear_buffer()
375 else:
376 # Last step or non-macOS: short delay. macOS last-step
377 # needs more time for flang-new I/O to flush.
378 time.sleep(0.3 if (is_last and self._use_marker_sync) else 0.05 * ds)
379 elif 'send_key' in step:
380 key = step['send_key']
381 fortsh.send_key(key)
382 if self._use_marker_sync and key in ('C-c', 'C-z') and not is_last:
383 # Signal keys interrupt/suspend commands — shell needs to
384 # process the signal, reap children, and return to readline.
385 if key == 'C-c':
386 # Ctrl+C always returns to prompt — wait for it, clear buffer
387 try:
388 fortsh.wait_for_prompt(timeout=self.pty_timeout)
389 except pexpect.TIMEOUT:
390 time.sleep(0.5)
391 fortsh.clear_buffer()
392 else:
393 # Ctrl+Z: only wait if next step needs input, otherwise
394 # let expect_output find the Stopped message
395 next_needs_input = (next_step is not None and
396 ('send' in next_step or 'send_key' in next_step or
397 'send_line' in next_step))
398 if next_needs_input:
399 try:
400 fortsh.wait_for_prompt(timeout=self.pty_timeout)
401 except pexpect.TIMEOUT:
402 time.sleep(0.5)
403 else:
404 time.sleep(0.5)
405 else:
406 time.sleep(0.02 * ds)
407 elif 'send_keys' in step:
408 for key in step['send_keys']:
409 fortsh.send_key(key)
410 time.sleep(0.02 * ds)
411 elif 'wait' in step:
412 time.sleep(step['wait'] * ds)
413 elif 'wait_for_prompt' in step:
414 fortsh.wait_for_prompt()
415 elif 'expect' in step:
416 fortsh.expect(step['expect'])
417 elif 'resize' in step:
418 rows = step['resize'].get('rows', 24)
419 cols = step['resize'].get('cols', 80)
420 fortsh.set_terminal_size(rows, cols)
421
422
423 def find_fortsh_binary() -> str:
424 """Find the fortsh binary."""
425 # Check common locations
426 candidates = [
427 "./bin/fortsh",
428 "../bin/fortsh",
429 "../../bin/fortsh",
430 "../fortsh/bin/fortsh",
431 ]
432
433 # Also check FORTSH environment variable
434 env_path = os.environ.get('FORTSH')
435 if env_path:
436 candidates.insert(0, env_path)
437
438 for path in candidates:
439 if os.path.isfile(path) and os.access(path, os.X_OK):
440 return path
441
442 # Default
443 return "./bin/fortsh"
444
445
446 def generate_markdown_report(results: List[TestResult], output_path: Path) -> None:
447 """
448 Generate a markdown report of test results.
449
450 Args:
451 results: List of test results
452 output_path: Path to write the report
453 """
454 passed = sum(1 for r in results if r.passed)
455 failed = len(results) - passed
456 total_time = sum(r.duration for r in results)
457
458 with open(output_path, 'w') as f:
459 f.write("# Interactive Test Results\n\n")
460 f.write(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
461 f.write(f"## Summary\n\n")
462 f.write(f"- **Total:** {len(results)}\n")
463 f.write(f"- **Passed:** {passed}\n")
464 f.write(f"- **Failed:** {failed}\n")
465 f.write(f"- **Duration:** {total_time:.2f}s\n\n")
466
467 if failed > 0:
468 f.write("## Failed Tests\n\n")
469 for r in results:
470 if not r.passed:
471 f.write(f"### {r.name}\n\n")
472 f.write(f"**Error:** {r.error}\n\n")
473
474 f.write("## All Tests\n\n")
475 f.write("| Test | Status | Duration |\n")
476 f.write("|------|--------|----------|\n")
477 for r in results:
478 status = "✓ Pass" if r.passed else "✗ Fail"
479 f.write(f"| {r.name} | {status} | {r.duration:.3f}s |\n")
480
481
482 def main():
483 parser = argparse.ArgumentParser(
484 description="Run interactive tests for fortsh"
485 )
486 parser.add_argument(
487 '--fortsh', '-f',
488 default=None,
489 help='Path to fortsh binary'
490 )
491 parser.add_argument(
492 '--spec', '-s',
493 default=None,
494 help='Run specific YAML spec file'
495 )
496 parser.add_argument(
497 '--pytest',
498 action='store_true',
499 help='Run pytest tests instead of YAML specs'
500 )
501 parser.add_argument(
502 '--verbose', '-v',
503 action='store_true',
504 help='Verbose output'
505 )
506 parser.add_argument(
507 '--report', '-r',
508 default=None,
509 help='Generate markdown report at path'
510 )
511
512 args = parser.parse_args()
513
514 # Find fortsh binary
515 fortsh_path = args.fortsh or find_fortsh_binary()
516
517 if not os.path.isfile(fortsh_path):
518 print(f"{Fore.RED}Error: fortsh binary not found at {fortsh_path}{Style.RESET_ALL}")
519 print("Build fortsh first or specify path with --fortsh")
520 return 1
521
522 print(f"{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗{Style.RESET_ALL}")
523 print(f"{Fore.CYAN}║ fortsh Interactive Test Suite ║{Style.RESET_ALL}")
524 print(f"{Fore.CYAN}╚══════════════════════════════════════════════════════════════╝{Style.RESET_ALL}")
525 print(f"\nfortsh binary: {fortsh_path}")
526
527 if args.pytest:
528 # Run pytest
529 import pytest
530 test_dir = Path(__file__).parent
531 return pytest.main([str(test_dir), '-v' if args.verbose else '-q'])
532
533 # Run YAML specs
534 runner = YAMLTestRunner(fortsh_path, verbose=args.verbose)
535 test_dir = Path(__file__).parent / "test_specs"
536
537 if args.spec:
538 # Run specific spec
539 spec_path = Path(args.spec)
540 if not spec_path.exists():
541 spec_path = test_dir / args.spec
542 if not spec_path.exists():
543 print(f"{Fore.RED}Error: Spec file not found: {args.spec}{Style.RESET_ALL}")
544 return 1
545 results = runner.run_spec_file(spec_path)
546 else:
547 # Run all specs
548 results = []
549 for spec_file in sorted(test_dir.glob("*.yaml")):
550 results.extend(runner.run_spec_file(spec_file))
551
552 # Print summary
553 passed = sum(1 for r in results if r.passed)
554 failed = len(results) - passed
555
556 print(f"\n{'='*50}")
557 print(f"{Fore.CYAN}Test Summary{Style.RESET_ALL}")
558 print(f"{'='*50}\n")
559 print(f"Total tests run: {len(results)}")
560 print(f"{Fore.GREEN}Passed: {passed}{Style.RESET_ALL}")
561 if failed > 0:
562 print(f"{Fore.RED}Failed: {failed}{Style.RESET_ALL}")
563 else:
564 print(f"Failed: {failed}")
565
566 if failed == 0:
567 print(f"\n{Fore.GREEN}✓ ALL TESTS PASSED!{Style.RESET_ALL}")
568 else:
569 print(f"\n{Fore.RED}✗ SOME TESTS FAILED{Style.RESET_ALL}")
570 # Print failed test summary
571 print(f"\n{Fore.RED}Failed tests:{Style.RESET_ALL}")
572 for r in results:
573 if not r.passed:
574 print(f" {r.test_id}: {r.name}")
575
576 # Generate report if requested
577 if args.report:
578 report_path = Path(args.report)
579 generate_markdown_report(results, report_path)
580 print(f"\nReport written to: {report_path}")
581
582 return 0 if failed == 0 else 1
583
584
585 if __name__ == "__main__":
586 sys.exit(main())