Rust · 2596 bytes Raw Blame History
1 //! Client connection handler
2
3 use anyhow::{Context, Result};
4 use gartop_ipc::{Command, Event, Response};
5 use std::collections::HashSet;
6 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7 use tokio::net::UnixStream;
8
9 /// Handle a single client connection.
10 pub struct ClientHandler {
11 reader: BufReader<tokio::net::unix::OwnedReadHalf>,
12 writer: tokio::net::unix::OwnedWriteHalf,
13 subscriptions: HashSet<String>,
14 }
15
16 impl ClientHandler {
17 /// Create a new client handler.
18 pub fn new(stream: UnixStream) -> Self {
19 let (read_half, write_half) = stream.into_split();
20 Self {
21 reader: BufReader::new(read_half),
22 writer: write_half,
23 subscriptions: HashSet::new(),
24 }
25 }
26
27 /// Read a command from the client.
28 pub async fn read_command(&mut self) -> Result<Option<Command>> {
29 let mut line = String::new();
30
31 match self.reader.read_line(&mut line).await {
32 Ok(0) => Ok(None), // EOF
33 Ok(_) => {
34 let cmd: Command =
35 serde_json::from_str(&line).context("Failed to parse command")?;
36 Ok(Some(cmd))
37 }
38 Err(e) => Err(e.into()),
39 }
40 }
41
42 /// Send a response to the client.
43 pub async fn send_response(&mut self, response: &Response) -> Result<()> {
44 let json = serde_json::to_string(response)?;
45 self.writer.write_all(json.as_bytes()).await?;
46 self.writer.write_all(b"\n").await?;
47 self.writer.flush().await?;
48 Ok(())
49 }
50
51 /// Send an event to the client (for subscriptions).
52 pub async fn send_event(&mut self, event: &Event) -> Result<()> {
53 let json = serde_json::to_string(event)?;
54 self.writer.write_all(json.as_bytes()).await?;
55 self.writer.write_all(b"\n").await?;
56 self.writer.flush().await?;
57 Ok(())
58 }
59
60 /// Subscribe to event types.
61 pub fn subscribe(&mut self, events: &[String]) {
62 self.subscriptions.extend(events.iter().cloned());
63 }
64
65 /// Unsubscribe from event types.
66 pub fn unsubscribe(&mut self, events: &[String]) {
67 for event in events {
68 self.subscriptions.remove(event);
69 }
70 }
71
72 /// Check if subscribed to an event type.
73 pub fn is_subscribed(&self, event_type: &str) -> bool {
74 self.subscriptions.contains(event_type) || self.subscriptions.contains("*")
75 }
76
77 /// Check if client has any subscriptions (persistent connection).
78 pub fn has_subscriptions(&self) -> bool {
79 !self.subscriptions.is_empty()
80 }
81 }
82