| 1 | use std::io::{BufRead, BufReader, Write}; |
| 2 | use std::os::unix::net::UnixListener; |
| 3 | use std::sync::{Arc, mpsc}; |
| 4 | |
| 5 | use super::events::EventBus; |
| 6 | use super::protocol::{Request, Response, socket_path}; |
| 7 | |
| 8 | /// A command received from an IPC client, with a channel to send the response back. |
| 9 | pub struct IpcCommand { |
| 10 | pub request: Request, |
| 11 | pub response_tx: mpsc::Sender<Response>, |
| 12 | } |
| 13 | |
| 14 | /// Start the IPC server on a background thread. |
| 15 | /// Returns a receiver for incoming commands. |
| 16 | pub fn start_server(event_bus: Arc<EventBus>) -> mpsc::Receiver<IpcCommand> { |
| 17 | let (cmd_tx, cmd_rx) = mpsc::channel(); |
| 18 | |
| 19 | std::thread::spawn(move || { |
| 20 | let path = socket_path(); |
| 21 | |
| 22 | // Clean up stale socket from previous run |
| 23 | let _ = std::fs::remove_file(&path); |
| 24 | |
| 25 | let listener = match UnixListener::bind(&path) { |
| 26 | Ok(l) => l, |
| 27 | Err(e) => { |
| 28 | tracing::error!(err = %e, ?path, "failed to bind IPC socket"); |
| 29 | return; |
| 30 | } |
| 31 | }; |
| 32 | |
| 33 | // Set permissions 0600 |
| 34 | #[cfg(unix)] |
| 35 | { |
| 36 | use std::os::unix::fs::PermissionsExt; |
| 37 | let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600)); |
| 38 | } |
| 39 | |
| 40 | tracing::info!(?path, "IPC server listening"); |
| 41 | |
| 42 | for stream in listener.incoming() { |
| 43 | match stream { |
| 44 | Ok(stream) => { |
| 45 | let tx = cmd_tx.clone(); |
| 46 | let bus = event_bus.clone(); |
| 47 | std::thread::spawn(move || handle_connection(stream, tx, bus)); |
| 48 | } |
| 49 | Err(e) => { |
| 50 | tracing::warn!(err = %e, "IPC accept error"); |
| 51 | } |
| 52 | } |
| 53 | } |
| 54 | }); |
| 55 | |
| 56 | cmd_rx |
| 57 | } |
| 58 | |
| 59 | fn handle_connection( |
| 60 | stream: std::os::unix::net::UnixStream, |
| 61 | cmd_tx: mpsc::Sender<IpcCommand>, |
| 62 | event_bus: Arc<EventBus>, |
| 63 | ) { |
| 64 | let reader = BufReader::new(&stream); |
| 65 | let mut writer = &stream; |
| 66 | |
| 67 | for line in reader.lines() { |
| 68 | let line = match line { |
| 69 | Ok(l) => l, |
| 70 | Err(_) => break, |
| 71 | }; |
| 72 | |
| 73 | if line.trim().is_empty() { |
| 74 | continue; |
| 75 | } |
| 76 | |
| 77 | let request: Request = match serde_json::from_str(&line) { |
| 78 | Ok(r) => r, |
| 79 | Err(e) => { |
| 80 | let resp = Response::err(format!("invalid JSON: {}", e)); |
| 81 | let _ = write_response(&mut writer, &resp); |
| 82 | continue; |
| 83 | } |
| 84 | }; |
| 85 | |
| 86 | // Handle subscribe: keep connection open, stream events |
| 87 | if request.command == "subscribe" { |
| 88 | handle_subscribe(&stream, &request.args, &event_bus); |
| 89 | return; // Connection dedicated to streaming |
| 90 | } |
| 91 | |
| 92 | // Send to main thread and wait for response |
| 93 | let (resp_tx, resp_rx) = mpsc::channel(); |
| 94 | let cmd = IpcCommand { |
| 95 | request, |
| 96 | response_tx: resp_tx, |
| 97 | }; |
| 98 | |
| 99 | if cmd_tx.send(cmd).is_err() { |
| 100 | let resp = Response::err("daemon shutting down"); |
| 101 | let _ = write_response(&mut writer, &resp); |
| 102 | break; |
| 103 | } |
| 104 | |
| 105 | // Wait for response from main thread (with timeout) |
| 106 | match resp_rx.recv_timeout(std::time::Duration::from_secs(5)) { |
| 107 | Ok(resp) => { |
| 108 | let _ = write_response(&mut writer, &resp); |
| 109 | } |
| 110 | Err(_) => { |
| 111 | let resp = Response::err("command timed out"); |
| 112 | let _ = write_response(&mut writer, &resp); |
| 113 | } |
| 114 | } |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | fn handle_subscribe( |
| 119 | stream: &std::os::unix::net::UnixStream, |
| 120 | filters: &[String], |
| 121 | event_bus: &EventBus, |
| 122 | ) { |
| 123 | let mut writer = stream; |
| 124 | let rx = event_bus.subscribe(); |
| 125 | let all = filters.is_empty() || filters.iter().any(|f| f == "*"); |
| 126 | |
| 127 | // Acknowledge subscription |
| 128 | let _ = write_response(&mut writer, &Response::ok_empty()); |
| 129 | |
| 130 | tracing::info!(filters = ?filters, "IPC client subscribed to events"); |
| 131 | |
| 132 | // Stream events until client disconnects |
| 133 | while let Ok(event) = rx.recv() { |
| 134 | if !all && !filters.iter().any(|f| f == event.event_type()) { |
| 135 | continue; |
| 136 | } |
| 137 | let Ok(json) = serde_json::to_string(&event) else { |
| 138 | continue; |
| 139 | }; |
| 140 | if writer.write_all(json.as_bytes()).is_err() |
| 141 | || writer.write_all(b"\n").is_err() |
| 142 | || writer.flush().is_err() |
| 143 | { |
| 144 | break; |
| 145 | } |
| 146 | } |
| 147 | tracing::debug!("IPC subscriber disconnected"); |
| 148 | } |
| 149 | |
| 150 | fn write_response(writer: &mut impl Write, response: &Response) -> std::io::Result<()> { |
| 151 | let json = serde_json::to_string(response)?; |
| 152 | writer.write_all(json.as_bytes())?; |
| 153 | writer.write_all(b"\n")?; |
| 154 | writer.flush() |
| 155 | } |
| 156 |