| 1 |
//! Automatic Replication System |
| 2 |
//! |
| 3 |
//! Handles automatic replication when nodes go offline, ensuring data durability |
| 4 |
//! through intelligent recovery and replacement strategies |
| 5 |
|
| 6 |
use anyhow::Result; |
| 7 |
use serde::{Deserialize, Serialize}; |
| 8 |
use std::collections::{HashMap, VecDeque, HashSet}; |
| 9 |
use chrono::{DateTime, Utc, Duration}; |
| 10 |
use tokio::time::{sleep, Duration as TokioDuration}; |
| 11 |
|
| 12 |
use crate::economics::earnings_calculator::GeographicRegion; |
| 13 |
use super::health_monitor::{ChunkHealth, ReplicaHealth, ReplicaStatus, HealthStatus}; |
| 14 |
use super::intelligent_replication::{ReplicationStrategy, ContentType}; |
| 15 |
|
| 16 |
/// Automatic replication manager |
| 17 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 18 |
pub struct AutoReplicationManager { |
| 19 |
/// Node failure detection |
| 20 |
pub failure_detection: FailureDetectionConfig, |
| 21 |
/// Replication policies |
| 22 |
pub replication_policies: HashMap<String, AutoReplicationPolicy>, |
| 23 |
/// Active replication tasks |
| 24 |
pub active_tasks: HashMap<String, ReplicationTask>, |
| 25 |
/// Node status tracking |
| 26 |
pub node_status: HashMap<String, NodeStatus>, |
| 27 |
/// Recovery strategies |
| 28 |
pub recovery_strategies: RecoveryStrategyConfig, |
| 29 |
/// Performance metrics |
| 30 |
pub metrics: AutoReplicationMetrics, |
| 31 |
/// Emergency protocols |
| 32 |
pub emergency_config: EmergencyReplicationConfig, |
| 33 |
} |
| 34 |
|
| 35 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 36 |
pub struct FailureDetectionConfig { |
| 37 |
/// Heartbeat interval for node monitoring |
| 38 |
pub heartbeat_interval_seconds: u32, |
| 39 |
/// Timeout before considering node offline |
| 40 |
pub offline_timeout_seconds: u32, |
| 41 |
/// Number of consecutive failures before triggering replication |
| 42 |
pub failure_threshold: u32, |
| 43 |
/// Grace period for temporary network issues |
| 44 |
pub grace_period_seconds: u32, |
| 45 |
/// Enable predictive failure detection |
| 46 |
pub predictive_detection: bool, |
| 47 |
/// Network partition detection |
| 48 |
pub partition_detection: bool, |
| 49 |
} |
| 50 |
|
| 51 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 52 |
pub struct AutoReplicationPolicy { |
| 53 |
pub policy_id: String, |
| 54 |
pub content_types: Vec<ContentType>, |
| 55 |
pub trigger_conditions: Vec<TriggerCondition>, |
| 56 |
pub replication_strategy: ReplicationResponseStrategy, |
| 57 |
pub priority: ReplicationPriority, |
| 58 |
pub max_concurrent_replications: u32, |
| 59 |
pub resource_limits: ResourceLimits, |
| 60 |
pub geographic_constraints: GeographicConstraints, |
| 61 |
} |
| 62 |
|
| 63 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 64 |
pub enum TriggerCondition { |
| 65 |
NodeOffline { grace_period_seconds: u32 }, |
| 66 |
ReplicaCountBelowThreshold { min_replicas: u32 }, |
| 67 |
HealthScoreBelowThreshold { min_score: f64 }, |
| 68 |
GeographicDistributionLoss { min_regions: u32 }, |
| 69 |
PerformanceDegradation { max_response_time_ms: f64 }, |
| 70 |
IntegrityViolation, |
| 71 |
NetworkPartition, |
| 72 |
} |
| 73 |
|
| 74 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 75 |
pub enum ReplicationResponseStrategy { |
| 76 |
Immediate, // Start replication immediately |
| 77 |
Delayed { delay_seconds: u32 }, // Wait before starting |
| 78 |
Batched { batch_size: u32 }, // Batch multiple chunks |
| 79 |
Adaptive, // Adapt based on network conditions |
| 80 |
Conservative, // Wait for confirmation of permanent failure |
| 81 |
} |
| 82 |
|
| 83 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 84 |
pub enum ReplicationPriority { |
| 85 |
Emergency, // Critical data, immediate action |
| 86 |
High, // Important data, prioritized |
| 87 |
Normal, // Standard priority |
| 88 |
Low, // Background processing |
| 89 |
Deferred, // Wait for better conditions |
| 90 |
} |
| 91 |
|
| 92 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 93 |
pub struct ResourceLimits { |
| 94 |
pub max_bandwidth_mbps: f64, |
| 95 |
pub max_concurrent_transfers: u32, |
| 96 |
pub max_storage_usage_gb: u64, |
| 97 |
pub max_cost_per_hour: f64, |
| 98 |
pub cpu_usage_limit: f64, |
| 99 |
} |
| 100 |
|
| 101 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 102 |
pub struct GeographicConstraints { |
| 103 |
pub required_regions: Vec<GeographicRegion>, |
| 104 |
pub forbidden_regions: Vec<GeographicRegion>, |
| 105 |
pub min_distance_km: f64, |
| 106 |
pub regulatory_compliance: Vec<String>, |
| 107 |
} |
| 108 |
|
| 109 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 110 |
pub struct NodeStatus { |
| 111 |
pub node_id: String, |
| 112 |
pub region: GeographicRegion, |
| 113 |
pub status: NodeState, |
| 114 |
pub last_seen: DateTime<Utc>, |
| 115 |
pub consecutive_failures: u32, |
| 116 |
pub failure_history: VecDeque<FailureEvent>, |
| 117 |
pub predicted_availability: f64, |
| 118 |
pub maintenance_scheduled: Option<DateTime<Utc>>, |
| 119 |
} |
| 120 |
|
| 121 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 122 |
pub enum NodeState { |
| 123 |
Online, |
| 124 |
Degraded, |
| 125 |
Offline, |
| 126 |
Maintenance, |
| 127 |
Unknown, |
| 128 |
Suspected, // Suspected of being offline |
| 129 |
} |
| 130 |
|
| 131 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 132 |
pub struct FailureEvent { |
| 133 |
pub timestamp: DateTime<Utc>, |
| 134 |
pub failure_type: FailureType, |
| 135 |
pub duration_seconds: Option<u64>, |
| 136 |
pub recovery_time_seconds: Option<u64>, |
| 137 |
pub root_cause: Option<String>, |
| 138 |
} |
| 139 |
|
| 140 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 141 |
pub enum FailureType { |
| 142 |
NetworkTimeout, |
| 143 |
DiskFailure, |
| 144 |
PowerOutage, |
| 145 |
MaintenanceShutdown, |
| 146 |
ProcessCrash, |
| 147 |
NetworkPartition, |
| 148 |
Unknown, |
| 149 |
} |
| 150 |
|
| 151 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 152 |
pub struct ReplicationTask { |
| 153 |
pub task_id: String, |
| 154 |
pub chunk_id: String, |
| 155 |
pub content_type: ContentType, |
| 156 |
pub trigger_reason: TriggerReason, |
| 157 |
pub source_replicas: Vec<String>, |
| 158 |
pub target_nodes: Vec<String>, |
| 159 |
pub status: TaskStatus, |
| 160 |
pub progress: TaskProgress, |
| 161 |
pub created_at: DateTime<Utc>, |
| 162 |
pub started_at: Option<DateTime<Utc>>, |
| 163 |
pub completed_at: Option<DateTime<Utc>>, |
| 164 |
pub estimated_completion: Option<DateTime<Utc>>, |
| 165 |
pub resource_usage: ResourceUsage, |
| 166 |
} |
| 167 |
|
| 168 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 169 |
pub enum TriggerReason { |
| 170 |
NodeFailure { failed_nodes: Vec<String> }, |
| 171 |
HealthDegradation { health_score: f64 }, |
| 172 |
PolicyViolation { policy_id: String }, |
| 173 |
ManualRequest { requested_by: String }, |
| 174 |
PredictiveAction { predicted_failure: String }, |
| 175 |
} |
| 176 |
|
| 177 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 178 |
pub enum TaskStatus { |
| 179 |
Queued, |
| 180 |
Planning, |
| 181 |
Executing, |
| 182 |
Verifying, |
| 183 |
Completed, |
| 184 |
Failed, |
| 185 |
Cancelled, |
| 186 |
Paused, |
| 187 |
} |
| 188 |
|
| 189 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 190 |
pub struct TaskProgress { |
| 191 |
pub stage: ReplicationStage, |
| 192 |
pub percentage_complete: f64, |
| 193 |
pub bytes_transferred: u64, |
| 194 |
pub total_bytes: u64, |
| 195 |
pub transfer_rate_mbps: f64, |
| 196 |
pub estimated_time_remaining_seconds: u64, |
| 197 |
pub current_operation: String, |
| 198 |
} |
| 199 |
|
| 200 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 201 |
pub enum ReplicationStage { |
| 202 |
Initializing, |
| 203 |
SelectingNodes, |
| 204 |
PreparingTransfer, |
| 205 |
Transferring, |
| 206 |
Verifying, |
| 207 |
Finalizing, |
| 208 |
} |
| 209 |
|
| 210 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 211 |
pub struct ResourceUsage { |
| 212 |
pub bandwidth_used_mbps: f64, |
| 213 |
pub storage_used_gb: u64, |
| 214 |
pub cpu_usage_percent: f64, |
| 215 |
pub cost_incurred: f64, |
| 216 |
pub network_transfers: u32, |
| 217 |
} |
| 218 |
|
| 219 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 220 |
pub struct RecoveryStrategyConfig { |
| 221 |
pub prefer_local_replicas: bool, |
| 222 |
pub max_recovery_distance_km: f64, |
| 223 |
pub parallel_recovery_streams: u32, |
| 224 |
pub verification_level: VerificationLevel, |
| 225 |
pub fallback_strategies: Vec<FallbackStrategy>, |
| 226 |
pub optimization_goals: OptimizationGoals, |
| 227 |
} |
| 228 |
|
| 229 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 230 |
pub enum VerificationLevel { |
| 231 |
Basic, // Hash verification only |
| 232 |
Standard, // Hash + size + basic integrity |
| 233 |
Thorough, // Full content verification |
| 234 |
Paranoid, // Multiple verification methods |
| 235 |
} |
| 236 |
|
| 237 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 238 |
pub enum FallbackStrategy { |
| 239 |
UseRemoteReplicas, |
| 240 |
IncreaseReplicationFactor, |
| 241 |
RelaxGeographicConstraints, |
| 242 |
UseExpensiveNodes, |
| 243 |
WaitForNodeRecovery, |
| 244 |
EmergencyProtocol, |
| 245 |
} |
| 246 |
|
| 247 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 248 |
pub struct OptimizationGoals { |
| 249 |
pub minimize_cost: bool, |
| 250 |
pub minimize_latency: bool, |
| 251 |
pub maximize_durability: bool, |
| 252 |
pub balance_geographic_distribution: bool, |
| 253 |
pub prefer_high_performance_nodes: bool, |
| 254 |
} |
| 255 |
|
| 256 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 257 |
pub struct AutoReplicationMetrics { |
| 258 |
pub total_replications_triggered: u64, |
| 259 |
pub successful_replications: u64, |
| 260 |
pub failed_replications: u64, |
| 261 |
pub average_replication_time_seconds: f64, |
| 262 |
pub total_data_recovered_gb: u64, |
| 263 |
pub cost_of_replications: f64, |
| 264 |
pub nodes_replaced: u64, |
| 265 |
pub emergency_recoveries: u64, |
| 266 |
pub last_updated: DateTime<Utc>, |
| 267 |
} |
| 268 |
|
| 269 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 270 |
pub struct EmergencyReplicationConfig { |
| 271 |
pub enable_emergency_mode: bool, |
| 272 |
pub emergency_triggers: Vec<EmergencyTrigger>, |
| 273 |
pub emergency_resources: EmergencyResources, |
| 274 |
pub escalation_timeouts: EscalationTimeouts, |
| 275 |
pub emergency_contacts: Vec<String>, |
| 276 |
} |
| 277 |
|
| 278 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 279 |
pub enum EmergencyTrigger { |
| 280 |
DataLossImminent { chunks_at_risk: u32 }, |
| 281 |
NetworkPartition { partition_size: f64 }, |
| 282 |
MassNodeFailure { failure_rate: f64 }, |
| 283 |
StorageCapacityCritical { utilization: f64 }, |
| 284 |
ComplianceViolation { severity: String }, |
| 285 |
} |
| 286 |
|
| 287 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 288 |
pub struct EmergencyResources { |
| 289 |
pub reserved_bandwidth_mbps: f64, |
| 290 |
pub reserved_storage_gb: u64, |
| 291 |
pub priority_node_access: bool, |
| 292 |
pub cost_override_enabled: bool, |
| 293 |
pub geographic_restriction_override: bool, |
| 294 |
} |
| 295 |
|
| 296 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 297 |
pub struct EscalationTimeouts { |
| 298 |
pub initial_response_minutes: u32, |
| 299 |
pub escalation_interval_minutes: u32, |
| 300 |
pub max_escalation_levels: u32, |
| 301 |
pub emergency_override_minutes: u32, |
| 302 |
} |
| 303 |
|
| 304 |
impl Default for FailureDetectionConfig { |
| 305 |
fn default() -> Self { |
| 306 |
Self { |
| 307 |
heartbeat_interval_seconds: 30, |
| 308 |
offline_timeout_seconds: 180, // 3 minutes |
| 309 |
failure_threshold: 3, |
| 310 |
grace_period_seconds: 60, |
| 311 |
predictive_detection: true, |
| 312 |
partition_detection: true, |
| 313 |
} |
| 314 |
} |
| 315 |
} |
| 316 |
|
| 317 |
impl AutoReplicationManager { |
| 318 |
/// Create new auto replication manager |
| 319 |
pub fn new() -> Self { |
| 320 |
let mut manager = Self { |
| 321 |
failure_detection: FailureDetectionConfig::default(), |
| 322 |
replication_policies: HashMap::new(), |
| 323 |
active_tasks: HashMap::new(), |
| 324 |
node_status: HashMap::new(), |
| 325 |
recovery_strategies: RecoveryStrategyConfig { |
| 326 |
prefer_local_replicas: true, |
| 327 |
max_recovery_distance_km: 5000.0, |
| 328 |
parallel_recovery_streams: 3, |
| 329 |
verification_level: VerificationLevel::Standard, |
| 330 |
fallback_strategies: vec![ |
| 331 |
FallbackStrategy::UseRemoteReplicas, |
| 332 |
FallbackStrategy::RelaxGeographicConstraints, |
| 333 |
FallbackStrategy::IncreaseReplicationFactor, |
| 334 |
], |
| 335 |
optimization_goals: OptimizationGoals { |
| 336 |
minimize_cost: true, |
| 337 |
minimize_latency: true, |
| 338 |
maximize_durability: true, |
| 339 |
balance_geographic_distribution: true, |
| 340 |
prefer_high_performance_nodes: false, |
| 341 |
}, |
| 342 |
}, |
| 343 |
metrics: AutoReplicationMetrics { |
| 344 |
total_replications_triggered: 0, |
| 345 |
successful_replications: 0, |
| 346 |
failed_replications: 0, |
| 347 |
average_replication_time_seconds: 0.0, |
| 348 |
total_data_recovered_gb: 0, |
| 349 |
cost_of_replications: 0.0, |
| 350 |
nodes_replaced: 0, |
| 351 |
emergency_recoveries: 0, |
| 352 |
last_updated: Utc::now(), |
| 353 |
}, |
| 354 |
emergency_config: EmergencyReplicationConfig { |
| 355 |
enable_emergency_mode: true, |
| 356 |
emergency_triggers: vec![ |
| 357 |
EmergencyTrigger::DataLossImminent { chunks_at_risk: 10 }, |
| 358 |
EmergencyTrigger::MassNodeFailure { failure_rate: 0.1 }, |
| 359 |
], |
| 360 |
emergency_resources: EmergencyResources { |
| 361 |
reserved_bandwidth_mbps: 100.0, |
| 362 |
reserved_storage_gb: 1000, |
| 363 |
priority_node_access: true, |
| 364 |
cost_override_enabled: true, |
| 365 |
geographic_restriction_override: false, |
| 366 |
}, |
| 367 |
escalation_timeouts: EscalationTimeouts { |
| 368 |
initial_response_minutes: 5, |
| 369 |
escalation_interval_minutes: 15, |
| 370 |
max_escalation_levels: 3, |
| 371 |
emergency_override_minutes: 60, |
| 372 |
}, |
| 373 |
emergency_contacts: Vec::new(), |
| 374 |
}, |
| 375 |
}; |
| 376 |
|
| 377 |
manager.initialize_default_policies(); |
| 378 |
manager |
| 379 |
} |
| 380 |
|
| 381 |
/// Initialize default replication policies |
| 382 |
fn initialize_default_policies(&mut self) { |
| 383 |
// Critical data policy |
| 384 |
self.replication_policies.insert("critical".to_string(), AutoReplicationPolicy { |
| 385 |
policy_id: "critical".to_string(), |
| 386 |
content_types: vec![ContentType::Critical], |
| 387 |
trigger_conditions: vec![ |
| 388 |
TriggerCondition::NodeOffline { grace_period_seconds: 30 }, |
| 389 |
TriggerCondition::ReplicaCountBelowThreshold { min_replicas: 5 }, |
| 390 |
TriggerCondition::HealthScoreBelowThreshold { min_score: 80.0 }, |
| 391 |
], |
| 392 |
replication_strategy: ReplicationResponseStrategy::Immediate, |
| 393 |
priority: ReplicationPriority::Emergency, |
| 394 |
max_concurrent_replications: 10, |
| 395 |
resource_limits: ResourceLimits { |
| 396 |
max_bandwidth_mbps: 1000.0, |
| 397 |
max_concurrent_transfers: 20, |
| 398 |
max_storage_usage_gb: 10000, |
| 399 |
max_cost_per_hour: 100.0, |
| 400 |
cpu_usage_limit: 80.0, |
| 401 |
}, |
| 402 |
geographic_constraints: GeographicConstraints { |
| 403 |
required_regions: vec![], |
| 404 |
forbidden_regions: vec![], |
| 405 |
min_distance_km: 1000.0, |
| 406 |
regulatory_compliance: vec!["SOX".to_string(), "HIPAA".to_string()], |
| 407 |
}, |
| 408 |
}); |
| 409 |
|
| 410 |
// Standard data policy |
| 411 |
self.replication_policies.insert("standard".to_string(), AutoReplicationPolicy { |
| 412 |
policy_id: "standard".to_string(), |
| 413 |
content_types: vec![ContentType::Standard, ContentType::Important], |
| 414 |
trigger_conditions: vec![ |
| 415 |
TriggerCondition::NodeOffline { grace_period_seconds: 120 }, |
| 416 |
TriggerCondition::ReplicaCountBelowThreshold { min_replicas: 3 }, |
| 417 |
TriggerCondition::HealthScoreBelowThreshold { min_score: 70.0 }, |
| 418 |
], |
| 419 |
replication_strategy: ReplicationResponseStrategy::Delayed { delay_seconds: 300 }, |
| 420 |
priority: ReplicationPriority::Normal, |
| 421 |
max_concurrent_replications: 5, |
| 422 |
resource_limits: ResourceLimits { |
| 423 |
max_bandwidth_mbps: 500.0, |
| 424 |
max_concurrent_transfers: 10, |
| 425 |
max_storage_usage_gb: 5000, |
| 426 |
max_cost_per_hour: 50.0, |
| 427 |
cpu_usage_limit: 60.0, |
| 428 |
}, |
| 429 |
geographic_constraints: GeographicConstraints { |
| 430 |
required_regions: vec![], |
| 431 |
forbidden_regions: vec![], |
| 432 |
min_distance_km: 500.0, |
| 433 |
regulatory_compliance: vec![], |
| 434 |
}, |
| 435 |
}); |
| 436 |
|
| 437 |
// Archive data policy |
| 438 |
self.replication_policies.insert("archive".to_string(), AutoReplicationPolicy { |
| 439 |
policy_id: "archive".to_string(), |
| 440 |
content_types: vec![ContentType::Archive], |
| 441 |
trigger_conditions: vec![ |
| 442 |
TriggerCondition::NodeOffline { grace_period_seconds: 3600 }, // 1 hour |
| 443 |
TriggerCondition::ReplicaCountBelowThreshold { min_replicas: 2 }, |
| 444 |
], |
| 445 |
replication_strategy: ReplicationResponseStrategy::Conservative, |
| 446 |
priority: ReplicationPriority::Low, |
| 447 |
max_concurrent_replications: 2, |
| 448 |
resource_limits: ResourceLimits { |
| 449 |
max_bandwidth_mbps: 100.0, |
| 450 |
max_concurrent_transfers: 3, |
| 451 |
max_storage_usage_gb: 1000, |
| 452 |
max_cost_per_hour: 10.0, |
| 453 |
cpu_usage_limit: 30.0, |
| 454 |
}, |
| 455 |
geographic_constraints: GeographicConstraints { |
| 456 |
required_regions: vec![], |
| 457 |
forbidden_regions: vec![], |
| 458 |
min_distance_km: 100.0, |
| 459 |
regulatory_compliance: vec![], |
| 460 |
}, |
| 461 |
}); |
| 462 |
} |
| 463 |
|
| 464 |
/// Update node status |
| 465 |
pub fn update_node_status(&mut self, node_id: String, status: NodeState) { |
| 466 |
let now = Utc::now(); |
| 467 |
|
| 468 |
if let Some(node_status) = self.node_status.get_mut(&node_id) { |
| 469 |
let previous_status = node_status.status.clone(); |
| 470 |
node_status.status = status.clone(); |
| 471 |
node_status.last_seen = now; |
| 472 |
|
| 473 |
// Track state transitions |
| 474 |
if !matches!(previous_status, NodeState::Online) && matches!(status, NodeState::Online) { |
| 475 |
// Node came back online |
| 476 |
node_status.consecutive_failures = 0; |
| 477 |
tracing::info!("Node {} came back online", node_id); |
| 478 |
} else if matches!(previous_status, NodeState::Online) && !matches!(status, NodeState::Online) { |
| 479 |
// Node went offline |
| 480 |
node_status.consecutive_failures += 1; |
| 481 |
|
| 482 |
node_status.failure_history.push_back(FailureEvent { |
| 483 |
timestamp: now, |
| 484 |
failure_type: FailureType::NetworkTimeout, // Default, would be determined by failure detection |
| 485 |
duration_seconds: None, |
| 486 |
recovery_time_seconds: None, |
| 487 |
root_cause: None, |
| 488 |
}); |
| 489 |
|
| 490 |
// Keep only last 100 failure events |
| 491 |
if node_status.failure_history.len() > 100 { |
| 492 |
node_status.failure_history.pop_front(); |
| 493 |
} |
| 494 |
|
| 495 |
tracing::warn!("Node {} went offline (failure #{}) ", node_id, node_status.consecutive_failures); |
| 496 |
} |
| 497 |
} else { |
| 498 |
// New node |
| 499 |
self.node_status.insert(node_id.clone(), NodeStatus { |
| 500 |
node_id: node_id.clone(), |
| 501 |
region: GeographicRegion::NorthAmerica, // Would be determined from node info |
| 502 |
status, |
| 503 |
last_seen: now, |
| 504 |
consecutive_failures: 0, |
| 505 |
failure_history: VecDeque::new(), |
| 506 |
predicted_availability: 1.0, |
| 507 |
maintenance_scheduled: None, |
| 508 |
}); |
| 509 |
} |
| 510 |
} |
| 511 |
|
| 512 |
/// Detect node failures and trigger replication |
| 513 |
pub async fn detect_failures_and_replicate(&mut self, chunk_health_data: &HashMap<String, ChunkHealth>) -> Result<()> { |
| 514 |
let failed_nodes = self.detect_failed_nodes(); |
| 515 |
|
| 516 |
if !failed_nodes.is_empty() { |
| 517 |
tracing::info!("Detected {} failed nodes: {:?}", failed_nodes.len(), failed_nodes); |
| 518 |
|
| 519 |
// Find affected chunks |
| 520 |
let affected_chunks = self.find_affected_chunks(chunk_health_data, &failed_nodes); |
| 521 |
|
| 522 |
for chunk_id in affected_chunks { |
| 523 |
if let Some(chunk_health) = chunk_health_data.get(&chunk_id) { |
| 524 |
// Check if replication is needed |
| 525 |
if self.should_trigger_replication(chunk_health, &failed_nodes)? { |
| 526 |
self.trigger_replication(chunk_id, chunk_health, &failed_nodes).await?; |
| 527 |
} |
| 528 |
} |
| 529 |
} |
| 530 |
} |
| 531 |
|
| 532 |
// Check for other trigger conditions |
| 533 |
self.check_other_trigger_conditions(chunk_health_data).await?; |
| 534 |
|
| 535 |
Ok(()) |
| 536 |
} |
| 537 |
|
| 538 |
/// Detect failed nodes |
| 539 |
fn detect_failed_nodes(&self) -> Vec<String> { |
| 540 |
let now = Utc::now(); |
| 541 |
let offline_threshold = Duration::seconds(self.failure_detection.offline_timeout_seconds as i64); |
| 542 |
|
| 543 |
self.node_status |
| 544 |
.iter() |
| 545 |
.filter(|(_, status)| { |
| 546 |
matches!(status.status, NodeState::Offline | NodeState::Unknown) || |
| 547 |
(now - status.last_seen) > offline_threshold |
| 548 |
}) |
| 549 |
.map(|(node_id, _)| node_id.clone()) |
| 550 |
.collect() |
| 551 |
} |
| 552 |
|
| 553 |
/// Find chunks affected by node failures |
| 554 |
fn find_affected_chunks(&self, chunk_health_data: &HashMap<String, ChunkHealth>, failed_nodes: &[String]) -> Vec<String> { |
| 555 |
let failed_node_set: HashSet<_> = failed_nodes.iter().collect(); |
| 556 |
|
| 557 |
chunk_health_data |
| 558 |
.iter() |
| 559 |
.filter(|(_, chunk_health)| { |
| 560 |
chunk_health.replica_health |
| 561 |
.iter() |
| 562 |
.any(|replica| failed_node_set.contains(&replica.node_id)) |
| 563 |
}) |
| 564 |
.map(|(chunk_id, _)| chunk_id.clone()) |
| 565 |
.collect() |
| 566 |
} |
| 567 |
|
| 568 |
/// Check if replication should be triggered |
| 569 |
fn should_trigger_replication(&self, chunk_health: &ChunkHealth, failed_nodes: &[String]) -> Result<bool> { |
| 570 |
let content_type = ContentType::Standard; // Would be determined from chunk metadata |
| 571 |
|
| 572 |
// Find applicable policy |
| 573 |
let policy = self.find_applicable_policy(&content_type)?; |
| 574 |
|
| 575 |
// Check each trigger condition |
| 576 |
for condition in &policy.trigger_conditions { |
| 577 |
match condition { |
| 578 |
TriggerCondition::NodeOffline { grace_period_seconds } => { |
| 579 |
// Check if any replica is on a failed node and grace period has passed |
| 580 |
let affected_replicas = chunk_health.replica_health |
| 581 |
.iter() |
| 582 |
.filter(|replica| failed_nodes.contains(&replica.node_id)) |
| 583 |
.count(); |
| 584 |
|
| 585 |
if affected_replicas > 0 { |
| 586 |
let grace_period = Duration::seconds(*grace_period_seconds as i64); |
| 587 |
let oldest_failure = Utc::now() - grace_period; // Simplified |
| 588 |
|
| 589 |
// In real implementation, would check actual failure times |
| 590 |
return Ok(true); |
| 591 |
} |
| 592 |
}, |
| 593 |
TriggerCondition::ReplicaCountBelowThreshold { min_replicas } => { |
| 594 |
let healthy_replicas = chunk_health.replica_health |
| 595 |
.iter() |
| 596 |
.filter(|replica| matches!(replica.status, ReplicaStatus::Healthy)) |
| 597 |
.count(); |
| 598 |
|
| 599 |
if healthy_replicas < *min_replicas as usize { |
| 600 |
return Ok(true); |
| 601 |
} |
| 602 |
}, |
| 603 |
TriggerCondition::HealthScoreBelowThreshold { min_score } => { |
| 604 |
if chunk_health.availability_score < *min_score { |
| 605 |
return Ok(true); |
| 606 |
} |
| 607 |
}, |
| 608 |
TriggerCondition::GeographicDistributionLoss { min_regions } => { |
| 609 |
let regions: HashSet<_> = chunk_health.replica_health |
| 610 |
.iter() |
| 611 |
.filter(|replica| matches!(replica.status, ReplicaStatus::Healthy)) |
| 612 |
.map(|replica| &replica.region) |
| 613 |
.collect(); |
| 614 |
|
| 615 |
if regions.len() < *min_regions as usize { |
| 616 |
return Ok(true); |
| 617 |
} |
| 618 |
}, |
| 619 |
_ => { |
| 620 |
// Handle other conditions |
| 621 |
} |
| 622 |
} |
| 623 |
} |
| 624 |
|
| 625 |
Ok(false) |
| 626 |
} |
| 627 |
|
| 628 |
/// Find applicable replication policy |
| 629 |
fn find_applicable_policy(&self, content_type: &ContentType) -> Result<&AutoReplicationPolicy> { |
| 630 |
for policy in self.replication_policies.values() { |
| 631 |
if policy.content_types.contains(content_type) { |
| 632 |
return Ok(policy); |
| 633 |
} |
| 634 |
} |
| 635 |
|
| 636 |
// Default to standard policy |
| 637 |
self.replication_policies.get("standard") |
| 638 |
.ok_or_else(|| anyhow::anyhow!("No applicable replication policy found")) |
| 639 |
} |
| 640 |
|
| 641 |
/// Trigger replication for a chunk |
| 642 |
pub async fn trigger_replication( |
| 643 |
&mut self, |
| 644 |
chunk_id: String, |
| 645 |
chunk_health: &ChunkHealth, |
| 646 |
failed_nodes: &[String], |
| 647 |
) -> Result<String> { |
| 648 |
let content_type = ContentType::Standard; // Would be determined from chunk metadata |
| 649 |
let policy = self.find_applicable_policy(&content_type)?; |
| 650 |
|
| 651 |
// Check if we're already replicating this chunk |
| 652 |
let existing_task = self.active_tasks.values() |
| 653 |
.find(|task| task.chunk_id == chunk_id && matches!(task.status, TaskStatus::Queued | TaskStatus::Executing)); |
| 654 |
|
| 655 |
if existing_task.is_some() { |
| 656 |
tracing::info!("Replication already in progress for chunk {}", chunk_id); |
| 657 |
return Ok(existing_task.unwrap().task_id.clone()); |
| 658 |
} |
| 659 |
|
| 660 |
// Check resource limits |
| 661 |
if self.active_tasks.len() >= policy.max_concurrent_replications as usize { |
| 662 |
tracing::warn!("Maximum concurrent replications reached, queuing chunk {}", chunk_id); |
| 663 |
} |
| 664 |
|
| 665 |
// Create replication task |
| 666 |
let task_id = format!("repl_{}_{}", chunk_id, Utc::now().timestamp()); |
| 667 |
let task = ReplicationTask { |
| 668 |
task_id: task_id.clone(), |
| 669 |
chunk_id: chunk_id.clone(), |
| 670 |
content_type, |
| 671 |
trigger_reason: TriggerReason::NodeFailure { |
| 672 |
failed_nodes: failed_nodes.to_vec(), |
| 673 |
}, |
| 674 |
source_replicas: chunk_health.replica_health |
| 675 |
.iter() |
| 676 |
.filter(|replica| matches!(replica.status, ReplicaStatus::Healthy)) |
| 677 |
.map(|replica| replica.replica_id.clone()) |
| 678 |
.collect(), |
| 679 |
target_nodes: Vec::new(), // Will be determined during planning |
| 680 |
status: TaskStatus::Queued, |
| 681 |
progress: TaskProgress { |
| 682 |
stage: ReplicationStage::Initializing, |
| 683 |
percentage_complete: 0.0, |
| 684 |
bytes_transferred: 0, |
| 685 |
total_bytes: 0, // Would be determined from chunk size |
| 686 |
transfer_rate_mbps: 0.0, |
| 687 |
estimated_time_remaining_seconds: 0, |
| 688 |
current_operation: "Queued for replication".to_string(), |
| 689 |
}, |
| 690 |
created_at: Utc::now(), |
| 691 |
started_at: None, |
| 692 |
completed_at: None, |
| 693 |
estimated_completion: None, |
| 694 |
resource_usage: ResourceUsage { |
| 695 |
bandwidth_used_mbps: 0.0, |
| 696 |
storage_used_gb: 0, |
| 697 |
cpu_usage_percent: 0.0, |
| 698 |
cost_incurred: 0.0, |
| 699 |
network_transfers: 0, |
| 700 |
}, |
| 701 |
}; |
| 702 |
|
| 703 |
self.active_tasks.insert(task_id.clone(), task); |
| 704 |
self.metrics.total_replications_triggered += 1; |
| 705 |
|
| 706 |
tracing::info!("Triggered replication for chunk {} (task: {})", chunk_id, task_id); |
| 707 |
|
| 708 |
// Schedule task execution based on policy strategy |
| 709 |
match &policy.replication_strategy { |
| 710 |
ReplicationResponseStrategy::Immediate => { |
| 711 |
self.execute_replication_task(task_id.clone()).await?; |
| 712 |
}, |
| 713 |
ReplicationResponseStrategy::Delayed { delay_seconds } => { |
| 714 |
// In real implementation, would schedule with delay |
| 715 |
tokio::spawn(async move { |
| 716 |
sleep(TokioDuration::from_secs(*delay_seconds as u64)).await; |
| 717 |
// Execute task after delay |
| 718 |
}); |
| 719 |
}, |
| 720 |
_ => { |
| 721 |
// Handle other strategies |
| 722 |
} |
| 723 |
} |
| 724 |
|
| 725 |
Ok(task_id) |
| 726 |
} |
| 727 |
|
| 728 |
/// Execute replication task |
| 729 |
async fn execute_replication_task(&mut self, task_id: String) -> Result<()> { |
| 730 |
let task = self.active_tasks.get_mut(&task_id) |
| 731 |
.ok_or_else(|| anyhow::anyhow!("Replication task not found"))?; |
| 732 |
|
| 733 |
task.status = TaskStatus::Planning; |
| 734 |
task.started_at = Some(Utc::now()); |
| 735 |
task.progress.stage = ReplicationStage::SelectingNodes; |
| 736 |
task.progress.current_operation = "Selecting target nodes".to_string(); |
| 737 |
|
| 738 |
tracing::info!("Executing replication task {}", task_id); |
| 739 |
|
| 740 |
// Select target nodes |
| 741 |
let target_nodes = self.select_target_nodes(&task.chunk_id, &task.content_type).await?; |
| 742 |
|
| 743 |
if target_nodes.is_empty() { |
| 744 |
task.status = TaskStatus::Failed; |
| 745 |
self.metrics.failed_replications += 1; |
| 746 |
return Err(anyhow::anyhow!("No suitable target nodes found")); |
| 747 |
} |
| 748 |
|
| 749 |
task.target_nodes = target_nodes; |
| 750 |
task.status = TaskStatus::Executing; |
| 751 |
task.progress.stage = ReplicationStage::Transferring; |
| 752 |
|
| 753 |
// Execute the actual replication |
| 754 |
self.perform_replication(&task_id).await?; |
| 755 |
|
| 756 |
Ok(()) |
| 757 |
} |
| 758 |
|
| 759 |
/// Select target nodes for replication |
| 760 |
async fn select_target_nodes(&self, chunk_id: &str, content_type: &ContentType) -> Result<Vec<String>> { |
| 761 |
// Simplified node selection - in real implementation would use |
| 762 |
// the intelligent replication manager |
| 763 |
let available_nodes: Vec<String> = self.node_status |
| 764 |
.iter() |
| 765 |
.filter(|(_, status)| matches!(status.status, NodeState::Online)) |
| 766 |
.map(|(node_id, _)| node_id.clone()) |
| 767 |
.take(2) // Select 2 replacement nodes |
| 768 |
.collect(); |
| 769 |
|
| 770 |
Ok(available_nodes) |
| 771 |
} |
| 772 |
|
| 773 |
/// Perform the actual replication |
| 774 |
async fn perform_replication(&mut self, task_id: &str) -> Result<()> { |
| 775 |
let task = self.active_tasks.get_mut(task_id) |
| 776 |
.ok_or_else(|| anyhow::anyhow!("Task not found"))?; |
| 777 |
|
| 778 |
// Simulate replication process |
| 779 |
task.progress.current_operation = "Transferring data".to_string(); |
| 780 |
task.progress.total_bytes = 1_000_000_000; // 1GB example |
| 781 |
|
| 782 |
// Simulate transfer progress |
| 783 |
for i in 0..=10 { |
| 784 |
task.progress.percentage_complete = i as f64 * 10.0; |
| 785 |
task.progress.bytes_transferred = (task.progress.total_bytes as f64 * (i as f64 / 10.0)) as u64; |
| 786 |
task.progress.current_operation = format!("Transferring chunk data ({:.0}%)", task.progress.percentage_complete); |
| 787 |
|
| 788 |
// Simulate transfer time |
| 789 |
sleep(TokioDuration::from_millis(100)).await; |
| 790 |
} |
| 791 |
|
| 792 |
// Verification stage |
| 793 |
task.progress.stage = ReplicationStage::Verifying; |
| 794 |
task.progress.current_operation = "Verifying replica integrity".to_string(); |
| 795 |
sleep(TokioDuration::from_millis(200)).await; |
| 796 |
|
| 797 |
// Finalization |
| 798 |
task.progress.stage = ReplicationStage::Finalizing; |
| 799 |
task.progress.current_operation = "Finalizing replication".to_string(); |
| 800 |
task.status = TaskStatus::Completed; |
| 801 |
task.completed_at = Some(Utc::now()); |
| 802 |
task.progress.percentage_complete = 100.0; |
| 803 |
|
| 804 |
// Update metrics |
| 805 |
self.metrics.successful_replications += 1; |
| 806 |
if let Some(started_at) = task.started_at { |
| 807 |
let duration = (Utc::now() - started_at).num_seconds() as f64; |
| 808 |
self.metrics.average_replication_time_seconds = |
| 809 |
(self.metrics.average_replication_time_seconds * (self.metrics.successful_replications - 1) as f64 + duration) |
| 810 |
/ self.metrics.successful_replications as f64; |
| 811 |
} |
| 812 |
|
| 813 |
tracing::info!("Replication task {} completed successfully", task_id); |
| 814 |
|
| 815 |
Ok(()) |
| 816 |
} |
| 817 |
|
| 818 |
/// Check other trigger conditions beyond node failures |
| 819 |
async fn check_other_trigger_conditions(&mut self, chunk_health_data: &HashMap<String, ChunkHealth>) -> Result<()> { |
| 820 |
for (chunk_id, chunk_health) in chunk_health_data { |
| 821 |
// Check health score thresholds |
| 822 |
if chunk_health.availability_score < 70.0 { |
| 823 |
match chunk_health.overall_health { |
| 824 |
HealthStatus::Critical | HealthStatus::Failed => { |
| 825 |
if !self.active_tasks.values().any(|task| task.chunk_id == *chunk_id) { |
| 826 |
self.trigger_replication( |
| 827 |
chunk_id.clone(), |
| 828 |
chunk_health, |
| 829 |
&[], // No specific failed nodes |
| 830 |
).await?; |
| 831 |
} |
| 832 |
}, |
| 833 |
_ => {} |
| 834 |
} |
| 835 |
} |
| 836 |
|
| 837 |
// Check geographic distribution |
| 838 |
let regions: HashSet<_> = chunk_health.replica_health |
| 839 |
.iter() |
| 840 |
.filter(|replica| matches!(replica.status, ReplicaStatus::Healthy)) |
| 841 |
.map(|replica| &replica.region) |
| 842 |
.collect(); |
| 843 |
|
| 844 |
if regions.len() < 2 { |
| 845 |
// Consider triggering replication for better geographic distribution |
| 846 |
tracing::warn!("Chunk {} has poor geographic distribution ({} regions)", chunk_id, regions.len()); |
| 847 |
} |
| 848 |
} |
| 849 |
|
| 850 |
Ok(()) |
| 851 |
} |
| 852 |
|
| 853 |
/// Clean up completed tasks |
| 854 |
pub fn cleanup_completed_tasks(&mut self) { |
| 855 |
let cutoff_time = Utc::now() - Duration::hours(24); // Keep completed tasks for 24 hours |
| 856 |
|
| 857 |
self.active_tasks.retain(|_, task| { |
| 858 |
if matches!(task.status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled) { |
| 859 |
if let Some(completed_at) = task.completed_at { |
| 860 |
completed_at > cutoff_time |
| 861 |
} else { |
| 862 |
task.created_at > cutoff_time |
| 863 |
} |
| 864 |
} else { |
| 865 |
true // Keep active tasks |
| 866 |
} |
| 867 |
}); |
| 868 |
} |
| 869 |
|
| 870 |
/// Get replication status summary |
| 871 |
pub fn get_replication_status(&self) -> ReplicationStatus { |
| 872 |
let active_count = self.active_tasks.values() |
| 873 |
.filter(|task| matches!(task.status, TaskStatus::Queued | TaskStatus::Executing)) |
| 874 |
.count(); |
| 875 |
|
| 876 |
let completed_count = self.active_tasks.values() |
| 877 |
.filter(|task| matches!(task.status, TaskStatus::Completed)) |
| 878 |
.count(); |
| 879 |
|
| 880 |
let failed_count = self.active_tasks.values() |
| 881 |
.filter(|task| matches!(task.status, TaskStatus::Failed)) |
| 882 |
.count(); |
| 883 |
|
| 884 |
ReplicationStatus { |
| 885 |
active_replications: active_count as u32, |
| 886 |
completed_replications: completed_count as u32, |
| 887 |
failed_replications: failed_count as u32, |
| 888 |
total_nodes_online: self.node_status.values() |
| 889 |
.filter(|status| matches!(status.status, NodeState::Online)) |
| 890 |
.count() as u32, |
| 891 |
total_nodes_offline: self.node_status.values() |
| 892 |
.filter(|status| matches!(status.status, NodeState::Offline)) |
| 893 |
.count() as u32, |
| 894 |
average_replication_time: self.metrics.average_replication_time_seconds, |
| 895 |
total_data_replicated: self.metrics.total_data_recovered_gb, |
| 896 |
metrics: self.metrics.clone(), |
| 897 |
} |
| 898 |
} |
| 899 |
|
| 900 |
/// Run continuous monitoring and auto-replication |
| 901 |
pub async fn run_auto_replication_loop(&mut self, chunk_health_data: HashMap<String, ChunkHealth>) -> Result<()> { |
| 902 |
let mut check_interval = tokio::time::interval(TokioDuration::from_secs( |
| 903 |
self.failure_detection.heartbeat_interval_seconds as u64 |
| 904 |
)); |
| 905 |
|
| 906 |
loop { |
| 907 |
check_interval.tick().await; |
| 908 |
|
| 909 |
// Detect failures and trigger replication |
| 910 |
if let Err(e) = self.detect_failures_and_replicate(&chunk_health_data).await { |
| 911 |
tracing::error!("Auto-replication check failed: {}", e); |
| 912 |
} |
| 913 |
|
| 914 |
// Clean up old tasks |
| 915 |
self.cleanup_completed_tasks(); |
| 916 |
|
| 917 |
// Update metrics |
| 918 |
self.metrics.last_updated = Utc::now(); |
| 919 |
|
| 920 |
tracing::debug!("Auto-replication check complete. Active tasks: {}", self.active_tasks.len()); |
| 921 |
} |
| 922 |
} |
| 923 |
} |
| 924 |
|
| 925 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 926 |
pub struct ReplicationStatus { |
| 927 |
pub active_replications: u32, |
| 928 |
pub completed_replications: u32, |
| 929 |
pub failed_replications: u32, |
| 930 |
pub total_nodes_online: u32, |
| 931 |
pub total_nodes_offline: u32, |
| 932 |
pub average_replication_time: f64, |
| 933 |
pub total_data_replicated: u64, |
| 934 |
pub metrics: AutoReplicationMetrics, |
| 935 |
} |
| 936 |
|
| 937 |
#[cfg(test)] |
| 938 |
mod tests { |
| 939 |
use super::*; |
| 940 |
|
| 941 |
#[test] |
| 942 |
fn test_auto_replication_manager_creation() { |
| 943 |
let manager = AutoReplicationManager::new(); |
| 944 |
assert!(!manager.replication_policies.is_empty()); |
| 945 |
assert!(manager.failure_detection.heartbeat_interval_seconds > 0); |
| 946 |
assert!(manager.emergency_config.enable_emergency_mode); |
| 947 |
} |
| 948 |
|
| 949 |
#[test] |
| 950 |
fn test_node_status_updates() { |
| 951 |
let mut manager = AutoReplicationManager::new(); |
| 952 |
|
| 953 |
// Add node as online |
| 954 |
manager.update_node_status("node1".to_string(), NodeState::Online); |
| 955 |
assert_eq!(manager.node_status.len(), 1); |
| 956 |
|
| 957 |
// Update to offline |
| 958 |
manager.update_node_status("node1".to_string(), NodeState::Offline); |
| 959 |
let status = manager.node_status.get("node1").unwrap(); |
| 960 |
assert_eq!(status.consecutive_failures, 1); |
| 961 |
assert!(!status.failure_history.is_empty()); |
| 962 |
|
| 963 |
// Back to online |
| 964 |
manager.update_node_status("node1".to_string(), NodeState::Online); |
| 965 |
let status = manager.node_status.get("node1").unwrap(); |
| 966 |
assert_eq!(status.consecutive_failures, 0); |
| 967 |
} |
| 968 |
|
| 969 |
#[test] |
| 970 |
fn test_failure_detection() { |
| 971 |
let mut manager = AutoReplicationManager::new(); |
| 972 |
|
| 973 |
// Add some nodes |
| 974 |
manager.update_node_status("node1".to_string(), NodeState::Online); |
| 975 |
manager.update_node_status("node2".to_string(), NodeState::Offline); |
| 976 |
manager.update_node_status("node3".to_string(), NodeState::Unknown); |
| 977 |
|
| 978 |
let failed_nodes = manager.detect_failed_nodes(); |
| 979 |
assert!(failed_nodes.contains(&"node2".to_string())); |
| 980 |
assert!(failed_nodes.contains(&"node3".to_string())); |
| 981 |
assert!(!failed_nodes.contains(&"node1".to_string())); |
| 982 |
} |
| 983 |
|
| 984 |
#[tokio::test] |
| 985 |
async fn test_replication_trigger() { |
| 986 |
let mut manager = AutoReplicationManager::new(); |
| 987 |
|
| 988 |
// Create mock chunk health with failed replica |
| 989 |
let chunk_health = ChunkHealth { |
| 990 |
chunk_id: "test_chunk".to_string(), |
| 991 |
overall_health: HealthStatus::Critical, |
| 992 |
replica_health: vec![ |
| 993 |
ReplicaHealth { |
| 994 |
replica_id: "replica1".to_string(), |
| 995 |
node_id: "failed_node".to_string(), |
| 996 |
region: GeographicRegion::NorthAmerica, |
| 997 |
status: ReplicaStatus::Unreachable, |
| 998 |
health_score: 0.0, |
| 999 |
last_accessed: Utc::now(), |
| 1000 |
last_verified: Utc::now(), |
| 1001 |
integrity_hash: "hash1".to_string(), |
| 1002 |
performance_metrics: super::super::health_monitor::ReplicaPerformanceMetrics { |
| 1003 |
response_time_ms: 0.0, |
| 1004 |
transfer_speed_mbps: 0.0, |
| 1005 |
success_rate: 0.0, |
| 1006 |
error_count: 0, |
| 1007 |
last_error: None, |
| 1008 |
uptime_percentage: 0.0, |
| 1009 |
}, |
| 1010 |
connectivity_status: super::super::health_monitor::ConnectivityStatus::Offline, |
| 1011 |
}, |
| 1012 |
], |
| 1013 |
integrity_status: super::super::health_monitor::IntegrityStatus::Unknown, |
| 1014 |
availability_score: 30.0, |
| 1015 |
durability_score: 30.0, |
| 1016 |
performance_metrics: super::super::health_monitor::ChunkPerformanceMetrics { |
| 1017 |
avg_response_time_ms: 0.0, |
| 1018 |
success_rate: 0.0, |
| 1019 |
throughput_mbps: 0.0, |
| 1020 |
error_rate: 100.0, |
| 1021 |
access_frequency: super::super::health_monitor::AccessFrequency::Low, |
| 1022 |
bandwidth_utilization: 0.0, |
| 1023 |
}, |
| 1024 |
last_verified: Utc::now(), |
| 1025 |
next_check_due: Utc::now(), |
| 1026 |
risk_factors: vec![], |
| 1027 |
repair_history: vec![], |
| 1028 |
}; |
| 1029 |
|
| 1030 |
let failed_nodes = vec!["failed_node".to_string()]; |
| 1031 |
|
| 1032 |
let task_id = manager.trigger_replication( |
| 1033 |
"test_chunk".to_string(), |
| 1034 |
&chunk_health, |
| 1035 |
&failed_nodes, |
| 1036 |
).await.unwrap(); |
| 1037 |
|
| 1038 |
assert!(!task_id.is_empty()); |
| 1039 |
assert!(manager.active_tasks.contains_key(&task_id)); |
| 1040 |
|
| 1041 |
let task = manager.active_tasks.get(&task_id).unwrap(); |
| 1042 |
assert_eq!(task.chunk_id, "test_chunk"); |
| 1043 |
assert!(matches!(task.trigger_reason, TriggerReason::NodeFailure { .. })); |
| 1044 |
} |
| 1045 |
} |