tenseleyflow/claudex / 68a4c05

Browse files

perf: per-pty event channel + spawn-lock dedupe + chunked replay

Authored by espadonne
SHA
68a4c05db53fa6fa0a2cce1f8886472195c80f48
Parents
b85ff40
Tree
22d6cd6

3 changed files

StatusFile+-
M src-tauri/src/commands.rs 14 5
M src/components/TerminalPane.tsx 141 59
M src/lib/ipc/client.ts 7 4
src-tauri/src/commands.rsmodified
@@ -454,13 +454,22 @@ pub async fn spawn_pty(
454454
     );
455455
 
456456
     // Forwarder task: base64-encode each chunk off `data_rx` and emit
457
-    // `pty:data`. When `data_rx` closes (reader task hit EOF), wait
458
-    // on `exit_rx` for the final exit code and emit `pty:exit`. Then
459
-    // remove our entry from active_ptys — if close_pty already
460
-    // removed it, the remove here is a harmless no-op.
457
+    // a per-PTY event named `pty:data:<pty_id>`. Per-PTY events
458
+    // avoid the O(N) fan-out that the previous single `pty:data`
459
+    // channel had — every TerminalPane subscribes only to events
460
+    // for its own ptyId, so one PTY's stdout chunk doesn't wake up
461
+    // every listener in the webview.
462
+    //
463
+    // When `data_rx` closes (reader task hit EOF), wait on
464
+    // `exit_rx` for the final exit code and emit `pty:exit`
465
+    // (globally, since the store subscribes to it to clean up
466
+    // `ptyIds` and doesn't want to listen per-pty). Then remove
467
+    // our entry from active_ptys — if close_pty already removed
468
+    // it, the remove here is a harmless no-op.
461469
     let app_clone = app.clone();
462470
     let active_ptys = state.active_ptys.clone();
463471
     let pid = pty_id.clone();
