Python · 13846 bytes Raw Blame History
1 """
2 PTY management for shell interactive testing.
3
4 Provides a high-level interface to spawn shell in a pseudo-terminal
5 and interact with it programmatically.
6 """
7
8 import os
9 import re
10 import pexpect
11 from pathlib import Path
12 from typing import Optional, Union
13
14 from utils.keys import KEYS, get_key
15
16
17 class ShellPTY:
18 """
19 Manages a shell process running in a pseudo-terminal.
20
21 This class provides methods to send input, receive output, and verify
22 behavior for interactive testing.
23 """
24
25 # Default prompt pattern - matches the shell prompt
26 # No anchors for reliability with pexpect's buffering
27 DEFAULT_PROMPT_PATTERN = r'> '
28
29 # Unique marker for reliable command output detection
30 END_MARKER = "___BENSCH_CMD_END___"
31
32 def __init__(
33 self,
34 shell_path: str = None,
35 timeout: float = 5.0,
36 prompt_pattern: Optional[str] = None,
37 env: Optional[dict] = None,
38 profile: Optional[dict] = None,
39 ):
40 """
41 Initialize the PTY wrapper.
42
43 Args:
44 shell_path: Path to shell binary
45 timeout: Default timeout for expect operations (seconds)
46 prompt_pattern: Regex pattern to match the shell prompt
47 env: Additional environment variables
48 profile: Shell profile dict (from profile.py)
49 """
50 self.shell_path = shell_path
51 self.timeout = timeout
52 self.profile = profile or {}
53 self.prompt_pattern = (
54 prompt_pattern
55 or self.profile.get("prompt_pattern")
56 or self.DEFAULT_PROMPT_PATTERN
57 )
58 self.custom_env = env or {}
59 self.child: Optional[pexpect.spawn] = None
60 self._output_buffer: str = ""
61
62 def start(self, rc_file: Optional[str] = None) -> None:
63 """
64 Start shell in a PTY.
65
66 Args:
67 rc_file: Path to rc file, or None to use default, or "/dev/null" for no rc
68 """
69 env = os.environ.copy()
70 env["TERM"] = "xterm-256color"
71 env["LANG"] = "en_US.UTF-8"
72 env["LC_ALL"] = "en_US.UTF-8"
73
74 # Apply profile-driven environment (test mode, history disable, rc disable)
75 if self.profile:
76 for k, v in self.profile.get("test_mode_env", {}).items():
77 env[k] = str(v)
78 for k, v in self.profile.get("history_disable", {}).get("env", {}).items():
79 env[k] = str(v)
80 if rc_file is not None:
81 for k, v in self.profile.get("rc_disable", {}).get("env", {}).items():
82 env[k] = str(v)
83 else:
84 # Fallback: disable history
85 env["HISTFILE"] = "/dev/null"
86
87 # Apply custom environment
88 env.update(self.custom_env)
89
90 # Build spawn command with rc-disable flags from profile
91 spawn_args = []
92 if self.profile and rc_file is not None:
93 spawn_args = self.profile.get("rc_disable", {}).get("flags", [])
94
95 self.child = pexpect.spawn(
96 self.shell_path,
97 args=spawn_args,
98 encoding="utf-8",
99 codec_errors="replace", # Handle raw ANSI/highlight bytes without crashing
100 timeout=self.timeout,
101 env=env,
102 dimensions=(24, 80), # Standard terminal size
103 echo=False, # Don't echo back input
104 )
105
106 # Wait for initial prompt — use a generous timeout for slow shells
107 # that print banners or load rc files
108 try:
109 self.wait_for_prompt(timeout=self.timeout * 2)
110 except Exception:
111 # If prompt detection fails, try setting it from the profile
112 pass
113
114 # Set prompt from profile to ensure consistent matching
115 if self.profile and self.profile.get("prompt_set_command"):
116 self.send_line(self.profile["prompt_set_command"])
117 try:
118 self.wait_for_prompt(timeout=self.timeout)
119 except Exception:
120 pass
121
122 def stop(self) -> int:
123 """
124 Stop shell and return exit code.
125
126 Returns:
127 Exit code of the shell process
128 """
129 if self.child is None:
130 return -1
131
132 import time
133 import signal
134
135 exit_code = 0
136 pid = self.child.pid
137
138 try:
139 # Try graceful exit first
140 self.child.sendline("exit")
141 self.child.expect(pexpect.EOF, timeout=1)
142 exit_code = self.child.exitstatus or 0
143 except (pexpect.TIMEOUT, pexpect.EOF):
144 # Send SIGTERM then SIGKILL
145 try:
146 self.child.kill(signal.SIGTERM)
147 time.sleep(0.1)
148 if self.child.isalive():
149 self.child.kill(signal.SIGKILL)
150 time.sleep(0.1)
151 except:
152 pass
153
154 # Close file descriptors explicitly
155 try:
156 if hasattr(self.child, 'child_fd') and self.child.child_fd is not None:
157 try:
158 os.close(self.child.child_fd)
159 except OSError:
160 pass
161 except:
162 pass
163
164 # Close the ptyprocess file object if it exists
165 try:
166 if hasattr(self.child, 'fileobj') and self.child.fileobj:
167 self.child.fileobj.close()
168 except:
169 pass
170
171 try:
172 self.child.close(force=True)
173 except:
174 pass
175
176 # Wait for process to fully terminate with retries
177 for _ in range(5):
178 try:
179 if pid:
180 result = os.waitpid(pid, os.WNOHANG)
181 if result[0] != 0:
182 break
183 time.sleep(0.05)
184 except ChildProcessError:
185 break
186 except:
187 break
188
189 # Ensure any zombie is reaped
190 try:
191 if pid:
192 os.kill(pid, 0) # Check if still exists
193 os.kill(pid, signal.SIGKILL)
194 os.waitpid(pid, 0)
195 except (ProcessLookupError, ChildProcessError, OSError):
196 pass
197
198 self.child = None
199 return exit_code
200
201 def clear_buffer(self) -> None:
202 """Clear the pexpect buffer to avoid accumulation."""
203 if self.child is None:
204 return
205 # Read any pending output without blocking
206 try:
207 while True:
208 self.child.read_nonblocking(size=1024, timeout=0.01)
209 except (pexpect.TIMEOUT, pexpect.EOF):
210 pass
211
212 def send(self, text: str) -> None:
213 """
214 Send text without newline.
215
216 Args:
217 text: Text to send to the shell
218 """
219 if self.child is None:
220 raise RuntimeError("shell not started")
221 self.child.send(text)
222
223 def send_line(self, text: str) -> None:
224 """
225 Send text followed by Enter.
226
227 Args:
228 text: Text to send to the shell
229 """
230 if self.child is None:
231 raise RuntimeError("shell not started")
232 self.child.sendline(text)
233
234 def send_key(self, key_name: str) -> None:
235 """
236 Send a special key by name.
237
238 Args:
239 key_name: Name of the key (e.g., "Up", "C-a", "Enter")
240 """
241 if self.child is None:
242 raise RuntimeError("shell not started")
243 self.child.send(get_key(key_name))
244
245 def send_keys(self, *key_names: str) -> None:
246 """
247 Send multiple keys in sequence.
248
249 Args:
250 *key_names: Names of keys to send
251 """
252 for key in key_names:
253 self.send_key(key)
254
255 def wait_for_prompt(self, timeout: Optional[float] = None) -> str:
256 """
257 Wait for the shell prompt to appear.
258
259 Args:
260 timeout: Timeout in seconds (uses default if None)
261
262 Returns:
263 Output received before the prompt
264 """
265 if self.child is None:
266 raise RuntimeError("shell not started")
267
268 self.child.expect(self.prompt_pattern, timeout=timeout or self.timeout)
269 output = self.child.before or ""
270 self._output_buffer = output
271 return output
272
273 def expect(
274 self,
275 pattern: Union[str, list],
276 timeout: Optional[float] = None
277 ) -> int:
278 """
279 Wait for a pattern in the output.
280
281 Args:
282 pattern: Regex pattern or list of patterns
283 timeout: Timeout in seconds
284
285 Returns:
286 Index of matched pattern (if list) or 0
287
288 Raises:
289 pexpect.TIMEOUT: If pattern not found within timeout
290 pexpect.EOF: If process terminates
291 """
292 if self.child is None:
293 raise RuntimeError("shell not started")
294
295 return self.child.expect(pattern, timeout=timeout or self.timeout)
296
297 def expect_exact(self, text: str, timeout: Optional[float] = None) -> None:
298 """
299 Wait for exact text in output.
300
301 Args:
302 text: Exact text to find
303 timeout: Timeout in seconds
304
305 Raises:
306 pexpect.TIMEOUT: If text not found within timeout
307 """
308 if self.child is None:
309 raise RuntimeError("shell not started")
310
311 self.child.expect_exact(text, timeout=timeout or self.timeout)
312
313 def get_output(self) -> str:
314 """
315 Get output from the last expect operation.
316
317 Returns:
318 Output that appeared before the matched pattern
319 """
320 if self.child is None:
321 return ""
322 return self.child.before or ""
323
324 def get_clean_output(self) -> str:
325 """
326 Get cleaned output, filtering out terminal redraw noise.
327
328 Returns:
329 Cleaned output with prompts and redraws removed
330 """
331 if self.child is None:
332 return ""
333
334 raw = self.child.before or ""
335
336 # Split into lines and filter
337 lines = raw.split('\n')
338 clean_lines = []
339
340 for line in lines:
341 # Skip lines that are mostly prompt redraws
342 if ':: ~' in line and line.count('@') > 1:
343 continue
344 # Skip lines that look like partial prompts
345 if line.strip().endswith('> ') and len(line.strip()) < 10:
346 continue
347 # Keep the line if it has actual content
348 clean_lines.append(line)
349
350 return '\n'.join(clean_lines)
351
352 def get_match(self) -> str:
353 """
354 Get the text that matched the last expect pattern.
355
356 Returns:
357 Matched text
358 """
359 if self.child is None:
360 return ""
361 return self.child.after or ""
362
363 def run_command(self, command: str, timeout: Optional[float] = None) -> str:
364 """
365 Run a command and return its output.
366
367 Uses a unique marker for reliable output detection instead of
368 prompt matching, which can be unreliable with terminal redraws.
369
370 Args:
371 command: Shell command to run
372 timeout: Timeout for the command
373
374 Returns:
375 Command output (excluding the prompt and marker)
376 """
377 # Send command followed by marker echo
378 self.send_line(command)
379 self.send_line(f"echo {self.END_MARKER}")
380
381 # Wait for the marker to appear
382 self.expect(self.END_MARKER, timeout=timeout)
383 output = self.get_output()
384
385 # Clean up the output
386 # Remove the marker echo command and any prompts
387 lines = []
388 for line in output.split('\n'):
389 # Skip lines containing the marker or prompt patterns
390 if self.END_MARKER in line:
391 continue
392 if line.strip().startswith(('>', '$', '#', '%')):
393 continue
394 # Skip the echoed command
395 if command in line:
396 continue
397 lines.append(line)
398
399 return '\n'.join(lines).strip()
400
401 def interrupt(self) -> None:
402 """Send Ctrl+C to interrupt current operation."""
403 self.send_key("C-c")
404
405 def suspend(self) -> None:
406 """Send Ctrl+Z to suspend current operation."""
407 self.send_key("C-z")
408
409 def eof(self) -> None:
410 """Send Ctrl+D (EOF)."""
411 self.send_key("C-d")
412
413 @property
414 def is_running(self) -> bool:
415 """Check if shell is still running."""
416 if self.child is None:
417 return False
418 return self.child.isalive()
419
420 def set_terminal_size(self, rows: int, cols: int) -> None:
421 """
422 Change the terminal dimensions (triggers SIGWINCH).
423
424 Args:
425 rows: Number of rows
426 cols: Number of columns
427 """
428 if self.child is None:
429 raise RuntimeError("shell not started")
430 self.child.setwinsize(rows, cols)
431
432
433 class ShellTestSession:
434 """
435 Context manager for shell test sessions.
436
437 Usage:
438 with ShellTestSession() as sh:
439 sh.send_line("echo hello")
440 output = sh.wait_for_prompt()
441 assert "hello" in output
442 """
443
444 def __init__(self, **kwargs):
445 self.kwargs = kwargs
446 self.pty: Optional[ShellPTY] = None
447
448 def __enter__(self) -> ShellPTY:
449 self.pty = ShellPTY(**self.kwargs)
450 self.pty.start()
451 return self.pty
452
453 def __exit__(self, exc_type, exc_val, exc_tb):
454 if self.pty:
455 self.pty.stop()
456 return False
457
458
459 # Convenience function for quick testing
460 def quick_test(command: str, shell_path: str = None) -> str:
461 """
462 Run a single command in shell and return output.
463
464 Args:
465 command: Command to run
466 shell_path: Path to shell binary
467
468 Returns:
469 Command output
470 """
471 with ShellTestSession(shell_path=shell_path) as sh:
472 return sh.run_command(command)