gardesk/garnotify / 4f3be95

Browse files

feat(ipc): add event subscriptions for bar integration

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
4f3be95ea06bc7b1cf5159fa895b7a4857c13420
Parents
6b55ac2
Tree
6943e61

2 changed files

StatusFile+-
M garnotify/src/daemon.rs 50 1
M garnotify/src/ipc.rs 138 3
garnotify/src/daemon.rsmodified
@@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn};
1212
 
1313
 use crate::config::{self, Config};
1414
 use crate::dbus::NotificationsService;
15
-use crate::ipc::{Command, IpcRequest, IpcServer, Response};
15
+use crate::ipc::{Command, Event, EventBroadcaster, IpcRequest, IpcServer, Response};
1616
 use crate::notification::{
1717
     new_shared_store, CloseReason, History, Notification, NotificationEvent,
1818
     SharedNotificationStore, UrgencyTimeouts,
@@ -118,6 +118,8 @@ pub struct Daemon {
118118
     config: Arc<Config>,
119119
     ipc_server: IpcServer,
120120
     ipc_rx: Receiver<IpcRequest>,
121
+    /// Event broadcaster for subscribers
122
+    broadcaster: EventBroadcaster,
121123
     dbus_service: Option<NotificationsService>,
122124
     notification_store: SharedNotificationStore,
123125
     notification_event_rx: tokio::sync::mpsc::Receiver<NotificationEvent>,
@@ -141,6 +143,7 @@ impl Daemon {
141143
     /// Create a new daemon
142144
     pub fn new(config: Config) -> Result<Self> {
143145
         let (ipc_server, ipc_rx) = IpcServer::new();
146
+        let broadcaster = ipc_server.broadcaster();
144147
 
145148
         // Create notification event channel
146149
         let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
@@ -168,6 +171,7 @@ impl Daemon {
168171
             config,
169172
             ipc_server,
170173
             ipc_rx,
174
+            broadcaster,
171175
             dbus_service: None,
172176
             notification_store,
173177
             notification_event_rx: event_rx,
@@ -405,6 +409,22 @@ impl Daemon {
405409
                     return;
406410
                 }
407411
 
412
+                // Broadcast event to subscribers
413
+                self.broadcaster.broadcast(&Event::NotificationNew {
414
+                    id: notification.id,
415
+                    app_name: notification.app_name.clone(),
416
+                    summary: notification.summary.clone(),
417
+                    body: notification.body.clone(),
418
+                    urgency: format!("{:?}", notification.hints.urgency),
419
+                });
420
+
421
+                // Broadcast count change
422
+                let count = {
423
+                    let store = self.notification_store.lock().await;
424
+                    store.count()
425
+                };
426
+                self.broadcaster.broadcast(&Event::CountChanged { count });
427
+
408428
                 // Check DND mode before showing popup
409429
                 let should_show = if !self.paused {
410430
                     true
@@ -432,6 +452,14 @@ impl Daemon {
432452
                     "Notification updated: id={} summary=\"{}\"",
433453
                     notification.id, notification.summary
434454
                 );
455
+
456
+                // Broadcast event to subscribers
457
+                self.broadcaster.broadcast(&Event::NotificationUpdated {
458
+                    id: notification.id,
459
+                    summary: notification.summary.clone(),
460
+                    body: notification.body.clone(),
461
+                });
462
+
435463
                 // Update popup
436464
                 self.send_ui_command(PopupCommand::Update {
437465
                     id: notification.id,
@@ -441,6 +469,12 @@ impl Daemon {
441469
             NotificationEvent::Closed { id, reason } => {
442470
                 info!("Notification {} closed: reason={:?}", id, reason);
443471
 
472
+                // Broadcast event to subscribers
473
+                self.broadcaster.broadcast(&Event::NotificationClosed {
474
+                    id,
475
+                    reason: format!("{:?}", reason),
476
+                });
477
+
444478
                 // Close popup
445479
                 self.close_notification_popup(id, reason.clone());
446480
 
@@ -452,6 +486,13 @@ impl Daemon {
452486
                     history.push(notification);
453487
                 }
454488
 
489
+                // Broadcast count change
490
+                let count = {
491
+                    let store = self.notification_store.lock().await;
492
+                    store.count()
493
+                };
494
+                self.broadcaster.broadcast(&Event::CountChanged { count });
495
+
455496
                 // Emit D-Bus signal
456497
                 if let Some(ref dbus) = self.dbus_service {
457498
                     if let Err(e) = dbus.emit_closed(id, reason).await {
@@ -573,11 +614,19 @@ impl Daemon {
573614
                     if paused { "enabled" } else { "disabled" },
574615
                     self.pause_level
575616
                 );
617
+
618
+                // Broadcast to subscribers
619
+                self.broadcaster.broadcast(&Event::PausedChanged { paused });
620
+
576621
                 Response::ok_with_message(format!(
577622
                     "DND {}",
578623
                     if paused { "enabled" } else { "disabled" }
579624
                 ))
580625
             }
626
+            Command::Subscribe => {
627
+                // Subscribe is handled in the IPC layer, shouldn't reach here
628
+                Response::ok_with_message("Subscribed")
629
+            }
581630
             Command::IsPaused => {
582631
                 let data = serde_json::json!({
583632
                     "paused": self.paused,
garnotify/src/ipc.rsmodified
@@ -5,7 +5,7 @@
55
 
66
 use anyhow::{Context, Result};
77
 use serde::{Deserialize, Serialize};
8
-use std::io::{BufRead, BufReader, Write};
8
+use std::io::{BufRead, BufReader, Read, Write};
99
 use std::os::unix::net::{UnixListener, UnixStream};
1010
 use std::path::PathBuf;
1111
 use std::sync::mpsc::{self, Receiver, Sender};
@@ -55,10 +55,45 @@ pub enum Command {
5555
     Reload,
5656
     /// Get daemon status
5757
     Status,
58
+    /// Subscribe to notification events (keeps connection open)
59
+    Subscribe,
5860
     /// Quit the daemon
5961
     Quit,
6062
 }
6163
 
64
+/// Notification events sent to subscribers
65
+#[derive(Debug, Clone, Serialize, Deserialize)]
66
+#[serde(tag = "event", rename_all = "snake_case")]
67
+pub enum Event {
68
+    /// New notification created
69
+    NotificationNew {
70
+        id: u32,
71
+        app_name: String,
72
+        summary: String,
73
+        body: String,
74
+        urgency: String,
75
+    },
76
+    /// Notification closed
77
+    NotificationClosed {
78
+        id: u32,
79
+        reason: String,
80
+    },
81
+    /// Notification updated/replaced
82
+    NotificationUpdated {
83
+        id: u32,
84
+        summary: String,
85
+        body: String,
86
+    },
87
+    /// DND state changed
88
+    PausedChanged {
89
+        paused: bool,
90
+    },
91
+    /// Count changed (for bar widgets)
92
+    CountChanged {
93
+        count: usize,
94
+    },
95
+}
96
+
6297
 /// IPC response
6398
 #[derive(Debug, Clone, Serialize, Deserialize)]
6499
 pub struct Response {
@@ -109,11 +144,28 @@ pub struct IpcRequest {
109144
     pub response_tx: std::sync::mpsc::Sender<Response>,
110145
 }
111146
 
147
+/// A subscriber connection for streaming events
148
+pub struct Subscriber {
149
+    stream: UnixStream,
150
+}
151
+
152
+impl Subscriber {
153
+    /// Send an event to this subscriber
154
+    pub fn send(&mut self, event: &Event) -> Result<()> {
155
+        let json = serde_json::to_string(event)?;
156
+        writeln!(self.stream, "{}", json)?;
157
+        self.stream.flush()?;
158
+        Ok(())
159
+    }
160
+}
161
+
112162
 /// IPC server for the daemon
113163
 pub struct IpcServer {
114164
     socket_path: PathBuf,
115165
     listener: Option<UnixListener>,
116166
     tx: Sender<IpcRequest>,
167
+    /// Active event subscribers
168
+    subscribers: std::sync::Arc<std::sync::Mutex<Vec<Subscriber>>>,
117169
 }
118170
 
119171
 impl IpcServer {
@@ -124,10 +176,18 @@ impl IpcServer {
124176
             socket_path: socket_path(),
125177
             listener: None,
126178
             tx,
179
+            subscribers: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
127180
         };
128181
         (server, rx)
129182
     }
130183
 
184
+    /// Get a handle for broadcasting events to subscribers
185
+    pub fn broadcaster(&self) -> EventBroadcaster {
186
+        EventBroadcaster {
187
+            subscribers: self.subscribers.clone(),
188
+        }
189
+    }
190
+
131191
     /// Start listening for connections
132192
     pub fn start(&mut self) -> Result<()> {
133193
         // Remove stale socket
@@ -141,6 +201,7 @@ impl IpcServer {
141201
         info!("IPC server listening on {}", self.socket_path.display());
142202
 
143203
         let tx = self.tx.clone();
204
+        let subscribers = self.subscribers.clone();
144205
         self.listener = Some(listener.try_clone()?);
145206
 
146207
         // Spawn listener thread
@@ -149,8 +210,9 @@ impl IpcServer {
149210
                 match stream {
150211
                     Ok(stream) => {
151212
                         let tx = tx.clone();
213
+                        let subscribers = subscribers.clone();
152214
                         thread::spawn(move || {
153
-                            if let Err(e) = handle_client(stream, tx) {
215
+                            if let Err(e) = handle_client(stream, tx, subscribers) {
154216
                                 error!("Client error: {}", e);
155217
                             }
156218
                         });
@@ -180,8 +242,47 @@ impl Drop for IpcServer {
180242
     }
181243
 }
182244
 
245
+/// Handle for broadcasting events to all subscribers
246
+#[derive(Clone)]
247
+pub struct EventBroadcaster {
248
+    subscribers: std::sync::Arc<std::sync::Mutex<Vec<Subscriber>>>,
249
+}
250
+
251
+impl EventBroadcaster {
252
+    /// Broadcast an event to all subscribers, removing dead connections
253
+    pub fn broadcast(&self, event: &Event) {
254
+        let mut subs = match self.subscribers.lock() {
255
+            Ok(s) => s,
256
+            Err(_) => return,
257
+        };
258
+
259
+        // Send to all, track failures
260
+        let mut failed = Vec::new();
261
+        for (i, sub) in subs.iter_mut().enumerate() {
262
+            if sub.send(event).is_err() {
263
+                failed.push(i);
264
+            }
265
+        }
266
+
267
+        // Remove failed subscribers (in reverse to preserve indices)
268
+        for i in failed.into_iter().rev() {
269
+            subs.remove(i);
270
+            debug!("Removed dead subscriber");
271
+        }
272
+    }
273
+
274
+    /// Get current subscriber count
275
+    pub fn subscriber_count(&self) -> usize {
276
+        self.subscribers.lock().map(|s| s.len()).unwrap_or(0)
277
+    }
278
+}
279
+
183280
 /// Handle a client connection
184
-fn handle_client(mut stream: UnixStream, tx: Sender<IpcRequest>) -> Result<()> {
281
+fn handle_client(
282
+    mut stream: UnixStream,
283
+    tx: Sender<IpcRequest>,
284
+    subscribers: std::sync::Arc<std::sync::Mutex<Vec<Subscriber>>>,
285
+) -> Result<()> {
185286
     let reader = BufReader::new(stream.try_clone()?);
186287
 
187288
     for line in reader.lines() {
@@ -189,6 +290,40 @@ fn handle_client(mut stream: UnixStream, tx: Sender<IpcRequest>) -> Result<()> {
189290
         debug!("Received: {}", line);
190291
 
191292
         let response = match serde_json::from_str::<Command>(&line) {
293
+            Ok(Command::Subscribe) => {
294
+                // Send acknowledgment then add to subscribers
295
+                let response = Response::ok_with_message("Subscribed to notification events");
296
+                let response_json = serde_json::to_string(&response)?;
297
+                writeln!(stream, "{}", response_json)?;
298
+                stream.flush()?;
299
+
300
+                // Add to subscribers list
301
+                if let Ok(mut subs) = subscribers.lock() {
302
+                    let sub_stream = stream.try_clone()?;
303
+                    subs.push(Subscriber { stream: sub_stream });
304
+                    info!("New subscriber connected, total: {}", subs.len());
305
+                }
306
+
307
+                // Keep connection open - block on read until client disconnects
308
+                let mut buf = [0u8; 1];
309
+                loop {
310
+                    match stream.read(&mut buf) {
311
+                        Ok(0) => {
312
+                            debug!("Subscriber disconnected");
313
+                            break;
314
+                        }
315
+                        Err(_) => {
316
+                            debug!("Subscriber connection error");
317
+                            break;
318
+                        }
319
+                        Ok(_) => {
320
+                            // Ignore any data from subscriber (they should only read)
321
+                        }
322
+                    }
323
+                }
324
+
325
+                return Ok(());
326
+            }
192327
             Ok(cmd) => {
193328
                 // Create response channel
194329
                 let (response_tx, response_rx) = mpsc::channel();