472
+    let data_event = format!("pty:data:{pid}");
464473
     tauri::async_runtime::spawn(async move {
465474
         while let Some(chunk) = data_rx.recv().await {
466475
             let b64 = BASE64_STANDARD.encode(&chunk);
@@ -468,7 +477,7 @@ pub async fn spawn_pty(
468477
                 pty_id: pid.clone(),
469478
                 base64: b64,
470479
             };
471
-            if let Err(err) = app_clone.emit("pty:data", &payload) {
480
+            if let Err(err) = app_clone.emit(&data_event, &payload) {
472481
                 tracing::warn!(error = %err, "failed to emit pty:data");
473482
             }
474483
         }
src/components/TerminalPane.tsxmodified
@@ -44,6 +44,103 @@ const trace = (msg: string, extra?: Record<string, unknown>) => {
4444
   void logFrontend("debug", "TerminalPane", payload);
4545
 };
4646
 
47
+/** Module-level dedup lock for concurrent spawn attempts on the
48
+ *  same `sessionId`. React 19 StrictMode double-invokes effects
49
+ *  (mount → cleanup → mount), and both invocations used to race
50
+ *  into `spawn_pty`, producing TWO claude subprocesses per click.
51
+ *  The first one was then orphaned — its ptyId never landed in
52
+ *  the store so the frontend lost track of it, but the backend
53
+ *  kept the subprocess alive until window-destroy.
54
+ *
55
+ *  With this map, the second mount awaits the first mount's
56
+ *  promise and reuses the same ptyId. Exactly one subprocess per
57
+ *  session, StrictMode-safe. */
58
+const spawnLocks = new Map<string, Promise<string>>();
59
+
60
+async function getOrSpawnPty(
61
+  sessionId: string,
62
+  cwd: string,
63
+  claudeArgs: string[],
64
+  cols: number,
65
+  rows: number,
66
+): Promise<string> {
67
+  const existing = useSessionStore.getState().ptyIds.get(sessionId);
68
+  if (existing) return existing;
69
+
70
+  const inflight = spawnLocks.get(sessionId);
71
+  if (inflight) return inflight;
72
+
73
+  const promise = (async () => {
74
+    const newId = crypto.randomUUID();
75
+    trace("spawn path", { ptyId: newId, cols, rows });
76
+    await spawnPty({
77
+      ptyId: newId,
78
+      sessionId,
79
+      cwd,
80
+      args: claudeArgs,
81
+      cols,
82
+      rows,
83
+    });
84
+    // Register in the store before returning so any second mount
85
+    // awaiting the same promise can look up the ptyId via the
86
+    // `existing` check above on its next call.
87
+    useSessionStore.getState().registerPty(sessionId, {
88
+      ptyId: newId,
89
+      sessionId,
90
+      cwd,
91
+      startedAt: new Date().toISOString(),
92
+    });
93
+    trace("spawn_pty ok", { ptyId: newId });
94
+    return newId;
95
+  })();
96
+
97
+  spawnLocks.set(sessionId, promise);
98
+  try {
99
+    return await promise;
100
+  } finally {
101
+    spawnLocks.delete(sessionId);
102
+  }
103
+}
104
+
105
+/** Write a large base64 payload into xterm in paced chunks so we
106
+ *  don't block the UI thread. xterm.write accepts a callback that
107
+ *  fires once the write has been parsed and rendered, which we use
108
+ *  to schedule the next chunk. For a fresh mount with a 200 KB
109
+ *  ring buffer replay this keeps the main thread responsive
110
+ *  (keystrokes still register) instead of dropping a multi-second
111
+ *  stall while xterm parses the whole blob. */
112
+const REPLAY_CHUNK_BYTES = 8 * 1024;
113
+
114
+function writeBase64Chunked(term: Terminal, b64: string): void {
115
+  const binary = atob(b64);
116
+  const total = binary.length;
117
+  if (total === 0) return;
118
+  if (total <= REPLAY_CHUNK_BYTES) {
119
+    term.write(decodeBinary(binary));
120
+    return;
121
+  }
122
+  let offset = 0;
123
+  const step = () => {
124
+    const end = Math.min(offset + REPLAY_CHUNK_BYTES, total);
125
+    const slice = decodeBinary(binary.slice(offset, end));
126
+    offset = end;
127
+    if (offset >= total) {
128
+      term.write(slice);
129
+    } else {
130
+      term.write(slice, step);
131
+    }
132
+  };
133
+  step();
134
+}
135
+
136
+function decodeBinary(binary: string): Uint8Array {
137
+  const bytes = new Uint8Array(binary.length);
138
+  for (let i = 0; i < binary.length; i++) {
139
+    bytes[i] = binary.charCodeAt(i);
140
+  }
141
+  return bytes;
142
+}
143
+
47144
 /** Theme matching the claudex design tokens in `src/index.css`. */
48145
 const TERMINAL_THEME = {
49146
   foreground: "#d4d4d8",
@@ -85,7 +182,6 @@ export interface TerminalPaneProps {
85182
 
86183
 export function TerminalPane({ sessionId, cwd, claudeArgs }: TerminalPaneProps) {
87184
   const containerRef = useRef<HTMLDivElement | null>(null);
88
-  const registerPty = useSessionStore((s) => s.registerPty);
89185
 
90186
   useEffect(() => {
91187
     const container = containerRef.current;
@@ -166,72 +262,57 @@ export function TerminalPane({ sessionId, cwd, claudeArgs }: TerminalPaneProps)
166262
     let cancelled = false;
167263
     let disposeListeners: Array<() => void> = [];
168264
 
169
-    const writeBase64 = (b64: string) => {
170
-      const binary = atob(b64);
171
-      const bytes = new Uint8Array(binary.length);
172
-      for (let i = 0; i < binary.length; i++) {
173
-        bytes[i] = binary.charCodeAt(i);
174
-      }
175
-      term.write(bytes);
176
-    };
177
-
178265
     const attach = async () => {
266
+      // Resolve a ptyId — either reattach to an already-running
267
+      // subprocess or dedupe-safely spawn a new one. The spawn
268
+      // lock guarantees at most one spawn per sessionId even under
269
+      // React 19 StrictMode's double-mount.
179270
       const existing = useSessionStore.getState().ptyIds.get(sessionId);
180
-      if (existing) {
181
-        trace("reattach path", { ptyId: existing });
182
-        ptyId = existing;
271
+      const reattach = !!existing;
272
+      let resolved: string;
273
+      try {
274
+        resolved = await getOrSpawnPty(
275
+          sessionId,
276
+          cwd,
277
+          claudeArgs,
278
+          term.cols,
279
+          term.rows,
280
+        );
281
+      } catch (err) {
282
+        trace("spawn_pty failed", { error: String(err) });
283
+        if (!cancelled) {
284
+          term.write(
285
+            `\r\n\x1b[31m[claudex] spawn_pty failed: ${formatErr(err)}\x1b[0m\r\n`,
286
+          );
287
+        }
288
+        return;
289
+      }
290
+      if (cancelled) return;
291
+      ptyId = resolved;
292
+
293
+      // On reattach, replay the ring buffer into xterm in paced
294
+      // chunks so a 200 KB scrollback doesn't stall the main
295
+      // thread for multiple seconds.
296
+      if (reattach) {
297
+        trace("reattach path", { ptyId: resolved });
183298
         try {
184
-          const snapshot = await getPtyBuffer(existing);
299
+          const snapshot = await getPtyBuffer(resolved);
185300
           if (cancelled) return;
186
-          if (snapshot.length > 0) writeBase64(snapshot);
301
+          if (snapshot.length > 0) writeBase64Chunked(term, snapshot);
187302
           trace("ring buffer replayed", { bytes: snapshot.length });
188303
         } catch (err) {
189304
           trace("get_pty_buffer failed during reattach", {
190305
             error: String(err),
191306
           });
192307
         }
193
-      } else {
194
-        const newId = crypto.randomUUID();
195
-        trace("spawn path", {
196
-          ptyId: newId,
197
-          cols: term.cols,
198
-          rows: term.rows,
199
-        });
200
-        try {
201
-          await spawnPty({
202
-            ptyId: newId,
203
-            sessionId,
204
-            cwd,
205
-            args: claudeArgs,
206
-            cols: term.cols,
207
-            rows: term.rows,
208
-          });
209
-          trace("spawn_pty ok", { ptyId: newId });
210
-        } catch (err) {
211
-          trace("spawn_pty failed", { error: String(err) });
212
-          if (!cancelled) {
213
-            term.write(
214
-              `\r\n\x1b[31m[claudex] spawn_pty failed: ${formatErr(err)}\x1b[0m\r\n`,
215
-            );
216
-          }
217
-          return;
218
-        }
219
-        if (cancelled) return;
220
-        ptyId = newId;
221
-        registerPty(sessionId, {
222
-          ptyId: newId,
223
-          sessionId,
224
-          cwd,
225
-          startedAt: new Date().toISOString(),
226
-        });
227308
       }
228309
 
229
-      // Listen for stdout. `pty:data` is a single shared channel, so
230
-      // we filter by our own pty id inside the callback.
310
+      // Listen for stdout — per-pty event name, so this listener
311
+      // only wakes up for our own terminal's bytes.
231312
       try {
232
-        const un = await onPtyData((ev: PtyDataEvent) => {
233
-          if (ev.ptyId !== ptyId) return;
234
-          writeBase64(ev.base64);
313
+        const pid = resolved;
314
+        const un = await onPtyData(pid, (ev: PtyDataEvent) => {
315
+          writeBase64Chunked(term, ev.base64);
235316
         });
236317
         if (cancelled) {
237318
           un();
@@ -239,24 +320,25 @@ export function TerminalPane({ sessionId, cwd, claudeArgs }: TerminalPaneProps)
239320
           unlistenData = un;
240321
         }
241322
       } catch (err) {
242
-        console.warn("[pty] onPtyData listen failed", err);
323
+        trace("onPtyData listen failed", { error: String(err) });
243324
       }
244325
 
245326
       // Wire keystrokes → PTY writer.
246327
       const dataDispose = term.onData((data) => {
247328
         if (!ptyId) return;
248329
         void writePty(ptyId, data).catch((err) => {
249
-          console.warn("[pty] write_pty failed", err);
330
+          trace("write_pty failed", { error: String(err) });
250331
         });
251332
       });
252333
       disposeListeners.push(() => dataDispose.dispose());
253334
 
254
-      // Propagate xterm resize events (triggered by fit.fit() below)
255
-      // down to the PTY master so claude's TUI redraws properly.
335
+      // Propagate xterm resize events (triggered by fit.fit() or
336
+      // container ResizeObserver) down to the PTY master so
337
+      // claude's TUI redraws properly.
256338
       const resizeDispose = term.onResize(({ cols, rows }) => {
257339
         if (!ptyId) return;
258340
         void resizePty(ptyId, cols, rows).catch((err) => {
259
-          console.warn("[pty] resize_pty failed", err);
341
+          trace("resize_pty failed", { error: String(err) });
260342
         });
261343
       });
262344
       disposeListeners.push(() => resizeDispose.dispose());
src/lib/ipc/client.tsmodified
@@ -223,13 +223,16 @@ export async function listPtys(): Promise<PtyInfo[]> {
223223
   return invoke<PtyInfo[]>("list_ptys");
224224
 }
225225
 
226
-/** Subscribe to `pty:data` payloads. The frontend filters by its
227
- *  own `ptyId` inside the callback — one event channel is shared
228
- *  across every live PTY. Returns an unlisten function. */
226
+/** Subscribe to stdout chunks for one specific live PTY. The Rust
227
+ *  backend emits per-PTY events named `pty:data:<ptyId>` so each
228
+ *  `TerminalPane` listener only wakes up for its own stdout — no
229
+ *  fan-out cost as the number of live PTYs grows. Returns an
230
+ *  unlisten function. */
229231
 export async function onPtyData(
232
+  ptyId: string,
230233
   cb: (ev: PtyDataEvent) => void,
231234
 ): Promise<UnlistenFn> {
232
-  return listen<PtyDataEvent>("pty:data", (e) => cb(e.payload));
235
+  return listen<PtyDataEvent>(`pty:data:${ptyId}`, (e) => cb(e.payload));
233236
 }
234237
 
235238
 /** Subscribe to `pty:exit` payloads. Fires once per PTY when its