@@ -1,8 +1,9 @@ |
| 1 | 1 | use std::fs; |
| 2 | 2 | use std::io::{self, BufRead, BufReader, Read, Write}; |
| 3 | 3 | use std::os::unix::net::{UnixListener, UnixStream}; |
| 4 | +use std::path::Path; |
| 4 | 5 | use std::thread; |
| 5 | | -use std::time::{Duration, Instant}; |
| 6 | +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; |
| 6 | 7 | |
| 7 | 8 | use garwarp_ipc::{ |
| 8 | 9 | ControlRequest, ControlResponse, HealthStatus, RequestTransitionTarget, StatusResponse, |
@@ -35,8 +36,8 @@ pub fn run() -> io::Result<()> { |
| 35 | 36 | |
| 36 | 37 | logging::info("daemon_starting"); |
| 37 | 38 | |
| 38 | | - let (requests, recovered_ids) = |
| 39 | | - load_registry_with_recovery(&paths.request_store, config.request_timeout)?; |
| 39 | + let (requests, recovered_ids, startup_degraded) = |
| 40 | + load_registry_with_fallback(&paths.request_store, config.request_timeout); |
| 40 | 41 | if !recovered_ids.is_empty() { |
| 41 | 42 | logging::warn(&format!( |
| 42 | 43 | "request_recovery_expired count={}", |
@@ -45,7 +46,11 @@ pub fn run() -> io::Result<()> { |
| 45 | 46 | } |
| 46 | 47 | |
| 47 | 48 | let mut state = DaemonState { |
| 48 | | - health: HealthStatus::Healthy, |
| 49 | + health: if startup_degraded { |
| 50 | + HealthStatus::Degraded |
| 51 | + } else { |
| 52 | + HealthStatus::Healthy |
| 53 | + }, |
| 49 | 54 | requests, |
| 50 | 55 | running: true, |
| 51 | 56 | }; |
@@ -337,6 +342,48 @@ fn load_registry_with_recovery( |
| 337 | 342 | Ok((registry, expired)) |
| 338 | 343 | } |
| 339 | 344 | |
| 345 | +fn load_registry_with_fallback( |
| 346 | + request_store_path: &Path, |
| 347 | + timeout: Duration, |
| 348 | +) -> (RequestRegistry, Vec<String>, bool) { |
| 349 | + match load_registry_with_recovery(request_store_path, timeout) { |
| 350 | + Ok((registry, recovered_ids)) => (registry, recovered_ids, false), |
| 351 | + Err(error) => { |
| 352 | + logging::warn(&format!("request_store_load_failed error={error}")); |
| 353 | + match quarantine_request_store(request_store_path) { |
| 354 | + Ok(Some(path)) => logging::warn(&format!( |
| 355 | + "request_store_quarantined path={}", |
| 356 | + path.display() |
| 357 | + )), |
| 358 | + Ok(None) => {} |
| 359 | + Err(error) => { |
| 360 | + logging::warn(&format!("request_store_quarantine_failed error={error}")) |
| 361 | + } |
| 362 | + } |
| 363 | + (RequestRegistry::new(timeout), Vec::new(), true) |
| 364 | + } |
| 365 | + } |
| 366 | +} |
| 367 | + |
| 368 | +fn quarantine_request_store(path: &Path) -> io::Result<Option<std::path::PathBuf>> { |
| 369 | + if !path.exists() { |
| 370 | + return Ok(None); |
| 371 | + } |
| 372 | + |
| 373 | + let parent = path.parent().unwrap_or_else(|| Path::new(".")); |
| 374 | + let file_name = path |
| 375 | + .file_name() |
| 376 | + .and_then(|name| name.to_str()) |
| 377 | + .unwrap_or("requests.state"); |
| 378 | + let nanos = SystemTime::now() |
| 379 | + .duration_since(UNIX_EPOCH) |
| 380 | + .map_or(0, |duration| duration.as_nanos()); |
| 381 | + |
| 382 | + let quarantined = parent.join(format!("{file_name}.corrupt-{nanos}")); |
| 383 | + fs::rename(path, &quarantined)?; |
| 384 | + Ok(Some(quarantined)) |
| 385 | +} |
| 386 | + |
| 340 | 387 | fn persist_registry_state(path: &std::path::Path, registry: &RequestRegistry) { |
| 341 | 388 | if let Err(error) = request_store::persist_registry(path, registry) { |
| 342 | 389 | logging::warn(&format!("request_store_write_failed error={error}")); |
@@ -346,7 +393,8 @@ fn persist_registry_state(path: &std::path::Path, registry: &RequestRegistry) { |
| 346 | 393 | #[cfg(test)] |
| 347 | 394 | mod tests { |
| 348 | 395 | use super::{ |
| 349 | | - DaemonState, MAX_CONTROL_LINE_BYTES, handle_connection, load_registry_with_recovery, |
| 396 | + DaemonState, MAX_CONTROL_LINE_BYTES, handle_connection, load_registry_with_fallback, |
| 397 | + load_registry_with_recovery, |
| 350 | 398 | }; |
| 351 | 399 | use garwarp_ipc::{ControlResponse, HealthStatus}; |
| 352 | 400 | use std::fs; |
@@ -1100,4 +1148,39 @@ mod tests { |
| 1100 | 1148 | |
| 1101 | 1149 | let _ = fs::remove_file(path); |
| 1102 | 1150 | } |
| 1151 | + |
| 1152 | + #[test] |
| 1153 | + fn invalid_store_load_uses_empty_registry_and_quarantines_file() { |
| 1154 | + let path = unique_temp_file(); |
| 1155 | + fs::write(&path, "id=req-1\tsender=:1.2\tstate=bogus\n") |
| 1156 | + .expect("invalid store should be written"); |
| 1157 | + |
| 1158 | + let parent = path |
| 1159 | + .parent() |
| 1160 | + .expect("temp file should have parent") |
| 1161 | + .to_path_buf(); |
| 1162 | + let file_name = path |
| 1163 | + .file_name() |
| 1164 | + .expect("temp file should have name") |
| 1165 | + .to_string_lossy() |
| 1166 | + .to_string(); |
| 1167 | + |
| 1168 | + let (registry, recovered_ids, degraded) = |
| 1169 | + load_registry_with_fallback(&path, Duration::from_secs(5)); |
| 1170 | + assert!(degraded); |
| 1171 | + assert!(recovered_ids.is_empty()); |
| 1172 | + assert_eq!(registry.total_count(), 0); |
| 1173 | + assert!(!path.exists()); |
| 1174 | + |
| 1175 | + let quarantined = fs::read_dir(&parent) |
| 1176 | + .expect("parent dir should be readable") |
| 1177 | + .filter_map(Result::ok) |
| 1178 | + .find(|entry| { |
| 1179 | + let name = entry.file_name().to_string_lossy().to_string(); |
| 1180 | + name.starts_with(&format!("{file_name}.corrupt-")) |
| 1181 | + }) |
| 1182 | + .map(|entry| entry.path()) |
| 1183 | + .expect("quarantined store should exist"); |
| 1184 | + fs::remove_file(quarantined).expect("quarantined store should be removed"); |
| 1185 | + } |
| 1103 | 1186 | } |