gardesk/garwarp / 33bdd6d

Browse files

add restart recovery store

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
33bdd6dcb6dea23d9c71572f401b6eb4a660dac7
Parents
1e77d9b
Tree
b9da39d

7 changed files

StatusFile+-
M garwarp/src/config.rs 12 1
M garwarp/src/daemon.rs 90 3
M garwarp/src/main.rs 1 0
M garwarp/src/request.rs 64 0
A garwarp/src/request_store.rs 179 0
M garwarp/src/runtime.rs 7 0
M garwarp/src/window.rs 9 0
garwarp/src/config.rsmodified
@@ -4,18 +4,29 @@ use std::time::Duration;
44
 #[derive(Debug, Clone)]
55
 pub struct Config {
66
     pub poll_interval: Duration,
7
+    pub request_timeout: Duration,
78
 }
89
 
910
 impl Config {
1011
     #[must_use]
1112
     pub fn from_env() -> Self {
1213
         const DEFAULT_POLL_MS: u64 = 100;
14
+        const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
1315
         let poll_interval = env::var("GARWARP_POLL_MS")
1416
             .ok()
1517
             .and_then(|value| value.parse::<u64>().ok())
1618
             .filter(|value| *value > 0)
1719
             .map(Duration::from_millis)
1820
             .unwrap_or_else(|| Duration::from_millis(DEFAULT_POLL_MS));
19
-        Self { poll_interval }
21
+        let request_timeout = env::var("GARWARP_REQUEST_TIMEOUT_MS")
22
+            .ok()
23
+            .and_then(|value| value.parse::<u64>().ok())
24
+            .filter(|value| *value > 0)
25
+            .map(Duration::from_millis)
26
+            .unwrap_or_else(|| Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MS));
27
+        Self {
28
+            poll_interval,
29
+            request_timeout,
30
+        }
2031
     }
2132
 }
garwarp/src/daemon.rsmodified
@@ -14,6 +14,7 @@ use crate::error::{PortalError, map_portal_error, map_request_error};
1414
 use crate::lock::SingleInstanceGuard;
1515
 use crate::logging;
1616
 use crate::request::{RequestOwner, RequestRegistry, RequestState};
17
+use crate::request_store;
1718
 use crate::runtime::RuntimePaths;
1819
 use crate::window::parse_optional_parent_window;
1920
 
