Rust · 39976 bytes Raw Blame History
1 //! Automatic Replication System
2 //!
3 //! Handles automatic replication when nodes go offline, ensuring data durability
4 //! through intelligent recovery and replacement strategies
5
6 use anyhow::Result;
7 use serde::{Deserialize, Serialize};
8 use std::collections::{HashMap, VecDeque, HashSet};
9 use chrono::{DateTime, Utc, Duration};
10 use tokio::time::{sleep, Duration as TokioDuration};
11
12 use crate::economics::GeographicRegion;
13 use super::health_monitor::{ChunkHealth, ReplicaHealth, ReplicaStatus, HealthStatus};
14 use super::intelligent_replication::{ReplicationStrategy, ContentType};
15
16 /// Automatic replication manager
17 #[derive(Debug, Clone, Serialize, Deserialize)]
18 pub struct AutoReplicationManager {
19 /// Node failure detection
20 pub failure_detection: FailureDetectionConfig,
21 /// Replication policies
22 pub replication_policies: HashMap<String, AutoReplicationPolicy>,
23 /// Active replication tasks
24 pub active_tasks: HashMap<String, ReplicationTask>,
25 /// Node status tracking
26 pub node_status: HashMap<String, NodeStatus>,
27 /// Recovery strategies
28 pub recovery_strategies: RecoveryStrategyConfig,
29 /// Performance metrics
30 pub metrics: AutoReplicationMetrics,
31 /// Emergency protocols
32 pub emergency_config: EmergencyReplicationConfig,
33 }
34
35 #[derive(Debug, Clone, Serialize, Deserialize)]
36 pub struct FailureDetectionConfig {
37 /// Heartbeat interval for node monitoring
38 pub heartbeat_interval_seconds: u32,
39 /// Timeout before considering node offline
40 pub offline_timeout_seconds: u32,
41 /// Number of consecutive failures before triggering replication
42 pub failure_threshold: u32,
43 /// Grace period for temporary network issues
44 pub grace_period_seconds: u32,
45 /// Enable predictive failure detection
46 pub predictive_detection: bool,
47 /// Network partition detection
48 pub partition_detection: bool,
49 }
50
51 #[derive(Debug, Clone, Serialize, Deserialize)]
52 pub struct AutoReplicationPolicy {
53 pub policy_id: String,
54 pub content_types: Vec<ContentType>,
55 pub trigger_conditions: Vec<TriggerCondition>,
56 pub replication_strategy: ReplicationResponseStrategy,
57 pub priority: ReplicationPriority,
58 pub max_concurrent_replications: u32,
59 pub resource_limits: ResourceLimits,
60 pub geographic_constraints: GeographicConstraints,
61 }
62
63 #[derive(Debug, Clone, Serialize, Deserialize)]
64 pub enum TriggerCondition {
65 NodeOffline { grace_period_seconds: u32 },
66 ReplicaCountBelowThreshold { min_replicas: u32 },
67 HealthScoreBelowThreshold { min_score: f64 },
68 GeographicDistributionLoss { min_regions: u32 },
69 PerformanceDegradation { max_response_time_ms: f64 },
70 IntegrityViolation,
71 NetworkPartition,
72 }
73
74 #[derive(Debug, Clone, Serialize, Deserialize)]
75 pub enum ReplicationResponseStrategy {
76 Immediate, // Start replication immediately
77 Delayed { delay_seconds: u32 }, // Wait before starting
78 Batched { batch_size: u32 }, // Batch multiple chunks
79 Adaptive, // Adapt based on network conditions
80 Conservative, // Wait for confirmation of permanent failure
81 }
82
83 #[derive(Debug, Clone, Serialize, Deserialize)]
84 pub enum ReplicationPriority {
85 Emergency, // Critical data, immediate action
86 High, // Important data, prioritized
87 Normal, // Standard priority
88 Low, // Background processing
89 Deferred, // Wait for better conditions
90 }
91
92 #[derive(Debug, Clone, Serialize, Deserialize)]
93 pub struct ResourceLimits {
94 pub max_bandwidth_mbps: f64,
95 pub max_concurrent_transfers: u32,
96 pub max_storage_usage_gb: u64,
97 pub max_cost_per_hour: f64,
98 pub cpu_usage_limit: f64,
99 }
100
101 #[derive(Debug, Clone, Serialize, Deserialize)]
102 pub struct GeographicConstraints {
103 pub required_regions: Vec<GeographicRegion>,
104 pub forbidden_regions: Vec<GeographicRegion>,
105 pub min_distance_km: f64,
106 pub regulatory_compliance: Vec<String>,
107 }
108
109 #[derive(Debug, Clone, Serialize, Deserialize)]
110 pub struct NodeStatus {
111 pub node_id: String,
112 pub region: GeographicRegion,
113 pub status: NodeState,
114 pub last_seen: DateTime<Utc>,
115 pub consecutive_failures: u32,
116 pub failure_history: VecDeque<FailureEvent>,
117 pub predicted_availability: f64,
118 pub maintenance_scheduled: Option<DateTime<Utc>>,
119 }
120
121 #[derive(Debug, Clone, Serialize, Deserialize)]
122 pub enum NodeState {
123 Online,
124 Degraded,
125 Offline,
126 Maintenance,
127 Unknown,
128 Suspected, // Suspected of being offline
129 }
130
131 #[derive(Debug, Clone, Serialize, Deserialize)]
132 pub struct FailureEvent {
133 pub timestamp: DateTime<Utc>,
134 pub failure_type: FailureType,
135 pub duration_seconds: Option<u64>,
136 pub recovery_time_seconds: Option<u64>,
137 pub root_cause: Option<String>,
138 }
139
140 #[derive(Debug, Clone, Serialize, Deserialize)]
141 pub enum FailureType {
142 NetworkTimeout,
143 DiskFailure,
144 PowerOutage,
145 MaintenanceShutdown,
146 ProcessCrash,
147 NetworkPartition,
148 Unknown,
149 }
150
151 #[derive(Debug, Clone, Serialize, Deserialize)]
152 pub struct ReplicationTask {
153 pub task_id: String,
154 pub chunk_id: String,
155 pub content_type: ContentType,
156 pub trigger_reason: TriggerReason,
157 pub source_replicas: Vec<String>,
158 pub target_nodes: Vec<String>,
159 pub status: TaskStatus,
160 pub progress: TaskProgress,
161 pub created_at: DateTime<Utc>,
162 pub started_at: Option<DateTime<Utc>>,
163 pub completed_at: Option<DateTime<Utc>>,
164 pub estimated_completion: Option<DateTime<Utc>>,
165 pub resource_usage: ResourceUsage,
166 }
167
168 #[derive(Debug, Clone, Serialize, Deserialize)]
169 pub enum TriggerReason {
170 NodeFailure { failed_nodes: Vec<String> },
171 HealthDegradation { health_score: f64 },
172 PolicyViolation { policy_id: String },
173 ManualRequest { requested_by: String },
174 PredictiveAction { predicted_failure: String },
175 }
176
177 #[derive(Debug, Clone, Serialize, Deserialize)]
178 pub enum TaskStatus {
179 Queued,
180 Planning,
181 Executing,
182 Verifying,
183 Completed,
184 Failed,
185 Cancelled,
186 Paused,
187 }
188
189 #[derive(Debug, Clone, Serialize, Deserialize)]
190 pub struct TaskProgress {
191 pub stage: ReplicationStage,
192 pub percentage_complete: f64,
193 pub bytes_transferred: u64,
194 pub total_bytes: u64,
195 pub transfer_rate_mbps: f64,
196 pub estimated_time_remaining_seconds: u64,
197 pub current_operation: String,
198 }
199
200 #[derive(Debug, Clone, Serialize, Deserialize)]
201 pub enum ReplicationStage {
202 Initializing,
203 SelectingNodes,
204 PreparingTransfer,
205 Transferring,
206 Verifying,
207 Finalizing,
208 }
209
210 #[derive(Debug, Clone, Serialize, Deserialize)]
211 pub struct ResourceUsage {
212 pub bandwidth_used_mbps: f64,
213 pub storage_used_gb: u64,
214 pub cpu_usage_percent: f64,
215 pub cost_incurred: f64,
216 pub network_transfers: u32,
217 }
218
219 #[derive(Debug, Clone, Serialize, Deserialize)]
220 pub struct RecoveryStrategyConfig {
221 pub prefer_local_replicas: bool,
222 pub max_recovery_distance_km: f64,
223 pub parallel_recovery_streams: u32,
224 pub verification_level: VerificationLevel,
225 pub fallback_strategies: Vec<FallbackStrategy>,
226 pub optimization_goals: OptimizationGoals,
227 }
228
229 #[derive(Debug, Clone, Serialize, Deserialize)]
230 pub enum VerificationLevel {
231 Basic, // Hash verification only
232 Standard, // Hash + size + basic integrity
233 Thorough, // Full content verification
234 Paranoid, // Multiple verification methods
235 }
236
237 #[derive(Debug, Clone, Serialize, Deserialize)]
238 pub enum FallbackStrategy {
239 UseRemoteReplicas,
240 IncreaseReplicationFactor,
241 RelaxGeographicConstraints,
242 UseExpensiveNodes,
243 WaitForNodeRecovery,
244 EmergencyProtocol,
245 }
246
247 #[derive(Debug, Clone, Serialize, Deserialize)]
248 pub struct OptimizationGoals {
249 pub minimize_cost: bool,
250 pub minimize_latency: bool,
251 pub maximize_durability: bool,
252 pub balance_geographic_distribution: bool,
253 pub prefer_high_performance_nodes: bool,
254 }
255
256 #[derive(Debug, Clone, Serialize, Deserialize)]
257 pub struct AutoReplicationMetrics {
258 pub total_replications_triggered: u64,
259 pub successful_replications: u64,
260 pub failed_replications: u64,
261 pub average_replication_time_seconds: f64,
262 pub total_data_recovered_gb: u64,
263 pub cost_of_replications: f64,
264 pub nodes_replaced: u64,
265 pub emergency_recoveries: u64,
266 pub last_updated: DateTime<Utc>,
267 }
268
269 #[derive(Debug, Clone, Serialize, Deserialize)]
270 pub struct EmergencyReplicationConfig {
271 pub enable_emergency_mode: bool,
272 pub emergency_triggers: Vec<EmergencyTrigger>,
273 pub emergency_resources: EmergencyResources,
274 pub escalation_timeouts: EscalationTimeouts,
275 pub emergency_contacts: Vec<String>,
276 }
277
278 #[derive(Debug, Clone, Serialize, Deserialize)]
279 pub enum EmergencyTrigger {
280 DataLossImminent { chunks_at_risk: u32 },
281 NetworkPartition { partition_size: f64 },
282 MassNodeFailure { failure_rate: f64 },
283 StorageCapacityCritical { utilization: f64 },
284 ComplianceViolation { severity: String },
285 }
286
287 #[derive(Debug, Clone, Serialize, Deserialize)]
288 pub struct EmergencyResources {
289 pub reserved_bandwidth_mbps: f64,
290 pub reserved_storage_gb: u64,
291 pub priority_node_access: bool,
292 pub cost_override_enabled: bool,
293 pub geographic_restriction_override: bool,
294 }
295
296 #[derive(Debug, Clone, Serialize, Deserialize)]
297 pub struct EscalationTimeouts {
298 pub initial_response_minutes: u32,
299 pub escalation_interval_minutes: u32,
300 pub max_escalation_levels: u32,
301 pub emergency_override_minutes: u32,
302 }
303
304 impl Default for FailureDetectionConfig {
305 fn default() -> Self {
306 Self {
307 heartbeat_interval_seconds: 30,
308 offline_timeout_seconds: 180, // 3 minutes
309 failure_threshold: 3,
310 grace_period_seconds: 60,
311 predictive_detection: true,
312 partition_detection: true,
313 }
314 }
315 }
316
317 impl AutoReplicationManager {
318 /// Create new auto replication manager
319 pub fn new() -> Self {
320 let mut manager = Self {
321 failure_detection: FailureDetectionConfig::default(),
322 replication_policies: HashMap::new(),
323 active_tasks: HashMap::new(),
324 node_status: HashMap::new(),
325 recovery_strategies: RecoveryStrategyConfig {
326 prefer_local_replicas: true,
327 max_recovery_distance_km: 5000.0,
328 parallel_recovery_streams: 3,
329 verification_level: VerificationLevel::Standard,
330 fallback_strategies: vec![
331 FallbackStrategy::UseRemoteReplicas,
332 FallbackStrategy::RelaxGeographicConstraints,
333 FallbackStrategy::IncreaseReplicationFactor,
334 ],
335 optimization_goals: OptimizationGoals {
336 minimize_cost: true,
337 minimize_latency: true,
338 maximize_durability: true,
339 balance_geographic_distribution: true,
340 prefer_high_performance_nodes: false,
341 },
342 },
343 metrics: AutoReplicationMetrics {
344 total_replications_triggered: 0,
345 successful_replications: 0,
346 failed_replications: 0,
347 average_replication_time_seconds: 0.0,
348 total_data_recovered_gb: 0,
349 cost_of_replications: 0.0,
350 nodes_replaced: 0,
351 emergency_recoveries: 0,
352 last_updated: Utc::now(),
353 },
354 emergency_config: EmergencyReplicationConfig {
355 enable_emergency_mode: true,
356 emergency_triggers: vec![
357 EmergencyTrigger::DataLossImminent { chunks_at_risk: 10 },
358 EmergencyTrigger::MassNodeFailure { failure_rate: 0.1 },
359 ],
360 emergency_resources: EmergencyResources {
361 reserved_bandwidth_mbps: 100.0,
362 reserved_storage_gb: 1000,
363 priority_node_access: true,
364 cost_override_enabled: true,
365 geographic_restriction_override: false,
366 },
367 escalation_timeouts: EscalationTimeouts {
368 initial_response_minutes: 5,
369 escalation_interval_minutes: 15,
370 max_escalation_levels: 3,
371 emergency_override_minutes: 60,
372 },
373 emergency_contacts: Vec::new(),
374 },
375 };
376
377 manager.initialize_default_policies();
378 manager
379 }
380
381 /// Initialize default replication policies
382 fn initialize_default_policies(&mut self) {
383 // Critical data policy
384 self.replication_policies.insert("critical".to_string(), AutoReplicationPolicy {
385 policy_id: "critical".to_string(),
386 content_types: vec![ContentType::Critical],
387 trigger_conditions: vec![
388 TriggerCondition::NodeOffline { grace_period_seconds: 30 },
389 TriggerCondition::ReplicaCountBelowThreshold { min_replicas: 5 },
390 TriggerCondition::HealthScoreBelowThreshold { min_score: 80.0 },
391 ],
392 replication_strategy: ReplicationResponseStrategy::Immediate,
393 priority: ReplicationPriority::Emergency,
394 max_concurrent_replications: 10,
395 resource_limits: ResourceLimits {
396 max_bandwidth_mbps: 1000.0,
397 max_concurrent_transfers: 20,
398 max_storage_usage_gb: 10000,
399 max_cost_per_hour: 100.0,
400 cpu_usage_limit: 80.0,
401 },
402 geographic_constraints: GeographicConstraints {
403 required_regions: vec![],
404 forbidden_regions: vec![],
405 min_distance_km: 1000.0,
406 regulatory_compliance: vec!["SOX".to_string(), "HIPAA".to_string()],
407 },
408 });
409
410 // Standard data policy
411 self.replication_policies.insert("standard".to_string(), AutoReplicationPolicy {
412 policy_id: "standard".to_string(),
413 content_types: vec![ContentType::Standard, ContentType::Important],
414 trigger_conditions: vec![
415 TriggerCondition::NodeOffline { grace_period_seconds: 120 },
416 TriggerCondition::ReplicaCountBelowThreshold { min_replicas: 3 },
417 TriggerCondition::HealthScoreBelowThreshold { min_score: 70.0 },
418 ],
419 replication_strategy: ReplicationResponseStrategy::Delayed { delay_seconds: 300 },
420 priority: ReplicationPriority::Normal,
421 max_concurrent_replications: 5,
422 resource_limits: ResourceLimits {
423 max_bandwidth_mbps: 500.0,
424 max_concurrent_transfers: 10,
425 max_storage_usage_gb: 5000,
426 max_cost_per_hour: 50.0,
427 cpu_usage_limit: 60.0,
428 },
429 geographic_constraints: GeographicConstraints {
430 required_regions: vec![],
431 forbidden_regions: vec![],
432 min_distance_km: 500.0,
433 regulatory_compliance: vec![],
434 },
435 });
436
437 // Archive data policy
438 self.replication_policies.insert("archive".to_string(), AutoReplicationPolicy {
439 policy_id: "archive".to_string(),
440 content_types: vec![ContentType::Archive],
441 trigger_conditions: vec![
442 TriggerCondition::NodeOffline { grace_period_seconds: 3600 }, // 1 hour
443 TriggerCondition::ReplicaCountBelowThreshold { min_replicas: 2 },
444 ],
445 replication_strategy: ReplicationResponseStrategy::Conservative,
446 priority: ReplicationPriority::Low,
447 max_concurrent_replications: 2,
448 resource_limits: ResourceLimits {
449 max_bandwidth_mbps: 100.0,
450 max_concurrent_transfers: 3,
451 max_storage_usage_gb: 1000,
452 max_cost_per_hour: 10.0,
453 cpu_usage_limit: 30.0,
454 },
455 geographic_constraints: GeographicConstraints {
456 required_regions: vec![],
457 forbidden_regions: vec![],
458 min_distance_km: 100.0,
459 regulatory_compliance: vec![],
460 },
461 });
462 }
463
464 /// Update node status
465 pub fn update_node_status(&mut self, node_id: String, status: NodeState) {
466 let now = Utc::now();
467
468 if let Some(node_status) = self.node_status.get_mut(&node_id) {
469 let previous_status = node_status.status.clone();
470 node_status.status = status.clone();
471 node_status.last_seen = now;
472
473 // Track state transitions
474 if !matches!(previous_status, NodeState::Online) && matches!(status, NodeState::Online) {
475 // Node came back online
476 node_status.consecutive_failures = 0;
477 tracing::info!("Node {} came back online", node_id);
478 } else if matches!(previous_status, NodeState::Online) && !matches!(status, NodeState::Online) {
479 // Node went offline
480 node_status.consecutive_failures += 1;
481
482 node_status.failure_history.push_back(FailureEvent {
483 timestamp: now,
484 failure_type: FailureType::NetworkTimeout, // Default, would be determined by failure detection
485 duration_seconds: None,
486 recovery_time_seconds: None,
487 root_cause: None,
488 });
489
490 // Keep only last 100 failure events
491 if node_status.failure_history.len() > 100 {
492 node_status.failure_history.pop_front();
493 }
494
495 tracing::warn!("Node {} went offline (failure #{}) ", node_id, node_status.consecutive_failures);
496 }
497 } else {
498 // New node
499 self.node_status.insert(node_id.clone(), NodeStatus {
500 node_id: node_id.clone(),
501 region: GeographicRegion::NorthAmerica, // Would be determined from node info
502 status,
503 last_seen: now,
504 consecutive_failures: 0,
505 failure_history: VecDeque::new(),
506 predicted_availability: 1.0,
507 maintenance_scheduled: None,
508 });
509 }
510 }
511
512 /// Detect node failures and trigger replication
513 pub async fn detect_failures_and_replicate(&mut self, chunk_health_data: &HashMap<String, ChunkHealth>) -> Result<()> {
514 let failed_nodes = self.detect_failed_nodes();
515
516 if !failed_nodes.is_empty() {
517 tracing::info!("Detected {} failed nodes: {:?}", failed_nodes.len(), failed_nodes);
518
519 // Find affected chunks
520 let affected_chunks = self.find_affected_chunks(chunk_health_data, &failed_nodes);
521
522 for chunk_id in affected_chunks {
523 if let Some(chunk_health) = chunk_health_data.get(&chunk_id) {
524 // Check if replication is needed
525 if self.should_trigger_replication(chunk_health, &failed_nodes)? {
526 self.trigger_replication(chunk_id, chunk_health, &failed_nodes).await?;
527 }
528 }
529 }
530 }
531
532 // Check for other trigger conditions
533 self.check_other_trigger_conditions(chunk_health_data).await?;
534
535 Ok(())
536 }
537
538 /// Detect failed nodes
539 fn detect_failed_nodes(&self) -> Vec<String> {
540 let now = Utc::now();
541 let offline_threshold = Duration::seconds(self.failure_detection.offline_timeout_seconds as i64);
542
543 self.node_status
544 .iter()
545 .filter(|(_, status)| {
546 matches!(status.status, NodeState::Offline | NodeState::Unknown) ||
547 (now - status.last_seen) > offline_threshold
548 })
549 .map(|(node_id, _)| node_id.clone())
550 .collect()
551 }
552
553 /// Find chunks affected by node failures
554 fn find_affected_chunks(&self, chunk_health_data: &HashMap<String, ChunkHealth>, failed_nodes: &[String]) -> Vec<String> {
555 let failed_node_set: HashSet<_> = failed_nodes.iter().collect();
556
557 chunk_health_data
558 .iter()
559 .filter(|(_, chunk_health)| {
560 chunk_health.replica_health
561 .iter()
562 .any(|replica| failed_node_set.contains(&replica.node_id))
563 })
564 .map(|(chunk_id, _)| chunk_id.clone())
565 .collect()
566 }
567
568 /// Check if replication should be triggered
569 fn should_trigger_replication(&self, chunk_health: &ChunkHealth, failed_nodes: &[String]) -> Result<bool> {
570 let content_type = ContentType::Standard; // Would be determined from chunk metadata
571
572 // Find applicable policy
573 let policy = self.find_applicable_policy(&content_type)?;
574
575 // Check each trigger condition
576 for condition in &policy.trigger_conditions {
577 match condition {
578 TriggerCondition::NodeOffline { grace_period_seconds } => {
579 // Check if any replica is on a failed node and grace period has passed
580 let affected_replicas = chunk_health.replica_health
581 .iter()
582 .filter(|replica| failed_nodes.contains(&replica.node_id))
583 .count();
584
585 if affected_replicas > 0 {
586 let grace_period = Duration::seconds(*grace_period_seconds as i64);
587 let oldest_failure = Utc::now() - grace_period; // Simplified
588
589 // In real implementation, would check actual failure times
590 return Ok(true);
591 }
592 },
593 TriggerCondition::ReplicaCountBelowThreshold { min_replicas } => {
594 let healthy_replicas = chunk_health.replica_health
595 .iter()
596 .filter(|replica| matches!(replica.status, ReplicaStatus::Healthy))
597 .count();
598
599 if healthy_replicas < *min_replicas as usize {
600 return Ok(true);
601 }
602 },
603 TriggerCondition::HealthScoreBelowThreshold { min_score } => {
604 if chunk_health.availability_score < *min_score {
605 return Ok(true);
606 }
607 },
608 TriggerCondition::GeographicDistributionLoss { min_regions } => {
609 let regions: HashSet<_> = chunk_health.replica_health
610 .iter()
611 .filter(|replica| matches!(replica.status, ReplicaStatus::Healthy))
612 .map(|replica| &replica.region)
613 .collect();
614
615 if regions.len() < *min_regions as usize {
616 return Ok(true);
617 }
618 },
619 _ => {
620 // Handle other conditions
621 }
622 }
623 }
624
625 Ok(false)
626 }
627
628 /// Find applicable replication policy
629 fn find_applicable_policy(&self, content_type: &ContentType) -> Result<&AutoReplicationPolicy> {
630 for policy in self.replication_policies.values() {
631 if policy.content_types.contains(content_type) {
632 return Ok(policy);
633 }
634 }
635
636 // Default to standard policy
637 self.replication_policies.get("standard")
638 .ok_or_else(|| anyhow::anyhow!("No applicable replication policy found"))
639 }
640
641 /// Trigger replication for a chunk
642 pub async fn trigger_replication(
643 &mut self,
644 chunk_id: String,
645 chunk_health: &ChunkHealth,
646 failed_nodes: &[String],
647 ) -> Result<String> {
648 let content_type = ContentType::Standard; // Would be determined from chunk metadata
649 let policy = self.find_applicable_policy(&content_type)?;
650
651 // Check if we're already replicating this chunk
652 let existing_task = self.active_tasks.values()
653 .find(|task| task.chunk_id == chunk_id && matches!(task.status, TaskStatus::Queued | TaskStatus::Executing));
654
655 if existing_task.is_some() {
656 tracing::info!("Replication already in progress for chunk {}", chunk_id);
657 return Ok(existing_task.unwrap().task_id.clone());
658 }
659
660 // Check resource limits
661 if self.active_tasks.len() >= policy.max_concurrent_replications as usize {
662 tracing::warn!("Maximum concurrent replications reached, queuing chunk {}", chunk_id);
663 }
664
665 // Create replication task
666 let task_id = format!("repl_{}_{}", chunk_id, Utc::now().timestamp());
667 let task = ReplicationTask {
668 task_id: task_id.clone(),
669 chunk_id: chunk_id.clone(),
670 content_type,
671 trigger_reason: TriggerReason::NodeFailure {
672 failed_nodes: failed_nodes.to_vec(),
673 },
674 source_replicas: chunk_health.replica_health
675 .iter()
676 .filter(|replica| matches!(replica.status, ReplicaStatus::Healthy))
677 .map(|replica| replica.replica_id.clone())
678 .collect(),
679 target_nodes: Vec::new(), // Will be determined during planning
680 status: TaskStatus::Queued,
681 progress: TaskProgress {
682 stage: ReplicationStage::Initializing,
683 percentage_complete: 0.0,
684 bytes_transferred: 0,
685 total_bytes: 0, // Would be determined from chunk size
686 transfer_rate_mbps: 0.0,
687 estimated_time_remaining_seconds: 0,
688 current_operation: "Queued for replication".to_string(),
689 },
690 created_at: Utc::now(),
691 started_at: None,
692 completed_at: None,
693 estimated_completion: None,
694 resource_usage: ResourceUsage {
695 bandwidth_used_mbps: 0.0,
696 storage_used_gb: 0,
697 cpu_usage_percent: 0.0,
698 cost_incurred: 0.0,
699 network_transfers: 0,
700 },
701 };
702
703 self.active_tasks.insert(task_id.clone(), task);
704 self.metrics.total_replications_triggered += 1;
705
706 tracing::info!("Triggered replication for chunk {} (task: {})", chunk_id, task_id);
707
708 // Schedule task execution based on policy strategy
709 match &policy.replication_strategy {
710 ReplicationResponseStrategy::Immediate => {
711 self.execute_replication_task(task_id.clone()).await?;
712 },
713 ReplicationResponseStrategy::Delayed { delay_seconds } => {
714 // In real implementation, would schedule with delay
715 tokio::spawn(async move {
716 sleep(TokioDuration::from_secs(*delay_seconds as u64)).await;
717 // Execute task after delay
718 });
719 },
720 _ => {
721 // Handle other strategies
722 }
723 }
724
725 Ok(task_id)
726 }
727
728 /// Execute replication task
729 async fn execute_replication_task(&mut self, task_id: String) -> Result<()> {
730 let task = self.active_tasks.get_mut(&task_id)
731 .ok_or_else(|| anyhow::anyhow!("Replication task not found"))?;
732
733 task.status = TaskStatus::Planning;
734 task.started_at = Some(Utc::now());
735 task.progress.stage = ReplicationStage::SelectingNodes;
736 task.progress.current_operation = "Selecting target nodes".to_string();
737
738 tracing::info!("Executing replication task {}", task_id);
739
740 // Select target nodes
741 let target_nodes = self.select_target_nodes(&task.chunk_id, &task.content_type).await?;
742
743 if target_nodes.is_empty() {
744 task.status = TaskStatus::Failed;
745 self.metrics.failed_replications += 1;
746 return Err(anyhow::anyhow!("No suitable target nodes found"));
747 }
748
749 task.target_nodes = target_nodes;
750 task.status = TaskStatus::Executing;
751 task.progress.stage = ReplicationStage::Transferring;
752
753 // Execute the actual replication
754 self.perform_replication(&task_id).await?;
755
756 Ok(())
757 }
758
759 /// Select target nodes for replication
760 async fn select_target_nodes(&self, chunk_id: &str, content_type: &ContentType) -> Result<Vec<String>> {
761 // Simplified node selection - in real implementation would use
762 // the intelligent replication manager
763 let available_nodes: Vec<String> = self.node_status
764 .iter()
765 .filter(|(_, status)| matches!(status.status, NodeState::Online))
766 .map(|(node_id, _)| node_id.clone())
767 .take(2) // Select 2 replacement nodes
768 .collect();
769
770 Ok(available_nodes)
771 }
772
773 /// Perform the actual replication
774 async fn perform_replication(&mut self, task_id: &str) -> Result<()> {
775 let task = self.active_tasks.get_mut(task_id)
776 .ok_or_else(|| anyhow::anyhow!("Task not found"))?;
777
778 // Simulate replication process
779 task.progress.current_operation = "Transferring data".to_string();
780 task.progress.total_bytes = 1_000_000_000; // 1GB example
781
782 // Simulate transfer progress
783 for i in 0..=10 {
784 task.progress.percentage_complete = i as f64 * 10.0;
785 task.progress.bytes_transferred = (task.progress.total_bytes as f64 * (i as f64 / 10.0)) as u64;
786 task.progress.current_operation = format!("Transferring chunk data ({:.0}%)", task.progress.percentage_complete);
787
788 // Simulate transfer time
789 sleep(TokioDuration::from_millis(100)).await;
790 }
791
792 // Verification stage
793 task.progress.stage = ReplicationStage::Verifying;
794 task.progress.current_operation = "Verifying replica integrity".to_string();
795 sleep(TokioDuration::from_millis(200)).await;
796
797 // Finalization
798 task.progress.stage = ReplicationStage::Finalizing;
799 task.progress.current_operation = "Finalizing replication".to_string();
800 task.status = TaskStatus::Completed;
801 task.completed_at = Some(Utc::now());
802 task.progress.percentage_complete = 100.0;
803
804 // Update metrics
805 self.metrics.successful_replications += 1;
806 if let Some(started_at) = task.started_at {
807 let duration = (Utc::now() - started_at).num_seconds() as f64;
808 self.metrics.average_replication_time_seconds =
809 (self.metrics.average_replication_time_seconds * (self.metrics.successful_replications - 1) as f64 + duration)
810 / self.metrics.successful_replications as f64;
811 }
812
813 tracing::info!("Replication task {} completed successfully", task_id);
814
815 Ok(())
816 }
817
818 /// Check other trigger conditions beyond node failures
819 async fn check_other_trigger_conditions(&mut self, chunk_health_data: &HashMap<String, ChunkHealth>) -> Result<()> {
820 for (chunk_id, chunk_health) in chunk_health_data {
821 // Check health score thresholds
822 if chunk_health.availability_score < 70.0 {
823 match chunk_health.overall_health {
824 HealthStatus::Critical | HealthStatus::Failed => {
825 if !self.active_tasks.values().any(|task| task.chunk_id == *chunk_id) {
826 self.trigger_replication(
827 chunk_id.clone(),
828 chunk_health,
829 &[], // No specific failed nodes
830 ).await?;
831 }
832 },
833 _ => {}
834 }
835 }
836
837 // Check geographic distribution
838 let regions: HashSet<_> = chunk_health.replica_health
839 .iter()
840 .filter(|replica| matches!(replica.status, ReplicaStatus::Healthy))
841 .map(|replica| &replica.region)
842 .collect();
843
844 if regions.len() < 2 {
845 // Consider triggering replication for better geographic distribution
846 tracing::warn!("Chunk {} has poor geographic distribution ({} regions)", chunk_id, regions.len());
847 }
848 }
849
850 Ok(())
851 }
852
853 /// Clean up completed tasks
854 pub fn cleanup_completed_tasks(&mut self) {
855 let cutoff_time = Utc::now() - Duration::hours(24); // Keep completed tasks for 24 hours
856
857 self.active_tasks.retain(|_, task| {
858 if matches!(task.status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled) {
859 if let Some(completed_at) = task.completed_at {
860 completed_at > cutoff_time
861 } else {
862 task.created_at > cutoff_time
863 }
864 } else {
865 true // Keep active tasks
866 }
867 });
868 }
869
870 /// Get replication status summary
871 pub fn get_replication_status(&self) -> ReplicationStatus {
872 let active_count = self.active_tasks.values()
873 .filter(|task| matches!(task.status, TaskStatus::Queued | TaskStatus::Executing))
874 .count();
875
876 let completed_count = self.active_tasks.values()
877 .filter(|task| matches!(task.status, TaskStatus::Completed))
878 .count();
879
880 let failed_count = self.active_tasks.values()
881 .filter(|task| matches!(task.status, TaskStatus::Failed))
882 .count();
883
884 ReplicationStatus {
885 active_replications: active_count as u32,
886 completed_replications: completed_count as u32,
887 failed_replications: failed_count as u32,
888 total_nodes_online: self.node_status.values()
889 .filter(|status| matches!(status.status, NodeState::Online))
890 .count() as u32,
891 total_nodes_offline: self.node_status.values()
892 .filter(|status| matches!(status.status, NodeState::Offline))
893 .count() as u32,
894 average_replication_time: self.metrics.average_replication_time_seconds,
895 total_data_replicated: self.metrics.total_data_recovered_gb,
896 metrics: self.metrics.clone(),
897 }
898 }
899
900 /// Run continuous monitoring and auto-replication
901 pub async fn run_auto_replication_loop(&mut self, chunk_health_data: HashMap<String, ChunkHealth>) -> Result<()> {
902 let mut check_interval = tokio::time::interval(TokioDuration::from_secs(
903 self.failure_detection.heartbeat_interval_seconds as u64
904 ));
905
906 loop {
907 check_interval.tick().await;
908
909 // Detect failures and trigger replication
910 if let Err(e) = self.detect_failures_and_replicate(&chunk_health_data).await {
911 tracing::error!("Auto-replication check failed: {}", e);
912 }
913
914 // Clean up old tasks
915 self.cleanup_completed_tasks();
916
917 // Update metrics
918 self.metrics.last_updated = Utc::now();
919
920 tracing::debug!("Auto-replication check complete. Active tasks: {}", self.active_tasks.len());
921 }
922 }
923 }
924
925 #[derive(Debug, Clone, Serialize, Deserialize)]
926 pub struct ReplicationStatus {
927 pub active_replications: u32,
928 pub completed_replications: u32,
929 pub failed_replications: u32,
930 pub total_nodes_online: u32,
931 pub total_nodes_offline: u32,
932 pub average_replication_time: f64,
933 pub total_data_replicated: u64,
934 pub metrics: AutoReplicationMetrics,
935 }
936
937 #[cfg(test)]
938 mod tests {
939 use super::*;
940
941 #[test]
942 fn test_auto_replication_manager_creation() {
943 let manager = AutoReplicationManager::new();
944 assert!(!manager.replication_policies.is_empty());
945 assert!(manager.failure_detection.heartbeat_interval_seconds > 0);
946 assert!(manager.emergency_config.enable_emergency_mode);
947 }
948
949 #[test]
950 fn test_node_status_updates() {
951 let mut manager = AutoReplicationManager::new();
952
953 // Add node as online
954 manager.update_node_status("node1".to_string(), NodeState::Online);
955 assert_eq!(manager.node_status.len(), 1);
956
957 // Update to offline
958 manager.update_node_status("node1".to_string(), NodeState::Offline);
959 let status = manager.node_status.get("node1").unwrap();
960 assert_eq!(status.consecutive_failures, 1);
961 assert!(!status.failure_history.is_empty());
962
963 // Back to online
964 manager.update_node_status("node1".to_string(), NodeState::Online);
965 let status = manager.node_status.get("node1").unwrap();
966 assert_eq!(status.consecutive_failures, 0);
967 }
968
969 #[test]
970 fn test_failure_detection() {
971 let mut manager = AutoReplicationManager::new();
972
973 // Add some nodes
974 manager.update_node_status("node1".to_string(), NodeState::Online);
975 manager.update_node_status("node2".to_string(), NodeState::Offline);
976 manager.update_node_status("node3".to_string(), NodeState::Unknown);
977
978 let failed_nodes = manager.detect_failed_nodes();
979 assert!(failed_nodes.contains(&"node2".to_string()));
980 assert!(failed_nodes.contains(&"node3".to_string()));
981 assert!(!failed_nodes.contains(&"node1".to_string()));
982 }
983
984 #[tokio::test]
985 async fn test_replication_trigger() {
986 let mut manager = AutoReplicationManager::new();
987
988 // Create mock chunk health with failed replica
989 let chunk_health = ChunkHealth {
990 chunk_id: "test_chunk".to_string(),
991 overall_health: HealthStatus::Critical,
992 replica_health: vec![
993 ReplicaHealth {
994 replica_id: "replica1".to_string(),
995 node_id: "failed_node".to_string(),
996 region: GeographicRegion::NorthAmerica,
997 status: ReplicaStatus::Unreachable,
998 health_score: 0.0,
999 last_accessed: Utc::now(),
1000 last_verified: Utc::now(),
1001 integrity_hash: "hash1".to_string(),
1002 performance_metrics: super::super::health_monitor::ReplicaPerformanceMetrics {
1003 response_time_ms: 0.0,
1004 transfer_speed_mbps: 0.0,
1005 success_rate: 0.0,
1006 error_count: 0,
1007 last_error: None,
1008 uptime_percentage: 0.0,
1009 },
1010 connectivity_status: super::super::health_monitor::ConnectivityStatus::Offline,
1011 },
1012 ],
1013 integrity_status: super::super::health_monitor::IntegrityStatus::Unknown,
1014 availability_score: 30.0,
1015 durability_score: 30.0,
1016 performance_metrics: super::super::health_monitor::ChunkPerformanceMetrics {
1017 avg_response_time_ms: 0.0,
1018 success_rate: 0.0,
1019 throughput_mbps: 0.0,
1020 error_rate: 100.0,
1021 access_frequency: super::super::health_monitor::AccessFrequency::Low,
1022 bandwidth_utilization: 0.0,
1023 },
1024 last_verified: Utc::now(),
1025 next_check_due: Utc::now(),
1026 risk_factors: vec![],
1027 repair_history: vec![],
1028 };
1029
1030 let failed_nodes = vec!["failed_node".to_string()];
1031
1032 let task_id = manager.trigger_replication(
1033 "test_chunk".to_string(),
1034 &chunk_health,
1035 &failed_nodes,
1036 ).await.unwrap();
1037
1038 assert!(!task_id.is_empty());
1039 assert!(manager.active_tasks.contains_key(&task_id));
1040
1041 let task = manager.active_tasks.get(&task_id).unwrap();
1042 assert_eq!(task.chunk_id, "test_chunk");
1043 assert!(matches!(task.trigger_reason, TriggerReason::NodeFailure { .. }));
1044 }
1045 }