gardesk/garnotify / 8c47bbb

Browse files

daemon: add event loop with IPC, D-Bus, and UI integration

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
8c47bbb8f9bd572b4f845b615ad905f4280727ee
Parents
fd3c96b
Tree
99fb22f

1 changed file

StatusFile+-
A garnotify/src/daemon.rs 559 0
garnotify/src/daemon.rsadded
@@ -0,0 +1,559 @@
1
+//! Daemon state machine and main event loop for garnotify
2
+
3
+use anyhow::{Context, Result};
4
+use std::fs;
5
+use std::io::Write;
6
+use std::path::PathBuf;
7
+use std::sync::mpsc::Receiver;
8
+use std::sync::Arc;
9
+use tokio::signal::unix::{signal, SignalKind};
10
+use tokio::sync::Mutex;
11
+use tracing::{debug, error, info, warn};
12
+
13
+use crate::config::{self, Config};
14
+use crate::dbus::NotificationsService;
15
+use crate::ipc::{Command, IpcServer};
16
+use crate::notification::{
17
+    new_shared_store, CloseReason, History, Notification, NotificationEvent,
18
+    SharedNotificationStore, UrgencyTimeouts,
19
+};
20
+use crate::ui::{PopupCommand, PopupEvent, PopupManager};
21
+
22
+/// Get the path to the PID file
23
+fn pid_file_path() -> PathBuf {
24
+    dirs::runtime_dir()
25
+        .unwrap_or_else(|| PathBuf::from("/tmp"))
26
+        .join("garnotify.pid")
27
+}
28
+
29
+/// Check if an existing daemon is running
30
+fn check_existing_daemon() -> Result<()> {
31
+    let pid_path = pid_file_path();
32
+    let socket_path = crate::ipc::socket_path();
33
+
34
+    if pid_path.exists() {
35
+        let pid_str = fs::read_to_string(&pid_path)?;
36
+        let pid: i32 = pid_str.trim().parse()?;
37
+
38
+        let proc_path = format!("/proc/{}", pid);
39
+        if std::path::Path::new(&proc_path).exists() {
40
+            anyhow::bail!(
41
+                "garnotify daemon already running (PID {}). Remove {} if incorrect.",
42
+                pid,
43
+                pid_path.display()
44
+            );
45
+        } else {
46
+            warn!("Removing stale PID file for PID {}", pid);
47
+            fs::remove_file(&pid_path)?;
48
+            // Also clean up stale socket
49
+            if socket_path.exists() {
50
+                warn!("Removing stale socket file");
51
+                let _ = fs::remove_file(&socket_path);
52
+            }
53
+        }
54
+    } else if socket_path.exists() {
55
+        // Socket exists but no PID file - orphaned socket
56
+        warn!("Removing orphaned socket file (no PID file)");
57
+        let _ = fs::remove_file(&socket_path);
58
+    }
59
+
60
+    Ok(())
61
+}
62
+
63
+/// Write the current process PID to the PID file
64
+fn write_pid_file() -> Result<()> {
65
+    let pid_path = pid_file_path();
66
+    let pid = std::process::id();
67
+
68
+    let mut file = fs::File::create(&pid_path)?;
69
+    writeln!(file, "{}", pid)?;
70
+
71
+    debug!("Wrote PID {} to {}", pid, pid_path.display());
72
+    Ok(())
73
+}
74
+
75
+/// Remove the PID file
76
+fn remove_pid_file() {
77
+    let pid_path = pid_file_path();
78
+    if let Err(e) = fs::remove_file(&pid_path) {
79
+        warn!("Failed to remove PID file: {}", e);
80
+    } else {
81
+        debug!("Removed PID file");
82
+    }
83
+}
84
+
85
+/// PID file guard - removes on drop
86
+struct PidGuard;
87
+
88
+impl Drop for PidGuard {
89
+    fn drop(&mut self) {
90
+        remove_pid_file();
91
+    }
92
+}
93
+
94
+/// Daemon state
95
+pub struct Daemon {
96
+    config: Arc<Config>,
97
+    ipc_server: IpcServer,
98
+    ipc_rx: Receiver<Command>,
99
+    dbus_service: Option<NotificationsService>,
100
+    notification_store: SharedNotificationStore,
101
+    notification_event_rx: tokio::sync::mpsc::Receiver<NotificationEvent>,
102
+    history: Arc<Mutex<History>>,
103
+    running: bool,
104
+    /// Channel to send commands to the UI thread
105
+    ui_cmd_tx: Option<std::sync::mpsc::Sender<PopupCommand>>,
106
+    /// Channel to receive events from the UI thread
107
+    ui_event_rx: Option<tokio::sync::mpsc::Receiver<PopupEvent>>,
108
+    /// Handle to the UI thread
109
+    ui_thread: Option<std::thread::JoinHandle<()>>,
110
+}
111
+
112
+impl Daemon {
113
+    /// Create a new daemon
114
+    pub fn new(config: Config) -> Result<Self> {
115
+        let (ipc_server, ipc_rx) = IpcServer::new();
116
+
117
+        // Create notification event channel
118
+        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
119
+
120
+        // Create urgency timeouts from config
121
+        let urgency_timeouts = UrgencyTimeouts {
122
+            low: config.timeouts.low,
123
+            normal: config.timeouts.normal,
124
+            critical: config.timeouts.critical,
125
+        };
126
+
127
+        // Create notification store with tokio handle for spawning timeout tasks
128
+        let tokio_handle = tokio::runtime::Handle::current();
129
+        let notification_store = new_shared_store(event_tx, urgency_timeouts, tokio_handle);
130
+
131
+        // Create history
132
+        let history = Arc::new(Mutex::new(History::new(config.history.max_length)));
133
+
134
+        let config = Arc::new(config);
135
+
136
+        Ok(Self {
137
+            config,
138
+            ipc_server,
139
+            ipc_rx,
140
+            dbus_service: None,
141
+            notification_store,
142
+            notification_event_rx: event_rx,
143
+            history,
144
+            running: true,
145
+            ui_cmd_tx: None,
146
+            ui_event_rx: None,
147
+            ui_thread: None,
148
+        })
149
+    }
150
+
151
+    /// Initialize IPC server
152
+    pub fn init_ipc(&mut self) -> Result<()> {
153
+        self.ipc_server
154
+            .start()
155
+            .context("Failed to start IPC server")?;
156
+        info!("IPC server started");
157
+        Ok(())
158
+    }
159
+
160
+    /// Initialize D-Bus service
161
+    pub async fn init_dbus(&mut self) -> Result<()> {
162
+        let service = NotificationsService::new(
163
+            self.config.clone(),
164
+            self.notification_store.clone(),
165
+        )
166
+        .await?;
167
+        self.dbus_service = Some(service);
168
+        info!("D-Bus service registered");
169
+        Ok(())
170
+    }
171
+
172
+    /// Initialize UI (popup manager) in a separate thread
173
+    pub fn init_ui(&mut self) -> Result<()> {
174
+        let config = self.config.clone();
175
+
176
+        // Create channels for communication
177
+        let (cmd_tx, cmd_rx) = std::sync::mpsc::channel::<PopupCommand>();
178
+        let (event_tx, event_rx) = tokio::sync::mpsc::channel::<PopupEvent>(100);
179
+
180
+        // Spawn UI thread
181
+        let ui_thread = std::thread::Builder::new()
182
+            .name("garnotify-ui".into())
183
+            .spawn(move || {
184
+                if let Err(e) = run_ui_thread(config, cmd_rx, event_tx) {
185
+                    error!("UI thread error: {}", e);
186
+                }
187
+            })
188
+            .context("Failed to spawn UI thread")?;
189
+
190
+        self.ui_cmd_tx = Some(cmd_tx);
191
+        self.ui_event_rx = Some(event_rx);
192
+        self.ui_thread = Some(ui_thread);
193
+
194
+        info!("UI thread started");
195
+        Ok(())
196
+    }
197
+
198
+    /// Send a command to the UI thread
199
+    fn send_ui_command(&self, cmd: PopupCommand) {
200
+        if let Some(ref tx) = self.ui_cmd_tx {
201
+            if let Err(e) = tx.send(cmd) {
202
+                warn!("Failed to send UI command: {}", e);
203
+            }
204
+        }
205
+    }
206
+
207
+    /// Show a notification popup
208
+    fn show_notification_popup(&self, notification: Notification) {
209
+        self.send_ui_command(PopupCommand::Show(notification));
210
+    }
211
+
212
+    /// Close a notification popup
213
+    fn close_notification_popup(&self, id: u32, reason: CloseReason) {
214
+        self.send_ui_command(PopupCommand::Close { id, reason });
215
+    }
216
+
217
+    /// Run the main event loop
218
+    pub async fn run(&mut self) -> Result<()> {
219
+        info!("Entering main event loop");
220
+
221
+        let mut sigterm = signal(SignalKind::terminate())?;
222
+        let mut sighup = signal(SignalKind::hangup())?;
223
+
224
+        while self.running {
225
+            // Check for IPC commands (non-blocking)
226
+            self.poll_ipc_commands().await;
227
+
228
+            // Check for UI events (non-blocking)
229
+            self.poll_ui_events().await;
230
+
231
+            tokio::select! {
232
+                _ = sigterm.recv() => {
233
+                    info!("Received SIGTERM, shutting down");
234
+                    self.running = false;
235
+                }
236
+                _ = sighup.recv() => {
237
+                    info!("Received SIGHUP, reloading config");
238
+                    self.handle_reload()?;
239
+                }
240
+                _ = tokio::signal::ctrl_c() => {
241
+                    info!("Received Ctrl+C, shutting down");
242
+                    self.running = false;
243
+                }
244
+                Some(event) = self.notification_event_rx.recv() => {
245
+                    self.handle_notification_event(event).await;
246
+                }
247
+                _ = tokio::time::sleep(tokio::time::Duration::from_millis(50)) => {
248
+                    // Poll interval for IPC and UI events
249
+                }
250
+            }
251
+        }
252
+
253
+        // Clean up UI thread
254
+        if let Some(tx) = self.ui_cmd_tx.take() {
255
+            drop(tx); // Close the channel to signal UI thread to exit
256
+        }
257
+        if let Some(handle) = self.ui_thread.take() {
258
+            let _ = handle.join();
259
+        }
260
+
261
+        info!("Daemon shutdown complete");
262
+        Ok(())
263
+    }
264
+
265
+    /// Poll for UI events
266
+    async fn poll_ui_events(&mut self) {
267
+        // Collect events first to avoid borrow issues
268
+        let events: Vec<PopupEvent> = if let Some(ref mut rx) = self.ui_event_rx {
269
+            let mut events = Vec::new();
270
+            while let Ok(event) = rx.try_recv() {
271
+                events.push(event);
272
+            }
273
+            events
274
+        } else {
275
+            Vec::new()
276
+        };
277
+
278
+        // Handle collected events
279
+        for event in events {
280
+            self.handle_ui_event(event).await;
281
+        }
282
+    }
283
+
284
+    /// Handle a UI event
285
+    async fn handle_ui_event(&mut self, event: PopupEvent) {
286
+        match event {
287
+            PopupEvent::Dismissed(id) => {
288
+                info!("Notification {} dismissed by user", id);
289
+                // Remove from store and emit D-Bus signal
290
+                let mut store = self.notification_store.lock().await;
291
+                if let Some(notification) = store.remove(id) {
292
+                    let mut history = self.history.lock().await;
293
+                    history.push(notification);
294
+                }
295
+                if let Some(ref dbus) = self.dbus_service {
296
+                    if let Err(e) = dbus.emit_closed(id, CloseReason::Dismissed).await {
297
+                        error!("Failed to emit NotificationClosed signal: {}", e);
298
+                    }
299
+                }
300
+            }
301
+            PopupEvent::ActionInvoked { id, action_key } => {
302
+                info!("Action '{}' invoked on notification {}", action_key, id);
303
+                if let Some(ref dbus) = self.dbus_service {
304
+                    if let Err(e) = dbus.emit_action_invoked(id, &action_key).await {
305
+                        error!("Failed to emit ActionInvoked signal: {}", e);
306
+                    }
307
+                }
308
+            }
309
+            PopupEvent::Closed { id, reason } => {
310
+                debug!("Popup closed for notification {}: {:?}", id, reason);
311
+                // Already handled by the notification event
312
+            }
313
+        }
314
+    }
315
+
316
+    /// Poll for IPC commands
317
+    async fn poll_ipc_commands(&mut self) {
318
+        while let Ok(cmd) = self.ipc_rx.try_recv() {
319
+            self.handle_ipc_command(cmd).await;
320
+        }
321
+    }
322
+
323
+    /// Handle a notification event (created, updated, closed)
324
+    async fn handle_notification_event(&mut self, event: NotificationEvent) {
325
+        match event {
326
+            NotificationEvent::Created(notification) => {
327
+                info!(
328
+                    "Notification created: id={} summary=\"{}\"",
329
+                    notification.id, notification.summary
330
+                );
331
+                // Show popup
332
+                self.show_notification_popup(notification);
333
+            }
334
+            NotificationEvent::Updated(notification) => {
335
+                info!(
336
+                    "Notification updated: id={} summary=\"{}\"",
337
+                    notification.id, notification.summary
338
+                );
339
+                // Update popup
340
+                self.send_ui_command(PopupCommand::Update {
341
+                    id: notification.id,
342
+                    notification,
343
+                });
344
+            }
345
+            NotificationEvent::Closed { id, reason } => {
346
+                info!("Notification {} closed: reason={:?}", id, reason);
347
+
348
+                // Close popup
349
+                self.close_notification_popup(id, reason.clone());
350
+
351
+                // Remove from store and add to history
352
+                let mut store = self.notification_store.lock().await;
353
+                if let Some(notification) = store.remove(id) {
354
+                    // Add to history (unless transient)
355
+                    let mut history = self.history.lock().await;
356
+                    history.push(notification);
357
+                }
358
+
359
+                // Emit D-Bus signal
360
+                if let Some(ref dbus) = self.dbus_service {
361
+                    if let Err(e) = dbus.emit_closed(id, reason).await {
362
+                        error!("Failed to emit NotificationClosed signal: {}", e);
363
+                    }
364
+                }
365
+            }
366
+        }
367
+    }
368
+
369
+    /// Handle an IPC command
370
+    async fn handle_ipc_command(&mut self, cmd: Command) {
371
+        debug!("Handling IPC command: {:?}", cmd);
372
+        match cmd {
373
+            Command::Status => {
374
+                let store = self.notification_store.lock().await;
375
+                let history = self.history.lock().await;
376
+                info!(
377
+                    "Status: running, {} active notifications, {} in history",
378
+                    store.count(),
379
+                    history.len()
380
+                );
381
+            }
382
+            Command::Reload => {
383
+                info!("Reloading config via IPC");
384
+                if let Err(e) = self.handle_reload() {
385
+                    error!("Failed to reload config: {}", e);
386
+                }
387
+            }
388
+            Command::Quit => {
389
+                info!("Quit requested via IPC");
390
+                self.running = false;
391
+            }
392
+            Command::Close { id } => {
393
+                let target_id = if let Some(id) = id {
394
+                    Some(id)
395
+                } else {
396
+                    // Close most recent
397
+                    let store = self.notification_store.lock().await;
398
+                    store.list().last().map(|n| n.id)
399
+                };
400
+
401
+                if let Some(id) = target_id {
402
+                    info!("Close notification: {}", id);
403
+                    let mut store = self.notification_store.lock().await;
404
+                    if let Some(notification) = store.remove(id) {
405
+                        let mut history = self.history.lock().await;
406
+                        history.push(notification);
407
+
408
+                        // Emit D-Bus signal
409
+                        if let Some(ref dbus) = self.dbus_service {
410
+                            if let Err(e) = dbus.emit_closed(id, CloseReason::Closed).await {
411
+                                error!("Failed to emit NotificationClosed signal: {}", e);
412
+                            }
413
+                        }
414
+                    }
415
+                }
416
+            }
417
+            Command::CloseAll => {
418
+                info!("Close all notifications");
419
+                let mut store = self.notification_store.lock().await;
420
+                let notifications = store.clear();
421
+                let mut history = self.history.lock().await;
422
+
423
+                for notification in notifications {
424
+                    let id = notification.id;
425
+                    history.push(notification);
426
+
427
+                    // Emit D-Bus signal for each
428
+                    if let Some(ref dbus) = self.dbus_service {
429
+                        if let Err(e) = dbus.emit_closed(id, CloseReason::Closed).await {
430
+                            error!("Failed to emit NotificationClosed signal: {}", e);
431
+                        }
432
+                    }
433
+                }
434
+            }
435
+            Command::HistoryPop => {
436
+                info!("History pop requested");
437
+                let mut history = self.history.lock().await;
438
+                if let Some(notification) = history.pop() {
439
+                    info!(
440
+                        "Popped from history: id={} summary=\"{}\"",
441
+                        notification.id, notification.summary
442
+                    );
443
+                    // TODO: In sprint 3, re-display the notification
444
+                } else {
445
+                    info!("History is empty");
446
+                }
447
+            }
448
+            Command::HistoryClear => {
449
+                info!("History clear requested");
450
+                let mut history = self.history.lock().await;
451
+                history.clear();
452
+            }
453
+            Command::SetPaused { paused, level } => {
454
+                info!("Set paused: {} (level {})", paused, level);
455
+                // TODO: Implement in sprint 4
456
+            }
457
+            Command::IsPaused => {
458
+                info!("Is paused query");
459
+                // TODO: Implement in sprint 4
460
+            }
461
+            Command::Count => {
462
+                let store = self.notification_store.lock().await;
463
+                info!("Notification count: {}", store.count());
464
+            }
465
+            Command::List => {
466
+                let store = self.notification_store.lock().await;
467
+                info!("Active notifications:");
468
+                for notification in store.list() {
469
+                    info!(
470
+                        "  id={} app=\"{}\" summary=\"{}\"",
471
+                        notification.id, notification.app_name, notification.summary
472
+                    );
473
+                }
474
+            }
475
+            Command::RuleEnable { name } => {
476
+                info!("Rule enable: {}", name);
477
+                // TODO: Implement in sprint 4
478
+            }
479
+            Command::RuleDisable { name } => {
480
+                info!("Rule disable: {}", name);
481
+                // TODO: Implement in sprint 4
482
+            }
483
+        }
484
+    }
485
+
486
+    /// Handle config reload
487
+    fn handle_reload(&mut self) -> Result<()> {
488
+        match config::load(None) {
489
+            Ok(new_config) => {
490
+                info!("Reloaded configuration");
491
+                self.config = Arc::new(new_config);
492
+            }
493
+            Err(e) => {
494
+                warn!("Failed to reload config: {}", e);
495
+            }
496
+        }
497
+        Ok(())
498
+    }
499
+}
500
+
501
+/// Run the notification daemon
502
+pub async fn run(config_path: Option<String>, _foreground: bool) -> Result<()> {
503
+    check_existing_daemon()?;
504
+    write_pid_file()?;
505
+    let _pid_guard = PidGuard;
506
+
507
+    let config = config::load(config_path.as_deref())?;
508
+    info!("Loaded configuration");
509
+    info!("Position: {}", config.geometry.position);
510
+    info!("Width: {}px", config.geometry.width);
511
+    info!(
512
+        "Timeouts: low={}ms, normal={}ms, critical={}ms",
513
+        config.timeouts.low, config.timeouts.normal, config.timeouts.critical
514
+    );
515
+
516
+    let mut daemon = Daemon::new(config)?;
517
+    daemon.init_ipc().context("Failed to initialize IPC")?;
518
+    daemon
519
+        .init_dbus()
520
+        .await
521
+        .context("Failed to initialize D-Bus")?;
522
+    daemon.init_ui().context("Failed to initialize UI")?;
523
+    daemon.run().await
524
+}
525
+
526
+/// Run the UI thread - handles X11 popup windows
527
+fn run_ui_thread(
528
+    config: Arc<Config>,
529
+    cmd_rx: std::sync::mpsc::Receiver<PopupCommand>,
530
+    event_tx: tokio::sync::mpsc::Sender<PopupEvent>,
531
+) -> Result<()> {
532
+    let mut manager = PopupManager::new(config, event_tx)?;
533
+
534
+    loop {
535
+        // Check for commands (with timeout to allow X11 event polling)
536
+        match cmd_rx.recv_timeout(std::time::Duration::from_millis(16)) {
537
+            Ok(cmd) => {
538
+                if let Err(e) = manager.handle_command(cmd) {
539
+                    error!("Failed to handle UI command: {}", e);
540
+                }
541
+            }
542
+            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
543
+                // Normal timeout, continue to poll X11 events
544
+            }
545
+            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
546
+                // Channel closed, daemon shutting down
547
+                info!("UI thread: command channel closed, exiting");
548
+                break;
549
+            }
550
+        }
551
+
552
+        // Poll X11 events
553
+        if let Err(e) = manager.poll_events() {
554
+            error!("Failed to poll X11 events: {}", e);
555
+        }
556
+    }
557
+
558
+    Ok(())
559
+}