Rust · 4659 bytes Raw Blame History
1 use std::io::{BufRead, BufReader, Write};
2 use std::os::unix::net::UnixListener;
3 use std::sync::{Arc, mpsc};
4
5 use super::events::EventBus;
6 use super::protocol::{Request, Response, socket_path};
7
8 /// A command received from an IPC client, with a channel to send the response back.
9 pub struct IpcCommand {
10 pub request: Request,
11 pub response_tx: mpsc::Sender<Response>,
12 }
13
14 /// Start the IPC server on a background thread.
15 /// Returns a receiver for incoming commands.
16 pub fn start_server(event_bus: Arc<EventBus>) -> mpsc::Receiver<IpcCommand> {
17 let (cmd_tx, cmd_rx) = mpsc::channel();
18
19 std::thread::spawn(move || {
20 let path = socket_path();
21
22 // Clean up stale socket from previous run
23 let _ = std::fs::remove_file(&path);
24
25 let listener = match UnixListener::bind(&path) {
26 Ok(l) => l,
27 Err(e) => {
28 tracing::error!(err = %e, ?path, "failed to bind IPC socket");
29 return;
30 }
31 };
32
33 // Set permissions 0600
34 #[cfg(unix)]
35 {
36 use std::os::unix::fs::PermissionsExt;
37 let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600));
38 }
39
40 tracing::info!(?path, "IPC server listening");
41
42 for stream in listener.incoming() {
43 match stream {
44 Ok(stream) => {
45 let tx = cmd_tx.clone();
46 let bus = event_bus.clone();
47 std::thread::spawn(move || handle_connection(stream, tx, bus));
48 }
49 Err(e) => {
50 tracing::warn!(err = %e, "IPC accept error");
51 }
52 }
53 }
54 });
55
56 cmd_rx
57 }
58
59 fn handle_connection(
60 stream: std::os::unix::net::UnixStream,
61 cmd_tx: mpsc::Sender<IpcCommand>,
62 event_bus: Arc<EventBus>,
63 ) {
64 let reader = BufReader::new(&stream);
65 let mut writer = &stream;
66
67 for line in reader.lines() {
68 let line = match line {
69 Ok(l) => l,
70 Err(_) => break,
71 };
72
73 if line.trim().is_empty() {
74 continue;
75 }
76
77 let request: Request = match serde_json::from_str(&line) {
78 Ok(r) => r,
79 Err(e) => {
80 let resp = Response::err(format!("invalid JSON: {}", e));
81 let _ = write_response(&mut writer, &resp);
82 continue;
83 }
84 };
85
86 // Handle subscribe: keep connection open, stream events
87 if request.command == "subscribe" {
88 handle_subscribe(&stream, &request.args, &event_bus);
89 return; // Connection dedicated to streaming
90 }
91
92 // Send to main thread and wait for response
93 let (resp_tx, resp_rx) = mpsc::channel();
94 let cmd = IpcCommand {
95 request,
96 response_tx: resp_tx,
97 };
98
99 if cmd_tx.send(cmd).is_err() {
100 let resp = Response::err("daemon shutting down");
101 let _ = write_response(&mut writer, &resp);
102 break;
103 }
104
105 // Wait for response from main thread (with timeout)
106 match resp_rx.recv_timeout(std::time::Duration::from_secs(5)) {
107 Ok(resp) => {
108 let _ = write_response(&mut writer, &resp);
109 }
110 Err(_) => {
111 let resp = Response::err("command timed out");
112 let _ = write_response(&mut writer, &resp);
113 }
114 }
115 }
116 }
117
118 fn handle_subscribe(
119 stream: &std::os::unix::net::UnixStream,
120 filters: &[String],
121 event_bus: &EventBus,
122 ) {
123 let mut writer = stream;
124 let rx = event_bus.subscribe();
125 let all = filters.is_empty() || filters.iter().any(|f| f == "*");
126
127 // Acknowledge subscription
128 let _ = write_response(&mut writer, &Response::ok_empty());
129
130 tracing::info!(filters = ?filters, "IPC client subscribed to events");
131
132 // Stream events until client disconnects
133 while let Ok(event) = rx.recv() {
134 if !all && !filters.iter().any(|f| f == event.event_type()) {
135 continue;
136 }
137 let Ok(json) = serde_json::to_string(&event) else {
138 continue;
139 };
140 if writer.write_all(json.as_bytes()).is_err()
141 || writer.write_all(b"\n").is_err()
142 || writer.flush().is_err()
143 {
144 break;
145 }
146 }
147 tracing::debug!("IPC subscriber disconnected");
148 }
149
150 fn write_response(writer: &mut impl Write, response: &Response) -> std::io::Result<()> {
151 let json = serde_json::to_string(response)?;
152 writer.write_all(json.as_bytes())?;
153 writer.write_all(b"\n")?;
154 writer.flush()
155 }
156