Python · 12735 bytes Raw Blame History
1 """
2 PTY management for fortsh interactive testing.
3
4 Provides a high-level interface to spawn fortsh in a pseudo-terminal
5 and interact with it programmatically.
6 """
7
8 import os
9 import re
10 import pexpect
11 from pathlib import Path
12 from typing import Optional, Union
13
14 from utils.keys import KEYS, get_key
15
16
17 class FortshPTY:
18 """
19 Manages a fortsh process running in a pseudo-terminal.
20
21 This class provides methods to send input, receive output, and verify
22 behavior for interactive testing.
23 """
24
25 # Default prompt pattern - matches the user's fortsh prompt
26 # No anchors for reliability with pexpect's buffering
27 DEFAULT_PROMPT_PATTERN = r'> '
28
29 # Unique marker for reliable command output detection
30 END_MARKER = "___FORTSH_CMD_END___"
31
32 def __init__(
33 self,
34 fortsh_path: str = "./bin/fortsh",
35 timeout: float = 5.0,
36 prompt_pattern: Optional[str] = None,
37 env: Optional[dict] = None,
38 ):
39 """
40 Initialize the PTY wrapper.
41
42 Args:
43 fortsh_path: Path to fortsh binary
44 timeout: Default timeout for expect operations (seconds)
45 prompt_pattern: Regex pattern to match the shell prompt
46 env: Additional environment variables
47 """
48 self.fortsh_path = fortsh_path
49 self.timeout = timeout
50 self.prompt_pattern = prompt_pattern or self.DEFAULT_PROMPT_PATTERN
51 self.custom_env = env or {}
52 self.child: Optional[pexpect.spawn] = None
53 self._output_buffer: str = ""
54
55 def start(self, rc_file: Optional[str] = None) -> None:
56 """
57 Start fortsh in a PTY.
58
59 Args:
60 rc_file: Path to rc file, or None to use default, or "/dev/null" for no rc
61 """
62 env = os.environ.copy()
63 env["TERM"] = "xterm-256color"
64 env["LANG"] = "en_US.UTF-8"
65 env["LC_ALL"] = "en_US.UTF-8"
66
67 # Enable test mode for cleaner output (less ANSI redraws)
68 # Note: Completion is NOT disabled here - tests can use Tab completion
69 env["FORTSH_MINIMAL_ECHO"] = "1"
70 env["FORTSH_TEST_MODE"] = "1"
71 env["HISTFILE"] = "/dev/null" # Don't load/save history from file
72
73 # Use /dev/null for clean testing unless specified
74 if rc_file is not None:
75 env["FORTSH_RC_FILE"] = rc_file
76
77 # Apply custom environment
78 env.update(self.custom_env)
79
80 self.child = pexpect.spawn(
81 self.fortsh_path,
82 encoding="utf-8",
83 codec_errors="replace", # Handle raw ANSI/highlight bytes without crashing
84 timeout=self.timeout,
85 env=env,
86 dimensions=(24, 80), # Standard terminal size
87 echo=False, # Don't echo back input
88 )
89
90 # Wait for initial prompt
91 self.wait_for_prompt()
92
93 def stop(self) -> int:
94 """
95 Stop fortsh and return exit code.
96
97 Returns:
98 Exit code of the fortsh process
99 """
100 if self.child is None:
101 return -1
102
103 import time
104 import signal
105
106 exit_code = 0
107 pid = self.child.pid
108
109 try:
110 # Try graceful exit first
111 self.child.sendline("exit")
112 self.child.expect(pexpect.EOF, timeout=1)
113 exit_code = self.child.exitstatus or 0
114 except (pexpect.TIMEOUT, pexpect.EOF):
115 # Send SIGTERM then SIGKILL
116 try:
117 self.child.kill(signal.SIGTERM)
118 time.sleep(0.1)
119 if self.child.isalive():
120 self.child.kill(signal.SIGKILL)
121 time.sleep(0.1)
122 except:
123 pass
124
125 # Close file descriptors explicitly
126 try:
127 if hasattr(self.child, 'child_fd') and self.child.child_fd is not None:
128 try:
129 os.close(self.child.child_fd)
130 except OSError:
131 pass
132 except:
133 pass
134
135 # Close the ptyprocess file object if it exists
136 try:
137 if hasattr(self.child, 'fileobj') and self.child.fileobj:
138 self.child.fileobj.close()
139 except:
140 pass
141
142 try:
143 self.child.close(force=True)
144 except:
145 pass
146
147 # Wait for process to fully terminate with retries
148 for _ in range(5):
149 try:
150 if pid:
151 result = os.waitpid(pid, os.WNOHANG)
152 if result[0] != 0:
153 break
154 time.sleep(0.05)
155 except ChildProcessError:
156 break
157 except:
158 break
159
160 # Ensure any zombie is reaped
161 try:
162 if pid:
163 os.kill(pid, 0) # Check if still exists
164 os.kill(pid, signal.SIGKILL)
165 os.waitpid(pid, 0)
166 except (ProcessLookupError, ChildProcessError, OSError):
167 pass
168
169 self.child = None
170 return exit_code
171
172 def clear_buffer(self) -> None:
173 """Clear the pexpect buffer to avoid accumulation."""
174 if self.child is None:
175 return
176 # Read any pending output without blocking
177 try:
178 while True:
179 self.child.read_nonblocking(size=1024, timeout=0.01)
180 except (pexpect.TIMEOUT, pexpect.EOF):
181 pass
182
183 def send(self, text: str) -> None:
184 """
185 Send text without newline.
186
187 Args:
188 text: Text to send to fortsh
189 """
190 if self.child is None:
191 raise RuntimeError("fortsh not started")
192 self.child.send(text)
193
194 def send_line(self, text: str) -> None:
195 """
196 Send text followed by Enter.
197
198 Args:
199 text: Text to send to fortsh
200 """
201 if self.child is None:
202 raise RuntimeError("fortsh not started")
203 self.child.sendline(text)
204
205 def send_key(self, key_name: str) -> None:
206 """
207 Send a special key by name.
208
209 Args:
210 key_name: Name of the key (e.g., "Up", "C-a", "Enter")
211 """
212 if self.child is None:
213 raise RuntimeError("fortsh not started")
214 self.child.send(get_key(key_name))
215
216 def send_keys(self, *key_names: str) -> None:
217 """
218 Send multiple keys in sequence.
219
220 Args:
221 *key_names: Names of keys to send
222 """
223 for key in key_names:
224 self.send_key(key)
225
226 def wait_for_prompt(self, timeout: Optional[float] = None) -> str:
227 """
228 Wait for the shell prompt to appear.
229
230 Args:
231 timeout: Timeout in seconds (uses default if None)
232
233 Returns:
234 Output received before the prompt
235 """
236 if self.child is None:
237 raise RuntimeError("fortsh not started")
238
239 self.child.expect(self.prompt_pattern, timeout=timeout or self.timeout)
240 output = self.child.before or ""
241 self._output_buffer = output
242 return output
243
244 def expect(
245 self,
246 pattern: Union[str, list],
247 timeout: Optional[float] = None
248 ) -> int:
249 """
250 Wait for a pattern in the output.
251
252 Args:
253 pattern: Regex pattern or list of patterns
254 timeout: Timeout in seconds
255
256 Returns:
257 Index of matched pattern (if list) or 0
258
259 Raises:
260 pexpect.TIMEOUT: If pattern not found within timeout
261 pexpect.EOF: If process terminates
262 """
263 if self.child is None:
264 raise RuntimeError("fortsh not started")
265
266 return self.child.expect(pattern, timeout=timeout or self.timeout)
267
268 def expect_exact(self, text: str, timeout: Optional[float] = None) -> None:
269 """
270 Wait for exact text in output.
271
272 Args:
273 text: Exact text to find
274 timeout: Timeout in seconds
275
276 Raises:
277 pexpect.TIMEOUT: If text not found within timeout
278 """
279 if self.child is None:
280 raise RuntimeError("fortsh not started")
281
282 self.child.expect_exact(text, timeout=timeout or self.timeout)
283
284 def get_output(self) -> str:
285 """
286 Get output from the last expect operation.
287
288 Returns:
289 Output that appeared before the matched pattern
290 """
291 if self.child is None:
292 return ""
293 return self.child.before or ""
294
295 def get_clean_output(self) -> str:
296 """
297 Get cleaned output, filtering out terminal redraw noise.
298
299 Returns:
300 Cleaned output with prompts and redraws removed
301 """
302 if self.child is None:
303 return ""
304
305 raw = self.child.before or ""
306
307 # Split into lines and filter
308 lines = raw.split('\n')
309 clean_lines = []
310
311 for line in lines:
312 # Skip lines that are mostly prompt redraws
313 if ':: ~' in line and line.count('@') > 1:
314 continue
315 # Skip lines that look like partial prompts
316 if line.strip().endswith('> ') and len(line.strip()) < 10:
317 continue
318 # Keep the line if it has actual content
319 clean_lines.append(line)
320
321 return '\n'.join(clean_lines)
322
323 def get_match(self) -> str:
324 """
325 Get the text that matched the last expect pattern.
326
327 Returns:
328 Matched text
329 """
330 if self.child is None:
331 return ""
332 return self.child.after or ""
333
334 def run_command(self, command: str, timeout: Optional[float] = None) -> str:
335 """
336 Run a command and return its output.
337
338 Uses a unique marker for reliable output detection instead of
339 prompt matching, which can be unreliable with terminal redraws.
340
341 Args:
342 command: Shell command to run
343 timeout: Timeout for the command
344
345 Returns:
346 Command output (excluding the prompt and marker)
347 """
348 # Send command followed by marker echo
349 self.send_line(command)
350 self.send_line(f"echo {self.END_MARKER}")
351
352 # Wait for the marker to appear
353 self.expect(self.END_MARKER, timeout=timeout)
354 output = self.get_output()
355
356 # Clean up the output
357 # Remove the marker echo command and any prompts
358 lines = []
359 for line in output.split('\n'):
360 # Skip lines containing the marker or prompt patterns
361 if self.END_MARKER in line:
362 continue
363 if line.strip().startswith(('>', '$', '#', '%')):
364 continue
365 # Skip the echoed command
366 if command in line:
367 continue
368 lines.append(line)
369
370 return '\n'.join(lines).strip()
371
372 def interrupt(self) -> None:
373 """Send Ctrl+C to interrupt current operation."""
374 self.send_key("C-c")
375
376 def suspend(self) -> None:
377 """Send Ctrl+Z to suspend current operation."""
378 self.send_key("C-z")
379
380 def eof(self) -> None:
381 """Send Ctrl+D (EOF)."""
382 self.send_key("C-d")
383
384 @property
385 def is_running(self) -> bool:
386 """Check if fortsh is still running."""
387 if self.child is None:
388 return False
389 return self.child.isalive()
390
391 def set_terminal_size(self, rows: int, cols: int) -> None:
392 """
393 Change the terminal dimensions (triggers SIGWINCH).
394
395 Args:
396 rows: Number of rows
397 cols: Number of columns
398 """
399 if self.child is None:
400 raise RuntimeError("fortsh not started")
401 self.child.setwinsize(rows, cols)
402
403
404 class FortshTestSession:
405 """
406 Context manager for fortsh test sessions.
407
408 Usage:
409 with FortshTestSession() as fortsh:
410 fortsh.send_line("echo hello")
411 output = fortsh.wait_for_prompt()
412 assert "hello" in output
413 """
414
415 def __init__(self, **kwargs):
416 self.kwargs = kwargs
417 self.pty: Optional[FortshPTY] = None
418
419 def __enter__(self) -> FortshPTY:
420 self.pty = FortshPTY(**self.kwargs)
421 self.pty.start()
422 return self.pty
423
424 def __exit__(self, exc_type, exc_val, exc_tb):
425 if self.pty:
426 self.pty.stop()
427 return False
428
429
430 # Convenience function for quick testing
431 def quick_test(command: str, fortsh_path: str = "./bin/fortsh") -> str:
432 """
433 Run a single command in fortsh and return output.
434
435 Args:
436 command: Command to run
437 fortsh_path: Path to fortsh binary
438
439 Returns:
440 Command output
441 """
442 with FortshTestSession(fortsh_path=fortsh_path) as fortsh:
443 return fortsh.run_command(command)