| 1 | //! Client connection handler |
| 2 | |
| 3 | use anyhow::{Context, Result}; |
| 4 | use gartop_ipc::{Command, Event, Response}; |
| 5 | use std::collections::HashSet; |
| 6 | use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; |
| 7 | use tokio::net::UnixStream; |
| 8 | |
| 9 | /// Handle a single client connection. |
| 10 | pub struct ClientHandler { |
| 11 | reader: BufReader<tokio::net::unix::OwnedReadHalf>, |
| 12 | writer: tokio::net::unix::OwnedWriteHalf, |
| 13 | subscriptions: HashSet<String>, |
| 14 | } |
| 15 | |
| 16 | impl ClientHandler { |
| 17 | /// Create a new client handler. |
| 18 | pub fn new(stream: UnixStream) -> Self { |
| 19 | let (read_half, write_half) = stream.into_split(); |
| 20 | Self { |
| 21 | reader: BufReader::new(read_half), |
| 22 | writer: write_half, |
| 23 | subscriptions: HashSet::new(), |
| 24 | } |
| 25 | } |
| 26 | |
| 27 | /// Read a command from the client. |
| 28 | pub async fn read_command(&mut self) -> Result<Option<Command>> { |
| 29 | let mut line = String::new(); |
| 30 | |
| 31 | match self.reader.read_line(&mut line).await { |
| 32 | Ok(0) => Ok(None), // EOF |
| 33 | Ok(_) => { |
| 34 | let cmd: Command = |
| 35 | serde_json::from_str(&line).context("Failed to parse command")?; |
| 36 | Ok(Some(cmd)) |
| 37 | } |
| 38 | Err(e) => Err(e.into()), |
| 39 | } |
| 40 | } |
| 41 | |
| 42 | /// Send a response to the client. |
| 43 | pub async fn send_response(&mut self, response: &Response) -> Result<()> { |
| 44 | let json = serde_json::to_string(response)?; |
| 45 | self.writer.write_all(json.as_bytes()).await?; |
| 46 | self.writer.write_all(b"\n").await?; |
| 47 | self.writer.flush().await?; |
| 48 | Ok(()) |
| 49 | } |
| 50 | |
| 51 | /// Send an event to the client (for subscriptions). |
| 52 | pub async fn send_event(&mut self, event: &Event) -> Result<()> { |
| 53 | let json = serde_json::to_string(event)?; |
| 54 | self.writer.write_all(json.as_bytes()).await?; |
| 55 | self.writer.write_all(b"\n").await?; |
| 56 | self.writer.flush().await?; |
| 57 | Ok(()) |
| 58 | } |
| 59 | |
| 60 | /// Subscribe to event types. |
| 61 | pub fn subscribe(&mut self, events: &[String]) { |
| 62 | self.subscriptions.extend(events.iter().cloned()); |
| 63 | } |
| 64 | |
| 65 | /// Unsubscribe from event types. |
| 66 | pub fn unsubscribe(&mut self, events: &[String]) { |
| 67 | for event in events { |
| 68 | self.subscriptions.remove(event); |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | /// Check if subscribed to an event type. |
| 73 | pub fn is_subscribed(&self, event_type: &str) -> bool { |
| 74 | self.subscriptions.contains(event_type) || self.subscriptions.contains("*") |
| 75 | } |
| 76 | |
| 77 | /// Check if client has any subscriptions (persistent connection). |
| 78 | pub fn has_subscriptions(&self) -> bool { |
| 79 | !self.subscriptions.is_empty() |
| 80 | } |
| 81 | } |
| 82 |