@@ -31,22 +32,37 @@ pub fn run() -> io::Result<()> {
3132
 
3233
     logging::info("daemon_starting");
3334
 
35
+    let (requests, recovered_ids) =
36
+        load_registry_with_recovery(&paths.request_store, config.request_timeout)?;
37
+    if !recovered_ids.is_empty() {
38
+        logging::warn(&format!(
39
+            "request_recovery_expired count={}",
40
+            recovered_ids.len()
41
+        ));
42
+    }
43
+
3444
     let mut state = DaemonState {
3545
         health: HealthStatus::Healthy,
36
-        requests: RequestRegistry::new(Duration::from_secs(30)),
46
+        requests,
3747
         running: true,
3848
     };
49
+    persist_registry_state(&paths.request_store, &state.requests);
3950
 
4051
     while state.running {
4152
         let expired = state.requests.expire_stale(Instant::now());
42
-        for id in expired {
53
+        for id in &expired {
4354
             logging::warn(&format!("request_expired id={id}"));
4455
         }
56
+        if !expired.is_empty() {
57
+            persist_registry_state(&paths.request_store, &state.requests);
58
+        }
4559
 
4660
         match listener.accept() {
4761
             Ok((stream, _address)) => {
4862
                 if let Err(error) = handle_connection(stream, &mut state) {
4963
                     logging::warn(&format!("request_error={error}"));
64
+                } else {
65
+                    persist_registry_state(&paths.request_store, &state.requests);
5066
                 }
5167
             }
5268
             Err(error) if error.kind() == io::ErrorKind::WouldBlock => {
@@ -188,17 +204,42 @@ fn map_transition_target(target: RequestTransitionTarget) -> RequestState {
188204
     }
189205
 }
190206
 
207
+fn load_registry_with_recovery(
208
+    request_store_path: &std::path::Path,
209
+    timeout: Duration,
210
+) -> io::Result<(RequestRegistry, Vec<String>)> {
211
+    let mut registry = request_store::load_registry(request_store_path, timeout)?;
212
+    let expired = registry.recover_after_restart(Instant::now());
213
+    Ok((registry, expired))
214
+}
215
+
216
+fn persist_registry_state(path: &std::path::Path, registry: &RequestRegistry) {
217
+    if let Err(error) = request_store::persist_registry(path, registry) {
218
+        logging::warn(&format!("request_store_write_failed error={error}"));
219
+    }
220
+}
221
+
191222
 #[cfg(test)]
192223
 mod tests {
193
-    use super::{DaemonState, handle_connection};
224
+    use super::{DaemonState, handle_connection, load_registry_with_recovery};
194225
     use garwarp_ipc::{ControlResponse, HealthStatus};
226
+    use std::fs;
195227
     use std::io::{BufRead, BufReader, Write};
196228
     use std::os::unix::net::UnixStream;
229
+    use std::path::PathBuf;
197230
     use std::time::{Duration, Instant};
198231
 
199232
     use crate::request::{RequestOwner, RequestRegistry, RequestState};
233
+    use crate::request_store;
200234
     use crate::window::ParentWindowContext;
201235
 
236
+    fn unique_temp_file() -> PathBuf {
237
+        let nanos = std::time::SystemTime::now()
238
+            .duration_since(std::time::UNIX_EPOCH)
239
+            .map_or(0, |duration| duration.as_nanos());
240
+        std::env::temp_dir().join(format!("garwarp-daemon-recovery-{nanos}.state"))
241
+    }
242
+
202243
     #[test]
203244
     fn status_request_returns_status_response() {
204245
         let (mut client, server) = UnixStream::pair().expect("pair should be created");
@@ -400,4 +441,50 @@ mod tests {
400441
         );
401442
         assert_eq!(state.requests.state("req-1"), Some(RequestState::Pending));
402443
     }
444
+
445
+    #[test]
446
+    fn startup_recovery_expires_non_terminal_requests() {
447
+        let path = unique_temp_file();
448
+
449
+        let mut persisted = RequestRegistry::new(Duration::from_secs(5));
450
+        persisted
451
+            .begin_at(
452
+                "req-pending",
453
+                RequestOwner::new(":1.2", None),
454
+                None,
455
+                Instant::now(),
456
+            )
457
+            .expect("request should be created");
458
+        persisted
459
+            .begin_at(
460
+                "req-done",
461
+                RequestOwner::new(":1.3", None),
462
+                None,
463
+                Instant::now(),
464
+            )
465
+            .expect("request should be created");
466
+        persisted
467
+            .transition(
468
+                "req-done",
469
+                &RequestOwner::new(":1.3", None),
470
+                RequestState::AwaitingUser,
471
+            )
472
+            .expect("request should transition");
473
+        persisted
474
+            .transition(
475
+                "req-done",
476
+                &RequestOwner::new(":1.3", None),
477
+                RequestState::Fulfilled,
478
+            )
479
+            .expect("request should transition");
480
+        request_store::persist_registry(&path, &persisted).expect("request store should persist");
481
+
482
+        let (loaded, recovered) = load_registry_with_recovery(&path, Duration::from_secs(5))
483
+            .expect("registry should load");
484
+        assert_eq!(recovered, vec!["req-pending".to_string()]);
485
+        assert_eq!(loaded.state("req-pending"), Some(RequestState::Expired));
486
+        assert_eq!(loaded.state("req-done"), Some(RequestState::Fulfilled));
487
+
488
+        let _ = fs::remove_file(path);
489
+    }
403490
 }
garwarp/src/main.rsmodified
@@ -5,6 +5,7 @@ mod error;
55
 mod lock;
66
 mod logging;
77
 mod request;
8
+mod request_store;
89
 mod runtime;
910
 mod window;
1011
 
garwarp/src/request.rsmodified
@@ -52,6 +52,19 @@ impl RequestState {
5252
             Self::Expired => "expired",
5353
         }
5454
     }
55
+
56
+    #[must_use]
57
+    pub fn parse(input: &str) -> Option<Self> {
58
+        match input {
59
+            "pending" => Some(Self::Pending),
60
+            "awaiting_user" => Some(Self::AwaitingUser),
61
+            "fulfilled" => Some(Self::Fulfilled),
62
+            "cancelled" => Some(Self::Cancelled),
63
+            "failed" => Some(Self::Failed),
64
+            "expired" => Some(Self::Expired),
65
+            _ => None,
66
+        }
67
+    }
5568
 }
5669
 
5770
 #[derive(Debug, Clone)]
@@ -83,6 +96,14 @@ impl RequestEntry {
8396
     }
8497
 }
8598
 
99
+#[derive(Debug, Clone, PartialEq, Eq)]
100
+pub struct RequestRecord {
101
+    pub id: String,
102
+    pub owner: RequestOwner,
103
+    pub parent_window: Option<ParentWindowContext>,
104
+    pub state: RequestState,
105
+}
106
+
86107
 #[derive(Debug)]
87108
 pub struct RequestRegistry {
88109
     entries: HashMap<String, RequestEntry>,
@@ -123,6 +144,28 @@ impl RequestRegistry {
123144
         Ok(())
124145
     }
125146
 
147
+    pub fn restore_record(
148
+        &mut self,
149
+        record: RequestRecord,
150
+        now: Instant,
151
+    ) -> Result<(), RequestError> {
152
+        if self.entries.contains_key(&record.id) {
153
+            return Err(RequestError::AlreadyExists(record.id));
154
+        }
155
+        self.entries.insert(
156
+            record.id.clone(),
157
+            RequestEntry {
158
+                id: record.id,
159
+                owner: record.owner,
160
+                parent_window: record.parent_window,
161
+                state: record.state,
162
+                started_at: now,
163
+                last_updated_at: now,
164
+            },
165
+        );
166
+        Ok(())
167
+    }
168
+
126169
     pub fn transition(
127170
         &mut self,
128171
         id: &str,
@@ -211,6 +254,27 @@ impl RequestRegistry {
211254
     pub fn parent_window(&self, id: &str) -> Option<Option<ParentWindowContext>> {
212255
         self.entries.get(id).map(|entry| entry.parent_window)
213256
     }
257
+
258
+    #[must_use]
259
+    pub fn owner(&self, id: &str) -> Option<RequestOwner> {
260
+        self.entries.get(id).map(|entry| entry.owner.clone())
261
+    }
262
+
263
+    #[must_use]
264
+    pub fn records(&self) -> Vec<RequestRecord> {
265
+        let mut records = self
266
+            .entries
267
+            .values()
268
+            .map(|entry| RequestRecord {
269
+                id: entry.id.clone(),
270
+                owner: entry.owner.clone(),
271
+                parent_window: entry.parent_window,
272
+                state: entry.state,
273
+            })
274
+            .collect::<Vec<_>>();
275
+        records.sort_by(|left, right| left.id.cmp(&right.id));
276
+        records
277
+    }
214278
 }
215279
 
216280
 fn is_valid_transition(from: RequestState, to: RequestState) -> bool {
garwarp/src/request_store.rsadded
@@ -0,0 +1,179 @@
1
+#![allow(dead_code)]
2
+
3
+use std::collections::HashMap;
4
+use std::fs;
5
+use std::io;
6
+use std::path::Path;
7
+use std::time::{Duration, Instant};
8
+
9
+use crate::request::{RequestOwner, RequestRecord, RequestRegistry, RequestState};
10
+use crate::window::parse_optional_parent_window;
11
+
12
+pub fn load_registry(path: &Path, timeout: Duration) -> io::Result<RequestRegistry> {
13
+    let mut registry = RequestRegistry::new(timeout);
14
+    if !path.exists() {
15
+        return Ok(registry);
16
+    }
17
+
18
+    let body = fs::read_to_string(path)?;
19
+    let now = Instant::now();
20
+
21
+    for (index, line) in body.lines().enumerate() {
22
+        if line.trim().is_empty() {
23
+            continue;
24
+        }
25
+        let record = parse_record_line(line).map_err(|error| {
26
+            io::Error::new(
27
+                io::ErrorKind::InvalidData,
28
+                format!("invalid request store line {}: {error}", index + 1),
29
+            )
30
+        })?;
31
+        registry.restore_record(record, now).map_err(|error| {
32
+            io::Error::new(
33
+                io::ErrorKind::InvalidData,
34
+                format!("invalid request store line {}: {error}", index + 1),
35
+            )
36
+        })?;
37
+    }
38
+
39
+    Ok(registry)
40
+}
41
+
42
+pub fn persist_registry(path: &Path, registry: &RequestRegistry) -> io::Result<()> {
43
+    let mut output = String::new();
44
+    for record in registry.records() {
45
+        output.push_str(&format_record_line(&record));
46
+        output.push('\n');
47
+    }
48
+    fs::write(path, output)
49
+}
50
+
51
+fn format_record_line(record: &RequestRecord) -> String {
52
+    let app_id = record.owner.app_id.as_deref().unwrap_or("-");
53
+    let parent_window = match record.parent_window {
54
+        Some(parent_window) => parent_window.as_str(),
55
+        None => "-".to_string(),
56
+    };
57
+
58
+    format!(
59
+        "id={}\tsender={}\tapp_id={}\tparent={}\tstate={}",
60
+        record.id,
61
+        record.owner.sender,
62
+        app_id,
63
+        parent_window,
64
+        record.state.as_str()
65
+    )
66
+}
67
+
68
+fn parse_record_line(line: &str) -> Result<RequestRecord, String> {
69
+    let fields = parse_fields(line)?;
70
+
71
+    let id = required_field(&fields, "id")?.to_string();
72
+    let sender = required_field(&fields, "sender")?.to_string();
73
+    let app_id = optional_field(&fields, "app_id");
74
+    let parent_window = parse_optional_parent_window(optional_field_ref(&fields, "parent"))
75
+        .map_err(|error| error.to_string())?;
76
+    let state = RequestState::parse(required_field(&fields, "state")?)
77
+        .ok_or_else(|| "invalid request state".to_string())?;
78
+
79
+    Ok(RequestRecord {
80
+        id,
81
+        owner: RequestOwner::new(sender, app_id),
82
+        parent_window,
83
+        state,
84
+    })
85
+}
86
+
87
+fn parse_fields(line: &str) -> Result<HashMap<&str, &str>, String> {
88
+    let mut fields = HashMap::new();
89
+    for token in line.split('\t') {
90
+        let (key, value) = token
91
+            .split_once('=')
92
+            .ok_or_else(|| format!("invalid token: {token}"))?;
93
+        fields.insert(key, value);
94
+    }
95
+    Ok(fields)
96
+}
97
+
98
+fn required_field<'a>(fields: &'a HashMap<&str, &'a str>, key: &str) -> Result<&'a str, String> {
99
+    fields
100
+        .get(key)
101
+        .copied()
102
+        .ok_or_else(|| format!("missing field: {key}"))
103
+}
104
+
105
+fn optional_field(fields: &HashMap<&str, &str>, key: &str) -> Option<String> {
106
+    optional_field_ref(fields, key).map(ToOwned::to_owned)
107
+}
108
+
109
+fn optional_field_ref<'a>(fields: &'a HashMap<&str, &'a str>, key: &str) -> Option<&'a str> {
110
+    fields
111
+        .get(key)
112
+        .copied()
113
+        .and_then(|value| if value == "-" { None } else { Some(value) })
114
+}
115
+
116
+#[cfg(test)]
117
+mod tests {
118
+    use super::{load_registry, persist_registry};
119
+    use crate::request::{RequestOwner, RequestRegistry, RequestState};
120
+    use crate::window::ParentWindowContext;
121
+    use std::fs;
122
+    use std::path::PathBuf;
123
+    use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
124
+
125
+    fn unique_temp_file() -> PathBuf {
126
+        let nanos = SystemTime::now()
127
+            .duration_since(UNIX_EPOCH)
128
+            .map_or(0, |duration| duration.as_nanos());
129
+        std::env::temp_dir().join(format!("garwarp-request-store-{nanos}.state"))
130
+    }
131
+
132
+    #[test]
133
+    fn persist_and_load_roundtrip() {
134
+        let path = unique_temp_file();
135
+        let mut registry = RequestRegistry::new(Duration::from_secs(5));
136
+        registry
137
+            .begin_at(
138
+                "req-1",
139
+                RequestOwner::new(":1.2", Some("org.test.App".to_string())),
140
+                Some(ParentWindowContext::X11 { window_id: 42 }),
141
+                Instant::now(),
142
+            )
143
+            .expect("request should be created");
144
+        registry
145
+            .transition(
146
+                "req-1",
147
+                &RequestOwner::new(":1.2", Some("org.test.App".to_string())),
148
+                RequestState::AwaitingUser,
149
+            )
150
+            .expect("request should transition");
151
+
152
+        persist_registry(&path, &registry).expect("registry should persist");
153
+
154
+        let loaded = load_registry(&path, Duration::from_secs(5)).expect("registry should load");
155
+        assert_eq!(loaded.state("req-1"), Some(RequestState::AwaitingUser));
156
+        assert_eq!(
157
+            loaded.parent_window("req-1"),
158
+            Some(Some(ParentWindowContext::X11 { window_id: 42 }))
159
+        );
160
+        assert_eq!(
161
+            loaded.owner("req-1"),
162
+            Some(RequestOwner::new(":1.2", Some("org.test.App".to_string())))
163
+        );
164
+
165
+        let _ = fs::remove_file(path);
166
+    }
167
+
168
+    #[test]
169
+    fn invalid_lines_fail_to_load() {
170
+        let path = unique_temp_file();
171
+        fs::write(&path, "id=req-1\tsender=:1.2\tstate=bogus\n")
172
+            .expect("test file should be written");
173
+
174
+        let loaded = load_registry(&path, Duration::from_secs(5));
175
+        assert!(loaded.is_err());
176
+
177
+        let _ = fs::remove_file(path);
178
+    }
179
+}
garwarp/src/runtime.rsmodified
@@ -10,6 +10,7 @@ pub struct RuntimePaths {
1010
     pub root: PathBuf,
1111
     pub control_socket: PathBuf,
1212
     pub lock_file: PathBuf,
13
+    pub request_store: PathBuf,
1314
 }
1415
 
1516
 impl RuntimePaths {
@@ -26,10 +27,12 @@ impl RuntimePaths {
2627
         let root = base.join(DEFAULT_RUNTIME_SUBDIR);
2728
         let control_socket = root.join(DEFAULT_CONTROL_SOCKET);
2829
         let lock_file = root.join("garwarp.lock");
30
+        let request_store = root.join("requests.state");
2931
         Self {
3032
             root,
3133
             control_socket,
3234
             lock_file,
35
+            request_store,
3336
         }
3437
     }
3538
 
@@ -56,5 +59,9 @@ mod tests {
5659
             paths.lock_file,
5760
             PathBuf::from("/tmp/runtime/garwarp/garwarp.lock")
5861
         );
62
+        assert_eq!(
63
+            paths.request_store,
64
+            PathBuf::from("/tmp/runtime/garwarp/requests.state")
65
+        );
5966
     }
6067
 }
garwarp/src/window.rsmodified
@@ -7,6 +7,15 @@ pub enum ParentWindowContext {
77
     X11 { window_id: u64 },
88
 }
99
 
10
+impl ParentWindowContext {
11
+    #[must_use]
12
+    pub fn as_str(self) -> String {
13
+        match self {
14
+            Self::X11 { window_id } => format!("x11:0x{window_id:x}"),
15
+        }
16
+    }
17
+}
18
+
1019
 #[derive(Debug, Clone, PartialEq, Eq)]
1120
 pub enum ParentWindowError {
1221
     Empty,