| 1 | //! Subprocess driver for live chat turns. |
| 2 | //! |
| 3 | //! v1.0 spawns `claude` once per user turn: |
| 4 | //! |
| 5 | //! ```text |
| 6 | //! claude -p <prompt> |
| 7 | //! --input-format stream-json |
| 8 | //! --output-format stream-json |
| 9 | //! --verbose |
| 10 | //! --include-partial-messages |
| 11 | //! --permission-mode <mode> |
| 12 | //! [--resume <id> | --session-id <uuid>] |
| 13 | //! ``` |
| 14 | //! |
| 15 | //! The single-turn model keeps the subprocess lifetime trivially |
| 16 | //! scoped and sidesteps partial-write races that a persistent-stdin |
| 17 | //! design would introduce. v1.1+ can upgrade to long-lived stdin |
| 18 | //! piping by: |
| 19 | //! |
| 20 | //! 1. Dropping `-p <prompt>` in favour of writing JSONL to child |
| 21 | //! stdin: `{"type":"user","message":{"role":"user","content":"..."}}`. |
| 22 | //! 2. Switching `.stdin(Stdio::null())` → `.stdin(Stdio::piped())`. |
| 23 | //! 3. Keeping the `TurnHandle` alive across turns and feeding |
| 24 | //! subsequent prompts via the open stdin handle. |
| 25 | //! |
| 26 | //! `--include-partial-messages` is why we care about |
| 27 | //! [`TurnEvent::Message`] mutation on the frontend: the same |
| 28 | //! assistant message id arrives as multiple events, and the store |
| 29 | //! merges their content blocks in place. |
| 30 | //! |
| 31 | //! # Cancellation |
| 32 | //! |
| 33 | //! A `oneshot::Sender<()>` returned in [`TurnHandle::kill_tx`]. |
| 34 | //! Sending on it triggers a `tokio::select!` branch in the wait |
| 35 | //! task that calls `child.start_kill()` and emits |
| 36 | //! [`TurnEvent::Cancelled`]. The commands layer stores the sender |
| 37 | //! in `AppState.active_turns` and drops it when `cancel_turn` |
| 38 | //! fires. |
| 39 | //! |
| 40 | //! # Threading |
| 41 | //! |
| 42 | //! Every task spawned in here uses `tauri::async_runtime::spawn`, |
| 43 | //! never raw `tokio::spawn` — raw tokio in a Tauri callback context |
| 44 | //! panics with "there is no reactor running" on app startup. This |
| 45 | //! was a real crash in v0 and the rule is absolute. |
| 46 | //! |
| 47 | //! Tests drive the driver via `tauri::async_runtime::block_on` so |
| 48 | //! they share the same global runtime as the spawned tasks. |
| 49 | |
| 50 | use std::path::PathBuf; |
| 51 | use std::process::Stdio; |
| 52 | |
| 53 | use tokio::io::{AsyncBufReadExt, BufReader}; |
| 54 | use tokio::process::Command; |
| 55 | use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; |
| 56 | use tokio::sync::oneshot; |
| 57 | |
| 58 | use crate::core::error::{CoreError, CoreResult}; |
| 59 | use crate::core::reader::raw_to_message; |
| 60 | use crate::core::schema::{Message, PermissionMode, RawEvent, StreamStatus}; |
| 61 | |
| 62 | /// Hard cap on stderr bytes retained per turn. Runaway subprocesses |
| 63 | /// printing megabytes of errors shouldn't be able to OOM the app. |
| 64 | const STDERR_CAP_BYTES: usize = 8 * 1024; |
| 65 | |
| 66 | /// What the commands layer hands us when starting a turn. |
| 67 | #[derive(Debug, Clone)] |
| 68 | pub struct TurnRequest { |
| 69 | /// Uuid generated by the commands layer. Currently only used |
| 70 | /// for logging / telemetry — routing is handled by the |
| 71 | /// returned receiver. |
| 72 | pub turn_id: String, |
| 73 | /// Absolute working directory — becomes the subprocess's cwd. |
| 74 | /// Must exist on disk; verified by the caller before spawn. |
| 75 | pub cwd: PathBuf, |
| 76 | /// `Some(id)` resumes an existing session's jsonl. `None` starts |
| 77 | /// a fresh session — set [`Self::new_session_id`] too in that |
| 78 | /// case so we know the uuid ahead of time. |
| 79 | pub resume_session_id: Option<String>, |
| 80 | /// Client-generated uuid for a new session, passed via |
| 81 | /// `--session-id`. Ignored when `resume_session_id` is set. |
| 82 | pub new_session_id: Option<String>, |
| 83 | pub prompt: String, |
| 84 | pub permission_mode: PermissionMode, |
| 85 | /// Absolute path to the `claude` binary. Resolved once at |
| 86 | /// startup via a zero-dep PATH walker and cached in `AppState`. |
| 87 | pub claude_bin: PathBuf, |
| 88 | /// Extra env vars passed to the spawned subprocess in addition |
| 89 | /// to whatever it inherits from the parent. Empty by default; |
| 90 | /// tests use this to configure `fake_claude` without racing on |
| 91 | /// process-wide `std::env::set_var`. |
| 92 | pub env: Vec<(String, String)>, |
| 93 | } |
| 94 | |
| 95 | /// One event surfaced by the driver. The commands layer maps these |
| 96 | /// onto the Tauri event bus as `chat:event` payloads. |
| 97 | #[derive(Debug, Clone)] |
| 98 | pub enum TurnEvent { |
| 99 | /// First subprocess event that carried a `sessionId`. Emitted |
| 100 | /// exactly once per turn. For new sessions this mirrors what we |
| 101 | /// already passed via `--session-id`; for resumes it confirms |
| 102 | /// the id. |
| 103 | SessionBound { session_id: String }, |
| 104 | /// A frontend-ready [`Message`] parsed from one stdout line and |
| 105 | /// already run through `raw_to_message`. Assistant messages |
| 106 | /// arrive with `status: Some(Streaming)`; the store is |
| 107 | /// responsible for merging partials by id. |
| 108 | Message(Message), |
| 109 | /// A single line of stderr, forwarded for debugging / error |
| 110 | /// surfacing. Buffering is capped at [`STDERR_CAP_BYTES`]. |
| 111 | Stderr(String), |
| 112 | /// Subprocess exited on its own. Exit code 0 is success; non-zero |
| 113 | /// triggers a `turn_failed` on the event bus. |
| 114 | Completed { exit_code: i32 }, |
| 115 | /// Spawn failed or a task couldn't recover — surfaces a message |
| 116 | /// the commands layer forwards as `turn_failed`. |
| 117 | Failed { reason: String }, |
| 118 | /// User requested cancel and the child was killed. Partial |
| 119 | /// messages already emitted remain valid. |
| 120 | Cancelled, |
| 121 | } |
| 122 | |
| 123 | /// Handle returned by [`spawn_turn`]. The commands layer keeps the |
| 124 | /// `kill_tx` in `AppState.active_turns`; dropping or sending on it |
| 125 | /// cancels the turn. The receiver streams events until a terminal |
| 126 | /// variant is emitted and then closes. |
| 127 | pub struct TurnHandle { |
| 128 | pub receiver: UnboundedReceiver<TurnEvent>, |
| 129 | pub kill_tx: oneshot::Sender<()>, |
| 130 | } |
| 131 | |
| 132 | /// Spawn a chat turn. The returned receiver will emit a sequence of |
| 133 | /// [`TurnEvent`]s ending in exactly one of `Completed`, `Failed`, |
| 134 | /// or `Cancelled`. After the terminal event the sender drops and |
| 135 | /// `recv()` returns `None`. |
| 136 | /// |
| 137 | /// # Runtime context |
| 138 | /// |
| 139 | /// **Callers must invoke this from inside a tokio runtime** (either |
| 140 | /// an `async fn` Tauri command, a `tauri::async_runtime::spawn` |
| 141 | /// block, or a `tauri::async_runtime::block_on` in tests). |
| 142 | /// `tokio::process::Command::spawn` internally wraps the child's |
| 143 | /// pipes in `PollEvented`, which calls `Handle::current()` and |
| 144 | /// panics if no tokio runtime is present. The panic message is |
| 145 | /// `"there is no reactor running, must be called from the context |
| 146 | /// of a Tokio 1.x runtime"`. The function itself is synchronous |
| 147 | /// (no `.await`s) — the constraint is purely about the calling |
| 148 | /// context. |
| 149 | pub fn spawn_turn(req: TurnRequest) -> CoreResult<TurnHandle> { |
| 150 | let mut cmd = Command::new(&req.claude_bin); |
| 151 | for (k, v) in &req.env { |
| 152 | cmd.env(k, v); |
| 153 | } |
| 154 | cmd.current_dir(&req.cwd) |
| 155 | .arg("-p") |
| 156 | .arg(&req.prompt) |
| 157 | // NOTE: we intentionally do NOT pass `--input-format stream-json` |
| 158 | // here. That flag tells claude to read JSONL turns from stdin |
| 159 | // instead of using the `-p <text>` positional; combined with |
| 160 | // our `Stdio::null()` the CLI ends up with an empty stdin, |
| 161 | // processes zero turns, and exits cleanly after emitting only |
| 162 | // session-init metadata (pr-link / custom-title / etc.). The |
| 163 | // user's prompt silently disappears. |
| 164 | // |
| 165 | // v1.1's upgrade path to long-lived stdin piping will flip |
| 166 | // both `--input-format stream-json` AND `Stdio::piped()` for |
| 167 | // stdin in lock-step, and pump `{"type":"user","message":…}` |
| 168 | // JSONL lines into the child. Do not re-add this flag in the |
| 169 | // one-shot `-p` path. |
| 170 | .arg("--output-format") |
| 171 | .arg("stream-json") |
| 172 | .arg("--verbose") |
| 173 | .arg("--include-partial-messages") |
| 174 | .arg("--permission-mode") |
| 175 | .arg(req.permission_mode.as_cli()) |
| 176 | .stdin(Stdio::null()) |
| 177 | .stdout(Stdio::piped()) |
| 178 | .stderr(Stdio::piped()) |
| 179 | .kill_on_drop(true); |
| 180 | |
| 181 | // Resume and new-session are mutually exclusive; resume wins if |
| 182 | // both are accidentally set. |
| 183 | if let Some(resume_id) = &req.resume_session_id { |
| 184 | cmd.arg("--resume").arg(resume_id); |
| 185 | } else if let Some(new_id) = &req.new_session_id { |
| 186 | cmd.arg("--session-id").arg(new_id); |
| 187 | } |
| 188 | |
| 189 | let mut child = cmd.spawn().map_err(CoreError::from)?; |
| 190 | |
| 191 | let stdout = child |
| 192 | .stdout |
| 193 | .take() |
| 194 | .ok_or_else(|| CoreError::PlainIo(std::io::Error::other("claude child has no stdout")))?; |
| 195 | let stderr = child |
| 196 | .stderr |
| 197 | .take() |
| 198 | .ok_or_else(|| CoreError::PlainIo(std::io::Error::other("claude child has no stderr")))?; |
| 199 | |
| 200 | let (tx, rx) = unbounded_channel::<TurnEvent>(); |
| 201 | let (kill_tx, kill_rx) = oneshot::channel::<()>(); |
| 202 | |
| 203 | // Each reader task holds a oneshot sender that drops when the |
| 204 | // task returns. The wait task waits on both receivers before |
| 205 | // emitting the terminal event, so buffered stdout bytes that |
| 206 | // arrive after `child.wait()` returns still get through first. |
| 207 | let (stdout_done_tx, stdout_done_rx) = oneshot::channel::<()>(); |
| 208 | let (stderr_done_tx, stderr_done_rx) = oneshot::channel::<()>(); |
| 209 | |
| 210 | spawn_stdout_task(stdout, tx.clone(), stdout_done_tx); |
| 211 | spawn_stderr_task(stderr, tx.clone(), stderr_done_tx); |
| 212 | spawn_wait_task(child, kill_rx, tx, stdout_done_rx, stderr_done_rx); |
| 213 | |
| 214 | Ok(TurnHandle { |
| 215 | receiver: rx, |
| 216 | kill_tx, |
| 217 | }) |
| 218 | } |
| 219 | |
| 220 | fn spawn_stdout_task( |
| 221 | stdout: tokio::process::ChildStdout, |
| 222 | tx: UnboundedSender<TurnEvent>, |
| 223 | done: oneshot::Sender<()>, |
| 224 | ) { |
| 225 | tauri::async_runtime::spawn(async move { |
| 226 | // Move the sender into the task so it drops when the task |
| 227 | // returns, notifying the wait task that we've finished |
| 228 | // draining the pipe. |
| 229 | let _done_guard = done; |
| 230 | let reader = BufReader::new(stdout); |
| 231 | let mut lines = reader.lines(); |
| 232 | let mut session_bound = false; |
| 233 | let mut fallback_counter: u32 = 0; |
| 234 | |
| 235 | loop { |
| 236 | match lines.next_line().await { |
| 237 | Ok(Some(line)) => { |
| 238 | if line.is_empty() { |
| 239 | continue; |
| 240 | } |
| 241 | let ev: RawEvent = match serde_json::from_str(&line) { |
| 242 | Ok(e) => e, |
| 243 | Err(err) => { |
| 244 | tracing::warn!( |
| 245 | error = %err, |
| 246 | line = %line, |
| 247 | "skipping malformed chat stdout line" |
| 248 | ); |
| 249 | continue; |
| 250 | } |
| 251 | }; |
| 252 | |
| 253 | // Emit SessionBound the first time we see a sessionId. |
| 254 | if !session_bound { |
| 255 | if let Some(sid) = &ev.session_id { |
| 256 | session_bound = true; |
| 257 | if tx |
| 258 | .send(TurnEvent::SessionBound { |
| 259 | session_id: sid.clone(), |
| 260 | }) |
| 261 | .is_err() |
| 262 | { |
| 263 | return; |
| 264 | } |
| 265 | } |
| 266 | } |
| 267 | |
| 268 | // Convert to a frontend-ready Message. `raw_to_message` |
| 269 | // filters session-level metadata events (permission-mode, |
| 270 | // custom-title, progress, etc.) and sidechain entries. |
| 271 | if let Some(mut msg) = raw_to_message(ev, &mut fallback_counter) { |
| 272 | // Flag assistant messages as streaming so the store |
| 273 | // knows to merge subsequent partials. |
| 274 | if let Message::Assistant { status, .. } = &mut msg { |
| 275 | *status = Some(StreamStatus::Streaming); |
| 276 | } |
| 277 | if tx.send(TurnEvent::Message(msg)).is_err() { |
| 278 | return; |
| 279 | } |
| 280 | } |
| 281 | } |
| 282 | Ok(None) => return, // clean EOF |
| 283 | Err(err) => { |
| 284 | tracing::warn!(error = %err, "chat stdout read error"); |
| 285 | return; |
| 286 | } |
| 287 | } |
| 288 | } |
| 289 | }); |
| 290 | } |
| 291 | |
| 292 | fn spawn_stderr_task( |
| 293 | stderr: tokio::process::ChildStderr, |
| 294 | tx: UnboundedSender<TurnEvent>, |
| 295 | done: oneshot::Sender<()>, |
| 296 | ) { |
| 297 | tauri::async_runtime::spawn(async move { |
| 298 | let _done_guard = done; |
| 299 | let reader = BufReader::new(stderr); |
| 300 | let mut lines = reader.lines(); |
| 301 | let mut total_bytes: usize = 0; |
| 302 | |
| 303 | loop { |
| 304 | match lines.next_line().await { |
| 305 | Ok(Some(line)) => { |
| 306 | if total_bytes >= STDERR_CAP_BYTES { |
| 307 | // Silently drop beyond the cap; we've already |
| 308 | // surfaced the first 8 KB which is enough for |
| 309 | // any reasonable error trail. |
| 310 | continue; |
| 311 | } |
| 312 | total_bytes += line.len() + 1; |
| 313 | if tx.send(TurnEvent::Stderr(line)).is_err() { |
| 314 | return; |
| 315 | } |
| 316 | } |
| 317 | Ok(None) => return, |
| 318 | Err(err) => { |
| 319 | tracing::warn!(error = %err, "chat stderr read error"); |
| 320 | return; |
| 321 | } |
| 322 | } |
| 323 | } |
| 324 | }); |
| 325 | } |
| 326 | |
| 327 | /// Owns the [`tokio::process::Child`] for the duration of the turn, |
| 328 | /// races a natural `wait()` against the user-cancel signal, **waits |
| 329 | /// for both reader tasks to finish draining**, and emits the single |
| 330 | /// terminal event. |
| 331 | /// |
| 332 | /// The drain step is load-bearing: `child.wait()` returns as soon as |
| 333 | /// the subprocess exits, but there may still be buffered bytes in |
| 334 | /// the stdout pipe. Emitting `Completed` before the reader task has |
| 335 | /// consumed them would truncate the event stream from the caller's |
| 336 | /// point of view. The reader tasks each hold a `oneshot::Sender` |
| 337 | /// that drops on task return, resolving the receivers we await here. |
| 338 | fn spawn_wait_task( |
| 339 | mut child: tokio::process::Child, |
| 340 | kill_rx: oneshot::Receiver<()>, |
| 341 | tx: UnboundedSender<TurnEvent>, |
| 342 | stdout_done: oneshot::Receiver<()>, |
| 343 | stderr_done: oneshot::Receiver<()>, |
| 344 | ) { |
| 345 | tauri::async_runtime::spawn(async move { |
| 346 | let (was_cancelled, terminal) = tokio::select! { |
| 347 | // User asked us to cancel. |
| 348 | _ = kill_rx => { |
| 349 | let _ = child.start_kill(); |
| 350 | // Reap the process so we don't leak a zombie. |
| 351 | let _ = child.wait().await; |
| 352 | (true, TurnEvent::Cancelled) |
| 353 | } |
| 354 | // Process finished on its own. |
| 355 | status = child.wait() => { |
| 356 | let ev = match status { |
| 357 | Ok(s) => TurnEvent::Completed { |
| 358 | exit_code: s.code().unwrap_or(-1), |
| 359 | }, |
| 360 | Err(err) => TurnEvent::Failed { |
| 361 | reason: format!("wait on claude child failed: {err}"), |
| 362 | }, |
| 363 | }; |
| 364 | (false, ev) |
| 365 | } |
| 366 | }; |
| 367 | |
| 368 | // Drain readers. `_ = stdout_done.await` resolves when the |
| 369 | // reader task returns (its Sender drops). On error (which |
| 370 | // shouldn't happen unless the Sender was dropped before |
| 371 | // being stored) we still proceed so we never hang here. |
| 372 | let _ = stdout_done.await; |
| 373 | let _ = stderr_done.await; |
| 374 | |
| 375 | // Only emit the terminal event after readers finished — |
| 376 | // guarantees ordering: every Message event precedes the |
| 377 | // terminal one. |
| 378 | let _ = tx.send(terminal); |
| 379 | // Keep the variable live for readability in the cancel path. |
| 380 | let _ = was_cancelled; |
| 381 | }); |
| 382 | } |
| 383 | |
| 384 | #[cfg(test)] |
| 385 | mod tests { |
| 386 | use super::*; |
| 387 | |
| 388 | #[test] |
| 389 | fn permission_mode_cli_strings() { |
| 390 | assert_eq!(PermissionMode::Auto.as_cli(), "auto"); |
| 391 | assert_eq!(PermissionMode::AcceptEdits.as_cli(), "acceptEdits"); |
| 392 | assert_eq!( |
| 393 | PermissionMode::BypassPermissions.as_cli(), |
| 394 | "bypassPermissions" |
| 395 | ); |
| 396 | assert_eq!(PermissionMode::Plan.as_cli(), "plan"); |
| 397 | } |
| 398 | } |
| 399 |