gardesk/garnotify / ef08537

Browse files

feat(ipc): add proper request-response handling and DND support

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
ef08537778171ef339a2b39b649555c1b1f10c28
Parents
6d67c7e
Tree
fb0ef16

2 changed files

StatusFile+-
M garnotify/src/daemon.rs 150 36
M garnotify/src/ipc.rs 27 7
garnotify/src/daemon.rsmodified
@@ -12,11 +12,12 @@ use tracing::{debug, error, info, warn};
1212
 
1313
 use crate::config::{self, Config};
1414
 use crate::dbus::NotificationsService;
15
-use crate::ipc::{Command, IpcServer};
15
+use crate::ipc::{Command, IpcRequest, IpcServer, Response};
1616
 use crate::notification::{
1717
     new_shared_store, CloseReason, History, Notification, NotificationEvent,
1818
     SharedNotificationStore, UrgencyTimeouts,
1919
 };
20
+use crate::rules::RuleEngine;
2021
 use crate::ui::{PopupCommand, PopupEvent, PopupManager};
2122
 
2223
 /// Get the path to the PID file
@@ -91,16 +92,43 @@ impl Drop for PidGuard {
9192
     }
9293
 }
9394
 
95
+/// DND pause level
96
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97
+pub enum PauseLevel {
98
+    /// Show all notifications (not paused)
99
+    ShowAll = 0,
100
+    /// Only show critical notifications
101
+    CriticalOnly = 1,
102
+    /// Show no notifications
103
+    ShowNone = 2,
104
+}
105
+
106
+impl From<u8> for PauseLevel {
107
+    fn from(level: u8) -> Self {
108
+        match level {
109
+            0 => PauseLevel::ShowAll,
110
+            1 => PauseLevel::CriticalOnly,
111
+            _ => PauseLevel::ShowNone,
112
+        }
113
+    }
114
+}
115
+
94116
 /// Daemon state
