| 1 |
//! Real-Time Chunk Health Monitoring |
| 2 |
//! |
| 3 |
//! Comprehensive monitoring system that tracks chunk health, replica status, |
| 4 |
//! and data integrity across the distributed network |
| 5 |
|
| 6 |
use anyhow::Result; |
| 7 |
use serde::{Deserialize, Serialize}; |
| 8 |
use std::collections::{HashMap, VecDeque, BTreeMap}; |
| 9 |
use chrono::{DateTime, Utc, Duration}; |
| 10 |
use tokio::time::{sleep, Duration as TokioDuration}; |
| 11 |
|
| 12 |
use crate::economics::GeographicRegion; |
| 13 |
|
| 14 |
/// Real-time chunk health monitoring system |
| 15 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 16 |
pub struct ChunkHealthMonitor { |
| 17 |
/// Health status for all monitored chunks |
| 18 |
pub chunk_health: HashMap<String, ChunkHealth>, |
| 19 |
/// Node health tracking |
| 20 |
pub node_health: HashMap<String, NodeHealth>, |
| 21 |
/// Real-time metrics |
| 22 |
pub monitoring_metrics: MonitoringMetrics, |
| 23 |
/// Alert configuration |
| 24 |
pub alert_config: AlertConfiguration, |
| 25 |
/// Health check scheduler |
| 26 |
pub check_scheduler: HealthCheckScheduler, |
| 27 |
/// Historical health data |
| 28 |
pub health_history: HashMap<String, VecDeque<HealthSnapshot>>, |
| 29 |
/// Performance analytics |
| 30 |
pub analytics: HealthAnalytics, |
| 31 |
} |
| 32 |
|
| 33 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 34 |
pub struct ChunkHealth { |
| 35 |
pub chunk_id: String, |
| 36 |
pub overall_health: HealthStatus, |
| 37 |
pub replica_health: Vec<ReplicaHealth>, |
| 38 |
pub integrity_status: IntegrityStatus, |
| 39 |
pub availability_score: f64, |
| 40 |
pub durability_score: f64, |
| 41 |
pub performance_metrics: ChunkPerformanceMetrics, |
| 42 |
pub last_verified: DateTime<Utc>, |
| 43 |
pub next_check_due: DateTime<Utc>, |
| 44 |
pub risk_factors: Vec<RiskFactor>, |
| 45 |
pub repair_history: Vec<RepairRecord>, |
| 46 |
} |
| 47 |
|
| 48 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 49 |
pub struct ReplicaHealth { |
| 50 |
pub replica_id: String, |
| 51 |
pub node_id: String, |
| 52 |
pub region: GeographicRegion, |
| 53 |
pub status: ReplicaStatus, |
| 54 |
pub health_score: f64, |
| 55 |
pub last_accessed: DateTime<Utc>, |
| 56 |
pub last_verified: DateTime<Utc>, |
| 57 |
pub integrity_hash: String, |
| 58 |
pub performance_metrics: ReplicaPerformanceMetrics, |
| 59 |
pub connectivity_status: ConnectivityStatus, |
| 60 |
} |
| 61 |
|
| 62 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 63 |
pub enum HealthStatus { |
| 64 |
Excellent, // All replicas healthy, high durability |
| 65 |
Good, // Most replicas healthy, adequate durability |
| 66 |
Warning, // Some replicas degraded, durability at risk |
| 67 |
Critical, // Many replicas unhealthy, immediate action needed |
| 68 |
Failed, // Cannot guarantee data availability |
| 69 |
} |
| 70 |
|
| 71 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 72 |
pub enum ReplicaStatus { |
| 73 |
Healthy, |
| 74 |
Degraded, |
| 75 |
Slow, |
| 76 |
Unreachable, |
| 77 |
Corrupted, |
| 78 |
Missing, |
| 79 |
Verifying, |
| 80 |
Repairing, |
| 81 |
} |
| 82 |
|
| 83 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 84 |
pub enum IntegrityStatus { |
| 85 |
Verified, |
| 86 |
Pending, |
| 87 |
Suspicious, |
| 88 |
Corrupted, |
| 89 |
Unknown, |
| 90 |
} |
| 91 |
|
| 92 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 93 |
pub enum ConnectivityStatus { |
| 94 |
Online, |
| 95 |
Intermittent, |
| 96 |
Offline, |
| 97 |
Unknown, |
| 98 |
} |
| 99 |
|
| 100 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 101 |
pub struct ChunkPerformanceMetrics { |
| 102 |
pub avg_response_time_ms: f64, |
| 103 |
pub success_rate: f64, |
| 104 |
pub throughput_mbps: f64, |
| 105 |
pub error_rate: f64, |
| 106 |
pub access_frequency: AccessFrequency, |
| 107 |
pub bandwidth_utilization: f64, |
| 108 |
} |
| 109 |
|
| 110 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 111 |
pub struct ReplicaPerformanceMetrics { |
| 112 |
pub response_time_ms: f64, |
| 113 |
pub transfer_speed_mbps: f64, |
| 114 |
pub success_rate: f64, |
| 115 |
pub error_count: u32, |
| 116 |
pub last_error: Option<String>, |
| 117 |
pub uptime_percentage: f64, |
| 118 |
} |
| 119 |
|
| 120 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 121 |
pub enum AccessFrequency { |
| 122 |
VeryHigh, // > 1000 accesses/day |
| 123 |
High, // 100-1000 accesses/day |
| 124 |
Medium, // 10-100 accesses/day |
| 125 |
Low, // 1-10 accesses/day |
| 126 |
VeryLow, // < 1 access/day |
| 127 |
Archived, // Not accessed recently |
| 128 |
} |
| 129 |
|
| 130 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 131 |
pub struct RiskFactor { |
| 132 |
pub risk_type: RiskType, |
| 133 |
pub severity: RiskSeverity, |
| 134 |
pub probability: f64, // 0.0 to 1.0 |
| 135 |
pub impact: f64, // 0.0 to 1.0 |
| 136 |
pub description: String, |
| 137 |
pub mitigation_actions: Vec<String>, |
| 138 |
} |
| 139 |
|
| 140 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 141 |
pub enum RiskType { |
| 142 |
NodeFailure, |
| 143 |
NetworkPartition, |
| 144 |
GeographicRisk, |
| 145 |
PerformanceDegradation, |
| 146 |
CapacityLimits, |
| 147 |
ComplianceViolation, |
| 148 |
SecurityThreat, |
| 149 |
} |
| 150 |
|
| 151 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 152 |
pub enum RiskSeverity { |
| 153 |
Low, |
| 154 |
Medium, |
| 155 |
High, |
| 156 |
Critical, |
| 157 |
} |
| 158 |
|
| 159 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 160 |
pub struct RepairRecord { |
| 161 |
pub repair_id: String, |
| 162 |
pub timestamp: DateTime<Utc>, |
| 163 |
pub repair_type: RepairType, |
| 164 |
pub affected_replicas: Vec<String>, |
| 165 |
pub repair_strategy: String, |
| 166 |
pub success: bool, |
| 167 |
pub duration_seconds: u64, |
| 168 |
pub cost: f64, |
| 169 |
} |
| 170 |
|
| 171 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 172 |
pub enum RepairType { |
| 173 |
ReplicationIncrease, |
| 174 |
ReplicaReplacement, |
| 175 |
IntegrityRepair, |
| 176 |
PerformanceOptimization, |
| 177 |
GeographicRebalancing, |
| 178 |
EmergencyRecovery, |
| 179 |
} |
| 180 |
|
| 181 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 182 |
pub struct NodeHealth { |
| 183 |
pub node_id: String, |
| 184 |
pub region: GeographicRegion, |
| 185 |
pub overall_health: HealthStatus, |
| 186 |
pub uptime_percentage: f64, |
| 187 |
pub response_time_ms: f64, |
| 188 |
pub bandwidth_mbps: f64, |
| 189 |
pub storage_health: StorageHealth, |
| 190 |
pub connectivity_quality: ConnectivityQuality, |
| 191 |
pub load_metrics: LoadMetrics, |
| 192 |
pub last_seen: DateTime<Utc>, |
| 193 |
pub consecutive_failures: u32, |
| 194 |
} |
| 195 |
|
| 196 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 197 |
pub struct StorageHealth { |
| 198 |
pub total_capacity_gb: u64, |
| 199 |
pub used_capacity_gb: u64, |
| 200 |
pub available_capacity_gb: u64, |
| 201 |
pub disk_health_score: f64, |
| 202 |
pub io_performance: f64, |
| 203 |
pub error_rate: f64, |
| 204 |
} |
| 205 |
|
| 206 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 207 |
pub struct ConnectivityQuality { |
| 208 |
pub latency_ms: f64, |
| 209 |
pub jitter_ms: f64, |
| 210 |
pub packet_loss: f64, |
| 211 |
pub bandwidth_stability: f64, |
| 212 |
pub connection_type: String, |
| 213 |
} |
| 214 |
|
| 215 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 216 |
pub struct LoadMetrics { |
| 217 |
pub cpu_usage: f64, |
| 218 |
pub memory_usage: f64, |
| 219 |
pub network_utilization: f64, |
| 220 |
pub disk_utilization: f64, |
| 221 |
pub active_connections: u32, |
| 222 |
} |
| 223 |
|
| 224 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 225 |
pub struct MonitoringMetrics { |
| 226 |
pub total_chunks_monitored: u64, |
| 227 |
pub healthy_chunks: u64, |
| 228 |
pub degraded_chunks: u64, |
| 229 |
pub critical_chunks: u64, |
| 230 |
pub failed_chunks: u64, |
| 231 |
pub total_replicas: u64, |
| 232 |
pub healthy_replicas: u64, |
| 233 |
pub degraded_replicas: u64, |
| 234 |
pub average_health_score: f64, |
| 235 |
pub monitoring_efficiency: f64, |
| 236 |
pub last_updated: DateTime<Utc>, |
| 237 |
} |
| 238 |
|
| 239 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 240 |
pub struct AlertConfiguration { |
| 241 |
pub enable_alerts: bool, |
| 242 |
pub health_thresholds: HealthThresholds, |
| 243 |
pub notification_channels: Vec<NotificationChannel>, |
| 244 |
pub alert_cooldown_minutes: u32, |
| 245 |
pub escalation_rules: Vec<EscalationRule>, |
| 246 |
} |
| 247 |
|
| 248 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 249 |
pub struct HealthThresholds { |
| 250 |
pub critical_health_score: f64, |
| 251 |
pub warning_health_score: f64, |
| 252 |
pub max_response_time_ms: f64, |
| 253 |
pub min_success_rate: f64, |
| 254 |
pub max_error_rate: f64, |
| 255 |
pub min_replica_count: u32, |
| 256 |
pub max_consecutive_failures: u32, |
| 257 |
} |
| 258 |
|
| 259 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 260 |
pub struct NotificationChannel { |
| 261 |
pub channel_type: NotificationType, |
| 262 |
pub endpoint: String, |
| 263 |
pub severity_filter: Vec<RiskSeverity>, |
| 264 |
pub enabled: bool, |
| 265 |
} |
| 266 |
|
| 267 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 268 |
pub enum NotificationType { |
| 269 |
Email, |
| 270 |
Slack, |
| 271 |
Webhook, |
| 272 |
SMS, |
| 273 |
PagerDuty, |
| 274 |
} |
| 275 |
|
| 276 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 277 |
pub struct EscalationRule { |
| 278 |
pub condition: String, |
| 279 |
pub delay_minutes: u32, |
| 280 |
pub action: EscalationAction, |
| 281 |
pub repeat_count: u32, |
| 282 |
} |
| 283 |
|
| 284 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 285 |
pub enum EscalationAction { |
| 286 |
NotifyManager, |
| 287 |
AutoRepair, |
| 288 |
IncreaseReplication, |
| 289 |
EmergencyProtocol, |
| 290 |
} |
| 291 |
|
| 292 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 293 |
pub struct HealthCheckScheduler { |
| 294 |
pub check_intervals: HashMap<HealthStatus, Duration>, |
| 295 |
pub priority_queue: BTreeMap<DateTime<Utc>, Vec<String>>, // chunk_ids |
| 296 |
pub concurrent_checks: u32, |
| 297 |
pub batch_size: u32, |
| 298 |
pub adaptive_scheduling: bool, |
| 299 |
} |
| 300 |
|
| 301 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 302 |
pub struct HealthSnapshot { |
| 303 |
pub timestamp: DateTime<Utc>, |
| 304 |
pub health_status: HealthStatus, |
| 305 |
pub health_score: f64, |
| 306 |
pub replica_count: u32, |
| 307 |
pub healthy_replicas: u32, |
| 308 |
pub performance_metrics: ChunkPerformanceMetrics, |
| 309 |
} |
| 310 |
|
| 311 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 312 |
pub struct HealthAnalytics { |
| 313 |
pub health_trends: HashMap<String, HealthTrend>, |
| 314 |
pub failure_patterns: Vec<FailurePattern>, |
| 315 |
pub performance_baselines: HashMap<String, PerformanceBaseline>, |
| 316 |
pub prediction_models: HashMap<String, PredictionModel>, |
| 317 |
} |
| 318 |
|
| 319 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 320 |
pub struct HealthTrend { |
| 321 |
pub chunk_id: String, |
| 322 |
pub trend_direction: TrendDirection, |
| 323 |
pub trend_strength: f64, |
| 324 |
pub prediction_confidence: f64, |
| 325 |
pub time_to_critical: Option<Duration>, |
| 326 |
} |
| 327 |
|
| 328 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 329 |
pub enum TrendDirection { |
| 330 |
Improving, |
| 331 |
Stable, |
| 332 |
Degrading, |
| 333 |
Volatile, |
| 334 |
} |
| 335 |
|
| 336 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 337 |
pub struct FailurePattern { |
| 338 |
pub pattern_id: String, |
| 339 |
pub pattern_type: String, |
| 340 |
pub frequency: f64, |
| 341 |
pub affected_chunks: Vec<String>, |
| 342 |
pub common_factors: Vec<String>, |
| 343 |
pub prevention_strategies: Vec<String>, |
| 344 |
} |
| 345 |
|
| 346 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 347 |
pub struct PerformanceBaseline { |
| 348 |
pub metric_name: String, |
| 349 |
pub baseline_value: f64, |
| 350 |
pub acceptable_variance: f64, |
| 351 |
pub seasonal_adjustments: HashMap<u8, f64>, // Month -> adjustment factor |
| 352 |
} |
| 353 |
|
| 354 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 355 |
pub struct PredictionModel { |
| 356 |
pub model_type: String, |
| 357 |
pub accuracy: f64, |
| 358 |
pub last_trained: DateTime<Utc>, |
| 359 |
pub parameters: HashMap<String, f64>, |
| 360 |
} |
| 361 |
|
| 362 |
impl Default for HealthThresholds { |
| 363 |
fn default() -> Self { |
| 364 |
Self { |
| 365 |
critical_health_score: 50.0, |
| 366 |
warning_health_score: 75.0, |
| 367 |
max_response_time_ms: 1000.0, |
| 368 |
min_success_rate: 95.0, |
| 369 |
max_error_rate: 5.0, |
| 370 |
min_replica_count: 3, |
| 371 |
max_consecutive_failures: 3, |
| 372 |
} |
| 373 |
} |
| 374 |
} |
| 375 |
|
| 376 |
impl ChunkHealthMonitor { |
| 377 |
/// Create new chunk health monitor |
| 378 |
pub fn new() -> Self { |
| 379 |
Self { |
| 380 |
chunk_health: HashMap::new(), |
| 381 |
node_health: HashMap::new(), |
| 382 |
monitoring_metrics: MonitoringMetrics { |
| 383 |
total_chunks_monitored: 0, |
| 384 |
healthy_chunks: 0, |
| 385 |
degraded_chunks: 0, |
| 386 |
critical_chunks: 0, |
| 387 |
failed_chunks: 0, |
| 388 |
total_replicas: 0, |
| 389 |
healthy_replicas: 0, |
| 390 |
degraded_replicas: 0, |
| 391 |
average_health_score: 100.0, |
| 392 |
monitoring_efficiency: 100.0, |
| 393 |
last_updated: Utc::now(), |
| 394 |
}, |
| 395 |
alert_config: AlertConfiguration { |
| 396 |
enable_alerts: true, |
| 397 |
health_thresholds: HealthThresholds::default(), |
| 398 |
notification_channels: Vec::new(), |
| 399 |
alert_cooldown_minutes: 15, |
| 400 |
escalation_rules: Vec::new(), |
| 401 |
}, |
| 402 |
check_scheduler: HealthCheckScheduler { |
| 403 |
check_intervals: HashMap::from([ |
| 404 |
(HealthStatus::Excellent, Duration::hours(24)), |
| 405 |
(HealthStatus::Good, Duration::hours(6)), |
| 406 |
(HealthStatus::Warning, Duration::hours(1)), |
| 407 |
(HealthStatus::Critical, Duration::minutes(15)), |
| 408 |
(HealthStatus::Failed, Duration::minutes(5)), |
| 409 |
]), |
| 410 |
priority_queue: BTreeMap::new(), |
| 411 |
concurrent_checks: 10, |
| 412 |
batch_size: 100, |
| 413 |
adaptive_scheduling: true, |
| 414 |
}, |
| 415 |
health_history: HashMap::new(), |
| 416 |
analytics: HealthAnalytics { |
| 417 |
health_trends: HashMap::new(), |
| 418 |
failure_patterns: Vec::new(), |
| 419 |
performance_baselines: HashMap::new(), |
| 420 |
prediction_models: HashMap::new(), |
| 421 |
}, |
| 422 |
} |
| 423 |
} |
| 424 |
|
| 425 |
/// Add chunk for monitoring |
| 426 |
pub fn add_chunk_monitoring(&mut self, chunk_id: String, initial_replicas: Vec<ReplicaHealth>) { |
| 427 |
let health_score = self.calculate_chunk_health_score(&initial_replicas); |
| 428 |
let health_status = self.determine_health_status(health_score, &initial_replicas); |
| 429 |
|
| 430 |
let chunk_health = ChunkHealth { |
| 431 |
chunk_id: chunk_id.clone(), |
| 432 |
overall_health: health_status.clone(), |
| 433 |
replica_health: initial_replicas, |
| 434 |
integrity_status: IntegrityStatus::Pending, |
| 435 |
availability_score: health_score, |
| 436 |
durability_score: health_score, |
| 437 |
performance_metrics: ChunkPerformanceMetrics { |
| 438 |
avg_response_time_ms: 0.0, |
| 439 |
success_rate: 100.0, |
| 440 |
throughput_mbps: 0.0, |
| 441 |
error_rate: 0.0, |
| 442 |
access_frequency: AccessFrequency::Low, |
| 443 |
bandwidth_utilization: 0.0, |
| 444 |
}, |
| 445 |
last_verified: Utc::now(), |
| 446 |
next_check_due: Utc::now() + self.get_check_interval(&health_status), |
| 447 |
risk_factors: Vec::new(), |
| 448 |
repair_history: Vec::new(), |
| 449 |
}; |
| 450 |
|
| 451 |
self.chunk_health.insert(chunk_id.clone(), chunk_health); |
| 452 |
|
| 453 |
// Schedule health check |
| 454 |
self.schedule_health_check(chunk_id, Utc::now() + self.get_check_interval(&health_status)); |
| 455 |
|
| 456 |
// Initialize health history |
| 457 |
self.health_history.insert(chunk_id, VecDeque::with_capacity(1000)); |
| 458 |
|
| 459 |
self.update_monitoring_metrics(); |
| 460 |
} |
| 461 |
|
| 462 |
/// Perform health check on a chunk |
| 463 |
pub async fn perform_health_check(&mut self, chunk_id: &str) -> Result<HealthCheckResult> { |
| 464 |
let chunk_health = self.chunk_health.get_mut(chunk_id) |
| 465 |
.ok_or_else(|| anyhow::anyhow!("Chunk not found in monitoring"))?; |
| 466 |
|
| 467 |
let mut check_results = Vec::new(); |
| 468 |
let mut healthy_replicas = 0; |
| 469 |
let mut total_response_time = 0.0; |
| 470 |
|
| 471 |
// Check each replica |
| 472 |
for replica in &mut chunk_health.replica_health { |
| 473 |
let replica_result = self.check_replica_health(replica).await?; |
| 474 |
|
| 475 |
if matches!(replica_result.status, ReplicaStatus::Healthy) { |
| 476 |
healthy_replicas += 1; |
| 477 |
} |
| 478 |
|
| 479 |
total_response_time += replica_result.response_time_ms; |
| 480 |
check_results.push(replica_result); |
| 481 |
} |
| 482 |
|
| 483 |
// Update chunk health based on check results |
| 484 |
let new_health_score = self.calculate_chunk_health_score(&chunk_health.replica_health); |
| 485 |
let new_health_status = self.determine_health_status(new_health_score, &chunk_health.replica_health); |
| 486 |
|
| 487 |
chunk_health.overall_health = new_health_status.clone(); |
| 488 |
chunk_health.availability_score = new_health_score; |
| 489 |
chunk_health.performance_metrics.avg_response_time_ms = total_response_time / check_results.len() as f64; |
| 490 |
chunk_health.performance_metrics.success_rate = (healthy_replicas as f64 / check_results.len() as f64) * 100.0; |
| 491 |
chunk_health.last_verified = Utc::now(); |
| 492 |
chunk_health.next_check_due = Utc::now() + self.get_check_interval(&new_health_status); |
| 493 |
|
| 494 |
// Update risk factors |
| 495 |
chunk_health.risk_factors = self.assess_risk_factors(chunk_health); |
| 496 |
|
| 497 |
// Record health snapshot |
| 498 |
self.record_health_snapshot(chunk_id, chunk_health); |
| 499 |
|
| 500 |
// Schedule next check |
| 501 |
self.schedule_health_check(chunk_id.to_string(), chunk_health.next_check_due); |
| 502 |
|
| 503 |
// Check for alerts |
| 504 |
if self.alert_config.enable_alerts { |
| 505 |
self.check_alert_conditions(chunk_id, chunk_health).await?; |
| 506 |
} |
| 507 |
|
| 508 |
self.update_monitoring_metrics(); |
| 509 |
|
| 510 |
Ok(HealthCheckResult { |
| 511 |
chunk_id: chunk_id.to_string(), |
| 512 |
health_status: new_health_status, |
| 513 |
health_score: new_health_score, |
| 514 |
replica_results: check_results, |
| 515 |
issues_detected: chunk_health.risk_factors.clone(), |
| 516 |
recommendations: self.generate_recommendations(chunk_health), |
| 517 |
}) |
| 518 |
} |
| 519 |
|
| 520 |
/// Check individual replica health |
| 521 |
async fn check_replica_health(&mut self, replica: &mut ReplicaHealth) -> Result<ReplicaCheckResult> { |
| 522 |
let start_time = std::time::Instant::now(); |
| 523 |
|
| 524 |
// Simulate health check (in real implementation, this would be actual network calls) |
| 525 |
let connectivity_check = self.check_replica_connectivity(&replica.node_id).await?; |
| 526 |
let integrity_check = self.verify_replica_integrity(replica).await?; |
| 527 |
let performance_check = self.measure_replica_performance(replica).await?; |
| 528 |
|
| 529 |
let check_duration = start_time.elapsed(); |
| 530 |
|
| 531 |
// Update replica status based on checks |
| 532 |
replica.status = if connectivity_check && integrity_check && performance_check.response_time_ms < 1000.0 { |
| 533 |
ReplicaStatus::Healthy |
| 534 |
} else if connectivity_check && integrity_check { |
| 535 |
ReplicaStatus::Slow |
| 536 |
} else if connectivity_check { |
| 537 |
ReplicaStatus::Degraded |
| 538 |
} else { |
| 539 |
ReplicaStatus::Unreachable |
| 540 |
}; |
| 541 |
|
| 542 |
replica.last_verified = Utc::now(); |
| 543 |
replica.performance_metrics = performance_check.clone(); |
| 544 |
|
| 545 |
Ok(ReplicaCheckResult { |
| 546 |
replica_id: replica.replica_id.clone(), |
| 547 |
node_id: replica.node_id.clone(), |
| 548 |
status: replica.status.clone(), |
| 549 |
response_time_ms: performance_check.response_time_ms, |
| 550 |
connectivity_ok: connectivity_check, |
| 551 |
integrity_ok: integrity_check, |
| 552 |
performance_metrics: performance_check, |
| 553 |
}) |
| 554 |
} |
| 555 |
|
| 556 |
/// Check replica connectivity |
| 557 |
async fn check_replica_connectivity(&self, node_id: &str) -> Result<bool> { |
| 558 |
// Simulate connectivity check |
| 559 |
tokio::time::sleep(TokioDuration::from_millis(10)).await; |
| 560 |
|
| 561 |
// Check if node is in our health records and recently seen |
| 562 |
if let Some(node_health) = self.node_health.get(node_id) { |
| 563 |
let time_since_last_seen = Utc::now() - node_health.last_seen; |
| 564 |
Ok(time_since_last_seen < Duration::minutes(5)) |
| 565 |
} else { |
| 566 |
Ok(false) |
| 567 |
} |
| 568 |
} |
| 569 |
|
| 570 |
/// Verify replica integrity |
| 571 |
async fn verify_replica_integrity(&self, replica: &ReplicaHealth) -> Result<bool> { |
| 572 |
// Simulate integrity verification |
| 573 |
tokio::time::sleep(TokioDuration::from_millis(50)).await; |
| 574 |
|
| 575 |
// In real implementation, this would verify checksums, etc. |
| 576 |
Ok(!replica.integrity_hash.is_empty()) |
| 577 |
} |
| 578 |
|
| 579 |
/// Measure replica performance |
| 580 |
async fn measure_replica_performance(&self, replica: &ReplicaHealth) -> Result<ReplicaPerformanceMetrics> { |
| 581 |
// Simulate performance measurement |
| 582 |
let base_latency = 100.0; |
| 583 |
let jitter = (rand::random::<f64>() - 0.5) * 50.0; |
| 584 |
let response_time = base_latency + jitter; |
| 585 |
|
| 586 |
tokio::time::sleep(TokioDuration::from_millis(response_time as u64)).await; |
| 587 |
|
| 588 |
Ok(ReplicaPerformanceMetrics { |
| 589 |
response_time_ms: response_time.max(0.0), |
| 590 |
transfer_speed_mbps: 50.0 + (rand::random::<f64>() * 50.0), |
| 591 |
success_rate: 95.0 + (rand::random::<f64>() * 5.0), |
| 592 |
error_count: 0, |
| 593 |
last_error: None, |
| 594 |
uptime_percentage: 99.0 + (rand::random::<f64>() * 1.0), |
| 595 |
}) |
| 596 |
} |
| 597 |
|
| 598 |
/// Calculate overall chunk health score |
| 599 |
fn calculate_chunk_health_score(&self, replicas: &[ReplicaHealth]) -> f64 { |
| 600 |
if replicas.is_empty() { |
| 601 |
return 0.0; |
| 602 |
} |
| 603 |
|
| 604 |
let healthy_count = replicas.iter() |
| 605 |
.filter(|r| matches!(r.status, ReplicaStatus::Healthy)) |
| 606 |
.count(); |
| 607 |
|
| 608 |
let degraded_count = replicas.iter() |
| 609 |
.filter(|r| matches!(r.status, ReplicaStatus::Degraded | ReplicaStatus::Slow)) |
| 610 |
.count(); |
| 611 |
|
| 612 |
let unhealthy_count = replicas.len() - healthy_count - degraded_count; |
| 613 |
|
| 614 |
// Weight factors |
| 615 |
let healthy_weight = 1.0; |
| 616 |
let degraded_weight = 0.5; |
| 617 |
let unhealthy_weight = 0.0; |
| 618 |
|
| 619 |
let weighted_score = (healthy_count as f64 * healthy_weight |
| 620 |
+ degraded_count as f64 * degraded_weight |
| 621 |
+ unhealthy_count as f64 * unhealthy_weight) / replicas.len() as f64; |
| 622 |
|
| 623 |
weighted_score * 100.0 |
| 624 |
} |
| 625 |
|
| 626 |
/// Determine health status from score and replica states |
| 627 |
fn determine_health_status(&self, health_score: f64, replicas: &[ReplicaHealth]) -> HealthStatus { |
| 628 |
let healthy_count = replicas.iter() |
| 629 |
.filter(|r| matches!(r.status, ReplicaStatus::Healthy)) |
| 630 |
.count(); |
| 631 |
|
| 632 |
let total_count = replicas.len(); |
| 633 |
|
| 634 |
if health_score >= 90.0 && healthy_count >= (total_count * 3) / 4 { |
| 635 |
HealthStatus::Excellent |
| 636 |
} else if health_score >= 75.0 && healthy_count >= total_count / 2 { |
| 637 |
HealthStatus::Good |
| 638 |
} else if health_score >= 50.0 && healthy_count >= total_count / 3 { |
| 639 |
HealthStatus::Warning |
| 640 |
} else if healthy_count > 0 { |
| 641 |
HealthStatus::Critical |
| 642 |
} else { |
| 643 |
HealthStatus::Failed |
| 644 |
} |
| 645 |
} |
| 646 |
|
| 647 |
/// Get check interval for health status |
| 648 |
fn get_check_interval(&self, status: &HealthStatus) -> Duration { |
| 649 |
self.check_scheduler.check_intervals |
| 650 |
.get(status) |
| 651 |
.copied() |
| 652 |
.unwrap_or(Duration::hours(6)) |
| 653 |
} |
| 654 |
|
| 655 |
/// Schedule health check |
| 656 |
fn schedule_health_check(&mut self, chunk_id: String, check_time: DateTime<Utc>) { |
| 657 |
self.check_scheduler.priority_queue |
| 658 |
.entry(check_time) |
| 659 |
.or_insert_with(Vec::new) |
| 660 |
.push(chunk_id); |
| 661 |
} |
| 662 |
|
| 663 |
/// Assess risk factors for a chunk |
| 664 |
fn assess_risk_factors(&self, chunk_health: &ChunkHealth) -> Vec<RiskFactor> { |
| 665 |
let mut risk_factors = Vec::new(); |
| 666 |
|
| 667 |
// Check replica count |
| 668 |
let healthy_replicas = chunk_health.replica_health.iter() |
| 669 |
.filter(|r| matches!(r.status, ReplicaStatus::Healthy)) |
| 670 |
.count(); |
| 671 |
|
| 672 |
if healthy_replicas < 3 { |
| 673 |
risk_factors.push(RiskFactor { |
| 674 |
risk_type: RiskType::NodeFailure, |
| 675 |
severity: if healthy_replicas < 2 { RiskSeverity::Critical } else { RiskSeverity::High }, |
| 676 |
probability: 0.8, |
| 677 |
impact: 0.9, |
| 678 |
description: format!("Only {} healthy replicas remaining", healthy_replicas), |
| 679 |
mitigation_actions: vec!["Increase replication".to_string(), "Replace unhealthy replicas".to_string()], |
| 680 |
}); |
| 681 |
} |
| 682 |
|
| 683 |
// Check geographic distribution |
| 684 |
let regions: std::collections::HashSet<_> = chunk_health.replica_health.iter() |
| 685 |
.map(|r| &r.region) |
| 686 |
.collect(); |
| 687 |
|
| 688 |
if regions.len() < 2 { |
| 689 |
risk_factors.push(RiskFactor { |
| 690 |
risk_type: RiskType::GeographicRisk, |
| 691 |
severity: RiskSeverity::Medium, |
| 692 |
probability: 0.3, |
| 693 |
impact: 0.7, |
| 694 |
description: "Poor geographic distribution".to_string(), |
| 695 |
mitigation_actions: vec!["Add replicas in different regions".to_string()], |
| 696 |
}); |
| 697 |
} |
| 698 |
|
| 699 |
// Check performance degradation |
| 700 |
if chunk_health.performance_metrics.avg_response_time_ms > 1000.0 { |
| 701 |
risk_factors.push(RiskFactor { |
| 702 |
risk_type: RiskType::PerformanceDegradation, |
| 703 |
severity: RiskSeverity::Medium, |
| 704 |
probability: 0.6, |
| 705 |
impact: 0.4, |
| 706 |
description: "High response times detected".to_string(), |
| 707 |
mitigation_actions: vec!["Optimize replica placement".to_string(), "Check network conditions".to_string()], |
| 708 |
}); |
| 709 |
} |
| 710 |
|
| 711 |
risk_factors |
| 712 |
} |
| 713 |
|
| 714 |
/// Generate recommendations for chunk health improvement |
| 715 |
fn generate_recommendations(&self, chunk_health: &ChunkHealth) -> Vec<String> { |
| 716 |
let mut recommendations = Vec::new(); |
| 717 |
|
| 718 |
match chunk_health.overall_health { |
| 719 |
HealthStatus::Failed => { |
| 720 |
recommendations.push("URGENT: Immediate recovery required - chunk data may be lost".to_string()); |
| 721 |
recommendations.push("Attempt recovery from any available replicas".to_string()); |
| 722 |
recommendations.push("Check backup systems".to_string()); |
| 723 |
}, |
| 724 |
HealthStatus::Critical => { |
| 725 |
recommendations.push("Create additional replicas immediately".to_string()); |
| 726 |
recommendations.push("Repair or replace unhealthy replicas".to_string()); |
| 727 |
recommendations.push("Monitor closely for further degradation".to_string()); |
| 728 |
}, |
| 729 |
HealthStatus::Warning => { |
| 730 |
recommendations.push("Consider increasing replication factor".to_string()); |
| 731 |
recommendations.push("Investigate cause of replica degradation".to_string()); |
| 732 |
recommendations.push("Improve geographic distribution".to_string()); |
| 733 |
}, |
| 734 |
HealthStatus::Good => { |
| 735 |
recommendations.push("Monitor performance trends".to_string()); |
| 736 |
recommendations.push("Consider optimizing replica placement for better performance".to_string()); |
| 737 |
}, |
| 738 |
HealthStatus::Excellent => { |
| 739 |
recommendations.push("Maintain current configuration".to_string()); |
| 740 |
recommendations.push("Consider this as a model for other chunks".to_string()); |
| 741 |
}, |
| 742 |
} |
| 743 |
|
| 744 |
recommendations |
| 745 |
} |
| 746 |
|
| 747 |
/// Record health snapshot in history |
| 748 |
fn record_health_snapshot(&mut self, chunk_id: &str, chunk_health: &ChunkHealth) { |
| 749 |
let snapshot = HealthSnapshot { |
| 750 |
timestamp: Utc::now(), |
| 751 |
health_status: chunk_health.overall_health.clone(), |
| 752 |
health_score: chunk_health.availability_score, |
| 753 |
replica_count: chunk_health.replica_health.len() as u32, |
| 754 |
healthy_replicas: chunk_health.replica_health.iter() |
| 755 |
.filter(|r| matches!(r.status, ReplicaStatus::Healthy)) |
| 756 |
.count() as u32, |
| 757 |
performance_metrics: chunk_health.performance_metrics.clone(), |
| 758 |
}; |
| 759 |
|
| 760 |
if let Some(history) = self.health_history.get_mut(chunk_id) { |
| 761 |
history.push_back(snapshot); |
| 762 |
|
| 763 |
// Keep only last 1000 snapshots |
| 764 |
if history.len() > 1000 { |
| 765 |
history.pop_front(); |
| 766 |
} |
| 767 |
} |
| 768 |
} |
| 769 |
|
| 770 |
/// Check alert conditions and send notifications |
| 771 |
async fn check_alert_conditions(&mut self, chunk_id: &str, chunk_health: &ChunkHealth) -> Result<()> { |
| 772 |
let thresholds = &self.alert_config.health_thresholds; |
| 773 |
|
| 774 |
let should_alert = match chunk_health.overall_health { |
| 775 |
HealthStatus::Failed | HealthStatus::Critical => true, |
| 776 |
HealthStatus::Warning => chunk_health.availability_score < thresholds.warning_health_score, |
| 777 |
_ => false, |
| 778 |
}; |
| 779 |
|
| 780 |
if should_alert { |
| 781 |
let alert = HealthAlert { |
| 782 |
alert_id: format!("alert_{}_{}", chunk_id, Utc::now().timestamp()), |
| 783 |
chunk_id: chunk_id.to_string(), |
| 784 |
severity: match chunk_health.overall_health { |
| 785 |
HealthStatus::Failed => RiskSeverity::Critical, |
| 786 |
HealthStatus::Critical => RiskSeverity::High, |
| 787 |
HealthStatus::Warning => RiskSeverity::Medium, |
| 788 |
_ => RiskSeverity::Low, |
| 789 |
}, |
| 790 |
message: format!("Chunk {} health degraded to {:?}", chunk_id, chunk_health.overall_health), |
| 791 |
timestamp: Utc::now(), |
| 792 |
health_score: chunk_health.availability_score, |
| 793 |
recommendations: self.generate_recommendations(chunk_health), |
| 794 |
}; |
| 795 |
|
| 796 |
self.send_alert(alert).await?; |
| 797 |
} |
| 798 |
|
| 799 |
Ok(()) |
| 800 |
} |
| 801 |
|
| 802 |
/// Send health alert |
| 803 |
async fn send_alert(&self, alert: HealthAlert) -> Result<()> { |
| 804 |
// In real implementation, this would send to configured notification channels |
| 805 |
tracing::warn!("Health Alert: {} - {} (Score: {:.1})", |
| 806 |
alert.alert_id, alert.message, alert.health_score); |
| 807 |
|
| 808 |
for recommendation in &alert.recommendations { |
| 809 |
tracing::info!("Recommendation: {}", recommendation); |
| 810 |
} |
| 811 |
|
| 812 |
Ok(()) |
| 813 |
} |
| 814 |
|
| 815 |
/// Update monitoring metrics |
| 816 |
fn update_monitoring_metrics(&mut self) { |
| 817 |
let total_chunks = self.chunk_health.len() as u64; |
| 818 |
let mut healthy = 0; |
| 819 |
let mut degraded = 0; |
| 820 |
let mut critical = 0; |
| 821 |
let mut failed = 0; |
| 822 |
let mut total_score = 0.0; |
| 823 |
let mut total_replicas = 0; |
| 824 |
let mut healthy_replicas = 0; |
| 825 |
|
| 826 |
for chunk_health in self.chunk_health.values() { |
| 827 |
match chunk_health.overall_health { |
| 828 |
HealthStatus::Excellent | HealthStatus::Good => healthy += 1, |
| 829 |
HealthStatus::Warning => degraded += 1, |
| 830 |
HealthStatus::Critical => critical += 1, |
| 831 |
HealthStatus::Failed => failed += 1, |
| 832 |
} |
| 833 |
|
| 834 |
total_score += chunk_health.availability_score; |
| 835 |
total_replicas += chunk_health.replica_health.len(); |
| 836 |
healthy_replicas += chunk_health.replica_health.iter() |
| 837 |
.filter(|r| matches!(r.status, ReplicaStatus::Healthy)) |
| 838 |
.count(); |
| 839 |
} |
| 840 |
|
| 841 |
self.monitoring_metrics = MonitoringMetrics { |
| 842 |
total_chunks_monitored: total_chunks, |
| 843 |
healthy_chunks: healthy, |
| 844 |
degraded_chunks: degraded, |
| 845 |
critical_chunks: critical, |
| 846 |
failed_chunks: failed, |
| 847 |
total_replicas: total_replicas as u64, |
| 848 |
healthy_replicas: healthy_replicas as u64, |
| 849 |
degraded_replicas: (total_replicas - healthy_replicas) as u64, |
| 850 |
average_health_score: if total_chunks > 0 { total_score / total_chunks as f64 } else { 100.0 }, |
| 851 |
monitoring_efficiency: 100.0, // Would be calculated based on check success rate |
| 852 |
last_updated: Utc::now(), |
| 853 |
}; |
| 854 |
} |
| 855 |
|
| 856 |
/// Run continuous health monitoring |
| 857 |
pub async fn run_continuous_monitoring(&mut self) -> Result<()> { |
| 858 |
let mut check_interval = tokio::time::interval(TokioDuration::from_secs(60)); // Check every minute |
| 859 |
|
| 860 |
loop { |
| 861 |
check_interval.tick().await; |
| 862 |
|
| 863 |
// Process due health checks |
| 864 |
let now = Utc::now(); |
| 865 |
let due_chunks: Vec<String> = self.check_scheduler.priority_queue |
| 866 |
.range(..=now) |
| 867 |
.flat_map(|(_, chunks)| chunks.iter().cloned()) |
| 868 |
.collect(); |
| 869 |
|
| 870 |
// Remove processed items from queue |
| 871 |
self.check_scheduler.priority_queue.retain(|time, _| *time > now); |
| 872 |
|
| 873 |
// Process health checks in batches |
| 874 |
for chunk_batch in due_chunks.chunks(self.check_scheduler.batch_size as usize) { |
| 875 |
let mut check_tasks = Vec::new(); |
| 876 |
|
| 877 |
for chunk_id in chunk_batch { |
| 878 |
if check_tasks.len() >= self.check_scheduler.concurrent_checks as usize { |
| 879 |
// Wait for some tasks to complete |
| 880 |
let _ = futures::future::join_all(check_tasks.drain(..)).await; |
| 881 |
} |
| 882 |
|
| 883 |
let chunk_id_clone = chunk_id.clone(); |
| 884 |
let task = async move { |
| 885 |
// Would perform actual health check here |
| 886 |
tokio::time::sleep(TokioDuration::from_millis(100)).await; |
| 887 |
Ok(chunk_id_clone) |
| 888 |
}; |
| 889 |
|
| 890 |
check_tasks.push(task); |
| 891 |
} |
| 892 |
|
| 893 |
// Wait for remaining tasks |
| 894 |
let results = futures::future::join_all(check_tasks).await; |
| 895 |
|
| 896 |
// Process results |
| 897 |
for result in results { |
| 898 |
match result { |
| 899 |
Ok(chunk_id) => { |
| 900 |
if let Err(e) = self.perform_health_check(&chunk_id).await { |
| 901 |
tracing::error!("Health check failed for chunk {}: {}", chunk_id, e); |
| 902 |
} |
| 903 |
}, |
| 904 |
Err(e) => { |
| 905 |
tracing::error!("Health check task failed: {}", e); |
| 906 |
} |
| 907 |
} |
| 908 |
} |
| 909 |
} |
| 910 |
|
| 911 |
// Update analytics |
| 912 |
self.update_health_analytics(); |
| 913 |
} |
| 914 |
} |
| 915 |
|
| 916 |
/// Update health analytics and trends |
| 917 |
fn update_health_analytics(&mut self) { |
| 918 |
// Analyze health trends for each chunk |
| 919 |
for (chunk_id, history) in &self.health_history { |
| 920 |
if history.len() >= 10 { |
| 921 |
let trend = self.analyze_health_trend(history); |
| 922 |
self.analytics.health_trends.insert(chunk_id.clone(), trend); |
| 923 |
} |
| 924 |
} |
| 925 |
|
| 926 |
// Detect failure patterns |
| 927 |
self.detect_failure_patterns(); |
| 928 |
|
| 929 |
// Update performance baselines |
| 930 |
self.update_performance_baselines(); |
| 931 |
} |
| 932 |
|
| 933 |
/// Analyze health trend for a chunk |
| 934 |
fn analyze_health_trend(&self, history: &VecDeque<HealthSnapshot>) -> HealthTrend { |
| 935 |
let recent_scores: Vec<f64> = history.iter() |
| 936 |
.rev() |
| 937 |
.take(10) |
| 938 |
.map(|snapshot| snapshot.health_score) |
| 939 |
.collect(); |
| 940 |
|
| 941 |
let trend_direction = if recent_scores.len() >= 2 { |
| 942 |
let first_half_avg = recent_scores.iter().take(recent_scores.len() / 2).sum::<f64>() / (recent_scores.len() / 2) as f64; |
| 943 |
let second_half_avg = recent_scores.iter().skip(recent_scores.len() / 2).sum::<f64>() / (recent_scores.len() - recent_scores.len() / 2) as f64; |
| 944 |
|
| 945 |
let diff = second_half_avg - first_half_avg; |
| 946 |
if diff > 5.0 { |
| 947 |
TrendDirection::Improving |
| 948 |
} else if diff < -5.0 { |
| 949 |
TrendDirection::Degrading |
| 950 |
} else { |
| 951 |
TrendDirection::Stable |
| 952 |
} |
| 953 |
} else { |
| 954 |
TrendDirection::Stable |
| 955 |
}; |
| 956 |
|
| 957 |
HealthTrend { |
| 958 |
chunk_id: String::new(), // Would be filled by caller |
| 959 |
trend_direction, |
| 960 |
trend_strength: 0.5, // Simplified calculation |
| 961 |
prediction_confidence: 0.7, |
| 962 |
time_to_critical: None, // Would calculate based on trend |
| 963 |
} |
| 964 |
} |
| 965 |
|
| 966 |
/// Detect common failure patterns |
| 967 |
fn detect_failure_patterns(&mut self) { |
| 968 |
// Simplified pattern detection |
| 969 |
// In real implementation, this would use more sophisticated ML algorithms |
| 970 |
} |
| 971 |
|
| 972 |
/// Update performance baselines |
| 973 |
fn update_performance_baselines(&mut self) { |
| 974 |
// Update baselines based on recent performance data |
| 975 |
// In real implementation, this would calculate statistical baselines |
| 976 |
} |
| 977 |
|
| 978 |
/// Get health summary |
| 979 |
pub fn get_health_summary(&self) -> HealthSummary { |
| 980 |
HealthSummary { |
| 981 |
overall_health: if self.monitoring_metrics.average_health_score >= 90.0 { |
| 982 |
HealthStatus::Excellent |
| 983 |
} else if self.monitoring_metrics.average_health_score >= 75.0 { |
| 984 |
HealthStatus::Good |
| 985 |
} else if self.monitoring_metrics.average_health_score >= 50.0 { |
| 986 |
HealthStatus::Warning |
| 987 |
} else { |
| 988 |
HealthStatus::Critical |
| 989 |
}, |
| 990 |
metrics: self.monitoring_metrics.clone(), |
| 991 |
top_risks: self.get_top_risk_factors(), |
| 992 |
recommendations: self.get_global_recommendations(), |
| 993 |
} |
| 994 |
} |
| 995 |
|
| 996 |
/// Get top risk factors across all chunks |
| 997 |
fn get_top_risk_factors(&self) -> Vec<RiskFactor> { |
| 998 |
let mut all_risks: Vec<RiskFactor> = self.chunk_health |
| 999 |
.values() |
| 1000 |
.flat_map(|chunk| chunk.risk_factors.iter().cloned()) |
| 1001 |
.collect(); |
| 1002 |
|
| 1003 |
all_risks.sort_by(|a, b| { |
| 1004 |
let score_a = a.probability * a.impact; |
| 1005 |
let score_b = b.probability * b.impact; |
| 1006 |
score_b.partial_cmp(&score_a).unwrap_or(std::cmp::Ordering::Equal) |
| 1007 |
}); |
| 1008 |
|
| 1009 |
all_risks.into_iter().take(10).collect() |
| 1010 |
} |
| 1011 |
|
| 1012 |
/// Get global recommendations |
| 1013 |
fn get_global_recommendations(&self) -> Vec<String> { |
| 1014 |
let mut recommendations = Vec::new(); |
| 1015 |
|
| 1016 |
if self.monitoring_metrics.critical_chunks > 0 { |
| 1017 |
recommendations.push("URGENT: Address critical chunks immediately".to_string()); |
| 1018 |
} |
| 1019 |
|
| 1020 |
if self.monitoring_metrics.degraded_chunks > self.monitoring_metrics.total_chunks_monitored / 4 { |
| 1021 |
recommendations.push("High number of degraded chunks - investigate infrastructure issues".to_string()); |
| 1022 |
} |
| 1023 |
|
| 1024 |
if self.monitoring_metrics.average_health_score < 80.0 { |
| 1025 |
recommendations.push("Overall system health is below optimal - consider increasing redundancy".to_string()); |
| 1026 |
} |
| 1027 |
|
| 1028 |
recommendations |
| 1029 |
} |
| 1030 |
} |
| 1031 |
|
| 1032 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 1033 |
pub struct HealthCheckResult { |
| 1034 |
pub chunk_id: String, |
| 1035 |
pub health_status: HealthStatus, |
| 1036 |
pub health_score: f64, |
| 1037 |
pub replica_results: Vec<ReplicaCheckResult>, |
| 1038 |
pub issues_detected: Vec<RiskFactor>, |
| 1039 |
pub recommendations: Vec<String>, |
| 1040 |
} |
| 1041 |
|
| 1042 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 1043 |
pub struct ReplicaCheckResult { |
| 1044 |
pub replica_id: String, |
| 1045 |
pub node_id: String, |
| 1046 |
pub status: ReplicaStatus, |
| 1047 |
pub response_time_ms: f64, |
| 1048 |
pub connectivity_ok: bool, |
| 1049 |
pub integrity_ok: bool, |
| 1050 |
pub performance_metrics: ReplicaPerformanceMetrics, |
| 1051 |
} |
| 1052 |
|
| 1053 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 1054 |
pub struct HealthAlert { |
| 1055 |
pub alert_id: String, |
| 1056 |
pub chunk_id: String, |
| 1057 |
pub severity: RiskSeverity, |
| 1058 |
pub message: String, |
| 1059 |
pub timestamp: DateTime<Utc>, |
| 1060 |
pub health_score: f64, |
| 1061 |
pub recommendations: Vec<String>, |
| 1062 |
} |
| 1063 |
|
| 1064 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 1065 |
pub struct HealthSummary { |
| 1066 |
pub overall_health: HealthStatus, |
| 1067 |
pub metrics: MonitoringMetrics, |
| 1068 |
pub top_risks: Vec<RiskFactor>, |
| 1069 |
pub recommendations: Vec<String>, |
| 1070 |
} |
| 1071 |
|
| 1072 |
#[cfg(test)] |
| 1073 |
mod tests { |
| 1074 |
use super::*; |
| 1075 |
|
| 1076 |
#[test] |
| 1077 |
fn test_health_monitor_creation() { |
| 1078 |
let monitor = ChunkHealthMonitor::new(); |
| 1079 |
assert!(monitor.chunk_health.is_empty()); |
| 1080 |
assert!(monitor.alert_config.enable_alerts); |
| 1081 |
assert_eq!(monitor.monitoring_metrics.total_chunks_monitored, 0); |
| 1082 |
} |
| 1083 |
|
| 1084 |
#[test] |
| 1085 |
fn test_health_score_calculation() { |
| 1086 |
let monitor = ChunkHealthMonitor::new(); |
| 1087 |
|
| 1088 |
let replicas = vec![ |
| 1089 |
ReplicaHealth { |
| 1090 |
replica_id: "replica1".to_string(), |
| 1091 |
node_id: "node1".to_string(), |
| 1092 |
region: GeographicRegion::NorthAmerica, |
| 1093 |
status: ReplicaStatus::Healthy, |
| 1094 |
health_score: 100.0, |
| 1095 |
last_accessed: Utc::now(), |
| 1096 |
last_verified: Utc::now(), |
| 1097 |
integrity_hash: "hash1".to_string(), |
| 1098 |
performance_metrics: ReplicaPerformanceMetrics { |
| 1099 |
response_time_ms: 100.0, |
| 1100 |
transfer_speed_mbps: 50.0, |
| 1101 |
success_rate: 99.0, |
| 1102 |
error_count: 0, |
| 1103 |
last_error: None, |
| 1104 |
uptime_percentage: 99.9, |
| 1105 |
}, |
| 1106 |
connectivity_status: ConnectivityStatus::Online, |
| 1107 |
}, |
| 1108 |
ReplicaHealth { |
| 1109 |
replica_id: "replica2".to_string(), |
| 1110 |
node_id: "node2".to_string(), |
| 1111 |
region: GeographicRegion::Europe, |
| 1112 |
status: ReplicaStatus::Degraded, |
| 1113 |
health_score: 70.0, |
| 1114 |
last_accessed: Utc::now(), |
| 1115 |
last_verified: Utc::now(), |
| 1116 |
integrity_hash: "hash2".to_string(), |
| 1117 |
performance_metrics: ReplicaPerformanceMetrics { |
| 1118 |
response_time_ms: 300.0, |
| 1119 |
transfer_speed_mbps: 20.0, |
| 1120 |
success_rate: 95.0, |
| 1121 |
error_count: 2, |
| 1122 |
last_error: Some("Network timeout".to_string()), |
| 1123 |
uptime_percentage: 98.0, |
| 1124 |
}, |
| 1125 |
connectivity_status: ConnectivityStatus::Intermittent, |
| 1126 |
}, |
| 1127 |
]; |
| 1128 |
|
| 1129 |
let score = monitor.calculate_chunk_health_score(&replicas); |
| 1130 |
assert!(score > 50.0 && score < 100.0); // Should be between 50-100 for mixed health |
| 1131 |
} |
| 1132 |
|
| 1133 |
#[tokio::test] |
| 1134 |
async fn test_health_check_workflow() { |
| 1135 |
let mut monitor = ChunkHealthMonitor::new(); |
| 1136 |
|
| 1137 |
let replicas = vec![ |
| 1138 |
ReplicaHealth { |
| 1139 |
replica_id: "replica1".to_string(), |
| 1140 |
node_id: "node1".to_string(), |
| 1141 |
region: GeographicRegion::NorthAmerica, |
| 1142 |
status: ReplicaStatus::Healthy, |
| 1143 |
health_score: 100.0, |
| 1144 |
last_accessed: Utc::now(), |
| 1145 |
last_verified: Utc::now(), |
| 1146 |
integrity_hash: "hash1".to_string(), |
| 1147 |
performance_metrics: ReplicaPerformanceMetrics { |
| 1148 |
response_time_ms: 100.0, |
| 1149 |
transfer_speed_mbps: 50.0, |
| 1150 |
success_rate: 99.0, |
| 1151 |
error_count: 0, |
| 1152 |
last_error: None, |
| 1153 |
uptime_percentage: 99.9, |
| 1154 |
}, |
| 1155 |
connectivity_status: ConnectivityStatus::Online, |
| 1156 |
}, |
| 1157 |
]; |
| 1158 |
|
| 1159 |
monitor.add_chunk_monitoring("test_chunk".to_string(), replicas); |
| 1160 |
assert_eq!(monitor.chunk_health.len(), 1); |
| 1161 |
|
| 1162 |
// Perform health check |
| 1163 |
let result = monitor.perform_health_check("test_chunk").await.unwrap(); |
| 1164 |
assert_eq!(result.chunk_id, "test_chunk"); |
| 1165 |
assert!(!result.replica_results.is_empty()); |
| 1166 |
} |
| 1167 |
} |