Rust · 15905 bytes Raw Blame History
1 //! Subprocess driver for live chat turns.
2 //!
3 //! v1.0 spawns `claude` once per user turn:
4 //!
5 //! ```text
6 //! claude -p <prompt>
7 //! --input-format stream-json
8 //! --output-format stream-json
9 //! --verbose
10 //! --include-partial-messages
11 //! --permission-mode <mode>
12 //! [--resume <id> | --session-id <uuid>]
13 //! ```
14 //!
15 //! The single-turn model keeps the subprocess lifetime trivially
16 //! scoped and sidesteps partial-write races that a persistent-stdin
17 //! design would introduce. v1.1+ can upgrade to long-lived stdin
18 //! piping by:
19 //!
20 //! 1. Dropping `-p <prompt>` in favour of writing JSONL to child
21 //! stdin: `{"type":"user","message":{"role":"user","content":"..."}}`.
22 //! 2. Switching `.stdin(Stdio::null())` → `.stdin(Stdio::piped())`.
23 //! 3. Keeping the `TurnHandle` alive across turns and feeding
24 //! subsequent prompts via the open stdin handle.
25 //!
26 //! `--include-partial-messages` is why we care about
27 //! [`TurnEvent::Message`] mutation on the frontend: the same
28 //! assistant message id arrives as multiple events, and the store
29 //! merges their content blocks in place.
30 //!
31 //! # Cancellation
32 //!
33 //! A `oneshot::Sender<()>` returned in [`TurnHandle::kill_tx`].
34 //! Sending on it triggers a `tokio::select!` branch in the wait
35 //! task that calls `child.start_kill()` and emits
36 //! [`TurnEvent::Cancelled`]. The commands layer stores the sender
37 //! in `AppState.active_turns` and drops it when `cancel_turn`
38 //! fires.
39 //!
40 //! # Threading
41 //!
42 //! Every task spawned in here uses `tauri::async_runtime::spawn`,
43 //! never raw `tokio::spawn` — raw tokio in a Tauri callback context
44 //! panics with "there is no reactor running" on app startup. This
45 //! was a real crash in v0 and the rule is absolute.
46 //!
47 //! Tests drive the driver via `tauri::async_runtime::block_on` so
48 //! they share the same global runtime as the spawned tasks.
49
50 use std::path::PathBuf;
51 use std::process::Stdio;
52
53 use tokio::io::{AsyncBufReadExt, BufReader};
54 use tokio::process::Command;
55 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
56 use tokio::sync::oneshot;
57
58 use crate::core::error::{CoreError, CoreResult};
59 use crate::core::reader::raw_to_message;
60 use crate::core::schema::{Message, PermissionMode, RawEvent, StreamStatus};
61
62 /// Hard cap on stderr bytes retained per turn. Runaway subprocesses
63 /// printing megabytes of errors shouldn't be able to OOM the app.
64 const STDERR_CAP_BYTES: usize = 8 * 1024;
65
66 /// What the commands layer hands us when starting a turn.
67 #[derive(Debug, Clone)]
68 pub struct TurnRequest {
69 /// Uuid generated by the commands layer. Currently only used
70 /// for logging / telemetry — routing is handled by the
71 /// returned receiver.
72 pub turn_id: String,
73 /// Absolute working directory — becomes the subprocess's cwd.
74 /// Must exist on disk; verified by the caller before spawn.
75 pub cwd: PathBuf,
76 /// `Some(id)` resumes an existing session's jsonl. `None` starts
77 /// a fresh session — set [`Self::new_session_id`] too in that
78 /// case so we know the uuid ahead of time.
79 pub resume_session_id: Option<String>,
80 /// Client-generated uuid for a new session, passed via
81 /// `--session-id`. Ignored when `resume_session_id` is set.
82 pub new_session_id: Option<String>,
83 pub prompt: String,
84 pub permission_mode: PermissionMode,
85 /// Absolute path to the `claude` binary. Resolved once at
86 /// startup via a zero-dep PATH walker and cached in `AppState`.
87 pub claude_bin: PathBuf,
88 /// Extra env vars passed to the spawned subprocess in addition
89 /// to whatever it inherits from the parent. Empty by default;
90 /// tests use this to configure `fake_claude` without racing on
91 /// process-wide `std::env::set_var`.
92 pub env: Vec<(String, String)>,
93 }
94
95 /// One event surfaced by the driver. The commands layer maps these
96 /// onto the Tauri event bus as `chat:event` payloads.
97 #[derive(Debug, Clone)]
98 pub enum TurnEvent {
99 /// First subprocess event that carried a `sessionId`. Emitted
100 /// exactly once per turn. For new sessions this mirrors what we
101 /// already passed via `--session-id`; for resumes it confirms
102 /// the id.
103 SessionBound { session_id: String },
104 /// A frontend-ready [`Message`] parsed from one stdout line and
105 /// already run through `raw_to_message`. Assistant messages
106 /// arrive with `status: Some(Streaming)`; the store is
107 /// responsible for merging partials by id.
108 Message(Message),
109 /// A single line of stderr, forwarded for debugging / error
110 /// surfacing. Buffering is capped at [`STDERR_CAP_BYTES`].
111 Stderr(String),
112 /// Subprocess exited on its own. Exit code 0 is success; non-zero
113 /// triggers a `turn_failed` on the event bus.
114 Completed { exit_code: i32 },
115 /// Spawn failed or a task couldn't recover — surfaces a message
116 /// the commands layer forwards as `turn_failed`.
117 Failed { reason: String },
118 /// User requested cancel and the child was killed. Partial
119 /// messages already emitted remain valid.
120 Cancelled,
121 }
122
123 /// Handle returned by [`spawn_turn`]. The commands layer keeps the
124 /// `kill_tx` in `AppState.active_turns`; dropping or sending on it
125 /// cancels the turn. The receiver streams events until a terminal
126 /// variant is emitted and then closes.
127 pub struct TurnHandle {
128 pub receiver: UnboundedReceiver<TurnEvent>,
129 pub kill_tx: oneshot::Sender<()>,
130 }
131
132 /// Spawn a chat turn. The returned receiver will emit a sequence of
133 /// [`TurnEvent`]s ending in exactly one of `Completed`, `Failed`,
134 /// or `Cancelled`. After the terminal event the sender drops and
135 /// `recv()` returns `None`.
136 ///
137 /// # Runtime context
138 ///
139 /// **Callers must invoke this from inside a tokio runtime** (either
140 /// an `async fn` Tauri command, a `tauri::async_runtime::spawn`
141 /// block, or a `tauri::async_runtime::block_on` in tests).
142 /// `tokio::process::Command::spawn` internally wraps the child's
143 /// pipes in `PollEvented`, which calls `Handle::current()` and
144 /// panics if no tokio runtime is present. The panic message is
145 /// `"there is no reactor running, must be called from the context
146 /// of a Tokio 1.x runtime"`. The function itself is synchronous
147 /// (no `.await`s) — the constraint is purely about the calling
148 /// context.
149 pub fn spawn_turn(req: TurnRequest) -> CoreResult<TurnHandle> {
150 let mut cmd = Command::new(&req.claude_bin);
151 for (k, v) in &req.env {
152 cmd.env(k, v);
153 }
154 cmd.current_dir(&req.cwd)
155 .arg("-p")
156 .arg(&req.prompt)
157 // NOTE: we intentionally do NOT pass `--input-format stream-json`
158 // here. That flag tells claude to read JSONL turns from stdin
159 // instead of using the `-p <text>` positional; combined with
160 // our `Stdio::null()` the CLI ends up with an empty stdin,
161 // processes zero turns, and exits cleanly after emitting only
162 // session-init metadata (pr-link / custom-title / etc.). The
163 // user's prompt silently disappears.
164 //
165 // v1.1's upgrade path to long-lived stdin piping will flip
166 // both `--input-format stream-json` AND `Stdio::piped()` for
167 // stdin in lock-step, and pump `{"type":"user","message":…}`
168 // JSONL lines into the child. Do not re-add this flag in the
169 // one-shot `-p` path.
170 .arg("--output-format")
171 .arg("stream-json")
172 .arg("--verbose")
173 .arg("--include-partial-messages")
174 .arg("--permission-mode")
175 .arg(req.permission_mode.as_cli())
176 .stdin(Stdio::null())
177 .stdout(Stdio::piped())
178 .stderr(Stdio::piped())
179 .kill_on_drop(true);
180
181 // Resume and new-session are mutually exclusive; resume wins if
182 // both are accidentally set.
183 if let Some(resume_id) = &req.resume_session_id {
184 cmd.arg("--resume").arg(resume_id);
185 } else if let Some(new_id) = &req.new_session_id {
186 cmd.arg("--session-id").arg(new_id);
187 }
188
189 let mut child = cmd.spawn().map_err(CoreError::from)?;
190
191 let stdout = child
192 .stdout
193 .take()
194 .ok_or_else(|| CoreError::PlainIo(std::io::Error::other("claude child has no stdout")))?;
195 let stderr = child
196 .stderr
197 .take()
198 .ok_or_else(|| CoreError::PlainIo(std::io::Error::other("claude child has no stderr")))?;
199
200 let (tx, rx) = unbounded_channel::<TurnEvent>();
201 let (kill_tx, kill_rx) = oneshot::channel::<()>();
202
203 // Each reader task holds a oneshot sender that drops when the
204 // task returns. The wait task waits on both receivers before
205 // emitting the terminal event, so buffered stdout bytes that
206 // arrive after `child.wait()` returns still get through first.
207 let (stdout_done_tx, stdout_done_rx) = oneshot::channel::<()>();
208 let (stderr_done_tx, stderr_done_rx) = oneshot::channel::<()>();
209
210 spawn_stdout_task(stdout, tx.clone(), stdout_done_tx);
211 spawn_stderr_task(stderr, tx.clone(), stderr_done_tx);
212 spawn_wait_task(child, kill_rx, tx, stdout_done_rx, stderr_done_rx);
213
214 Ok(TurnHandle {
215 receiver: rx,
216 kill_tx,
217 })
218 }
219
220 fn spawn_stdout_task(
221 stdout: tokio::process::ChildStdout,
222 tx: UnboundedSender<TurnEvent>,
223 done: oneshot::Sender<()>,
224 ) {
225 tauri::async_runtime::spawn(async move {
226 // Move the sender into the task so it drops when the task
227 // returns, notifying the wait task that we've finished
228 // draining the pipe.
229 let _done_guard = done;
230 let reader = BufReader::new(stdout);
231 let mut lines = reader.lines();
232 let mut session_bound = false;
233 let mut fallback_counter: u32 = 0;
234
235 loop {
236 match lines.next_line().await {
237 Ok(Some(line)) => {
238 if line.is_empty() {
239 continue;
240 }
241 let ev: RawEvent = match serde_json::from_str(&line) {
242 Ok(e) => e,
243 Err(err) => {
244 tracing::warn!(
245 error = %err,
246 line = %line,
247 "skipping malformed chat stdout line"
248 );
249 continue;
250 }
251 };
252
253 // Emit SessionBound the first time we see a sessionId.
254 if !session_bound {
255 if let Some(sid) = &ev.session_id {
256 session_bound = true;
257 if tx
258 .send(TurnEvent::SessionBound {
259 session_id: sid.clone(),
260 })
261 .is_err()
262 {
263 return;
264 }
265 }
266 }
267
268 // Convert to a frontend-ready Message. `raw_to_message`
269 // filters session-level metadata events (permission-mode,
270 // custom-title, progress, etc.) and sidechain entries.
271 if let Some(mut msg) = raw_to_message(ev, &mut fallback_counter) {
272 // Flag assistant messages as streaming so the store
273 // knows to merge subsequent partials.
274 if let Message::Assistant { status, .. } = &mut msg {
275 *status = Some(StreamStatus::Streaming);
276 }
277 if tx.send(TurnEvent::Message(msg)).is_err() {
278 return;
279 }
280 }
281 }
282 Ok(None) => return, // clean EOF
283 Err(err) => {
284 tracing::warn!(error = %err, "chat stdout read error");
285 return;
286 }
287 }
288 }
289 });
290 }
291
292 fn spawn_stderr_task(
293 stderr: tokio::process::ChildStderr,
294 tx: UnboundedSender<TurnEvent>,
295 done: oneshot::Sender<()>,
296 ) {
297 tauri::async_runtime::spawn(async move {
298 let _done_guard = done;
299 let reader = BufReader::new(stderr);
300 let mut lines = reader.lines();
301 let mut total_bytes: usize = 0;
302
303 loop {
304 match lines.next_line().await {
305 Ok(Some(line)) => {
306 if total_bytes >= STDERR_CAP_BYTES {
307 // Silently drop beyond the cap; we've already
308 // surfaced the first 8 KB which is enough for
309 // any reasonable error trail.
310 continue;
311 }
312 total_bytes += line.len() + 1;
313 if tx.send(TurnEvent::Stderr(line)).is_err() {
314 return;
315 }
316 }
317 Ok(None) => return,
318 Err(err) => {
319 tracing::warn!(error = %err, "chat stderr read error");
320 return;
321 }
322 }
323 }
324 });
325 }
326
327 /// Owns the [`tokio::process::Child`] for the duration of the turn,
328 /// races a natural `wait()` against the user-cancel signal, **waits
329 /// for both reader tasks to finish draining**, and emits the single
330 /// terminal event.
331 ///
332 /// The drain step is load-bearing: `child.wait()` returns as soon as
333 /// the subprocess exits, but there may still be buffered bytes in
334 /// the stdout pipe. Emitting `Completed` before the reader task has
335 /// consumed them would truncate the event stream from the caller's
336 /// point of view. The reader tasks each hold a `oneshot::Sender`
337 /// that drops on task return, resolving the receivers we await here.
338 fn spawn_wait_task(
339 mut child: tokio::process::Child,
340 kill_rx: oneshot::Receiver<()>,
341 tx: UnboundedSender<TurnEvent>,
342 stdout_done: oneshot::Receiver<()>,
343 stderr_done: oneshot::Receiver<()>,
344 ) {
345 tauri::async_runtime::spawn(async move {
346 let (was_cancelled, terminal) = tokio::select! {
347 // User asked us to cancel.
348 _ = kill_rx => {
349 let _ = child.start_kill();
350 // Reap the process so we don't leak a zombie.
351 let _ = child.wait().await;
352 (true, TurnEvent::Cancelled)
353 }
354 // Process finished on its own.
355 status = child.wait() => {
356 let ev = match status {
357 Ok(s) => TurnEvent::Completed {
358 exit_code: s.code().unwrap_or(-1),
359 },
360 Err(err) => TurnEvent::Failed {
361 reason: format!("wait on claude child failed: {err}"),
362 },
363 };
364 (false, ev)
365 }
366 };
367
368 // Drain readers. `_ = stdout_done.await` resolves when the
369 // reader task returns (its Sender drops). On error (which
370 // shouldn't happen unless the Sender was dropped before
371 // being stored) we still proceed so we never hang here.
372 let _ = stdout_done.await;
373 let _ = stderr_done.await;
374
375 // Only emit the terminal event after readers finished —
376 // guarantees ordering: every Message event precedes the
377 // terminal one.
378 let _ = tx.send(terminal);
379 // Keep the variable live for readability in the cancel path.
380 let _ = was_cancelled;
381 });
382 }
383
384 #[cfg(test)]
385 mod tests {
386 use super::*;
387
388 #[test]
389 fn permission_mode_cli_strings() {
390 assert_eq!(PermissionMode::Auto.as_cli(), "auto");
391 assert_eq!(PermissionMode::AcceptEdits.as_cli(), "acceptEdits");
392 assert_eq!(
393 PermissionMode::BypassPermissions.as_cli(),
394 "bypassPermissions"
395 );
396 assert_eq!(PermissionMode::Plan.as_cli(), "plan");
397 }
398 }
399