Rust · 13860 bytes Raw Blame History
1 //! PTY-backed subprocess driver for codex-style parallel threads.
2 //!
3 //! Where `core::chat` shells out to `claude -p <prompt>` as a
4 //! stream-json subprocess and renders the parsed events as cards,
5 //! this module spawns `claude --resume <id>` (or
6 //! `claude --session-id <uuid>` for new sessions) inside a real
7 //! pseudo-terminal. Output is raw ANSI-encoded TTY bytes, rendered
8 //! by `xterm.js` in the webview. Input is keystrokes from the
9 //! terminal widget, forwarded to the child's stdin.
10 //!
11 //! # Lifecycle
12 //!
13 //! PTYs are **persistent across session switches**. A PTY only dies
14 //! when the user explicitly closes it, the subprocess exits on its
15 //! own (e.g., `/exit` slash command), or the app quits. This is
16 //! what enables the codex-parallel-thread behavior where you can
17 //! bounce between projects and everything keeps running.
18 //!
19 //! # Ring buffer
20 //!
21 //! Each PTY keeps a bounded ring buffer of recent stdout bytes
22 //! ([`RingBuffer`]) so we can replay state into a fresh `xterm.js`
23 //! instance on reattach. The cap is 256 KB — enough for ~3 screens
24 //! of dense ANSI output without unbounded memory per background
25 //! terminal. When the cap is reached, the oldest bytes are dropped.
26 //! This can truncate in the middle of an ANSI escape sequence;
27 //! xterm.js handles that by rendering one garbage character and
28 //! recovering.
29 //!
30 //! # Threading
31 //!
32 //! `portable-pty` is synchronous. Reader and wait tasks run under
33 //! `tauri::async_runtime::spawn_blocking`, never raw `tokio::spawn`.
34 //! This follows the same rule as `core::chat` — see the
35 //! `tauri_tokio_runtime_rule` memory for the full explanation and
36 //! the two real crashes that enforced it.
37
38 use std::collections::VecDeque;
39 use std::io::{Read, Write};
40 use std::path::PathBuf;
41 use std::sync::{Arc, Mutex};
42
43 use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
44 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
45 use tokio::sync::oneshot;
46
47 use crate::core::error::{CoreError, CoreResult};
48
49 /// Bounded ring buffer of recent PTY stdout bytes. Used for replay
50 /// when a detached xterm.js reattaches to a running PTY.
51 ///
52 /// New bytes are appended; when the capacity is reached, oldest
53 /// bytes are popped from the front to make room. A single push of
54 /// more than `cap` bytes replaces the whole buffer with the tail.
55 #[derive(Debug)]
56 pub struct RingBuffer {
57 buf: VecDeque<u8>,
58 cap: usize,
59 }
60
61 impl RingBuffer {
62 pub fn with_capacity(cap: usize) -> Self {
63 Self {
64 buf: VecDeque::with_capacity(cap.min(16 * 1024)),
65 cap,
66 }
67 }
68
69 /// Append `bytes` to the buffer, dropping oldest bytes as
70 /// needed to stay within the capacity cap.
71 pub fn push(&mut self, bytes: &[u8]) {
72 if bytes.is_empty() {
73 return;
74 }
75 if bytes.len() >= self.cap {
76 // Single push bigger than the whole buffer — keep only
77 // the last `cap` bytes.
78 self.buf.clear();
79 self.buf.extend(&bytes[bytes.len() - self.cap..]);
80 return;
81 }
82 while self.buf.len() + bytes.len() > self.cap {
83 self.buf.pop_front();
84 }
85 self.buf.extend(bytes);
86 }
87
88 /// Current number of buffered bytes.
89 pub fn len(&self) -> usize {
90 self.buf.len()
91 }
92
93 pub fn is_empty(&self) -> bool {
94 self.buf.is_empty()
95 }
96
97 /// Copy the current buffer contents into a fresh `Vec<u8>`.
98 /// Used on reattach to base64-encode the snapshot and replay
99 /// it into the freshly-mounted xterm.js instance.
100 pub fn snapshot(&self) -> Vec<u8> {
101 self.buf.iter().copied().collect()
102 }
103 }
104
105 /// Default ring-buffer capacity per PTY. 256 KB ≈ three screens of
106 /// dense ANSI output. Larger means unbounded memory for long-lived
107 /// background terminals; smaller means reattach shows less history.
108 pub const DEFAULT_RING_CAPACITY: usize = 256 * 1024;
109
110 /// Caller-supplied parameters for spawning a PTY-backed claude
111 /// subprocess. Mirrors [`crate::core::chat::TurnRequest`] but
112 /// terminal-flavored.
113 #[derive(Debug, Clone)]
114 pub struct PtyRequest {
115 pub pty_id: String,
116 pub cwd: PathBuf,
117 pub claude_bin: PathBuf,
118 pub args: Vec<String>,
119 pub cols: u16,
120 pub rows: u16,
121 pub env: Vec<(String, String)>,
122 }
123
124 /// Handle returned by [`spawn_pty`]. The commands layer stores this
125 /// in `AppState.active_ptys`; the xterm-facing frontend reads from
126 /// `data_rx` via the Tauri event bus forwarder, writes via `writer`,
127 /// and resizes via `master`. Dropping `kill_tx` (or sending on it)
128 /// terminates the subprocess; `exit_rx` fires once with the final
129 /// exit code.
130 pub struct PtyHandle {
131 /// Stream of stdout chunks from the subprocess. Base64-encoded
132 /// and forwarded to the webview as `pty:data` events.
133 pub data_rx: UnboundedReceiver<Vec<u8>>,
134 /// Fires exactly once when the subprocess exits (naturally or
135 /// via kill). `None` means we couldn't extract an exit code
136 /// (e.g., killed by signal).
137 pub exit_rx: oneshot::Receiver<Option<i32>>,
138 /// Writer side of the PTY master. The commands layer locks and
139 /// writes keystrokes into this.
140 pub writer: Arc<Mutex<Box<dyn Write + Send>>>,
141 /// Master handle — used for `resize` when the xterm pane's
142 /// container dimensions change.
143 pub master: Arc<Mutex<Box<dyn MasterPty + Send>>>,
144 /// Send `()` to request cancellation. The kill watcher task
145 /// picks it up and calls `ChildKiller::kill`, after which the
146 /// wait task reaps the subprocess and emits `exit_rx`.
147 pub kill_tx: oneshot::Sender<()>,
148 /// Bounded ring buffer of recent stdout bytes, used to replay
149 /// state into a freshly-mounted xterm on reattach.
150 pub ring_buffer: Arc<Mutex<RingBuffer>>,
151 }
152
153 /// Spawn a claude subprocess inside a real pseudo-terminal and
154 /// start background tasks to pump its I/O.
155 ///
156 /// Concurrency layout:
157 /// - **Reader task** (`spawn_blocking`): blocks on
158 /// `master.try_clone_reader().read(&mut buf)`, copies each chunk
159 /// into the ring buffer *and* sends a clone to `data_rx`. Exits
160 /// on EOF or error.
161 /// - **Kill watcher** (async): awaits `kill_rx`. On signal, calls
162 /// `ChildKiller::kill` on the cloned killer (valid from any
163 /// thread) to start termination. The wait task then reaps.
164 /// - **Wait task** (`spawn_blocking`): owns the `Child`, calls
165 /// `wait()`, and sends the exit code on `exit_tx`.
166 ///
167 /// # Runtime context
168 ///
169 /// Must be called from within a tokio runtime context (async Tauri
170 /// command or `tauri::async_runtime::block_on`). `spawn_blocking`
171 /// and `spawn` both require a runtime handle. See the
172 /// `tauri_tokio_runtime_rule` memory for the full story.
173 pub fn spawn_pty(req: PtyRequest) -> CoreResult<PtyHandle> {
174 let pty_system = native_pty_system();
175 let pair = pty_system
176 .openpty(PtySize {
177 rows: req.rows,
178 cols: req.cols,
179 pixel_width: 0,
180 pixel_height: 0,
181 })
182 .map_err(|e| {
183 CoreError::PlainIo(std::io::Error::other(format!("openpty: {e}")))
184 })?;
185
186 // Build the command and spawn it through the slave. Drop slave
187 // after spawn — we don't need it further; the subprocess's
188 // stdin/stdout/stderr are wired through the slave's FDs already.
189 let mut cmd = CommandBuilder::new(&req.claude_bin);
190 cmd.cwd(&req.cwd);
191 for arg in &req.args {
192 cmd.arg(arg);
193 }
194 for (k, v) in &req.env {
195 cmd.env(k, v);
196 }
197
198 let child = pair
199 .slave
200 .spawn_command(cmd)
201 .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("spawn: {e}"))))?;
202 drop(pair.slave);
203
204 // Capture reader + writer from the master BEFORE wrapping the
205 // master in an Arc/Mutex. Both calls are non-consuming on the
206 // master, so `resize()` still works afterward.
207 let reader = pair.master.try_clone_reader().map_err(|e| {
208 CoreError::PlainIo(std::io::Error::other(format!("clone_reader: {e}")))
209 })?;
210 let writer = pair.master.take_writer().map_err(|e| {
211 CoreError::PlainIo(std::io::Error::other(format!("take_writer: {e}")))
212 })?;
213
214 let master: Arc<Mutex<Box<dyn MasterPty + Send>>> = Arc::new(Mutex::new(pair.master));
215 let writer: Arc<Mutex<Box<dyn Write + Send>>> = Arc::new(Mutex::new(writer));
216 let ring_buffer = Arc::new(Mutex::new(RingBuffer::with_capacity(DEFAULT_RING_CAPACITY)));
217
218 let (data_tx, data_rx) = unbounded_channel::<Vec<u8>>();
219 let (exit_tx, exit_rx) = oneshot::channel::<Option<i32>>();
220 let (kill_tx, kill_rx) = oneshot::channel::<()>();
221
222 // Reader task — `portable-pty` reads are synchronous, so this
223 // runs on the blocking pool.
224 {
225 let ring_buffer = ring_buffer.clone();
226 let mut reader = reader;
227 tauri::async_runtime::spawn_blocking(move || {
228 let mut buf = vec![0u8; 4096];
229 loop {
230 match reader.read(&mut buf) {
231 Ok(0) => return, // EOF
232 Ok(n) => {
233 let chunk = buf[..n].to_vec();
234 if let Ok(mut rb) = ring_buffer.lock() {
235 rb.push(&chunk);
236 }
237 if data_tx.send(chunk).is_err() {
238 // Receiver gone — no point continuing.
239 return;
240 }
241 }
242 Err(err) => {
243 tracing::debug!(error = %err, "pty reader exiting");
244 return;
245 }
246 }
247 }
248 });
249 }
250
251 // Clone the child killer BEFORE moving the child into the wait
252 // task. The killer is a separate handle and can live in the
253 // kill-watcher task.
254 let mut child = child;
255 let killer = child.clone_killer();
256
257 // Kill watcher — async, no blocking work. Just awaits the
258 // signal and calls kill on the cloned killer.
259 tauri::async_runtime::spawn(async move {
260 if kill_rx.await.is_ok() {
261 let mut killer = killer;
262 if let Err(err) = killer.kill() {
263 tracing::warn!(error = %err, "pty kill failed");
264 }
265 }
266 });
267
268 // Wait task — blocking `child.wait()`. When the subprocess
269 // exits (naturally or because we killed it), this fires the
270 // exit_tx once.
271 tauri::async_runtime::spawn_blocking(move || {
272 let status = child.wait();
273 let code = match status {
274 Ok(s) => {
275 // portable-pty's ExitStatus::exit_code returns u32.
276 // Map to i32 for frontend consumption; saturate on
277 // overflow (shouldn't happen in practice).
278 Some(s.exit_code() as i32)
279 }
280 Err(err) => {
281 tracing::warn!(error = %err, "pty wait failed");
282 None
283 }
284 };
285 let _ = exit_tx.send(code);
286 });
287
288 Ok(PtyHandle {
289 data_rx,
290 exit_rx,
291 writer,
292 master,
293 kill_tx,
294 ring_buffer,
295 })
296 }
297
298 /// Write bytes into the PTY master. Convenience wrapper that the
299 /// commands layer calls from `write_pty`.
300 pub fn write_pty(writer: &Arc<Mutex<Box<dyn Write + Send>>>, data: &[u8]) -> CoreResult<()> {
301 let mut w = writer
302 .lock()
303 .map_err(|_| CoreError::PlainIo(std::io::Error::other("writer poisoned")))?;
304 w.write_all(data)
305 .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("write: {e}"))))?;
306 w.flush()
307 .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("flush: {e}"))))?;
308 Ok(())
309 }
310
311 /// Resize the PTY master. Propagates to the subprocess via
312 /// SIGWINCH on unix; claude redraws automatically.
313 pub fn resize_pty(
314 master: &Arc<Mutex<Box<dyn MasterPty + Send>>>,
315 cols: u16,
316 rows: u16,
317 ) -> CoreResult<()> {
318 let m = master
319 .lock()
320 .map_err(|_| CoreError::PlainIo(std::io::Error::other("master poisoned")))?;
321 m.resize(PtySize {
322 rows,
323 cols,
324 pixel_width: 0,
325 pixel_height: 0,
326 })
327 .map_err(|e| CoreError::PlainIo(std::io::Error::other(format!("resize: {e}"))))?;
328 Ok(())
329 }
330
331 #[cfg(test)]
332 mod tests {
333 use super::*;
334
335 #[test]
336 fn push_under_capacity() {
337 let mut rb = RingBuffer::with_capacity(1024);
338 rb.push(b"hello");
339 rb.push(b" world");
340 assert_eq!(rb.len(), 11);
341 assert_eq!(rb.snapshot(), b"hello world");
342 }
343
344 #[test]
345 fn push_drops_oldest_when_full() {
346 let mut rb = RingBuffer::with_capacity(8);
347 rb.push(b"abcd");
348 rb.push(b"efgh"); // exactly at capacity
349 assert_eq!(rb.snapshot(), b"abcdefgh");
350 rb.push(b"ij"); // drops "ab"
351 assert_eq!(rb.snapshot(), b"cdefghij");
352 }
353
354 #[test]
355 fn single_oversized_push_keeps_tail() {
356 let mut rb = RingBuffer::with_capacity(4);
357 rb.push(b"abcdefghij"); // 10 bytes into a 4-byte buffer
358 assert_eq!(rb.snapshot(), b"ghij");
359 assert_eq!(rb.len(), 4);
360 }
361
362 #[test]
363 fn cumulative_push_respects_cap() {
364 let mut rb = RingBuffer::with_capacity(256 * 1024);
365 // Push 512 KB total via 4 KB chunks.
366 let chunk = vec![b'x'; 4096];
367 for _ in 0..128 {
368 rb.push(&chunk);
369 }
370 assert_eq!(rb.len(), 256 * 1024);
371 // Every byte should be 'x'.
372 assert!(rb.snapshot().iter().all(|&b| b == b'x'));
373 }
374
375 #[test]
376 fn empty_push_is_noop() {
377 let mut rb = RingBuffer::with_capacity(16);
378 rb.push(b"foo");
379 rb.push(b"");
380 assert_eq!(rb.snapshot(), b"foo");
381 }
382
383 #[test]
384 fn is_empty_before_any_push() {
385 let rb = RingBuffer::with_capacity(16);
386 assert!(rb.is_empty());
387 }
388 }
389