Rust · 36447 bytes Raw Blame History
1 //! Intelligent Replication Strategy
2 //!
3 //! Adaptive redundancy system that optimizes data durability based on content importance,
4 //! network conditions, and cost considerations
5
6 use anyhow::Result;
7 use serde::{Deserialize, Serialize};
8 use std::collections::{HashMap, BTreeMap};
9 use chrono::{DateTime, Utc, Duration};
10
11 use crate::economics::{NetworkHealthMetrics, VolunteerMetrics, ContributionTracker};
12 use crate::economics::earnings_calculator::GeographicRegion;
13 use super::contribution_node_selector::{ContributionNodeSelector, NodeSelectionCriteria, NodeSelectionResult};
14
15 /// Intelligent replication manager
16 #[derive(Debug, Clone, Serialize, Deserialize)]
17 pub struct IntelligentReplicationManager {
18 /// Replication policies for different content types
19 pub policies: HashMap<ContentType, ReplicationPolicy>,
20 /// Current replication state tracking
21 pub replication_state: HashMap<String, ChunkReplicationState>,
22 /// Node performance tracking for replication decisions
23 pub node_performance: HashMap<String, NodePerformanceProfile>,
24 /// Geographic distribution requirements
25 pub geo_distribution: GeographicDistributionConfig,
26 /// Adaptive redundancy configuration
27 pub adaptive_config: AdaptiveRedundancyConfig,
28 /// Cost optimization configuration
29 pub cost_config: CostOptimizationConfig,
30 /// Contribution-based node selector
31 pub node_selector: ContributionNodeSelector,
32 }
33
34 #[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
35 pub enum ContentType {
36 Critical, // System-critical data
37 Important, // User-important files
38 Standard, // Regular user files
39 Archive, // Long-term storage
40 Temporary, // Short-term cache
41 Backup, // Backup copies
42 }
43
44 #[derive(Debug, Clone, Serialize, Deserialize)]
45 pub struct ReplicationPolicy {
46 pub content_type: ContentType,
47 pub min_replicas: u32,
48 pub max_replicas: u32,
49 pub target_replicas: u32,
50 pub geographic_spread: GeographicSpread,
51 pub node_quality_requirements: NodeQualityRequirements,
52 pub redundancy_scheme: RedundancyScheme,
53 pub replication_priority: ReplicationPriority,
54 pub cost_sensitivity: f64, // 0.0 = cost-insensitive, 1.0 = highly cost-sensitive
55 }
56
57 #[derive(Debug, Clone, Serialize, Deserialize)]
58 pub enum GeographicSpread {
59 SingleRegion,
60 MultiRegion(u32), // Minimum number of regions
61 GlobalDistribution, // Maximum geographic spread
62 RegionSpecific(Vec<GeographicRegion>), // Specific regions required
63 }
64
65 #[derive(Debug, Clone, Serialize, Deserialize)]
66 pub struct NodeQualityRequirements {
67 pub min_uptime_percentage: f64,
68 pub min_bandwidth_mbps: f64,
69 pub max_latency_ms: u64,
70 pub min_reliability_score: f64,
71 pub required_connection_quality: Option<ConnectionQuality>,
72 pub exclude_unstable_nodes: bool,
73 pub prefer_premium_nodes: bool,
74 }
75
76 #[derive(Debug, Clone, Serialize, Deserialize)]
77 pub enum RedundancyScheme {
78 SimpleReplication, // Basic copying
79 ReedSolomon { data: u32, parity: u32 }, // (n,k) Reed-Solomon
80 HybridErasure { replicas: u32, erasure: (u32, u32) }, // Combination
81 }
82
83 #[derive(Debug, Clone, Serialize, Deserialize)]
84 pub enum ReplicationPriority {
85 Immediate,
86 High,
87 Normal,
88 Low,
89 Background,
90 }
91
92 #[derive(Debug, Clone, Serialize, Deserialize)]
93 pub enum ConnectionQuality {
94 Fiber,
95 Broadband,
96 Mobile,
97 Satellite,
98 }
99
100 #[derive(Debug, Clone, Serialize, Deserialize)]
101 pub struct ChunkReplicationState {
102 pub chunk_id: String,
103 pub content_type: ContentType,
104 pub current_replicas: Vec<ReplicaLocation>,
105 pub target_replicas: u32,
106 pub health_score: f64,
107 pub last_verification: DateTime<Utc>,
108 pub replication_status: ReplicationStatus,
109 pub repair_history: Vec<RepairEvent>,
110 pub access_patterns: AccessPatterns,
111 }
112
113 #[derive(Debug, Clone, Serialize, Deserialize)]
114 pub struct ReplicaLocation {
115 pub node_id: String,
116 pub region: GeographicRegion,
117 pub quality_score: f64,
118 pub created_at: DateTime<Utc>,
119 pub last_verified: DateTime<Utc>,
120 pub status: ReplicaStatus,
121 pub performance_metrics: ReplicaPerformance,
122 }
123
124 #[derive(Debug, Clone, Serialize, Deserialize)]
125 pub enum ReplicaStatus {
126 Healthy,
127 Degraded,
128 Unreachable,
129 Corrupted,
130 Missing,
131 }
132
133 #[derive(Debug, Clone, Serialize, Deserialize)]
134 pub struct ReplicaPerformance {
135 pub response_time_ms: u64,
136 pub transfer_speed_mbps: f64,
137 pub success_rate: f64,
138 pub last_access: DateTime<Utc>,
139 }
140
141 #[derive(Debug, Clone, Serialize, Deserialize)]
142 pub enum ReplicationStatus {
143 Optimal, // Meets all requirements
144 Adequate, // Meets minimum requirements
145 Degraded, // Below minimum requirements
146 Critical, // Immediate action needed
147 Repairing, // Currently being repaired
148 }
149
150 #[derive(Debug, Clone, Serialize, Deserialize)]
151 pub struct RepairEvent {
152 pub event_id: String,
153 pub timestamp: DateTime<Utc>,
154 pub event_type: RepairEventType,
155 pub affected_replicas: Vec<String>,
156 pub repair_strategy: RepairStrategy,
157 pub success: bool,
158 pub duration_seconds: u64,
159 }
160
161 #[derive(Debug, Clone, Serialize, Deserialize)]
162 pub enum RepairEventType {
163 NodeFailure,
164 NetworkPartition,
165 CorruptionDetected,
166 PerformanceDegradation,
167 ScheduledMaintenance,
168 }
169
170 #[derive(Debug, Clone, Serialize, Deserialize)]
171 pub enum RepairStrategy {
172 CreateNewReplica,
173 RepairExistingReplica,
174 MigrateReplica,
175 IncreaseRedundancy,
176 RebuildFromErasure,
177 }
178
179 #[derive(Debug, Clone, Serialize, Deserialize)]
180 pub struct AccessPatterns {
181 pub access_frequency: AccessFrequency,
182 pub geographic_access: HashMap<GeographicRegion, u32>,
183 pub time_patterns: HashMap<u8, u32>, // Hour of day -> access count
184 pub last_access: DateTime<Utc>,
185 pub predicted_next_access: Option<DateTime<Utc>>,
186 }
187
188 #[derive(Debug, Clone, Serialize, Deserialize)]
189 pub enum AccessFrequency {
190 VeryHigh, // Multiple times per hour
191 High, // Multiple times per day
192 Medium, // Daily access
193 Low, // Weekly access
194 VeryLow, // Monthly access
195 Archive, // Rarely accessed
196 }
197
198 #[derive(Debug, Clone, Serialize, Deserialize)]
199 pub struct NodePerformanceProfile {
200 pub node_id: String,
201 pub region: GeographicRegion,
202 pub uptime_percentage: f64,
203 pub bandwidth_mbps: f64,
204 pub latency_ms: u64,
205 pub reliability_score: f64,
206 pub connection_quality: ConnectionQuality,
207 pub storage_capacity_gb: u64,
208 pub available_capacity_gb: u64,
209 pub cost_per_gb_month: f64,
210 pub performance_tier: PerformanceTier,
211 pub last_updated: DateTime<Utc>,
212 }
213
214 #[derive(Debug, Clone, Serialize, Deserialize)]
215 pub enum PerformanceTier {
216 Premium, // Top 10% performers
217 High, // Top 25% performers
218 Standard, // Average performers
219 Basic, // Below average
220 Unreliable, // Poor performers
221 }
222
223 #[derive(Debug, Clone, Serialize, Deserialize)]
224 pub struct GeographicDistributionConfig {
225 pub min_regions_per_chunk: u32,
226 pub preferred_regions: Vec<GeographicRegion>,
227 pub region_weights: HashMap<GeographicRegion, f64>,
228 pub latency_requirements: HashMap<GeographicRegion, u64>,
229 pub regulatory_constraints: HashMap<GeographicRegion, Vec<String>>,
230 }
231
232 #[derive(Debug, Clone, Serialize, Deserialize)]
233 pub struct AdaptiveRedundancyConfig {
234 pub enable_dynamic_adjustment: bool,
235 pub adjustment_frequency_hours: u32,
236 pub network_health_threshold: f64,
237 pub failure_rate_threshold: f64,
238 pub auto_scale_replicas: bool,
239 pub max_auto_replicas: u32,
240 pub min_auto_replicas: u32,
241 pub cost_efficiency_target: f64,
242 }
243
244 #[derive(Debug, Clone, Serialize, Deserialize)]
245 pub struct CostOptimizationConfig {
246 pub enable_cost_optimization: bool,
247 pub cost_efficiency_weight: f64,
248 pub durability_weight: f64,
249 pub performance_weight: f64,
250 pub max_cost_per_gb_month: f64,
251 pub prefer_cheaper_nodes: bool,
252 pub cost_monitoring_enabled: bool,
253 }
254
255 impl Default for ReplicationPolicy {
256 fn default() -> Self {
257 Self {
258 content_type: ContentType::Standard,
259 min_replicas: 3,
260 max_replicas: 10,
261 target_replicas: 5,
262 geographic_spread: GeographicSpread::MultiRegion(2),
263 node_quality_requirements: NodeQualityRequirements {
264 min_uptime_percentage: 95.0,
265 min_bandwidth_mbps: 10.0,
266 max_latency_ms: 200,
267 min_reliability_score: 90.0,
268 required_connection_quality: None,
269 exclude_unstable_nodes: true,
270 prefer_premium_nodes: false,
271 },
272 redundancy_scheme: RedundancyScheme::SimpleReplication,
273 replication_priority: ReplicationPriority::Normal,
274 cost_sensitivity: 0.3,
275 }
276 }
277 }
278
279 impl IntelligentReplicationManager {
280 /// Create new intelligent replication manager
281 pub fn new() -> Self {
282 let mut manager = Self {
283 policies: HashMap::new(),
284 replication_state: HashMap::new(),
285 node_performance: HashMap::new(),
286 geo_distribution: GeographicDistributionConfig {
287 min_regions_per_chunk: 2,
288 preferred_regions: vec![
289 GeographicRegion::NorthAmerica,
290 GeographicRegion::Europe,
291 GeographicRegion::Asia,
292 ],
293 region_weights: HashMap::new(),
294 latency_requirements: HashMap::new(),
295 regulatory_constraints: HashMap::new(),
296 },
297 adaptive_config: AdaptiveRedundancyConfig {
298 enable_dynamic_adjustment: true,
299 adjustment_frequency_hours: 6,
300 network_health_threshold: 95.0,
301 failure_rate_threshold: 0.01,
302 auto_scale_replicas: true,
303 max_auto_replicas: 15,
304 min_auto_replicas: 3,
305 cost_efficiency_target: 0.8,
306 },
307 cost_config: CostOptimizationConfig {
308 enable_cost_optimization: true,
309 cost_efficiency_weight: 0.3,
310 durability_weight: 0.5,
311 performance_weight: 0.2,
312 max_cost_per_gb_month: 0.05,
313 prefer_cheaper_nodes: false,
314 cost_monitoring_enabled: true,
315 },
316 };
317
318 manager.initialize_default_policies();
319 manager
320 }
321
322 /// Initialize default replication policies
323 fn initialize_default_policies(&mut self) {
324 // Critical content policy
325 self.policies.insert(ContentType::Critical, ReplicationPolicy {
326 content_type: ContentType::Critical,
327 min_replicas: 5,
328 max_replicas: 15,
329 target_replicas: 8,
330 geographic_spread: GeographicSpread::GlobalDistribution,
331 node_quality_requirements: NodeQualityRequirements {
332 min_uptime_percentage: 99.0,
333 min_bandwidth_mbps: 50.0,
334 max_latency_ms: 100,
335 min_reliability_score: 95.0,
336 required_connection_quality: Some(ConnectionQuality::Fiber),
337 exclude_unstable_nodes: true,
338 prefer_premium_nodes: true,
339 },
340 redundancy_scheme: RedundancyScheme::ReedSolomon { data: 6, parity: 3 },
341 replication_priority: ReplicationPriority::Immediate,
342 cost_sensitivity: 0.1, // Low cost sensitivity for critical data
343 });
344
345 // Important content policy
346 self.policies.insert(ContentType::Important, ReplicationPolicy {
347 content_type: ContentType::Important,
348 min_replicas: 4,
349 max_replicas: 10,
350 target_replicas: 6,
351 geographic_spread: GeographicSpread::MultiRegion(3),
352 node_quality_requirements: NodeQualityRequirements {
353 min_uptime_percentage: 97.0,
354 min_bandwidth_mbps: 25.0,
355 max_latency_ms: 150,
356 min_reliability_score: 92.0,
357 required_connection_quality: None,
358 exclude_unstable_nodes: true,
359 prefer_premium_nodes: true,
360 },
361 redundancy_scheme: RedundancyScheme::ReedSolomon { data: 4, parity: 2 },
362 replication_priority: ReplicationPriority::High,
363 cost_sensitivity: 0.2,
364 });
365
366 // Standard content policy
367 self.policies.insert(ContentType::Standard, ReplicationPolicy::default());
368
369 // Archive content policy
370 self.policies.insert(ContentType::Archive, ReplicationPolicy {
371 content_type: ContentType::Archive,
372 min_replicas: 3,
373 max_replicas: 8,
374 target_replicas: 4,
375 geographic_spread: GeographicSpread::MultiRegion(2),
376 node_quality_requirements: NodeQualityRequirements {
377 min_uptime_percentage: 90.0,
378 min_bandwidth_mbps: 5.0,
379 max_latency_ms: 500,
380 min_reliability_score: 85.0,
381 required_connection_quality: None,
382 exclude_unstable_nodes: false,
383 prefer_premium_nodes: false,
384 },
385 redundancy_scheme: RedundancyScheme::ReedSolomon { data: 3, parity: 2 },
386 replication_priority: ReplicationPriority::Background,
387 cost_sensitivity: 0.8, // High cost sensitivity for archive data
388 });
389
390 // Temporary content policy
391 self.policies.insert(ContentType::Temporary, ReplicationPolicy {
392 content_type: ContentType::Temporary,
393 min_replicas: 2,
394 max_replicas: 4,
395 target_replicas: 3,
396 geographic_spread: GeographicSpread::SingleRegion,
397 node_quality_requirements: NodeQualityRequirements {
398 min_uptime_percentage: 85.0,
399 min_bandwidth_mbps: 10.0,
400 max_latency_ms: 300,
401 min_reliability_score: 80.0,
402 required_connection_quality: None,
403 exclude_unstable_nodes: false,
404 prefer_premium_nodes: false,
405 },
406 redundancy_scheme: RedundancyScheme::SimpleReplication,
407 replication_priority: ReplicationPriority::Low,
408 cost_sensitivity: 1.0, // Maximum cost sensitivity
409 });
410 }
411
412 /// Determine optimal replication strategy for a chunk
413 pub fn determine_replication_strategy(
414 &self,
415 chunk_id: &str,
416 content_type: ContentType,
417 access_patterns: &AccessPatterns,
418 network_health: &NetworkHealthMetrics,
419 ) -> Result<ReplicationStrategy> {
420 let policy = self.policies.get(&content_type)
421 .ok_or_else(|| anyhow::anyhow!("No policy found for content type: {:?}", content_type))?;
422
423 // Calculate base replication requirements
424 let mut target_replicas = policy.target_replicas;
425
426 // Adjust based on access patterns
427 target_replicas = self.adjust_for_access_patterns(target_replicas, access_patterns);
428
429 // Adjust based on network health
430 target_replicas = self.adjust_for_network_health(target_replicas, network_health);
431
432 // Ensure within policy bounds
433 target_replicas = target_replicas.max(policy.min_replicas).min(policy.max_replicas);
434
435 // Select optimal nodes
436 let selected_nodes = self.select_optimal_nodes(
437 target_replicas,
438 &policy.node_quality_requirements,
439 &policy.geographic_spread,
440 policy.cost_sensitivity,
441 )?;
442
443 Ok(ReplicationStrategy {
444 chunk_id: chunk_id.to_string(),
445 content_type,
446 target_replicas,
447 selected_nodes,
448 redundancy_scheme: policy.redundancy_scheme.clone(),
449 priority: policy.replication_priority.clone(),
450 estimated_cost: self.calculate_replication_cost(&selected_nodes),
451 durability_score: self.calculate_durability_score(&selected_nodes, &policy.redundancy_scheme),
452 })
453 }
454
455 /// Adjust replica count based on access patterns
456 fn adjust_for_access_patterns(&self, base_replicas: u32, access_patterns: &AccessPatterns) -> u32 {
457 let frequency_multiplier = match access_patterns.access_frequency {
458 AccessFrequency::VeryHigh => 1.5,
459 AccessFrequency::High => 1.2,
460 AccessFrequency::Medium => 1.0,
461 AccessFrequency::Low => 0.9,
462 AccessFrequency::VeryLow => 0.8,
463 AccessFrequency::Archive => 0.7,
464 };
465
466 // Geographic access diversity bonus
467 let geo_diversity_bonus = if access_patterns.geographic_access.len() > 2 {
468 1.1
469 } else {
470 1.0
471 };
472
473 ((base_replicas as f64) * frequency_multiplier * geo_diversity_bonus) as u32
474 }
475
476 /// Adjust replica count based on network health
477 fn adjust_for_network_health(&self, base_replicas: u32, network_health: &NetworkHealthMetrics) -> u32 {
478 let health_factor = if network_health.average_uptime < 90.0 {
479 1.3 // Increase replicas for poor network health
480 } else if network_health.average_uptime < 95.0 {
481 1.1
482 } else {
483 1.0 // Normal replication for healthy network
484 };
485
486 // Adjust for utilization pressure
487 let utilization_factor = if network_health.utilization_rate > 90.0 {
488 0.9 // Reduce replicas under high utilization
489 } else {
490 1.0
491 };
492
493 ((base_replicas as f64) * health_factor * utilization_factor) as u32
494 }
495
496 /// Select optimal nodes for replication
497 fn select_optimal_nodes(
498 &self,
499 target_replicas: u32,
500 quality_requirements: &NodeQualityRequirements,
501 geographic_spread: &GeographicSpread,
502 cost_sensitivity: f64,
503 ) -> Result<Vec<String>> {
504 // Filter nodes by quality requirements
505 let eligible_nodes: Vec<_> = self.node_performance
506 .values()
507 .filter(|node| self.meets_quality_requirements(node, quality_requirements))
508 .collect();
509
510 if eligible_nodes.is_empty() {
511 return Err(anyhow::anyhow!("No nodes meet quality requirements"));
512 }
513
514 // Group by region for geographic distribution
515 let nodes_by_region = self.group_nodes_by_region(&eligible_nodes);
516
517 // Select nodes based on geographic spread requirements
518 let selected_nodes = self.apply_geographic_selection(
519 nodes_by_region,
520 target_replicas,
521 geographic_spread,
522 cost_sensitivity,
523 )?;
524
525 Ok(selected_nodes)
526 }
527
528 /// Check if node meets quality requirements
529 fn meets_quality_requirements(
530 &self,
531 node: &NodePerformanceProfile,
532 requirements: &NodeQualityRequirements,
533 ) -> bool {
534 node.uptime_percentage >= requirements.min_uptime_percentage
535 && node.bandwidth_mbps >= requirements.min_bandwidth_mbps
536 && node.latency_ms <= requirements.max_latency_ms
537 && node.reliability_score >= requirements.min_reliability_score
538 && node.available_capacity_gb > 0
539 && (!requirements.exclude_unstable_nodes || !matches!(node.performance_tier, PerformanceTier::Unreliable))
540 && requirements.required_connection_quality.as_ref()
541 .map_or(true, |required| self.connection_quality_matches(&node.connection_quality, required))
542 }
543
544 /// Check if connection quality matches requirement
545 fn connection_quality_matches(&self, actual: &ConnectionQuality, required: &ConnectionQuality) -> bool {
546 match (actual, required) {
547 (ConnectionQuality::Fiber, _) => true,
548 (ConnectionQuality::Broadband, ConnectionQuality::Fiber) => false,
549 (ConnectionQuality::Broadband, _) => true,
550 (ConnectionQuality::Mobile, ConnectionQuality::Fiber | ConnectionQuality::Broadband) => false,
551 (ConnectionQuality::Mobile, _) => true,
552 (ConnectionQuality::Satellite, ConnectionQuality::Satellite) => true,
553 (ConnectionQuality::Satellite, _) => false,
554 }
555 }
556
557 /// Group nodes by geographic region
558 fn group_nodes_by_region(
559 &self,
560 nodes: &[&NodePerformanceProfile],
561 ) -> HashMap<GeographicRegion, Vec<&NodePerformanceProfile>> {
562 let mut grouped = HashMap::new();
563
564 for node in nodes {
565 grouped.entry(node.region.clone())
566 .or_insert_with(Vec::new)
567 .push(*node);
568 }
569
570 // Sort nodes within each region by performance score
571 for region_nodes in grouped.values_mut() {
572 region_nodes.sort_by(|a, b| {
573 let score_a = self.calculate_node_score(a, 0.3); // Default cost sensitivity
574 let score_b = self.calculate_node_score(b, 0.3);
575 score_b.partial_cmp(&score_a).unwrap_or(std::cmp::Ordering::Equal)
576 });
577 }
578
579 grouped
580 }
581
582 /// Apply geographic selection strategy
583 fn apply_geographic_selection(
584 &self,
585 nodes_by_region: HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>,
586 target_replicas: u32,
587 geographic_spread: &GeographicSpread,
588 cost_sensitivity: f64,
589 ) -> Result<Vec<String>> {
590 let mut selected_nodes = Vec::new();
591
592 match geographic_spread {
593 GeographicSpread::SingleRegion => {
594 // Select all from the best region
595 let best_region = self.find_best_region(&nodes_by_region, cost_sensitivity)?;
596 if let Some(region_nodes) = nodes_by_region.get(&best_region) {
597 for node in region_nodes.iter().take(target_replicas as usize) {
598 selected_nodes.push(node.node_id.clone());
599 }
600 }
601 },
602 GeographicSpread::MultiRegion(min_regions) => {
603 selected_nodes = self.select_multi_region_nodes(
604 &nodes_by_region,
605 target_replicas,
606 *min_regions,
607 cost_sensitivity,
608 )?;
609 },
610 GeographicSpread::GlobalDistribution => {
611 selected_nodes = self.select_global_distribution_nodes(
612 &nodes_by_region,
613 target_replicas,
614 cost_sensitivity,
615 )?;
616 },
617 GeographicSpread::RegionSpecific(required_regions) => {
618 selected_nodes = self.select_region_specific_nodes(
619 &nodes_by_region,
620 target_replicas,
621 required_regions,
622 cost_sensitivity,
623 )?;
624 },
625 }
626
627 if selected_nodes.len() < target_replicas as usize {
628 tracing::warn!("Could only select {} nodes out of {} requested",
629 selected_nodes.len(), target_replicas);
630 }
631
632 Ok(selected_nodes)
633 }
634
635 /// Find the best region based on cost and performance
636 fn find_best_region(
637 &self,
638 nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>,
639 cost_sensitivity: f64,
640 ) -> Result<GeographicRegion> {
641 let mut best_region = None;
642 let mut best_score = 0.0;
643
644 for (region, nodes) in nodes_by_region {
645 if nodes.is_empty() {
646 continue;
647 }
648
649 let avg_score = nodes.iter()
650 .map(|node| self.calculate_node_score(node, cost_sensitivity))
651 .sum::<f64>() / nodes.len() as f64;
652
653 if avg_score > best_score {
654 best_score = avg_score;
655 best_region = Some(region.clone());
656 }
657 }
658
659 best_region.ok_or_else(|| anyhow::anyhow!("No suitable region found"))
660 }
661
662 /// Select nodes across multiple regions
663 fn select_multi_region_nodes(
664 &self,
665 nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>,
666 target_replicas: u32,
667 min_regions: u32,
668 cost_sensitivity: f64,
669 ) -> Result<Vec<String>> {
670 let mut selected_nodes = Vec::new();
671 let available_regions: Vec<_> = nodes_by_region.keys()
672 .filter(|region| !nodes_by_region[*region].is_empty())
673 .collect();
674
675 if available_regions.len() < min_regions as usize {
676 return Err(anyhow::anyhow!("Not enough regions available: {} < {}",
677 available_regions.len(), min_regions));
678 }
679
680 // Calculate replicas per region
681 let replicas_per_region = target_replicas / min_regions;
682 let extra_replicas = target_replicas % min_regions;
683
684 // Sort regions by quality
685 let mut sorted_regions = available_regions;
686 sorted_regions.sort_by(|a, b| {
687 let score_a = self.calculate_region_score(nodes_by_region[*a].as_slice(), cost_sensitivity);
688 let score_b = self.calculate_region_score(nodes_by_region[*b].as_slice(), cost_sensitivity);
689 score_b.partial_cmp(&score_a).unwrap_or(std::cmp::Ordering::Equal)
690 });
691
692 // Select nodes from each region
693 for (i, region) in sorted_regions.iter().take(min_regions as usize).enumerate() {
694 let region_nodes = &nodes_by_region[*region];
695 let region_replicas = replicas_per_region + if i < extra_replicas as usize { 1 } else { 0 };
696
697 for node in region_nodes.iter().take(region_replicas as usize) {
698 selected_nodes.push(node.node_id.clone());
699 }
700 }
701
702 Ok(selected_nodes)
703 }
704
705 /// Select nodes for global distribution
706 fn select_global_distribution_nodes(
707 &self,
708 nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>,
709 target_replicas: u32,
710 cost_sensitivity: f64,
711 ) -> Result<Vec<String>> {
712 // Try to distribute across all available regions
713 let available_regions = nodes_by_region.len() as u32;
714 self.select_multi_region_nodes(nodes_by_region, target_replicas, available_regions, cost_sensitivity)
715 }
716
717 /// Select nodes from specific regions
718 fn select_region_specific_nodes(
719 &self,
720 nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>,
721 target_replicas: u32,
722 required_regions: &[GeographicRegion],
723 cost_sensitivity: f64,
724 ) -> Result<Vec<String>> {
725 let mut selected_nodes = Vec::new();
726 let replicas_per_region = target_replicas / required_regions.len() as u32;
727 let extra_replicas = target_replicas % required_regions.len() as u32;
728
729 for (i, region) in required_regions.iter().enumerate() {
730 if let Some(region_nodes) = nodes_by_region.get(region) {
731 let region_replicas = replicas_per_region + if i < extra_replicas as usize { 1 } else { 0 };
732
733 for node in region_nodes.iter().take(region_replicas as usize) {
734 selected_nodes.push(node.node_id.clone());
735 }
736 }
737 }
738
739 Ok(selected_nodes)
740 }
741
742 /// Calculate node performance score
743 fn calculate_node_score(&self, node: &NodePerformanceProfile, cost_sensitivity: f64) -> f64 {
744 let performance_score = (node.uptime_percentage / 100.0)
745 * (node.reliability_score / 100.0)
746 * (node.bandwidth_mbps / 100.0).min(1.0)
747 * (200.0 / (node.latency_ms as f64 + 50.0));
748
749 let cost_score = if cost_sensitivity > 0.0 {
750 1.0 / (node.cost_per_gb_month + 0.01) // Avoid division by zero
751 } else {
752 1.0
753 };
754
755 // Weighted combination
756 performance_score * (1.0 - cost_sensitivity) + cost_score * cost_sensitivity
757 }
758
759 /// Calculate region quality score
760 fn calculate_region_score(&self, nodes: &[&NodePerformanceProfile], cost_sensitivity: f64) -> f64 {
761 if nodes.is_empty() {
762 return 0.0;
763 }
764
765 nodes.iter()
766 .map(|node| self.calculate_node_score(node, cost_sensitivity))
767 .sum::<f64>() / nodes.len() as f64
768 }
769
770 /// Calculate estimated replication cost
771 fn calculate_replication_cost(&self, selected_nodes: &[String]) -> f64 {
772 selected_nodes.iter()
773 .filter_map(|node_id| self.node_performance.get(node_id))
774 .map(|node| node.cost_per_gb_month)
775 .sum()
776 }
777
778 /// Calculate expected durability score
779 fn calculate_durability_score(&self, selected_nodes: &[String], redundancy_scheme: &RedundancyScheme) -> f64 {
780 let avg_reliability = selected_nodes.iter()
781 .filter_map(|node_id| self.node_performance.get(node_id))
782 .map(|node| node.reliability_score / 100.0)
783 .sum::<f64>() / selected_nodes.len() as f64;
784
785 // Calculate durability based on redundancy scheme
786 match redundancy_scheme {
787 RedundancyScheme::SimpleReplication => {
788 // Simple calculation: 1 - (1 - reliability)^replicas
789 1.0 - (1.0 - avg_reliability).powf(selected_nodes.len() as f64)
790 },
791 RedundancyScheme::ReedSolomon { data, parity } => {
792 // Can survive up to 'parity' failures
793 let total_shards = data + parity;
794 let failure_tolerance = *parity as f64;
795
796 // Simplified calculation
797 let failure_prob = 1.0 - avg_reliability;
798 let survive_prob = (0..=failure_tolerance as u32)
799 .map(|failures| {
800 binomial_probability(total_shards, failures, failure_prob)
801 })
802 .sum::<f64>();
803
804 survive_prob
805 },
806 RedundancyScheme::HybridErasure { replicas, erasure } => {
807 // Combination of replication and erasure coding
808 let replication_durability = 1.0 - (1.0 - avg_reliability).powf(*replicas as f64);
809 let erasure_durability = self.calculate_durability_score(
810 selected_nodes,
811 &RedundancyScheme::ReedSolomon { data: erasure.0, parity: erasure.1 }
812 );
813
814 // Best of both
815 replication_durability.max(erasure_durability)
816 },
817 }
818 }
819
820 /// Update node performance profile
821 pub fn update_node_performance(&mut self, node_id: String, profile: NodePerformanceProfile) {
822 self.node_performance.insert(node_id, profile);
823 }
824
825 /// Update chunk replication state
826 pub fn update_chunk_state(&mut self, chunk_id: String, state: ChunkReplicationState) {
827 self.replication_state.insert(chunk_id, state);
828 }
829
830 /// Get replication recommendations for a chunk
831 pub fn get_replication_recommendations(&self, chunk_id: &str) -> Result<Vec<ReplicationRecommendation>> {
832 let state = self.replication_state.get(chunk_id)
833 .ok_or_else(|| anyhow::anyhow!("Chunk state not found"))?;
834
835 let mut recommendations = Vec::new();
836
837 // Check if we need more replicas
838 if state.current_replicas.len() < state.target_replicas as usize {
839 recommendations.push(ReplicationRecommendation {
840 recommendation_type: RecommendationType::IncreaseReplicas,
841 priority: ReplicationPriority::High,
842 estimated_cost: 0.02, // Placeholder
843 durability_impact: 0.15,
844 description: "Increase replica count to meet target".to_string(),
845 });
846 }
847
848 // Check for unhealthy replicas
849 let unhealthy_replicas: Vec<_> = state.current_replicas.iter()
850 .filter(|replica| !matches!(replica.status, ReplicaStatus::Healthy))
851 .collect();
852
853 if !unhealthy_replicas.is_empty() {
854 recommendations.push(ReplicationRecommendation {
855 recommendation_type: RecommendationType::RepairReplicas,
856 priority: ReplicationPriority::Immediate,
857 estimated_cost: 0.05,
858 durability_impact: 0.25,
859 description: format!("Repair {} unhealthy replicas", unhealthy_replicas.len()),
860 });
861 }
862
863 // Check geographic distribution
864 let regions: std::collections::HashSet<_> = state.current_replicas.iter()
865 .map(|replica| &replica.region)
866 .collect();
867
868 if regions.len() < self.geo_distribution.min_regions_per_chunk as usize {
869 recommendations.push(ReplicationRecommendation {
870 recommendation_type: RecommendationType::ImproveGeographicDistribution,
871 priority: ReplicationPriority::Normal,
872 estimated_cost: 0.03,
873 durability_impact: 0.10,
874 description: "Improve geographic distribution".to_string(),
875 });
876 }
877
878 Ok(recommendations)
879 }
880 }
881
882 #[derive(Debug, Clone, Serialize, Deserialize)]
883 pub struct ReplicationStrategy {
884 pub chunk_id: String,
885 pub content_type: ContentType,
886 pub target_replicas: u32,
887 pub selected_nodes: Vec<String>,
888 pub redundancy_scheme: RedundancyScheme,
889 pub priority: ReplicationPriority,
890 pub estimated_cost: f64,
891 pub durability_score: f64,
892 }
893
894 #[derive(Debug, Clone, Serialize, Deserialize)]
895 pub struct ReplicationRecommendation {
896 pub recommendation_type: RecommendationType,
897 pub priority: ReplicationPriority,
898 pub estimated_cost: f64,
899 pub durability_impact: f64,
900 pub description: String,
901 }
902
903 #[derive(Debug, Clone, Serialize, Deserialize)]
904 pub enum RecommendationType {
905 IncreaseReplicas,
906 DecreaseReplicas,
907 RepairReplicas,
908 MigrateReplicas,
909 ImproveGeographicDistribution,
910 OptimizeCost,
911 UpgradeRedundancyScheme,
912 }
913
914 /// Calculate binomial probability
915 fn binomial_probability(n: u32, k: u32, p: f64) -> f64 {
916 if k > n {
917 return 0.0;
918 }
919
920 let combination = factorial(n) / (factorial(k) * factorial(n - k));
921 combination as f64 * p.powi(k as i32) * (1.0 - p).powi((n - k) as i32)
922 }
923
924 /// Calculate factorial (simplified for small numbers)
925 fn factorial(n: u32) -> u64 {
926 (1..=n as u64).product()
927 }
928
929 #[cfg(test)]
930 mod tests {
931 use super::*;
932
933 #[test]
934 fn test_replication_manager_creation() {
935 let manager = IntelligentReplicationManager::new();
936 assert!(!manager.policies.is_empty());
937 assert!(manager.policies.contains_key(&ContentType::Critical));
938 assert!(manager.policies.contains_key(&ContentType::Standard));
939 }
940
941 #[test]
942 fn test_node_quality_requirements() {
943 let manager = IntelligentReplicationManager::new();
944
945 let high_quality_node = NodePerformanceProfile {
946 node_id: "node1".to_string(),
947 region: GeographicRegion::NorthAmerica,
948 uptime_percentage: 99.5,
949 bandwidth_mbps: 100.0,
950 latency_ms: 50,
951 reliability_score: 98.0,
952 connection_quality: ConnectionQuality::Fiber,
953 storage_capacity_gb: 1000,
954 available_capacity_gb: 500,
955 cost_per_gb_month: 0.02,
956 performance_tier: PerformanceTier::Premium,
957 last_updated: Utc::now(),
958 };
959
960 let requirements = &manager.policies[&ContentType::Critical].node_quality_requirements;
961 assert!(manager.meets_quality_requirements(&high_quality_node, requirements));
962 }
963
964 #[test]
965 fn test_access_pattern_adjustment() {
966 let manager = IntelligentReplicationManager::new();
967
968 let high_access_patterns = AccessPatterns {
969 access_frequency: AccessFrequency::VeryHigh,
970 geographic_access: HashMap::from([
971 (GeographicRegion::NorthAmerica, 100),
972 (GeographicRegion::Europe, 50),
973 (GeographicRegion::Asia, 25),
974 ]),
975 time_patterns: HashMap::new(),
976 last_access: Utc::now(),
977 predicted_next_access: None,
978 };
979
980 let adjusted = manager.adjust_for_access_patterns(5, &high_access_patterns);
981 assert!(adjusted > 5); // Should increase for high-access content
982 }
983 }