| 1 | //! PTY-backed subprocess driver for codex-style parallel threads. |
| 2 | //! |
| 3 | //! Where `core::chat` shells out to `claude -p <prompt>` as a |
| 4 | //! stream-json subprocess and renders the parsed events as cards, |
| 5 | //! this module spawns `claude --resume <id>` (or |
| 6 | //! `claude --session-id <uuid>` for new sessions) inside a real |
| 7 | //! pseudo-terminal. Output is raw ANSI-encoded TTY bytes, rendered |
| 8 | //! by `xterm.js` in the webview. Input is keystrokes from the |
| 9 | //! terminal widget, forwarded to the child's stdin. |
| 10 | //! |
| 11 | //! # Lifecycle |
| 12 | //! |
| 13 | //! PTYs are **persistent across session switches**. A PTY only dies |
| 14 | //! when the user explicitly closes it, the subprocess exits on its |
| 15 | //! own (e.g., `/exit` slash command), or the app quits. This is |
| 16 | //! what enables the codex-parallel-thread behavior where you can |
| 17 | //! bounce between projects and everything keeps running. |
| 18 | //! |
| 19 | //! # Ring buffer |
| 20 | //! |
| 21 | //! Each PTY keeps a bounded ring buffer of recent stdout bytes |
| 22 | //! ([`RingBuffer`]) so we can replay state into a fresh `xterm.js` |
| 23 | //! instance on reattach. The cap is 256 KB — enough for ~3 screens |
| 24 | //! of dense ANSI output without unbounded memory per background |
| 25 | //! terminal. When the cap is reached, the oldest bytes are dropped. |
| 26 | //! This can truncate in the middle of an ANSI escape sequence; |
| 27 | //! xterm.js handles that by rendering one garbage character and |
| 28 | //! recovering. |
| 29 | //! |
| 30 | //! # Threading |
| 31 | //! |
| 32 | //! `portable-pty` is synchronous. Reader and wait tasks run under |
| 33 | //! `tauri::async_runtime::spawn_blocking`, never raw `tokio::spawn`. |
| 34 | //! This follows the same rule as `core::chat` — see the |
| 35 | //! `tauri_tokio_runtime_rule` memory for the full explanation and |
| 36 | //! the two real crashes that enforced it. |
| 37 | |
| 38 | use std::collections::VecDeque; |
| 39 | use std::io::{Read, Write}; |
| 40 | use std::path::PathBuf; |
| 41 | use std::sync::{Arc, Mutex}; |
| 42 | |
| 43 | use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize}; |
| 44 | use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; |
| 45 | use tokio::sync::oneshot; |
| 46 | |
| 47 | use crate::core::error::{CoreError, CoreResult}; |
| 48 | |
| 49 | /// Bounded ring buffer of recent PTY stdout bytes. Used for replay |
| 50 | /// when a detached xterm.js reattaches to a running PTY. |
| 51 | /// |
| 52 | /// New bytes are appended; when the capacity is reached, oldest |
| 53 | /// bytes are popped from the front to make room. A single push of |
| 54 | /// more than `cap` bytes replaces the whole buffer with the tail. |
| 55 | #[derive(Debug)] |
| 56 | pub struct RingBuffer { |
| 57 | buf: VecDeque<u8>, |
| 58 | cap: usize, |
| 59 | } |
| 60 | |
| 61 | impl RingBuffer { |
| 62 | pub fn with_capacity(cap: usize) -> Self { |
| 63 | Self { |
| 64 | buf: VecDeque::with_capacity(cap.min(16 * 1024)), |
| 65 | cap, |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | /// Append `bytes` to the buffer, dropping oldest bytes as |
| 70 | /// needed to stay within the capacity cap. |
| 71 | pub fn push(&mut self, bytes: &[u8]) { |
| 72 | if bytes.is_empty() { |
| 73 | return; |
| 74 | } |
| 75 | if bytes.len() >= self.cap { |
| 76 | // Single push bigger than the whole buffer — keep only |
| 77 | // the last `cap` bytes. |
| 78 | self.buf.clear(); |
| 79 | self.buf.extend(&bytes[bytes.len() - self.cap..]); |
| 80 | return; |
| 81 | } |
| 82 | while self.buf.len() + bytes.len() > self.cap { |
| 83 | self.buf.pop_front(); |
| 84 | } |
| 85 | self.buf.extend(bytes); |
| 86 | } |
| 87 | |
| 88 | /// Current number of buffered bytes. |
| 89 | pub fn len(&self) -> usize { |
| 90 | self.buf.len() |
| 91 | } |
| 92 | |
| 93 | pub fn is_empty(&self) -> bool { |
| 94 | self.buf.is_empty() |
| 95 | } |
| 96 | |
| 97 | /// Copy the current buffer contents into a fresh `Vec<u8>`. |
| 98 | /// Used on reattach to base64-encode the snapshot and replay |
| 99 | /// it into the freshly-mounted xterm.js instance. |
| 100 | pub fn snapshot(&self) -> Vec<u8> { |
| 101 | self.buf.iter().copied().collect() |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | /// Default ring-buffer capacity per PTY. 256 KB ≈ three screens of |
| 106 | /// dense ANSI output. Larger means unbounded memory for long-lived |
| 107 | /// background terminals; smaller means reattach shows less history. |
| 108 | pub const DEFAULT_RING_CAPACITY: usize = 256 * 1024; |
| 109 | |
| 110 | /// Caller-supplied parameters for spawning a PTY-backed claude |
| 111 | /// subprocess. Mirrors [`crate::core::chat::TurnRequest`] but |
| 112 | /// terminal-flavored. |
| 113 | #[derive(Debug, Clone)] |
| 114 | pub struct PtyRequest { |
| 115 | pub pty_id: String, |
| 116 | pub cwd: PathBuf, |
| 117 | pub claude_bin: PathBuf, |
| 118 | pub args: Vec<String>, |
| 119 | pub cols: u16, |
| 120 | pub rows: u16, |
| 121 | pub env: Vec<(String, String)>, |
| 122 | } |
| 123 | |
| 124 | /// Handle returned by [`spawn_pty`]. The commands layer stores this |
| 125 | /// in `AppState.active_ptys`; the xterm-facing frontend reads from |
| 126 | /// `data_rx` via the Tauri event bus forwarder, writes via `writer`, |
| 127 | /// and resizes via `master`. Dropping `kill_tx` (or sending on it) |
| 128 | /// terminates the subprocess; `exit_rx` fires once with the final |
| 129 | /// exit code. |
| 130 | pub struct PtyHandle { |
| 131 | /// Stream of stdout chunks from the subprocess. Base64-encoded |
| 132 | /// and forwarded to the webview as `pty:data` events. |
| 133 | pub data_rx: UnboundedReceiver<Vec<u8>>, |
| 134 | /// Fires exactly once when the subprocess exits (naturally or |
| 135 | /// via kill). `None` means we couldn't extract an exit code |
| 136 | /// (e.g., killed by signal). |
| 137 | pub exit_rx: oneshot::Receiver<Option<i32>>, |
| 138 | /// Writer side of the PTY master. The commands layer locks and |
| 139 | /// writes keystrokes into this. |
| 140 | pub writer: Arc<Mutex<Box<dyn Write + Send>>>, |
| 141 | /// Master handle — used for `resize` when the xterm pane's |
| 142 | /// container dimensions change. |
| 143 | pub master: Arc<Mutex<Box<dyn MasterPty + Send>>>, |
| 144 | /// Send `()` to request cancellation. The kill watcher task |
| 145 | /// picks it up and calls `ChildKiller::kill`, after which the |
| 146 | /// wait task reaps the subprocess and emits `exit_rx`. |
| 147 | pub kill_tx: oneshot::Sender<()>, |
| 148 | /// Bounded ring buffer of recent stdout bytes, used to replay |
| 149 | /// state into a freshly-mounted xterm on reattach. |
| 150 | pub ring_buffer: Arc<Mutex<RingBuffer>>, |
| 151 | } |
| 152 | |
| 153 | /// Spawn a claude subprocess inside a real pseudo-terminal and |
| 154 | /// start background tasks to pump its I/O. |
| 155 | /// |
| 156 | /// Concurrency layout: |
| 157 | /// - **Reader task** (`spawn_blocking`): blocks on |
| 158 | /// `master.try_clone_reader().read(&mut buf)`, copies each chunk |
| 159 | /// into the ring buffer *and* sends a clone to `data_rx`. Exits |
| 160 | /// on EOF or error. |
| 161 | /// - **Kill watcher** (async): awaits `kill_rx`. On signal, calls |
| 162 | /// `ChildKiller::kill` on the cloned killer (valid from any |
| 163 | /// thread) to start termination. The wait task then reaps. |
| 164 | /// - **Wait task** (`spawn_blocking`): owns the `Child`, calls |
| 165 | /// `wait()`, and sends the exit code on `exit_tx`. |
| 166 | /// |
| 167 | /// # Runtime context |
| 168 | /// |
| 169 | /// Must be called from within a tokio runtime context (async Tauri |
| 170 | /// command or `tauri::async_runtime::block_on`). `spawn_blocking` |
| 171 | /// and `spawn` both require a runtime handle. See the |
| 172 | /// `tauri_tokio_runtime_rule` memory for the full story. |
| 173 | pub fn spawn_pty(req: PtyRequest) -> CoreResult<PtyHandle> { |
| 174 | let pty_system = native_pty_system(); |
| 175 | let pair = pty_system |
| 176 | .openpty(PtySize { |
| 177 | rows: req.rows, |
| 178 | cols: req.cols, |
| 179 | pixel_width: 0, |
| 180 | pixel_height: 0, |
| 181 | }) |
| 182 | .map_err(|e| { |
| 183 | CoreError::PlainIo(std::io::Error::other(format!("openpty: {e}"))) |
| 184 | })?; |
| 185 | |
| 186 | // Build the command and spawn it through the slave. Drop slave |
| 187 | // after spawn — we don't need it further; the subprocess's |
| 188 | // stdin/stdout/stderr are wired through the slave's FDs already. |
| 189 | let mut cmd = CommandBuilder::new(&req.claude_bin); |
| 190 | cmd.cwd(&req.cwd); |
| 191 | for arg in &req.args { |
| 192 | cmd.arg(arg); |
| 193 | } |
| 194 | for (k, v) in &req.env { |
| 195 | cmd.env(k, v); |
| 196 | } |
| 197 | |
| 198 | let child = pair |
| 199 | .slave |
| 200 | .spawn_command(cmd) |
| 201 | .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("spawn: {e}"))))?; |
| 202 | drop(pair.slave); |
| 203 | |
| 204 | // Capture reader + writer from the master BEFORE wrapping the |
| 205 | // master in an Arc/Mutex. Both calls are non-consuming on the |
| 206 | // master, so `resize()` still works afterward. |
| 207 | let reader = pair.master.try_clone_reader().map_err(|e| { |
| 208 | CoreError::PlainIo(std::io::Error::other(format!("clone_reader: {e}"))) |
| 209 | })?; |
| 210 | let writer = pair.master.take_writer().map_err(|e| { |
| 211 | CoreError::PlainIo(std::io::Error::other(format!("take_writer: {e}"))) |
| 212 | })?; |
| 213 | |
| 214 | let master: Arc<Mutex<Box<dyn MasterPty + Send>>> = Arc::new(Mutex::new(pair.master)); |
| 215 | let writer: Arc<Mutex<Box<dyn Write + Send>>> = Arc::new(Mutex::new(writer)); |
| 216 | let ring_buffer = Arc::new(Mutex::new(RingBuffer::with_capacity(DEFAULT_RING_CAPACITY))); |
| 217 | |
| 218 | let (data_tx, data_rx) = unbounded_channel::<Vec<u8>>(); |
| 219 | let (exit_tx, exit_rx) = oneshot::channel::<Option<i32>>(); |
| 220 | let (kill_tx, kill_rx) = oneshot::channel::<()>(); |
| 221 | |
| 222 | // Reader task — `portable-pty` reads are synchronous, so this |
| 223 | // runs on the blocking pool. |
| 224 | { |
| 225 | let ring_buffer = ring_buffer.clone(); |
| 226 | let mut reader = reader; |
| 227 | tauri::async_runtime::spawn_blocking(move || { |
| 228 | let mut buf = vec![0u8; 4096]; |
| 229 | loop { |
| 230 | match reader.read(&mut buf) { |
| 231 | Ok(0) => return, // EOF |
| 232 | Ok(n) => { |
| 233 | let chunk = buf[..n].to_vec(); |
| 234 | if let Ok(mut rb) = ring_buffer.lock() { |
| 235 | rb.push(&chunk); |
| 236 | } |
| 237 | if data_tx.send(chunk).is_err() { |
| 238 | // Receiver gone — no point continuing. |
| 239 | return; |
| 240 | } |
| 241 | } |
| 242 | Err(err) => { |
| 243 | tracing::debug!(error = %err, "pty reader exiting"); |
| 244 | return; |
| 245 | } |
| 246 | } |
| 247 | } |
| 248 | }); |
| 249 | } |
| 250 | |
| 251 | // Clone the child killer BEFORE moving the child into the wait |
| 252 | // task. The killer is a separate handle and can live in the |
| 253 | // kill-watcher task. |
| 254 | let mut child = child; |
| 255 | let killer = child.clone_killer(); |
| 256 | |
| 257 | // Kill watcher — async, no blocking work. Just awaits the |
| 258 | // signal and calls kill on the cloned killer. |
| 259 | tauri::async_runtime::spawn(async move { |
| 260 | if kill_rx.await.is_ok() { |
| 261 | let mut killer = killer; |
| 262 | if let Err(err) = killer.kill() { |
| 263 | tracing::warn!(error = %err, "pty kill failed"); |
| 264 | } |
| 265 | } |
| 266 | }); |
| 267 | |
| 268 | // Wait task — blocking `child.wait()`. When the subprocess |
| 269 | // exits (naturally or because we killed it), this fires the |
| 270 | // exit_tx once. |
| 271 | tauri::async_runtime::spawn_blocking(move || { |
| 272 | let status = child.wait(); |
| 273 | let code = match status { |
| 274 | Ok(s) => { |
| 275 | // portable-pty's ExitStatus::exit_code returns u32. |
| 276 | // Map to i32 for frontend consumption; saturate on |
| 277 | // overflow (shouldn't happen in practice). |
| 278 | Some(s.exit_code() as i32) |
| 279 | } |
| 280 | Err(err) => { |
| 281 | tracing::warn!(error = %err, "pty wait failed"); |
| 282 | None |
| 283 | } |
| 284 | }; |
| 285 | let _ = exit_tx.send(code); |
| 286 | }); |
| 287 | |
| 288 | Ok(PtyHandle { |
| 289 | data_rx, |
| 290 | exit_rx, |
| 291 | writer, |
| 292 | master, |
| 293 | kill_tx, |
| 294 | ring_buffer, |
| 295 | }) |
| 296 | } |
| 297 | |
| 298 | /// Write bytes into the PTY master. Convenience wrapper that the |
| 299 | /// commands layer calls from `write_pty`. |
| 300 | pub fn write_pty(writer: &Arc<Mutex<Box<dyn Write + Send>>>, data: &[u8]) -> CoreResult<()> { |
| 301 | let mut w = writer |
| 302 | .lock() |
| 303 | .map_err(|_| CoreError::PlainIo(std::io::Error::other("writer poisoned")))?; |
| 304 | w.write_all(data) |
| 305 | .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("write: {e}"))))?; |
| 306 | w.flush() |
| 307 | .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("flush: {e}"))))?; |
| 308 | Ok(()) |
| 309 | } |
| 310 | |
| 311 | /// Resize the PTY master. Propagates to the subprocess via |
| 312 | /// SIGWINCH on unix; claude redraws automatically. |
| 313 | pub fn resize_pty( |
| 314 | master: &Arc<Mutex<Box<dyn MasterPty + Send>>>, |
| 315 | cols: u16, |
| 316 | rows: u16, |
| 317 | ) -> CoreResult<()> { |
| 318 | let m = master |
| 319 | .lock() |
| 320 | .map_err(|_| CoreError::PlainIo(std::io::Error::other("master poisoned")))?; |
| 321 | m.resize(PtySize { |
| 322 | rows, |
| 323 | cols, |
| 324 | pixel_width: 0, |
| 325 | pixel_height: 0, |
| 326 | }) |
| 327 | .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("resize: {e}"))))?; |
| 328 | Ok(()) |
| 329 | } |
| 330 | |
| 331 | #[cfg(test)] |
| 332 | mod tests { |
| 333 | use super::*; |
| 334 | |
| 335 | #[test] |
| 336 | fn push_under_capacity() { |
| 337 | let mut rb = RingBuffer::with_capacity(1024); |
| 338 | rb.push(b"hello"); |
| 339 | rb.push(b" world"); |
| 340 | assert_eq!(rb.len(), 11); |
| 341 | assert_eq!(rb.snapshot(), b"hello world"); |
| 342 | } |
| 343 | |
| 344 | #[test] |
| 345 | fn push_drops_oldest_when_full() { |
| 346 | let mut rb = RingBuffer::with_capacity(8); |
| 347 | rb.push(b"abcd"); |
| 348 | rb.push(b"efgh"); // exactly at capacity |
| 349 | assert_eq!(rb.snapshot(), b"abcdefgh"); |
| 350 | rb.push(b"ij"); // drops "ab" |
| 351 | assert_eq!(rb.snapshot(), b"cdefghij"); |
| 352 | } |
| 353 | |
| 354 | #[test] |
| 355 | fn single_oversized_push_keeps_tail() { |
| 356 | let mut rb = RingBuffer::with_capacity(4); |
| 357 | rb.push(b"abcdefghij"); // 10 bytes into a 4-byte buffer |
| 358 | assert_eq!(rb.snapshot(), b"ghij"); |
| 359 | assert_eq!(rb.len(), 4); |
| 360 | } |
| 361 | |
| 362 | #[test] |
| 363 | fn cumulative_push_respects_cap() { |
| 364 | let mut rb = RingBuffer::with_capacity(256 * 1024); |
| 365 | // Push 512 KB total via 4 KB chunks. |
| 366 | let chunk = vec![b'x'; 4096]; |
| 367 | for _ in 0..128 { |
| 368 | rb.push(&chunk); |
| 369 | } |
| 370 | assert_eq!(rb.len(), 256 * 1024); |
| 371 | // Every byte should be 'x'. |
| 372 | assert!(rb.snapshot().iter().all(|&b| b == b'x')); |
| 373 | } |
| 374 | |
| 375 | #[test] |
| 376 | fn empty_push_is_noop() { |
| 377 | let mut rb = RingBuffer::with_capacity(16); |
| 378 | rb.push(b"foo"); |
| 379 | rb.push(b""); |
| 380 | assert_eq!(rb.snapshot(), b"foo"); |
| 381 | } |
| 382 | |
| 383 | #[test] |
| 384 | fn is_empty_before_any_push() { |
| 385 | let rb = RingBuffer::with_capacity(16); |
| 386 | assert!(rb.is_empty()); |
| 387 | } |
| 388 | } |
| 389 |