95117
 pub struct Daemon {
96118
     config: Arc<Config>,
97119
     ipc_server: IpcServer,
98
-    ipc_rx: Receiver<Command>,
120
+    ipc_rx: Receiver<IpcRequest>,
99121
     dbus_service: Option<NotificationsService>,
100122
     notification_store: SharedNotificationStore,
101123
     notification_event_rx: tokio::sync::mpsc::Receiver<NotificationEvent>,
102124
     history: Arc<Mutex<History>>,
103125
     running: bool,
126
+    /// Do Not Disturb mode
127
+    paused: bool,
128
+    /// Pause level (0=show all, 1=critical only, 2=show none)
129
+    pause_level: PauseLevel,
130
+    /// Rule engine for filtering/modifying notifications
131
+    rule_engine: RuleEngine,
104132
     /// Channel to send commands to the UI thread
105133
     ui_cmd_tx: Option<std::sync::mpsc::Sender<PopupCommand>>,
106134
     /// Channel to receive events from the UI thread
@@ -131,6 +159,9 @@ impl Daemon {
131159
         // Create history
132160
         let history = Arc::new(Mutex::new(History::new(config.history.max_length)));
133161
 
162
+        // Create rule engine from config
163
+        let rule_engine = RuleEngine::with_rules(config.rules.clone());
164
+
134165
         let config = Arc::new(config);
135166
 
136167
         Ok(Self {
@@ -142,6 +173,9 @@ impl Daemon {
142173
             notification_event_rx: event_rx,
143174
             history,
144175
             running: true,
176
+            paused: false,
177
+            pause_level: PauseLevel::ShowAll,
178
+            rule_engine,
145179
             ui_cmd_tx: None,
146180
             ui_event_rx: None,
147181
             ui_thread: None,
@@ -315,21 +349,52 @@ impl Daemon {
315349
 
316350
     /// Poll for IPC commands
317351
     async fn poll_ipc_commands(&mut self) {
318
-        while let Ok(cmd) = self.ipc_rx.try_recv() {
319
-            self.handle_ipc_command(cmd).await;
352
+        while let Ok(request) = self.ipc_rx.try_recv() {
353
+            let response = self.handle_ipc_command(request.command).await;
354
+            // Send response back (ignore error if receiver dropped)
355
+            let _ = request.response_tx.send(response);
320356
         }
321357
     }
322358
 
323359
     /// Handle a notification event (created, updated, closed)
324360
     async fn handle_notification_event(&mut self, event: NotificationEvent) {
325361
         match event {
326
-            NotificationEvent::Created(notification) => {
362
+            NotificationEvent::Created(mut notification) => {
327363
                 info!(
328364
                     "Notification created: id={} summary=\"{}\"",
329365
                     notification.id, notification.summary
330366
                 );
331
-                // Show popup
332
-                self.show_notification_popup(notification);
367
+
368
+                // Process through rule engine (may modify or suppress)
369
+                if self.rule_engine.process(&mut notification).is_none() {
370
+                    debug!(
371
+                        "Notification {} suppressed by rules",
372
+                        notification.id
373
+                    );
374
+                    return;
375
+                }
376
+
377
+                // Check DND mode before showing popup
378
+                let should_show = if !self.paused {
379
+                    true
380
+                } else {
381
+                    match self.pause_level {
382
+                        PauseLevel::ShowAll => true,
383
+                        PauseLevel::CriticalOnly => {
384
+                            notification.hints.urgency == crate::notification::Urgency::Critical
385
+                        }
386
+                        PauseLevel::ShowNone => false,
387
+                    }
388
+                };
389
+
390
+                if should_show {
391
+                    self.show_notification_popup(notification);
392
+                } else {
393
+                    debug!(
394
+                        "Notification {} suppressed by DND (level={:?})",
395
+                        notification.id, self.pause_level
396
+                    );
397
+                }
333398
             }
334399
             NotificationEvent::Updated(notification) => {
335400
                 info!(
@@ -366,28 +431,33 @@ impl Daemon {
366431
         }
367432
     }
368433
 
369
-    /// Handle an IPC command
370
-    async fn handle_ipc_command(&mut self, cmd: Command) {
434
+    /// Handle an IPC command and return a response
435
+    async fn handle_ipc_command(&mut self, cmd: Command) -> Response {
371436
         debug!("Handling IPC command: {:?}", cmd);
372437
         match cmd {
373438
             Command::Status => {
374439
                 let store = self.notification_store.lock().await;
375440
                 let history = self.history.lock().await;
376
-                info!(
377
-                    "Status: running, {} active notifications, {} in history",
378
-                    store.count(),
379
-                    history.len()
380
-                );
441
+                let status = serde_json::json!({
442
+                    "running": true,
443
+                    "active_count": store.count(),
444
+                    "history_count": history.len(),
445
+                    "paused": self.paused,
446
+                    "pause_level": self.pause_level as u8,
447
+                });
448
+                Response::ok_with_data(status)
381449
             }
382450
             Command::Reload => {
383451
                 info!("Reloading config via IPC");
384
-                if let Err(e) = self.handle_reload() {
385
-                    error!("Failed to reload config: {}", e);
452
+                match self.handle_reload() {
453
+                    Ok(_) => Response::ok_with_message("Configuration reloaded"),
454
+                    Err(e) => Response::error(format!("Failed to reload: {}", e)),
386455
                 }
387456
             }
388457
             Command::Quit => {
389458
                 info!("Quit requested via IPC");
390459
                 self.running = false;
460
+                Response::ok_with_message("Shutting down")
391461
             }
392462
             Command::Close { id } => {
393463
                 let target_id = if let Some(id) = id {
@@ -400,6 +470,9 @@ impl Daemon {
400470
 
401471
                 if let Some(id) = target_id {
402472
                     info!("Close notification: {}", id);
473
+                    // Send close command to UI
474
+                    self.close_notification_popup(id, CloseReason::Closed);
475
+
403476
                     let mut store = self.notification_store.lock().await;
404477
                     if let Some(notification) = store.remove(id) {
405478
                         let mut history = self.history.lock().await;
@@ -412,12 +485,19 @@ impl Daemon {
412485
                             }
413486
                         }
414487
                     }
488
+                    Response::ok_with_message(format!("Closed notification {}", id))
489
+                } else {
490
+                    Response::ok_with_message("No notifications to close")
415491
                 }
416492
             }
417493
             Command::CloseAll => {
418494
                 info!("Close all notifications");
495
+                // Send close-all to UI
496
+                self.send_ui_command(PopupCommand::CloseAll);
497
+
419498
                 let mut store = self.notification_store.lock().await;
420499
                 let notifications = store.clear();
500
+                let count = notifications.len();
421501
                 let mut history = self.history.lock().await;
422502
 
423503
                 for notification in notifications {
@@ -431,54 +511,86 @@ impl Daemon {
431511
                         }
432512
                     }
433513
                 }
514
+                Response::ok_with_message(format!("Closed {} notifications", count))
434515
             }
435516
             Command::HistoryPop => {
436
-                info!("History pop requested");
437517
                 let mut history = self.history.lock().await;
438518
                 if let Some(notification) = history.pop() {
439519
                     info!(
440520
                         "Popped from history: id={} summary=\"{}\"",
441521
                         notification.id, notification.summary
442522
                     );
443
-                    // TODO: In sprint 3, re-display the notification
523
+                    // Re-display the notification
524
+                    drop(history); // Release lock before showing popup
525
+                    self.show_notification_popup(notification);
526
+                    Response::ok_with_message("Restored notification from history")
444527
                 } else {
445
-                    info!("History is empty");
528
+                    Response::ok_with_message("History is empty")
446529
                 }
447530
             }
448531
             Command::HistoryClear => {
449
-                info!("History clear requested");
450532
                 let mut history = self.history.lock().await;
533
+                let count = history.len();
451534
                 history.clear();
535
+                Response::ok_with_message(format!("Cleared {} items from history", count))
452536
             }
453537
             Command::SetPaused { paused, level } => {
454
-                info!("Set paused: {} (level {})", paused, level);
455
-                // TODO: Implement in sprint 4
538
+                self.paused = paused;
539
+                self.pause_level = PauseLevel::from(level);
540
+                info!(
541
+                    "DND mode: {} (level {:?})",
542
+                    if paused { "enabled" } else { "disabled" },
543
+                    self.pause_level
544
+                );
545
+                Response::ok_with_message(format!(
546
+                    "DND {}",
547
+                    if paused { "enabled" } else { "disabled" }
548
+                ))
456549
             }
457550
             Command::IsPaused => {
458
-                info!("Is paused query");
459
-                // TODO: Implement in sprint 4
551
+                let data = serde_json::json!({
552
+                    "paused": self.paused,
553
+                    "level": self.pause_level as u8,
554
+                });
555
+                Response::ok_with_data(data)
460556
             }
461557
             Command::Count => {
462558
                 let store = self.notification_store.lock().await;
463
-                info!("Notification count: {}", store.count());
559
+                let data = serde_json::json!({
560
+                    "count": store.count(),
561
+                });
562
+                Response::ok_with_data(data)
464563
             }
465564
             Command::List => {
466565
                 let store = self.notification_store.lock().await;
467
-                info!("Active notifications:");
468
-                for notification in store.list() {
469
-                    info!(
470
-                        "  id={} app=\"{}\" summary=\"{}\"",
471
-                        notification.id, notification.app_name, notification.summary
472
-                    );
473
-                }
566
+                let list: Vec<_> = store
567
+                    .list()
568
+                    .iter()
569
+                    .map(|n| {
570
+                        serde_json::json!({
571
+                            "id": n.id,
572
+                            "app_name": n.app_name,
573
+                            "summary": n.summary,
574
+                            "body": n.body,
575
+                            "urgency": format!("{:?}", n.hints.urgency),
576
+                        })
577
+                    })
578
+                    .collect();
579
+                Response::ok_with_data(serde_json::json!(list))
474580
             }
475581
             Command::RuleEnable { name } => {
476
-                info!("Rule enable: {}", name);
477
-                // TODO: Implement in sprint 4
582
+                if self.rule_engine.enable_rule(&name) {
583
+                    Response::ok_with_message(format!("Enabled rule '{}'", name))
584
+                } else {
585
+                    Response::error(format!("Rule '{}' not found", name))
586
+                }
478587
             }
479588
             Command::RuleDisable { name } => {
480
-                info!("Rule disable: {}", name);
481
-                // TODO: Implement in sprint 4
589
+                if self.rule_engine.disable_rule(&name) {
590
+                    Response::ok_with_message(format!("Disabled rule '{}'", name))
591
+                } else {
592
+                    Response::error(format!("Rule '{}' not found", name))
593
+                }
482594
             }
483595
         }
484596
     }
@@ -488,6 +600,8 @@ impl Daemon {
488600
         match config::load(None) {
489601
             Ok(new_config) => {
490602
                 info!("Reloaded configuration");
603
+                // Reload rules
604
+                self.rule_engine = RuleEngine::with_rules(new_config.rules.clone());
491605
                 self.config = Arc::new(new_config);
492606
             }
493607
             Err(e) => {
garnotify/src/ipc.rsmodified
@@ -1,6 +1,7 @@
11
 //! IPC for garnotify daemon <-> garnotifyctl communication
22
 //!
3
-//! Uses Unix domain sockets with JSON protocol.
3
+//! Uses Unix domain sockets with JSON protocol and oneshot channels
4
+//! for proper request-response handling.
45
 
56
 use anyhow::{Context, Result};
67
 use serde::{Deserialize, Serialize};
@@ -9,6 +10,7 @@ use std::os::unix::net::{UnixListener, UnixStream};
910
 use std::path::PathBuf;
1011
 use std::sync::mpsc::{self, Receiver, Sender};
1112
 use std::thread;
13
+use std::time::Duration;
1214
 use tracing::{debug, error, info};
1315
 
1416
 /// Get the path to the IPC socket
@@ -101,16 +103,22 @@ impl Response {
101103
     }
102104
 }
103105
 
106
+/// An IPC request wrapping a command with a response channel
107
+pub struct IpcRequest {
108
+    pub command: Command,
109
+    pub response_tx: std::sync::mpsc::Sender<Response>,
110
+}
111
+
104112
 /// IPC server for the daemon
105113
 pub struct IpcServer {
106114
     socket_path: PathBuf,
107115
     listener: Option<UnixListener>,
108
-    tx: Sender<Command>,
116
+    tx: Sender<IpcRequest>,
109117
 }
110118
 
111119
 impl IpcServer {
112120
     /// Create a new IPC server
113
-    pub fn new() -> (Self, Receiver<Command>) {
121
+    pub fn new() -> (Self, Receiver<IpcRequest>) {
114122
         let (tx, rx) = mpsc::channel();
115123
         let server = Self {
116124
             socket_path: socket_path(),
@@ -173,7 +181,7 @@ impl Drop for IpcServer {
173181
 }
174182
 
175183
 /// Handle a client connection
176
-fn handle_client(mut stream: UnixStream, tx: Sender<Command>) -> Result<()> {
184
+fn handle_client(mut stream: UnixStream, tx: Sender<IpcRequest>) -> Result<()> {
177185
     let reader = BufReader::new(stream.try_clone()?);
178186
 
179187
     for line in reader.lines() {
@@ -182,11 +190,23 @@ fn handle_client(mut stream: UnixStream, tx: Sender<Command>) -> Result<()> {
182190
 
183191
         let response = match serde_json::from_str::<Command>(&line) {
184192
             Ok(cmd) => {
185
-                // Forward command to daemon
186
-                if tx.send(cmd).is_err() {
193
+                // Create response channel
194
+                let (response_tx, response_rx) = mpsc::channel();
195
+
196
+                // Forward command with response channel to daemon
197
+                let request = IpcRequest {
198
+                    command: cmd,
199
+                    response_tx,
200
+                };
201
+
202
+                if tx.send(request).is_err() {
187203
                     Response::error("Daemon not responding")
188204
                 } else {
189
-                    Response::ok()
205
+                    // Wait for response from daemon (with timeout)
206
+                    match response_rx.recv_timeout(Duration::from_secs(5)) {
207
+                        Ok(response) => response,
208
+                        Err(_) => Response::error("Timeout waiting for daemon response"),
209
+                    }
190210
                 }
191211
             }
192212
             Err(e) => Response::error(format!("Invalid command: {}", e)),