Rust · 42403 bytes Raw Blame History
1 //! Real-Time Chunk Health Monitoring
2 //!
3 //! Comprehensive monitoring system that tracks chunk health, replica status,
4 //! and data integrity across the distributed network
5
6 use anyhow::Result;
7 use serde::{Deserialize, Serialize};
8 use std::collections::{HashMap, VecDeque, BTreeMap};
9 use chrono::{DateTime, Utc, Duration};
10 use tokio::time::{sleep, Duration as TokioDuration};
11
12 use crate::economics::earnings_calculator::GeographicRegion;
13
14 /// Real-time chunk health monitoring system
15 #[derive(Debug, Clone, Serialize, Deserialize)]
16 pub struct ChunkHealthMonitor {
17 /// Health status for all monitored chunks
18 pub chunk_health: HashMap<String, ChunkHealth>,
19 /// Node health tracking
20 pub node_health: HashMap<String, NodeHealth>,
21 /// Real-time metrics
22 pub monitoring_metrics: MonitoringMetrics,
23 /// Alert configuration
24 pub alert_config: AlertConfiguration,
25 /// Health check scheduler
26 pub check_scheduler: HealthCheckScheduler,
27 /// Historical health data
28 pub health_history: HashMap<String, VecDeque<HealthSnapshot>>,
29 /// Performance analytics
30 pub analytics: HealthAnalytics,
31 }
32
33 #[derive(Debug, Clone, Serialize, Deserialize)]
34 pub struct ChunkHealth {
35 pub chunk_id: String,
36 pub overall_health: HealthStatus,
37 pub replica_health: Vec<ReplicaHealth>,
38 pub integrity_status: IntegrityStatus,
39 pub availability_score: f64,
40 pub durability_score: f64,
41 pub performance_metrics: ChunkPerformanceMetrics,
42 pub last_verified: DateTime<Utc>,
43 pub next_check_due: DateTime<Utc>,
44 pub risk_factors: Vec<RiskFactor>,
45 pub repair_history: Vec<RepairRecord>,
46 }
47
48 #[derive(Debug, Clone, Serialize, Deserialize)]
49 pub struct ReplicaHealth {
50 pub replica_id: String,
51 pub node_id: String,
52 pub region: GeographicRegion,
53 pub status: ReplicaStatus,
54 pub health_score: f64,
55 pub last_accessed: DateTime<Utc>,
56 pub last_verified: DateTime<Utc>,
57 pub integrity_hash: String,
58 pub performance_metrics: ReplicaPerformanceMetrics,
59 pub connectivity_status: ConnectivityStatus,
60 }
61
62 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
63 pub enum HealthStatus {
64 Excellent, // All replicas healthy, high durability
65 Good, // Most replicas healthy, adequate durability
66 Warning, // Some replicas degraded, durability at risk
67 Critical, // Many replicas unhealthy, immediate action needed
68 Failed, // Cannot guarantee data availability
69 }
70
71 #[derive(Debug, Clone, Serialize, Deserialize)]
72 pub enum ReplicaStatus {
73 Healthy,
74 Degraded,
75 Slow,
76 Unreachable,
77 Corrupted,
78 Missing,
79 Verifying,
80 Repairing,
81 }
82
83 #[derive(Debug, Clone, Serialize, Deserialize)]
84 pub enum IntegrityStatus {
85 Verified,
86 Pending,
87 Suspicious,
88 Corrupted,
89 Unknown,
90 }
91
92 #[derive(Debug, Clone, Serialize, Deserialize)]
93 pub enum ConnectivityStatus {
94 Online,
95 Intermittent,
96 Offline,
97 Unknown,
98 }
99
100 #[derive(Debug, Clone, Serialize, Deserialize)]
101 pub struct ChunkPerformanceMetrics {
102 pub avg_response_time_ms: f64,
103 pub success_rate: f64,
104 pub throughput_mbps: f64,
105 pub error_rate: f64,
106 pub access_frequency: AccessFrequency,
107 pub bandwidth_utilization: f64,
108 }
109
110 #[derive(Debug, Clone, Serialize, Deserialize)]
111 pub struct ReplicaPerformanceMetrics {
112 pub response_time_ms: f64,
113 pub transfer_speed_mbps: f64,
114 pub success_rate: f64,
115 pub error_count: u32,
116 pub last_error: Option<String>,
117 pub uptime_percentage: f64,
118 }
119
120 #[derive(Debug, Clone, Serialize, Deserialize)]
121 pub enum AccessFrequency {
122 VeryHigh, // > 1000 accesses/day
123 High, // 100-1000 accesses/day
124 Medium, // 10-100 accesses/day
125 Low, // 1-10 accesses/day
126 VeryLow, // < 1 access/day
127 Archived, // Not accessed recently
128 }
129
130 #[derive(Debug, Clone, Serialize, Deserialize)]
131 pub struct RiskFactor {
132 pub risk_type: RiskType,
133 pub severity: RiskSeverity,
134 pub probability: f64, // 0.0 to 1.0
135 pub impact: f64, // 0.0 to 1.0
136 pub description: String,
137 pub mitigation_actions: Vec<String>,
138 }
139
140 #[derive(Debug, Clone, Serialize, Deserialize)]
141 pub enum RiskType {
142 NodeFailure,
143 NetworkPartition,
144 GeographicRisk,
145 PerformanceDegradation,
146 CapacityLimits,
147 ComplianceViolation,
148 SecurityThreat,
149 }
150
151 #[derive(Debug, Clone, Serialize, Deserialize)]
152 pub enum RiskSeverity {
153 Low,
154 Medium,
155 High,
156 Critical,
157 }
158
159 #[derive(Debug, Clone, Serialize, Deserialize)]
160 pub struct RepairRecord {
161 pub repair_id: String,
162 pub timestamp: DateTime<Utc>,
163 pub repair_type: RepairType,
164 pub affected_replicas: Vec<String>,
165 pub repair_strategy: String,
166 pub success: bool,
167 pub duration_seconds: u64,
168 pub cost: f64,
169 }
170
171 #[derive(Debug, Clone, Serialize, Deserialize)]
172 pub enum RepairType {
173 ReplicationIncrease,
174 ReplicaReplacement,
175 IntegrityRepair,
176 PerformanceOptimization,
177 GeographicRebalancing,
178 EmergencyRecovery,
179 }
180
181 #[derive(Debug, Clone, Serialize, Deserialize)]
182 pub struct NodeHealth {
183 pub node_id: String,
184 pub region: GeographicRegion,
185 pub overall_health: HealthStatus,
186 pub uptime_percentage: f64,
187 pub response_time_ms: f64,
188 pub bandwidth_mbps: f64,
189 pub storage_health: StorageHealth,
190 pub connectivity_quality: ConnectivityQuality,
191 pub load_metrics: LoadMetrics,
192 pub last_seen: DateTime<Utc>,
193 pub consecutive_failures: u32,
194 }
195
196 #[derive(Debug, Clone, Serialize, Deserialize)]
197 pub struct StorageHealth {
198 pub total_capacity_gb: u64,
199 pub used_capacity_gb: u64,
200 pub available_capacity_gb: u64,
201 pub disk_health_score: f64,
202 pub io_performance: f64,
203 pub error_rate: f64,
204 }
205
206 #[derive(Debug, Clone, Serialize, Deserialize)]
207 pub struct ConnectivityQuality {
208 pub latency_ms: f64,
209 pub jitter_ms: f64,
210 pub packet_loss: f64,
211 pub bandwidth_stability: f64,
212 pub connection_type: String,
213 }
214
215 #[derive(Debug, Clone, Serialize, Deserialize)]
216 pub struct LoadMetrics {
217 pub cpu_usage: f64,
218 pub memory_usage: f64,
219 pub network_utilization: f64,
220 pub disk_utilization: f64,
221 pub active_connections: u32,
222 }
223
224 #[derive(Debug, Clone, Serialize, Deserialize)]
225 pub struct MonitoringMetrics {
226 pub total_chunks_monitored: u64,
227 pub healthy_chunks: u64,
228 pub degraded_chunks: u64,
229 pub critical_chunks: u64,
230 pub failed_chunks: u64,
231 pub total_replicas: u64,
232 pub healthy_replicas: u64,
233 pub degraded_replicas: u64,
234 pub average_health_score: f64,
235 pub monitoring_efficiency: f64,
236 pub last_updated: DateTime<Utc>,
237 }
238
239 #[derive(Debug, Clone, Serialize, Deserialize)]
240 pub struct AlertConfiguration {
241 pub enable_alerts: bool,
242 pub health_thresholds: HealthThresholds,
243 pub notification_channels: Vec<NotificationChannel>,
244 pub alert_cooldown_minutes: u32,
245 pub escalation_rules: Vec<EscalationRule>,
246 }
247
248 #[derive(Debug, Clone, Serialize, Deserialize)]
249 pub struct HealthThresholds {
250 pub critical_health_score: f64,
251 pub warning_health_score: f64,
252 pub max_response_time_ms: f64,
253 pub min_success_rate: f64,
254 pub max_error_rate: f64,
255 pub min_replica_count: u32,
256 pub max_consecutive_failures: u32,
257 }
258
259 #[derive(Debug, Clone, Serialize, Deserialize)]
260 pub struct NotificationChannel {
261 pub channel_type: NotificationType,
262 pub endpoint: String,
263 pub severity_filter: Vec<RiskSeverity>,
264 pub enabled: bool,
265 }
266
267 #[derive(Debug, Clone, Serialize, Deserialize)]
268 pub enum NotificationType {
269 Email,
270 Slack,
271 Webhook,
272 SMS,
273 PagerDuty,
274 }
275
276 #[derive(Debug, Clone, Serialize, Deserialize)]
277 pub struct EscalationRule {
278 pub condition: String,
279 pub delay_minutes: u32,
280 pub action: EscalationAction,
281 pub repeat_count: u32,
282 }
283
284 #[derive(Debug, Clone, Serialize, Deserialize)]
285 pub enum EscalationAction {
286 NotifyManager,
287 AutoRepair,
288 IncreaseReplication,
289 EmergencyProtocol,
290 }
291
292 #[derive(Debug, Clone, Serialize, Deserialize)]
293 pub struct HealthCheckScheduler {
294 pub check_intervals: HashMap<HealthStatus, Duration>,
295 pub priority_queue: BTreeMap<DateTime<Utc>, Vec<String>>, // chunk_ids
296 pub concurrent_checks: u32,
297 pub batch_size: u32,
298 pub adaptive_scheduling: bool,
299 }
300
301 #[derive(Debug, Clone, Serialize, Deserialize)]
302 pub struct HealthSnapshot {
303 pub timestamp: DateTime<Utc>,
304 pub health_status: HealthStatus,
305 pub health_score: f64,
306 pub replica_count: u32,
307 pub healthy_replicas: u32,
308 pub performance_metrics: ChunkPerformanceMetrics,
309 }
310
311 #[derive(Debug, Clone, Serialize, Deserialize)]
312 pub struct HealthAnalytics {
313 pub health_trends: HashMap<String, HealthTrend>,
314 pub failure_patterns: Vec<FailurePattern>,
315 pub performance_baselines: HashMap<String, PerformanceBaseline>,
316 pub prediction_models: HashMap<String, PredictionModel>,
317 }
318
319 #[derive(Debug, Clone, Serialize, Deserialize)]
320 pub struct HealthTrend {
321 pub chunk_id: String,
322 pub trend_direction: TrendDirection,
323 pub trend_strength: f64,
324 pub prediction_confidence: f64,
325 pub time_to_critical: Option<Duration>,
326 }
327
328 #[derive(Debug, Clone, Serialize, Deserialize)]
329 pub enum TrendDirection {
330 Improving,
331 Stable,
332 Degrading,
333 Volatile,
334 }
335
336 #[derive(Debug, Clone, Serialize, Deserialize)]
337 pub struct FailurePattern {
338 pub pattern_id: String,
339 pub pattern_type: String,
340 pub frequency: f64,
341 pub affected_chunks: Vec<String>,
342 pub common_factors: Vec<String>,
343 pub prevention_strategies: Vec<String>,
344 }
345
346 #[derive(Debug, Clone, Serialize, Deserialize)]
347 pub struct PerformanceBaseline {
348 pub metric_name: String,
349 pub baseline_value: f64,
350 pub acceptable_variance: f64,
351 pub seasonal_adjustments: HashMap<u8, f64>, // Month -> adjustment factor
352 }
353
354 #[derive(Debug, Clone, Serialize, Deserialize)]
355 pub struct PredictionModel {
356 pub model_type: String,
357 pub accuracy: f64,
358 pub last_trained: DateTime<Utc>,
359 pub parameters: HashMap<String, f64>,
360 }
361
362 impl Default for HealthThresholds {
363 fn default() -> Self {
364 Self {
365 critical_health_score: 50.0,
366 warning_health_score: 75.0,
367 max_response_time_ms: 1000.0,
368 min_success_rate: 95.0,
369 max_error_rate: 5.0,
370 min_replica_count: 3,
371 max_consecutive_failures: 3,
372 }
373 }
374 }
375
376 impl ChunkHealthMonitor {
377 /// Create new chunk health monitor
378 pub fn new() -> Self {
379 Self {
380 chunk_health: HashMap::new(),
381 node_health: HashMap::new(),
382 monitoring_metrics: MonitoringMetrics {
383 total_chunks_monitored: 0,
384 healthy_chunks: 0,
385 degraded_chunks: 0,
386 critical_chunks: 0,
387 failed_chunks: 0,
388 total_replicas: 0,
389 healthy_replicas: 0,
390 degraded_replicas: 0,
391 average_health_score: 100.0,
392 monitoring_efficiency: 100.0,
393 last_updated: Utc::now(),
394 },
395 alert_config: AlertConfiguration {
396 enable_alerts: true,
397 health_thresholds: HealthThresholds::default(),
398 notification_channels: Vec::new(),
399 alert_cooldown_minutes: 15,
400 escalation_rules: Vec::new(),
401 },
402 check_scheduler: HealthCheckScheduler {
403 check_intervals: HashMap::from([
404 (HealthStatus::Excellent, Duration::hours(24)),
405 (HealthStatus::Good, Duration::hours(6)),
406 (HealthStatus::Warning, Duration::hours(1)),
407 (HealthStatus::Critical, Duration::minutes(15)),
408 (HealthStatus::Failed, Duration::minutes(5)),
409 ]),
410 priority_queue: BTreeMap::new(),
411 concurrent_checks: 10,
412 batch_size: 100,
413 adaptive_scheduling: true,
414 },
415 health_history: HashMap::new(),
416 analytics: HealthAnalytics {
417 health_trends: HashMap::new(),
418 failure_patterns: Vec::new(),
419 performance_baselines: HashMap::new(),
420 prediction_models: HashMap::new(),
421 },
422 }
423 }
424
425 /// Add chunk for monitoring
426 pub fn add_chunk_monitoring(&mut self, chunk_id: String, initial_replicas: Vec<ReplicaHealth>) {
427 let health_score = self.calculate_chunk_health_score(&initial_replicas);
428 let health_status = self.determine_health_status(health_score, &initial_replicas);
429
430 let chunk_health = ChunkHealth {
431 chunk_id: chunk_id.clone(),
432 overall_health: health_status.clone(),
433 replica_health: initial_replicas,
434 integrity_status: IntegrityStatus::Pending,
435 availability_score: health_score,
436 durability_score: health_score,
437 performance_metrics: ChunkPerformanceMetrics {
438 avg_response_time_ms: 0.0,
439 success_rate: 100.0,
440 throughput_mbps: 0.0,
441 error_rate: 0.0,
442 access_frequency: AccessFrequency::Low,
443 bandwidth_utilization: 0.0,
444 },
445 last_verified: Utc::now(),
446 next_check_due: Utc::now() + self.get_check_interval(&health_status),
447 risk_factors: Vec::new(),
448 repair_history: Vec::new(),
449 };
450
451 self.chunk_health.insert(chunk_id.clone(), chunk_health);
452
453 // Schedule health check
454 self.schedule_health_check(chunk_id.clone(), Utc::now() + self.get_check_interval(&health_status));
455
456 // Initialize health history
457 self.health_history.insert(chunk_id, VecDeque::with_capacity(1000));
458
459 self.update_monitoring_metrics();
460 }
461
462 /// Perform health check on a chunk
463 pub async fn perform_health_check(&mut self, chunk_id: &str) -> Result<HealthCheckResult> {
464 // First, collect replica information without holding a mutable reference
465 let mut replicas_to_check = {
466 let chunk_health = self.chunk_health.get(chunk_id)
467 .ok_or_else(|| anyhow::anyhow!("Chunk not found in monitoring"))?;
468 chunk_health.replica_health.clone()
469 };
470
471 let mut check_results = Vec::new();
472 let mut healthy_replicas = 0;
473 let mut total_response_time = 0.0;
474
475 // Check each replica
476 for replica in replicas_to_check.iter_mut() {
477 let replica_result = self.check_replica_health(replica).await?;
478
479 if matches!(replica_result.status, ReplicaStatus::Healthy) {
480 healthy_replicas += 1;
481 }
482
483 total_response_time += replica_result.response_time_ms;
484 check_results.push(replica_result);
485 }
486
487 // Compute new values outside of mutable borrow scope
488 let new_health_score = self.calculate_chunk_health_score(&replicas_to_check);
489 let new_health_status = self.determine_health_status(new_health_score, &replicas_to_check);
490 let avg_response_time = total_response_time / check_results.len() as f64;
491 let success_rate = (healthy_replicas as f64 / check_results.len() as f64) * 100.0;
492 let now = Utc::now();
493 let next_check_due = now + self.get_check_interval(&new_health_status);
494
495 // Now update chunk health with computed values
496 let risk_factors = {
497 let mut chunk_health = self.chunk_health.get_mut(chunk_id)
498 .ok_or_else(|| anyhow::anyhow!("Chunk not found in monitoring"))?;
499
500 chunk_health.overall_health = new_health_status.clone();
501 chunk_health.availability_score = new_health_score;
502 chunk_health.performance_metrics.avg_response_time_ms = avg_response_time;
503 chunk_health.performance_metrics.success_rate = success_rate;
504 chunk_health.last_verified = now;
505 chunk_health.next_check_due = next_check_due;
506
507 // Assess risk factors and clone them before the borrow ends
508 let risk_factors = self.assess_risk_factors(&*chunk_health);
509 chunk_health.risk_factors = risk_factors.clone();
510
511 self.record_health_snapshot(chunk_id, &*chunk_health);
512
513 risk_factors
514 };
515
516 // Schedule next check
517 self.schedule_health_check(chunk_id.to_string(), next_check_due);
518
519 // Check for alerts
520 if self.alert_config.enable_alerts {
521 let chunk_health = self.chunk_health.get(chunk_id).unwrap();
522 self.check_alert_conditions(chunk_id, chunk_health).await?;
523 }
524
525 self.update_monitoring_metrics();
526
527 Ok(HealthCheckResult {
528 chunk_id: chunk_id.to_string(),
529 health_status: new_health_status,
530 health_score: new_health_score,
531 replica_results: check_results,
532 issues_detected: risk_factors,
533 recommendations: self.generate_recommendations(&self.chunk_health.get(chunk_id).unwrap()),
534 })
535 }
536
537 /// Check individual replica health
538 async fn check_replica_health(&mut self, replica: &mut ReplicaHealth) -> Result<ReplicaCheckResult> {
539 let start_time = crate::SerializableInstant::now();
540
541 // Simulate health check (in real implementation, this would be actual network calls)
542 let connectivity_check = self.check_replica_connectivity(&replica.node_id).await?;
543 let integrity_check = self.verify_replica_integrity(replica).await?;
544 let performance_check = self.measure_replica_performance(replica).await?;
545
546 let check_duration = start_time.elapsed();
547
548 // Update replica status based on checks
549 replica.status = if connectivity_check && integrity_check && performance_check.response_time_ms < 1000.0 {
550 ReplicaStatus::Healthy
551 } else if connectivity_check && integrity_check {
552 ReplicaStatus::Slow
553 } else if connectivity_check {
554 ReplicaStatus::Degraded
555 } else {
556 ReplicaStatus::Unreachable
557 };
558
559 replica.last_verified = Utc::now();
560 replica.performance_metrics = performance_check.clone();
561
562 Ok(ReplicaCheckResult {
563 replica_id: replica.replica_id.clone(),
564 node_id: replica.node_id.clone(),
565 status: replica.status.clone(),
566 response_time_ms: performance_check.response_time_ms,
567 connectivity_ok: connectivity_check,
568 integrity_ok: integrity_check,
569 performance_metrics: performance_check,
570 })
571 }
572
573 /// Check replica connectivity
574 async fn check_replica_connectivity(&self, node_id: &str) -> Result<bool> {
575 // Simulate connectivity check
576 tokio::time::sleep(TokioDuration::from_millis(10)).await;
577
578 // Check if node is in our health records and recently seen
579 if let Some(node_health) = self.node_health.get(node_id) {
580 let time_since_last_seen = Utc::now() - node_health.last_seen;
581 Ok(time_since_last_seen < Duration::minutes(5))
582 } else {
583 Ok(false)
584 }
585 }
586
587 /// Verify replica integrity
588 async fn verify_replica_integrity(&self, replica: &ReplicaHealth) -> Result<bool> {
589 // Simulate integrity verification
590 tokio::time::sleep(TokioDuration::from_millis(50)).await;
591
592 // In real implementation, this would verify checksums, etc.
593 Ok(!replica.integrity_hash.is_empty())
594 }
595
596 /// Measure replica performance
597 async fn measure_replica_performance(&self, replica: &ReplicaHealth) -> Result<ReplicaPerformanceMetrics> {
598 // Simulate performance measurement
599 let base_latency = 100.0;
600 let jitter = (rand::random::<f64>() - 0.5) * 50.0;
601 let response_time = base_latency + jitter;
602
603 tokio::time::sleep(TokioDuration::from_millis(response_time as u64)).await;
604
605 Ok(ReplicaPerformanceMetrics {
606 response_time_ms: response_time.max(0.0),
607 transfer_speed_mbps: 50.0 + (rand::random::<f64>() * 50.0),
608 success_rate: 95.0 + (rand::random::<f64>() * 5.0),
609 error_count: 0,
610 last_error: None,
611 uptime_percentage: 99.0 + (rand::random::<f64>() * 1.0),
612 })
613 }
614
615 /// Calculate overall chunk health score
616 fn calculate_chunk_health_score(&self, replicas: &[ReplicaHealth]) -> f64 {
617 if replicas.is_empty() {
618 return 0.0;
619 }
620
621 let healthy_count = replicas.iter()
622 .filter(|r| matches!(r.status, ReplicaStatus::Healthy))
623 .count();
624
625 let degraded_count = replicas.iter()
626 .filter(|r| matches!(r.status, ReplicaStatus::Degraded | ReplicaStatus::Slow))
627 .count();
628
629 let unhealthy_count = replicas.len() - healthy_count - degraded_count;
630
631 // Weight factors
632 let healthy_weight = 1.0;
633 let degraded_weight = 0.5;
634 let unhealthy_weight = 0.0;
635
636 let weighted_score = (healthy_count as f64 * healthy_weight
637 + degraded_count as f64 * degraded_weight
638 + unhealthy_count as f64 * unhealthy_weight) / replicas.len() as f64;
639
640 weighted_score * 100.0
641 }
642
643 /// Determine health status from score and replica states
644 fn determine_health_status(&self, health_score: f64, replicas: &[ReplicaHealth]) -> HealthStatus {
645 let healthy_count = replicas.iter()
646 .filter(|r| matches!(r.status, ReplicaStatus::Healthy))
647 .count();
648
649 let total_count = replicas.len();
650
651 if health_score >= 90.0 && healthy_count >= (total_count * 3) / 4 {
652 HealthStatus::Excellent
653 } else if health_score >= 75.0 && healthy_count >= total_count / 2 {
654 HealthStatus::Good
655 } else if health_score >= 50.0 && healthy_count >= total_count / 3 {
656 HealthStatus::Warning
657 } else if healthy_count > 0 {
658 HealthStatus::Critical
659 } else {
660 HealthStatus::Failed
661 }
662 }
663
664 /// Get check interval for health status
665 fn get_check_interval(&self, status: &HealthStatus) -> Duration {
666 self.check_scheduler.check_intervals
667 .get(status)
668 .copied()
669 .unwrap_or(Duration::hours(6))
670 }
671
672 /// Schedule health check
673 fn schedule_health_check(&mut self, chunk_id: String, check_time: DateTime<Utc>) {
674 self.check_scheduler.priority_queue
675 .entry(check_time)
676 .or_insert_with(Vec::new)
677 .push(chunk_id);
678 }
679
680 /// Assess risk factors for a chunk
681 fn assess_risk_factors(&self, chunk_health: &ChunkHealth) -> Vec<RiskFactor> {
682 let mut risk_factors = Vec::new();
683
684 // Check replica count
685 let healthy_replicas = chunk_health.replica_health.iter()
686 .filter(|r| matches!(r.status, ReplicaStatus::Healthy))
687 .count();
688
689 if healthy_replicas < 3 {
690 risk_factors.push(RiskFactor {
691 risk_type: RiskType::NodeFailure,
692 severity: if healthy_replicas < 2 { RiskSeverity::Critical } else { RiskSeverity::High },
693 probability: 0.8,
694 impact: 0.9,
695 description: format!("Only {} healthy replicas remaining", healthy_replicas),
696 mitigation_actions: vec!["Increase replication".to_string(), "Replace unhealthy replicas".to_string()],
697 });
698 }
699
700 // Check geographic distribution
701 let regions: std::collections::HashSet<_> = chunk_health.replica_health.iter()
702 .map(|r| &r.region)
703 .collect();
704
705 if regions.len() < 2 {
706 risk_factors.push(RiskFactor {
707 risk_type: RiskType::GeographicRisk,
708 severity: RiskSeverity::Medium,
709 probability: 0.3,
710 impact: 0.7,
711 description: "Poor geographic distribution".to_string(),
712 mitigation_actions: vec!["Add replicas in different regions".to_string()],
713 });
714 }
715
716 // Check performance degradation
717 if chunk_health.performance_metrics.avg_response_time_ms > 1000.0 {
718 risk_factors.push(RiskFactor {
719 risk_type: RiskType::PerformanceDegradation,
720 severity: RiskSeverity::Medium,
721 probability: 0.6,
722 impact: 0.4,
723 description: "High response times detected".to_string(),
724 mitigation_actions: vec!["Optimize replica placement".to_string(), "Check network conditions".to_string()],
725 });
726 }
727
728 risk_factors
729 }
730
731 /// Generate recommendations for chunk health improvement
732 fn generate_recommendations(&self, chunk_health: &ChunkHealth) -> Vec<String> {
733 let mut recommendations = Vec::new();
734
735 match chunk_health.overall_health {
736 HealthStatus::Failed => {
737 recommendations.push("URGENT: Immediate recovery required - chunk data may be lost".to_string());
738 recommendations.push("Attempt recovery from any available replicas".to_string());
739 recommendations.push("Check backup systems".to_string());
740 },
741 HealthStatus::Critical => {
742 recommendations.push("Create additional replicas immediately".to_string());
743 recommendations.push("Repair or replace unhealthy replicas".to_string());
744 recommendations.push("Monitor closely for further degradation".to_string());
745 },
746 HealthStatus::Warning => {
747 recommendations.push("Consider increasing replication factor".to_string());
748 recommendations.push("Investigate cause of replica degradation".to_string());
749 recommendations.push("Improve geographic distribution".to_string());
750 },
751 HealthStatus::Good => {
752 recommendations.push("Monitor performance trends".to_string());
753 recommendations.push("Consider optimizing replica placement for better performance".to_string());
754 },
755 HealthStatus::Excellent => {
756 recommendations.push("Maintain current configuration".to_string());
757 recommendations.push("Consider this as a model for other chunks".to_string());
758 },
759 }
760
761 recommendations
762 }
763
764 /// Record health snapshot in history
765 fn record_health_snapshot(&mut self, chunk_id: &str, chunk_health: &ChunkHealth) {
766 let snapshot = HealthSnapshot {
767 timestamp: Utc::now(),
768 health_status: chunk_health.overall_health.clone(),
769 health_score: chunk_health.availability_score,
770 replica_count: chunk_health.replica_health.len() as u32,
771 healthy_replicas: chunk_health.replica_health.iter()
772 .filter(|r| matches!(r.status, ReplicaStatus::Healthy))
773 .count() as u32,
774 performance_metrics: chunk_health.performance_metrics.clone(),
775 };
776
777 if let Some(history) = self.health_history.get_mut(chunk_id) {
778 history.push_back(snapshot);
779
780 // Keep only last 1000 snapshots
781 if history.len() > 1000 {
782 history.pop_front();
783 }
784 }
785 }
786
787 /// Check alert conditions and send notifications
788 async fn check_alert_conditions(&mut self, chunk_id: &str, chunk_health: &ChunkHealth) -> Result<()> {
789 let thresholds = &self.alert_config.health_thresholds;
790
791 let should_alert = match chunk_health.overall_health {
792 HealthStatus::Failed | HealthStatus::Critical => true,
793 HealthStatus::Warning => chunk_health.availability_score < thresholds.warning_health_score,
794 _ => false,
795 };
796
797 if should_alert {
798 let alert = HealthAlert {
799 alert_id: format!("alert_{}_{}", chunk_id, Utc::now().timestamp()),
800 chunk_id: chunk_id.to_string(),
801 severity: match chunk_health.overall_health {
802 HealthStatus::Failed => RiskSeverity::Critical,
803 HealthStatus::Critical => RiskSeverity::High,
804 HealthStatus::Warning => RiskSeverity::Medium,
805 _ => RiskSeverity::Low,
806 },
807 message: format!("Chunk {} health degraded to {:?}", chunk_id, chunk_health.overall_health),
808 timestamp: Utc::now(),
809 health_score: chunk_health.availability_score,
810 recommendations: self.generate_recommendations(chunk_health),
811 };
812
813 self.send_alert(alert).await?;
814 }
815
816 Ok(())
817 }
818
819 /// Send health alert
820 async fn send_alert(&self, alert: HealthAlert) -> Result<()> {
821 // In real implementation, this would send to configured notification channels
822 tracing::warn!("Health Alert: {} - {} (Score: {:.1})",
823 alert.alert_id, alert.message, alert.health_score);
824
825 for recommendation in &alert.recommendations {
826 tracing::info!("Recommendation: {}", recommendation);
827 }
828
829 Ok(())
830 }
831
832 /// Update monitoring metrics
833 fn update_monitoring_metrics(&mut self) {
834 let total_chunks = self.chunk_health.len() as u64;
835 let mut healthy = 0;
836 let mut degraded = 0;
837 let mut critical = 0;
838 let mut failed = 0;
839 let mut total_score = 0.0;
840 let mut total_replicas = 0;
841 let mut healthy_replicas = 0;
842
843 for chunk_health in self.chunk_health.values() {
844 match chunk_health.overall_health {
845 HealthStatus::Excellent | HealthStatus::Good => healthy += 1,
846 HealthStatus::Warning => degraded += 1,
847 HealthStatus::Critical => critical += 1,
848 HealthStatus::Failed => failed += 1,
849 }
850
851 total_score += chunk_health.availability_score;
852 total_replicas += chunk_health.replica_health.len();
853 healthy_replicas += chunk_health.replica_health.iter()
854 .filter(|r| matches!(r.status, ReplicaStatus::Healthy))
855 .count();
856 }
857
858 self.monitoring_metrics = MonitoringMetrics {
859 total_chunks_monitored: total_chunks,
860 healthy_chunks: healthy,
861 degraded_chunks: degraded,
862 critical_chunks: critical,
863 failed_chunks: failed,
864 total_replicas: total_replicas as u64,
865 healthy_replicas: healthy_replicas as u64,
866 degraded_replicas: (total_replicas - healthy_replicas) as u64,
867 average_health_score: if total_chunks > 0 { total_score / total_chunks as f64 } else { 100.0 },
868 monitoring_efficiency: 100.0, // Would be calculated based on check success rate
869 last_updated: Utc::now(),
870 };
871 }
872
873 /// Run continuous health monitoring
874 pub async fn run_continuous_monitoring(&mut self) -> Result<()> {
875 let mut check_interval = tokio::time::interval(TokioDuration::from_secs(60)); // Check every minute
876
877 loop {
878 check_interval.tick().await;
879
880 // Process due health checks
881 let now = Utc::now();
882 let due_chunks: Vec<String> = self.check_scheduler.priority_queue
883 .range(..=now)
884 .flat_map(|(_, chunks)| chunks.iter().cloned())
885 .collect();
886
887 // Remove processed items from queue
888 self.check_scheduler.priority_queue.retain(|time, _| *time > now);
889
890 // Process health checks in batches
891 for chunk_batch in due_chunks.chunks(self.check_scheduler.batch_size as usize) {
892 let mut check_tasks = Vec::new();
893
894 for chunk_id in chunk_batch {
895 if check_tasks.len() >= self.check_scheduler.concurrent_checks as usize {
896 // Wait for some tasks to complete
897 let _ = futures::future::join_all(check_tasks.drain(..)).await;
898 }
899
900 let chunk_id_clone = chunk_id.clone();
901 let task = async move {
902 // Would perform actual health check here
903 tokio::time::sleep(TokioDuration::from_millis(100)).await;
904 Ok(chunk_id_clone)
905 };
906
907 check_tasks.push(task);
908 }
909
910 // Wait for remaining tasks
911 let results = futures::future::join_all(check_tasks).await;
912
913 // Process results
914 for result in results {
915 match result {
916 Ok(chunk_id) => {
917 if let Err(e) = self.perform_health_check(&chunk_id).await {
918 tracing::error!("Health check failed for chunk {}: {}", chunk_id, e);
919 }
920 },
921 Err(e) => {
922 tracing::error!("Health check task failed: {}", e);
923 }
924 }
925 }
926 }
927
928 // Update analytics
929 self.update_health_analytics();
930 }
931 }
932
933 /// Update health analytics and trends
934 fn update_health_analytics(&mut self) {
935 // Analyze health trends for each chunk
936 for (chunk_id, history) in &self.health_history {
937 if history.len() >= 10 {
938 let trend = self.analyze_health_trend(history);
939 self.analytics.health_trends.insert(chunk_id.clone(), trend);
940 }
941 }
942
943 // Detect failure patterns
944 self.detect_failure_patterns();
945
946 // Update performance baselines
947 self.update_performance_baselines();
948 }
949
950 /// Analyze health trend for a chunk
951 fn analyze_health_trend(&self, history: &VecDeque<HealthSnapshot>) -> HealthTrend {
952 let recent_scores: Vec<f64> = history.iter()
953 .rev()
954 .take(10)
955 .map(|snapshot| snapshot.health_score)
956 .collect();
957
958 let trend_direction = if recent_scores.len() >= 2 {
959 let first_half_avg = recent_scores.iter().take(recent_scores.len() / 2).sum::<f64>() / (recent_scores.len() / 2) as f64;
960 let second_half_avg = recent_scores.iter().skip(recent_scores.len() / 2).sum::<f64>() / (recent_scores.len() - recent_scores.len() / 2) as f64;
961
962 let diff = second_half_avg - first_half_avg;
963 if diff > 5.0 {
964 TrendDirection::Improving
965 } else if diff < -5.0 {
966 TrendDirection::Degrading
967 } else {
968 TrendDirection::Stable
969 }
970 } else {
971 TrendDirection::Stable
972 };
973
974 HealthTrend {
975 chunk_id: String::new(), // Would be filled by caller
976 trend_direction,
977 trend_strength: 0.5, // Simplified calculation
978 prediction_confidence: 0.7,
979 time_to_critical: None, // Would calculate based on trend
980 }
981 }
982
983 /// Detect common failure patterns
984 fn detect_failure_patterns(&mut self) {
985 // Simplified pattern detection
986 // In real implementation, this would use more sophisticated ML algorithms
987 }
988
989 /// Update performance baselines
990 fn update_performance_baselines(&mut self) {
991 // Update baselines based on recent performance data
992 // In real implementation, this would calculate statistical baselines
993 }
994
995 /// Get health summary
996 pub fn get_health_summary(&self) -> HealthSummary {
997 HealthSummary {
998 overall_health: if self.monitoring_metrics.average_health_score >= 90.0 {
999 HealthStatus::Excellent
1000 } else if self.monitoring_metrics.average_health_score >= 75.0 {
1001 HealthStatus::Good
1002 } else if self.monitoring_metrics.average_health_score >= 50.0 {
1003 HealthStatus::Warning
1004 } else {
1005 HealthStatus::Critical
1006 },
1007 metrics: self.monitoring_metrics.clone(),
1008 top_risks: self.get_top_risk_factors(),
1009 recommendations: self.get_global_recommendations(),
1010 }
1011 }
1012
1013 /// Get top risk factors across all chunks
1014 fn get_top_risk_factors(&self) -> Vec<RiskFactor> {
1015 let mut all_risks: Vec<RiskFactor> = self.chunk_health
1016 .values()
1017 .flat_map(|chunk| chunk.risk_factors.iter().cloned())
1018 .collect();
1019
1020 all_risks.sort_by(|a, b| {
1021 let score_a = a.probability * a.impact;
1022 let score_b = b.probability * b.impact;
1023 score_b.partial_cmp(&score_a).unwrap_or(std::cmp::Ordering::Equal)
1024 });
1025
1026 all_risks.into_iter().take(10).collect()
1027 }
1028
1029 /// Get global recommendations
1030 fn get_global_recommendations(&self) -> Vec<String> {
1031 let mut recommendations = Vec::new();
1032
1033 if self.monitoring_metrics.critical_chunks > 0 {
1034 recommendations.push("URGENT: Address critical chunks immediately".to_string());
1035 }
1036
1037 if self.monitoring_metrics.degraded_chunks > self.monitoring_metrics.total_chunks_monitored / 4 {
1038 recommendations.push("High number of degraded chunks - investigate infrastructure issues".to_string());
1039 }
1040
1041 if self.monitoring_metrics.average_health_score < 80.0 {
1042 recommendations.push("Overall system health is below optimal - consider increasing redundancy".to_string());
1043 }
1044
1045 recommendations
1046 }
1047 }
1048
1049 #[derive(Debug, Clone, Serialize, Deserialize)]
1050 pub struct HealthCheckResult {
1051 pub chunk_id: String,
1052 pub health_status: HealthStatus,
1053 pub health_score: f64,
1054 pub replica_results: Vec<ReplicaCheckResult>,
1055 pub issues_detected: Vec<RiskFactor>,
1056 pub recommendations: Vec<String>,
1057 }
1058
1059 #[derive(Debug, Clone, Serialize, Deserialize)]
1060 pub struct ReplicaCheckResult {
1061 pub replica_id: String,
1062 pub node_id: String,
1063 pub status: ReplicaStatus,
1064 pub response_time_ms: f64,
1065 pub connectivity_ok: bool,
1066 pub integrity_ok: bool,
1067 pub performance_metrics: ReplicaPerformanceMetrics,
1068 }
1069
1070 #[derive(Debug, Clone, Serialize, Deserialize)]
1071 pub struct HealthAlert {
1072 pub alert_id: String,
1073 pub chunk_id: String,
1074 pub severity: RiskSeverity,
1075 pub message: String,
1076 pub timestamp: DateTime<Utc>,
1077 pub health_score: f64,
1078 pub recommendations: Vec<String>,
1079 }
1080
1081 #[derive(Debug, Clone, Serialize, Deserialize)]
1082 pub struct HealthSummary {
1083 pub overall_health: HealthStatus,
1084 pub metrics: MonitoringMetrics,
1085 pub top_risks: Vec<RiskFactor>,
1086 pub recommendations: Vec<String>,
1087 }
1088
1089 #[cfg(test)]
1090 mod tests {
1091 use super::*;
1092
1093 #[test]
1094 fn test_health_monitor_creation() {
1095 let monitor = ChunkHealthMonitor::new();
1096 assert!(monitor.chunk_health.is_empty());
1097 assert!(monitor.alert_config.enable_alerts);
1098 assert_eq!(monitor.monitoring_metrics.total_chunks_monitored, 0);
1099 }
1100
1101 #[test]
1102 fn test_health_score_calculation() {
1103 let monitor = ChunkHealthMonitor::new();
1104
1105 let replicas = vec![
1106 ReplicaHealth {
1107 replica_id: "replica1".to_string(),
1108 node_id: "node1".to_string(),
1109 region: GeographicRegion::NorthAmerica,
1110 status: ReplicaStatus::Healthy,
1111 health_score: 100.0,
1112 last_accessed: Utc::now(),
1113 last_verified: Utc::now(),
1114 integrity_hash: "hash1".to_string(),
1115 performance_metrics: ReplicaPerformanceMetrics {
1116 response_time_ms: 100.0,
1117 transfer_speed_mbps: 50.0,
1118 success_rate: 99.0,
1119 error_count: 0,
1120 last_error: None,
1121 uptime_percentage: 99.9,
1122 },
1123 connectivity_status: ConnectivityStatus::Online,
1124 },
1125 ReplicaHealth {
1126 replica_id: "replica2".to_string(),
1127 node_id: "node2".to_string(),
1128 region: GeographicRegion::Europe,
1129 status: ReplicaStatus::Degraded,
1130 health_score: 70.0,
1131 last_accessed: Utc::now(),
1132 last_verified: Utc::now(),
1133 integrity_hash: "hash2".to_string(),
1134 performance_metrics: ReplicaPerformanceMetrics {
1135 response_time_ms: 300.0,
1136 transfer_speed_mbps: 20.0,
1137 success_rate: 95.0,
1138 error_count: 2,
1139 last_error: Some("Network timeout".to_string()),
1140 uptime_percentage: 98.0,
1141 },
1142 connectivity_status: ConnectivityStatus::Intermittent,
1143 },
1144 ];
1145
1146 let score = monitor.calculate_chunk_health_score(&replicas);
1147 assert!(score > 50.0 && score < 100.0); // Should be between 50-100 for mixed health
1148 }
1149
1150 #[tokio::test]
1151 async fn test_health_check_workflow() {
1152 let mut monitor = ChunkHealthMonitor::new();
1153
1154 let replicas = vec![
1155 ReplicaHealth {
1156 replica_id: "replica1".to_string(),
1157 node_id: "node1".to_string(),
1158 region: GeographicRegion::NorthAmerica,
1159 status: ReplicaStatus::Healthy,
1160 health_score: 100.0,
1161 last_accessed: Utc::now(),
1162 last_verified: Utc::now(),
1163 integrity_hash: "hash1".to_string(),
1164 performance_metrics: ReplicaPerformanceMetrics {
1165 response_time_ms: 100.0,
1166 transfer_speed_mbps: 50.0,
1167 success_rate: 99.0,
1168 error_count: 0,
1169 last_error: None,
1170 uptime_percentage: 99.9,
1171 },
1172 connectivity_status: ConnectivityStatus::Online,
1173 },
1174 ];
1175
1176 monitor.add_chunk_monitoring("test_chunk".to_string(), replicas);
1177 assert_eq!(monitor.chunk_health.len(), 1);
1178
1179 // Perform health check
1180 let result = monitor.perform_health_check("test_chunk").await.unwrap();
1181 assert_eq!(result.chunk_id, "test_chunk");
1182 assert!(!result.replica_results.is_empty());
1183 }
1184 }