| 1 |
//! Intelligent Replication Strategy |
| 2 |
//! |
| 3 |
//! Adaptive redundancy system that optimizes data durability based on content importance, |
| 4 |
//! network conditions, and cost considerations |
| 5 |
|
| 6 |
use anyhow::Result; |
| 7 |
use serde::{Deserialize, Serialize}; |
| 8 |
use std::collections::{HashMap, BTreeMap}; |
| 9 |
use chrono::{DateTime, Utc, Duration}; |
| 10 |
|
| 11 |
use crate::economics::{NetworkHealthMetrics, VolunteerMetrics, ContributionTracker}; |
| 12 |
use crate::economics::earnings_calculator::GeographicRegion; |
| 13 |
use super::contribution_node_selector::{ContributionNodeSelector, NodeSelectionCriteria, NodeSelectionResult}; |
| 14 |
|
| 15 |
/// Intelligent replication manager |
| 16 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 17 |
pub struct IntelligentReplicationManager { |
| 18 |
/// Replication policies for different content types |
| 19 |
pub policies: HashMap<ContentType, ReplicationPolicy>, |
| 20 |
/// Current replication state tracking |
| 21 |
pub replication_state: HashMap<String, ChunkReplicationState>, |
| 22 |
/// Node performance tracking for replication decisions |
| 23 |
pub node_performance: HashMap<String, NodePerformanceProfile>, |
| 24 |
/// Geographic distribution requirements |
| 25 |
pub geo_distribution: GeographicDistributionConfig, |
| 26 |
/// Adaptive redundancy configuration |
| 27 |
pub adaptive_config: AdaptiveRedundancyConfig, |
| 28 |
/// Cost optimization configuration |
| 29 |
pub cost_config: CostOptimizationConfig, |
| 30 |
/// Contribution-based node selector |
| 31 |
pub node_selector: ContributionNodeSelector, |
| 32 |
} |
| 33 |
|
| 34 |
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] |
| 35 |
pub enum ContentType { |
| 36 |
Critical, // System-critical data |
| 37 |
Important, // User-important files |
| 38 |
Standard, // Regular user files |
| 39 |
Archive, // Long-term storage |
| 40 |
Temporary, // Short-term cache |
| 41 |
Backup, // Backup copies |
| 42 |
} |
| 43 |
|
| 44 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 45 |
pub struct ReplicationPolicy { |
| 46 |
pub content_type: ContentType, |
| 47 |
pub min_replicas: u32, |
| 48 |
pub max_replicas: u32, |
| 49 |
pub target_replicas: u32, |
| 50 |
pub geographic_spread: GeographicSpread, |
| 51 |
pub node_quality_requirements: NodeQualityRequirements, |
| 52 |
pub redundancy_scheme: RedundancyScheme, |
| 53 |
pub replication_priority: ReplicationPriority, |
| 54 |
pub cost_sensitivity: f64, // 0.0 = cost-insensitive, 1.0 = highly cost-sensitive |
| 55 |
} |
| 56 |
|
| 57 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 58 |
pub enum GeographicSpread { |
| 59 |
SingleRegion, |
| 60 |
MultiRegion(u32), // Minimum number of regions |
| 61 |
GlobalDistribution, // Maximum geographic spread |
| 62 |
RegionSpecific(Vec<GeographicRegion>), // Specific regions required |
| 63 |
} |
| 64 |
|
| 65 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 66 |
pub struct NodeQualityRequirements { |
| 67 |
pub min_uptime_percentage: f64, |
| 68 |
pub min_bandwidth_mbps: f64, |
| 69 |
pub max_latency_ms: u64, |
| 70 |
pub min_reliability_score: f64, |
| 71 |
pub required_connection_quality: Option<ConnectionQuality>, |
| 72 |
pub exclude_unstable_nodes: bool, |
| 73 |
pub prefer_premium_nodes: bool, |
| 74 |
} |
| 75 |
|
| 76 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 77 |
pub enum RedundancyScheme { |
| 78 |
SimpleReplication, // Basic copying |
| 79 |
ReedSolomon { data: u32, parity: u32 }, // (n,k) Reed-Solomon |
| 80 |
HybridErasure { replicas: u32, erasure: (u32, u32) }, // Combination |
| 81 |
} |
| 82 |
|
| 83 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 84 |
pub enum ReplicationPriority { |
| 85 |
Immediate, |
| 86 |
High, |
| 87 |
Normal, |
| 88 |
Low, |
| 89 |
Background, |
| 90 |
} |
| 91 |
|
| 92 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 93 |
pub enum ConnectionQuality { |
| 94 |
Fiber, |
| 95 |
Broadband, |
| 96 |
Mobile, |
| 97 |
Satellite, |
| 98 |
} |
| 99 |
|
| 100 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 101 |
pub struct ChunkReplicationState { |
| 102 |
pub chunk_id: String, |
| 103 |
pub content_type: ContentType, |
| 104 |
pub current_replicas: Vec<ReplicaLocation>, |
| 105 |
pub target_replicas: u32, |
| 106 |
pub health_score: f64, |
| 107 |
pub last_verification: DateTime<Utc>, |
| 108 |
pub replication_status: ReplicationStatus, |
| 109 |
pub repair_history: Vec<RepairEvent>, |
| 110 |
pub access_patterns: AccessPatterns, |
| 111 |
} |
| 112 |
|
| 113 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 114 |
pub struct ReplicaLocation { |
| 115 |
pub node_id: String, |
| 116 |
pub region: GeographicRegion, |
| 117 |
pub quality_score: f64, |
| 118 |
pub created_at: DateTime<Utc>, |
| 119 |
pub last_verified: DateTime<Utc>, |
| 120 |
pub status: ReplicaStatus, |
| 121 |
pub performance_metrics: ReplicaPerformance, |
| 122 |
} |
| 123 |
|
| 124 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 125 |
pub enum ReplicaStatus { |
| 126 |
Healthy, |
| 127 |
Degraded, |
| 128 |
Unreachable, |
| 129 |
Corrupted, |
| 130 |
Missing, |
| 131 |
} |
| 132 |
|
| 133 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 134 |
pub struct ReplicaPerformance { |
| 135 |
pub response_time_ms: u64, |
| 136 |
pub transfer_speed_mbps: f64, |
| 137 |
pub success_rate: f64, |
| 138 |
pub last_access: DateTime<Utc>, |
| 139 |
} |
| 140 |
|
| 141 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 142 |
pub enum ReplicationStatus { |
| 143 |
Optimal, // Meets all requirements |
| 144 |
Adequate, // Meets minimum requirements |
| 145 |
Degraded, // Below minimum requirements |
| 146 |
Critical, // Immediate action needed |
| 147 |
Repairing, // Currently being repaired |
| 148 |
} |
| 149 |
|
| 150 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 151 |
pub struct RepairEvent { |
| 152 |
pub event_id: String, |
| 153 |
pub timestamp: DateTime<Utc>, |
| 154 |
pub event_type: RepairEventType, |
| 155 |
pub affected_replicas: Vec<String>, |
| 156 |
pub repair_strategy: RepairStrategy, |
| 157 |
pub success: bool, |
| 158 |
pub duration_seconds: u64, |
| 159 |
} |
| 160 |
|
| 161 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 162 |
pub enum RepairEventType { |
| 163 |
NodeFailure, |
| 164 |
NetworkPartition, |
| 165 |
CorruptionDetected, |
| 166 |
PerformanceDegradation, |
| 167 |
ScheduledMaintenance, |
| 168 |
} |
| 169 |
|
| 170 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 171 |
pub enum RepairStrategy { |
| 172 |
CreateNewReplica, |
| 173 |
RepairExistingReplica, |
| 174 |
MigrateReplica, |
| 175 |
IncreaseRedundancy, |
| 176 |
RebuildFromErasure, |
| 177 |
} |
| 178 |
|
| 179 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 180 |
pub struct AccessPatterns { |
| 181 |
pub access_frequency: AccessFrequency, |
| 182 |
pub geographic_access: HashMap<GeographicRegion, u32>, |
| 183 |
pub time_patterns: HashMap<u8, u32>, // Hour of day -> access count |
| 184 |
pub last_access: DateTime<Utc>, |
| 185 |
pub predicted_next_access: Option<DateTime<Utc>>, |
| 186 |
} |
| 187 |
|
| 188 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 189 |
pub enum AccessFrequency { |
| 190 |
VeryHigh, // Multiple times per hour |
| 191 |
High, // Multiple times per day |
| 192 |
Medium, // Daily access |
| 193 |
Low, // Weekly access |
| 194 |
VeryLow, // Monthly access |
| 195 |
Archive, // Rarely accessed |
| 196 |
} |
| 197 |
|
| 198 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 199 |
pub struct NodePerformanceProfile { |
| 200 |
pub node_id: String, |
| 201 |
pub region: GeographicRegion, |
| 202 |
pub uptime_percentage: f64, |
| 203 |
pub bandwidth_mbps: f64, |
| 204 |
pub latency_ms: u64, |
| 205 |
pub reliability_score: f64, |
| 206 |
pub connection_quality: ConnectionQuality, |
| 207 |
pub storage_capacity_gb: u64, |
| 208 |
pub available_capacity_gb: u64, |
| 209 |
pub cost_per_gb_month: f64, |
| 210 |
pub performance_tier: PerformanceTier, |
| 211 |
pub last_updated: DateTime<Utc>, |
| 212 |
} |
| 213 |
|
| 214 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 215 |
pub enum PerformanceTier { |
| 216 |
Premium, // Top 10% performers |
| 217 |
High, // Top 25% performers |
| 218 |
Standard, // Average performers |
| 219 |
Basic, // Below average |
| 220 |
Unreliable, // Poor performers |
| 221 |
} |
| 222 |
|
| 223 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 224 |
pub struct GeographicDistributionConfig { |
| 225 |
pub min_regions_per_chunk: u32, |
| 226 |
pub preferred_regions: Vec<GeographicRegion>, |
| 227 |
pub region_weights: HashMap<GeographicRegion, f64>, |
| 228 |
pub latency_requirements: HashMap<GeographicRegion, u64>, |
| 229 |
pub regulatory_constraints: HashMap<GeographicRegion, Vec<String>>, |
| 230 |
} |
| 231 |
|
| 232 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 233 |
pub struct AdaptiveRedundancyConfig { |
| 234 |
pub enable_dynamic_adjustment: bool, |
| 235 |
pub adjustment_frequency_hours: u32, |
| 236 |
pub network_health_threshold: f64, |
| 237 |
pub failure_rate_threshold: f64, |
| 238 |
pub auto_scale_replicas: bool, |
| 239 |
pub max_auto_replicas: u32, |
| 240 |
pub min_auto_replicas: u32, |
| 241 |
pub cost_efficiency_target: f64, |
| 242 |
} |
| 243 |
|
| 244 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 245 |
pub struct CostOptimizationConfig { |
| 246 |
pub enable_cost_optimization: bool, |
| 247 |
pub cost_efficiency_weight: f64, |
| 248 |
pub durability_weight: f64, |
| 249 |
pub performance_weight: f64, |
| 250 |
pub max_cost_per_gb_month: f64, |
| 251 |
pub prefer_cheaper_nodes: bool, |
| 252 |
pub cost_monitoring_enabled: bool, |
| 253 |
} |
| 254 |
|
| 255 |
impl Default for ReplicationPolicy { |
| 256 |
fn default() -> Self { |
| 257 |
Self { |
| 258 |
content_type: ContentType::Standard, |
| 259 |
min_replicas: 3, |
| 260 |
max_replicas: 10, |
| 261 |
target_replicas: 5, |
| 262 |
geographic_spread: GeographicSpread::MultiRegion(2), |
| 263 |
node_quality_requirements: NodeQualityRequirements { |
| 264 |
min_uptime_percentage: 95.0, |
| 265 |
min_bandwidth_mbps: 10.0, |
| 266 |
max_latency_ms: 200, |
| 267 |
min_reliability_score: 90.0, |
| 268 |
required_connection_quality: None, |
| 269 |
exclude_unstable_nodes: true, |
| 270 |
prefer_premium_nodes: false, |
| 271 |
}, |
| 272 |
redundancy_scheme: RedundancyScheme::SimpleReplication, |
| 273 |
replication_priority: ReplicationPriority::Normal, |
| 274 |
cost_sensitivity: 0.3, |
| 275 |
} |
| 276 |
} |
| 277 |
} |
| 278 |
|
| 279 |
impl IntelligentReplicationManager { |
| 280 |
/// Create new intelligent replication manager |
| 281 |
pub fn new() -> Self { |
| 282 |
let mut manager = Self { |
| 283 |
policies: HashMap::new(), |
| 284 |
replication_state: HashMap::new(), |
| 285 |
node_performance: HashMap::new(), |
| 286 |
geo_distribution: GeographicDistributionConfig { |
| 287 |
min_regions_per_chunk: 2, |
| 288 |
preferred_regions: vec![ |
| 289 |
GeographicRegion::NorthAmerica, |
| 290 |
GeographicRegion::Europe, |
| 291 |
GeographicRegion::Asia, |
| 292 |
], |
| 293 |
region_weights: HashMap::new(), |
| 294 |
latency_requirements: HashMap::new(), |
| 295 |
regulatory_constraints: HashMap::new(), |
| 296 |
}, |
| 297 |
adaptive_config: AdaptiveRedundancyConfig { |
| 298 |
enable_dynamic_adjustment: true, |
| 299 |
adjustment_frequency_hours: 6, |
| 300 |
network_health_threshold: 95.0, |
| 301 |
failure_rate_threshold: 0.01, |
| 302 |
auto_scale_replicas: true, |
| 303 |
max_auto_replicas: 15, |
| 304 |
min_auto_replicas: 3, |
| 305 |
cost_efficiency_target: 0.8, |
| 306 |
}, |
| 307 |
cost_config: CostOptimizationConfig { |
| 308 |
enable_cost_optimization: true, |
| 309 |
cost_efficiency_weight: 0.3, |
| 310 |
durability_weight: 0.5, |
| 311 |
performance_weight: 0.2, |
| 312 |
max_cost_per_gb_month: 0.05, |
| 313 |
prefer_cheaper_nodes: false, |
| 314 |
cost_monitoring_enabled: true, |
| 315 |
}, |
| 316 |
}; |
| 317 |
|
| 318 |
manager.initialize_default_policies(); |
| 319 |
manager |
| 320 |
} |
| 321 |
|
| 322 |
/// Initialize default replication policies |
| 323 |
fn initialize_default_policies(&mut self) { |
| 324 |
// Critical content policy |
| 325 |
self.policies.insert(ContentType::Critical, ReplicationPolicy { |
| 326 |
content_type: ContentType::Critical, |
| 327 |
min_replicas: 5, |
| 328 |
max_replicas: 15, |
| 329 |
target_replicas: 8, |
| 330 |
geographic_spread: GeographicSpread::GlobalDistribution, |
| 331 |
node_quality_requirements: NodeQualityRequirements { |
| 332 |
min_uptime_percentage: 99.0, |
| 333 |
min_bandwidth_mbps: 50.0, |
| 334 |
max_latency_ms: 100, |
| 335 |
min_reliability_score: 95.0, |
| 336 |
required_connection_quality: Some(ConnectionQuality::Fiber), |
| 337 |
exclude_unstable_nodes: true, |
| 338 |
prefer_premium_nodes: true, |
| 339 |
}, |
| 340 |
redundancy_scheme: RedundancyScheme::ReedSolomon { data: 6, parity: 3 }, |
| 341 |
replication_priority: ReplicationPriority::Immediate, |
| 342 |
cost_sensitivity: 0.1, // Low cost sensitivity for critical data |
| 343 |
}); |
| 344 |
|
| 345 |
// Important content policy |
| 346 |
self.policies.insert(ContentType::Important, ReplicationPolicy { |
| 347 |
content_type: ContentType::Important, |
| 348 |
min_replicas: 4, |
| 349 |
max_replicas: 10, |
| 350 |
target_replicas: 6, |
| 351 |
geographic_spread: GeographicSpread::MultiRegion(3), |
| 352 |
node_quality_requirements: NodeQualityRequirements { |
| 353 |
min_uptime_percentage: 97.0, |
| 354 |
min_bandwidth_mbps: 25.0, |
| 355 |
max_latency_ms: 150, |
| 356 |
min_reliability_score: 92.0, |
| 357 |
required_connection_quality: None, |
| 358 |
exclude_unstable_nodes: true, |
| 359 |
prefer_premium_nodes: true, |
| 360 |
}, |
| 361 |
redundancy_scheme: RedundancyScheme::ReedSolomon { data: 4, parity: 2 }, |
| 362 |
replication_priority: ReplicationPriority::High, |
| 363 |
cost_sensitivity: 0.2, |
| 364 |
}); |
| 365 |
|
| 366 |
// Standard content policy |
| 367 |
self.policies.insert(ContentType::Standard, ReplicationPolicy::default()); |
| 368 |
|
| 369 |
// Archive content policy |
| 370 |
self.policies.insert(ContentType::Archive, ReplicationPolicy { |
| 371 |
content_type: ContentType::Archive, |
| 372 |
min_replicas: 3, |
| 373 |
max_replicas: 8, |
| 374 |
target_replicas: 4, |
| 375 |
geographic_spread: GeographicSpread::MultiRegion(2), |
| 376 |
node_quality_requirements: NodeQualityRequirements { |
| 377 |
min_uptime_percentage: 90.0, |
| 378 |
min_bandwidth_mbps: 5.0, |
| 379 |
max_latency_ms: 500, |
| 380 |
min_reliability_score: 85.0, |
| 381 |
required_connection_quality: None, |
| 382 |
exclude_unstable_nodes: false, |
| 383 |
prefer_premium_nodes: false, |
| 384 |
}, |
| 385 |
redundancy_scheme: RedundancyScheme::ReedSolomon { data: 3, parity: 2 }, |
| 386 |
replication_priority: ReplicationPriority::Background, |
| 387 |
cost_sensitivity: 0.8, // High cost sensitivity for archive data |
| 388 |
}); |
| 389 |
|
| 390 |
// Temporary content policy |
| 391 |
self.policies.insert(ContentType::Temporary, ReplicationPolicy { |
| 392 |
content_type: ContentType::Temporary, |
| 393 |
min_replicas: 2, |
| 394 |
max_replicas: 4, |
| 395 |
target_replicas: 3, |
| 396 |
geographic_spread: GeographicSpread::SingleRegion, |
| 397 |
node_quality_requirements: NodeQualityRequirements { |
| 398 |
min_uptime_percentage: 85.0, |
| 399 |
min_bandwidth_mbps: 10.0, |
| 400 |
max_latency_ms: 300, |
| 401 |
min_reliability_score: 80.0, |
| 402 |
required_connection_quality: None, |
| 403 |
exclude_unstable_nodes: false, |
| 404 |
prefer_premium_nodes: false, |
| 405 |
}, |
| 406 |
redundancy_scheme: RedundancyScheme::SimpleReplication, |
| 407 |
replication_priority: ReplicationPriority::Low, |
| 408 |
cost_sensitivity: 1.0, // Maximum cost sensitivity |
| 409 |
}); |
| 410 |
} |
| 411 |
|
| 412 |
/// Determine optimal replication strategy for a chunk |
| 413 |
pub fn determine_replication_strategy( |
| 414 |
&self, |
| 415 |
chunk_id: &str, |
| 416 |
content_type: ContentType, |
| 417 |
access_patterns: &AccessPatterns, |
| 418 |
network_health: &NetworkHealthMetrics, |
| 419 |
) -> Result<ReplicationStrategy> { |
| 420 |
let policy = self.policies.get(&content_type) |
| 421 |
.ok_or_else(|| anyhow::anyhow!("No policy found for content type: {:?}", content_type))?; |
| 422 |
|
| 423 |
// Calculate base replication requirements |
| 424 |
let mut target_replicas = policy.target_replicas; |
| 425 |
|
| 426 |
// Adjust based on access patterns |
| 427 |
target_replicas = self.adjust_for_access_patterns(target_replicas, access_patterns); |
| 428 |
|
| 429 |
// Adjust based on network health |
| 430 |
target_replicas = self.adjust_for_network_health(target_replicas, network_health); |
| 431 |
|
| 432 |
// Ensure within policy bounds |
| 433 |
target_replicas = target_replicas.max(policy.min_replicas).min(policy.max_replicas); |
| 434 |
|
| 435 |
// Select optimal nodes |
| 436 |
let selected_nodes = self.select_optimal_nodes( |
| 437 |
target_replicas, |
| 438 |
&policy.node_quality_requirements, |
| 439 |
&policy.geographic_spread, |
| 440 |
policy.cost_sensitivity, |
| 441 |
)?; |
| 442 |
|
| 443 |
Ok(ReplicationStrategy { |
| 444 |
chunk_id: chunk_id.to_string(), |
| 445 |
content_type, |
| 446 |
target_replicas, |
| 447 |
selected_nodes, |
| 448 |
redundancy_scheme: policy.redundancy_scheme.clone(), |
| 449 |
priority: policy.replication_priority.clone(), |
| 450 |
estimated_cost: self.calculate_replication_cost(&selected_nodes), |
| 451 |
durability_score: self.calculate_durability_score(&selected_nodes, &policy.redundancy_scheme), |
| 452 |
}) |
| 453 |
} |
| 454 |
|
| 455 |
/// Adjust replica count based on access patterns |
| 456 |
fn adjust_for_access_patterns(&self, base_replicas: u32, access_patterns: &AccessPatterns) -> u32 { |
| 457 |
let frequency_multiplier = match access_patterns.access_frequency { |
| 458 |
AccessFrequency::VeryHigh => 1.5, |
| 459 |
AccessFrequency::High => 1.2, |
| 460 |
AccessFrequency::Medium => 1.0, |
| 461 |
AccessFrequency::Low => 0.9, |
| 462 |
AccessFrequency::VeryLow => 0.8, |
| 463 |
AccessFrequency::Archive => 0.7, |
| 464 |
}; |
| 465 |
|
| 466 |
// Geographic access diversity bonus |
| 467 |
let geo_diversity_bonus = if access_patterns.geographic_access.len() > 2 { |
| 468 |
1.1 |
| 469 |
} else { |
| 470 |
1.0 |
| 471 |
}; |
| 472 |
|
| 473 |
((base_replicas as f64) * frequency_multiplier * geo_diversity_bonus) as u32 |
| 474 |
} |
| 475 |
|
| 476 |
/// Adjust replica count based on network health |
| 477 |
fn adjust_for_network_health(&self, base_replicas: u32, network_health: &NetworkHealthMetrics) -> u32 { |
| 478 |
let health_factor = if network_health.average_uptime < 90.0 { |
| 479 |
1.3 // Increase replicas for poor network health |
| 480 |
} else if network_health.average_uptime < 95.0 { |
| 481 |
1.1 |
| 482 |
} else { |
| 483 |
1.0 // Normal replication for healthy network |
| 484 |
}; |
| 485 |
|
| 486 |
// Adjust for utilization pressure |
| 487 |
let utilization_factor = if network_health.utilization_rate > 90.0 { |
| 488 |
0.9 // Reduce replicas under high utilization |
| 489 |
} else { |
| 490 |
1.0 |
| 491 |
}; |
| 492 |
|
| 493 |
((base_replicas as f64) * health_factor * utilization_factor) as u32 |
| 494 |
} |
| 495 |
|
| 496 |
/// Select optimal nodes for replication |
| 497 |
fn select_optimal_nodes( |
| 498 |
&self, |
| 499 |
target_replicas: u32, |
| 500 |
quality_requirements: &NodeQualityRequirements, |
| 501 |
geographic_spread: &GeographicSpread, |
| 502 |
cost_sensitivity: f64, |
| 503 |
) -> Result<Vec<String>> { |
| 504 |
// Filter nodes by quality requirements |
| 505 |
let eligible_nodes: Vec<_> = self.node_performance |
| 506 |
.values() |
| 507 |
.filter(|node| self.meets_quality_requirements(node, quality_requirements)) |
| 508 |
.collect(); |
| 509 |
|
| 510 |
if eligible_nodes.is_empty() { |
| 511 |
return Err(anyhow::anyhow!("No nodes meet quality requirements")); |
| 512 |
} |
| 513 |
|
| 514 |
// Group by region for geographic distribution |
| 515 |
let nodes_by_region = self.group_nodes_by_region(&eligible_nodes); |
| 516 |
|
| 517 |
// Select nodes based on geographic spread requirements |
| 518 |
let selected_nodes = self.apply_geographic_selection( |
| 519 |
nodes_by_region, |
| 520 |
target_replicas, |
| 521 |
geographic_spread, |
| 522 |
cost_sensitivity, |
| 523 |
)?; |
| 524 |
|
| 525 |
Ok(selected_nodes) |
| 526 |
} |
| 527 |
|
| 528 |
/// Check if node meets quality requirements |
| 529 |
fn meets_quality_requirements( |
| 530 |
&self, |
| 531 |
node: &NodePerformanceProfile, |
| 532 |
requirements: &NodeQualityRequirements, |
| 533 |
) -> bool { |
| 534 |
node.uptime_percentage >= requirements.min_uptime_percentage |
| 535 |
&& node.bandwidth_mbps >= requirements.min_bandwidth_mbps |
| 536 |
&& node.latency_ms <= requirements.max_latency_ms |
| 537 |
&& node.reliability_score >= requirements.min_reliability_score |
| 538 |
&& node.available_capacity_gb > 0 |
| 539 |
&& (!requirements.exclude_unstable_nodes || !matches!(node.performance_tier, PerformanceTier::Unreliable)) |
| 540 |
&& requirements.required_connection_quality.as_ref() |
| 541 |
.map_or(true, |required| self.connection_quality_matches(&node.connection_quality, required)) |
| 542 |
} |
| 543 |
|
| 544 |
/// Check if connection quality matches requirement |
| 545 |
fn connection_quality_matches(&self, actual: &ConnectionQuality, required: &ConnectionQuality) -> bool { |
| 546 |
match (actual, required) { |
| 547 |
(ConnectionQuality::Fiber, _) => true, |
| 548 |
(ConnectionQuality::Broadband, ConnectionQuality::Fiber) => false, |
| 549 |
(ConnectionQuality::Broadband, _) => true, |
| 550 |
(ConnectionQuality::Mobile, ConnectionQuality::Fiber | ConnectionQuality::Broadband) => false, |
| 551 |
(ConnectionQuality::Mobile, _) => true, |
| 552 |
(ConnectionQuality::Satellite, ConnectionQuality::Satellite) => true, |
| 553 |
(ConnectionQuality::Satellite, _) => false, |
| 554 |
} |
| 555 |
} |
| 556 |
|
| 557 |
/// Group nodes by geographic region |
| 558 |
fn group_nodes_by_region( |
| 559 |
&self, |
| 560 |
nodes: &[&NodePerformanceProfile], |
| 561 |
) -> HashMap<GeographicRegion, Vec<&NodePerformanceProfile>> { |
| 562 |
let mut grouped = HashMap::new(); |
| 563 |
|
| 564 |
for node in nodes { |
| 565 |
grouped.entry(node.region.clone()) |
| 566 |
.or_insert_with(Vec::new) |
| 567 |
.push(*node); |
| 568 |
} |
| 569 |
|
| 570 |
// Sort nodes within each region by performance score |
| 571 |
for region_nodes in grouped.values_mut() { |
| 572 |
region_nodes.sort_by(|a, b| { |
| 573 |
let score_a = self.calculate_node_score(a, 0.3); // Default cost sensitivity |
| 574 |
let score_b = self.calculate_node_score(b, 0.3); |
| 575 |
score_b.partial_cmp(&score_a).unwrap_or(std::cmp::Ordering::Equal) |
| 576 |
}); |
| 577 |
} |
| 578 |
|
| 579 |
grouped |
| 580 |
} |
| 581 |
|
| 582 |
/// Apply geographic selection strategy |
| 583 |
fn apply_geographic_selection( |
| 584 |
&self, |
| 585 |
nodes_by_region: HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>, |
| 586 |
target_replicas: u32, |
| 587 |
geographic_spread: &GeographicSpread, |
| 588 |
cost_sensitivity: f64, |
| 589 |
) -> Result<Vec<String>> { |
| 590 |
let mut selected_nodes = Vec::new(); |
| 591 |
|
| 592 |
match geographic_spread { |
| 593 |
GeographicSpread::SingleRegion => { |
| 594 |
// Select all from the best region |
| 595 |
let best_region = self.find_best_region(&nodes_by_region, cost_sensitivity)?; |
| 596 |
if let Some(region_nodes) = nodes_by_region.get(&best_region) { |
| 597 |
for node in region_nodes.iter().take(target_replicas as usize) { |
| 598 |
selected_nodes.push(node.node_id.clone()); |
| 599 |
} |
| 600 |
} |
| 601 |
}, |
| 602 |
GeographicSpread::MultiRegion(min_regions) => { |
| 603 |
selected_nodes = self.select_multi_region_nodes( |
| 604 |
&nodes_by_region, |
| 605 |
target_replicas, |
| 606 |
*min_regions, |
| 607 |
cost_sensitivity, |
| 608 |
)?; |
| 609 |
}, |
| 610 |
GeographicSpread::GlobalDistribution => { |
| 611 |
selected_nodes = self.select_global_distribution_nodes( |
| 612 |
&nodes_by_region, |
| 613 |
target_replicas, |
| 614 |
cost_sensitivity, |
| 615 |
)?; |
| 616 |
}, |
| 617 |
GeographicSpread::RegionSpecific(required_regions) => { |
| 618 |
selected_nodes = self.select_region_specific_nodes( |
| 619 |
&nodes_by_region, |
| 620 |
target_replicas, |
| 621 |
required_regions, |
| 622 |
cost_sensitivity, |
| 623 |
)?; |
| 624 |
}, |
| 625 |
} |
| 626 |
|
| 627 |
if selected_nodes.len() < target_replicas as usize { |
| 628 |
tracing::warn!("Could only select {} nodes out of {} requested", |
| 629 |
selected_nodes.len(), target_replicas); |
| 630 |
} |
| 631 |
|
| 632 |
Ok(selected_nodes) |
| 633 |
} |
| 634 |
|
| 635 |
/// Find the best region based on cost and performance |
| 636 |
fn find_best_region( |
| 637 |
&self, |
| 638 |
nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>, |
| 639 |
cost_sensitivity: f64, |
| 640 |
) -> Result<GeographicRegion> { |
| 641 |
let mut best_region = None; |
| 642 |
let mut best_score = 0.0; |
| 643 |
|
| 644 |
for (region, nodes) in nodes_by_region { |
| 645 |
if nodes.is_empty() { |
| 646 |
continue; |
| 647 |
} |
| 648 |
|
| 649 |
let avg_score = nodes.iter() |
| 650 |
.map(|node| self.calculate_node_score(node, cost_sensitivity)) |
| 651 |
.sum::<f64>() / nodes.len() as f64; |
| 652 |
|
| 653 |
if avg_score > best_score { |
| 654 |
best_score = avg_score; |
| 655 |
best_region = Some(region.clone()); |
| 656 |
} |
| 657 |
} |
| 658 |
|
| 659 |
best_region.ok_or_else(|| anyhow::anyhow!("No suitable region found")) |
| 660 |
} |
| 661 |
|
| 662 |
/// Select nodes across multiple regions |
| 663 |
fn select_multi_region_nodes( |
| 664 |
&self, |
| 665 |
nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>, |
| 666 |
target_replicas: u32, |
| 667 |
min_regions: u32, |
| 668 |
cost_sensitivity: f64, |
| 669 |
) -> Result<Vec<String>> { |
| 670 |
let mut selected_nodes = Vec::new(); |
| 671 |
let available_regions: Vec<_> = nodes_by_region.keys() |
| 672 |
.filter(|region| !nodes_by_region[*region].is_empty()) |
| 673 |
.collect(); |
| 674 |
|
| 675 |
if available_regions.len() < min_regions as usize { |
| 676 |
return Err(anyhow::anyhow!("Not enough regions available: {} < {}", |
| 677 |
available_regions.len(), min_regions)); |
| 678 |
} |
| 679 |
|
| 680 |
// Calculate replicas per region |
| 681 |
let replicas_per_region = target_replicas / min_regions; |
| 682 |
let extra_replicas = target_replicas % min_regions; |
| 683 |
|
| 684 |
// Sort regions by quality |
| 685 |
let mut sorted_regions = available_regions; |
| 686 |
sorted_regions.sort_by(|a, b| { |
| 687 |
let score_a = self.calculate_region_score(nodes_by_region[*a].as_slice(), cost_sensitivity); |
| 688 |
let score_b = self.calculate_region_score(nodes_by_region[*b].as_slice(), cost_sensitivity); |
| 689 |
score_b.partial_cmp(&score_a).unwrap_or(std::cmp::Ordering::Equal) |
| 690 |
}); |
| 691 |
|
| 692 |
// Select nodes from each region |
| 693 |
for (i, region) in sorted_regions.iter().take(min_regions as usize).enumerate() { |
| 694 |
let region_nodes = &nodes_by_region[*region]; |
| 695 |
let region_replicas = replicas_per_region + if i < extra_replicas as usize { 1 } else { 0 }; |
| 696 |
|
| 697 |
for node in region_nodes.iter().take(region_replicas as usize) { |
| 698 |
selected_nodes.push(node.node_id.clone()); |
| 699 |
} |
| 700 |
} |
| 701 |
|
| 702 |
Ok(selected_nodes) |
| 703 |
} |
| 704 |
|
| 705 |
/// Select nodes for global distribution |
| 706 |
fn select_global_distribution_nodes( |
| 707 |
&self, |
| 708 |
nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>, |
| 709 |
target_replicas: u32, |
| 710 |
cost_sensitivity: f64, |
| 711 |
) -> Result<Vec<String>> { |
| 712 |
// Try to distribute across all available regions |
| 713 |
let available_regions = nodes_by_region.len() as u32; |
| 714 |
self.select_multi_region_nodes(nodes_by_region, target_replicas, available_regions, cost_sensitivity) |
| 715 |
} |
| 716 |
|
| 717 |
/// Select nodes from specific regions |
| 718 |
fn select_region_specific_nodes( |
| 719 |
&self, |
| 720 |
nodes_by_region: &HashMap<GeographicRegion, Vec<&NodePerformanceProfile>>, |
| 721 |
target_replicas: u32, |
| 722 |
required_regions: &[GeographicRegion], |
| 723 |
cost_sensitivity: f64, |
| 724 |
) -> Result<Vec<String>> { |
| 725 |
let mut selected_nodes = Vec::new(); |
| 726 |
let replicas_per_region = target_replicas / required_regions.len() as u32; |
| 727 |
let extra_replicas = target_replicas % required_regions.len() as u32; |
| 728 |
|
| 729 |
for (i, region) in required_regions.iter().enumerate() { |
| 730 |
if let Some(region_nodes) = nodes_by_region.get(region) { |
| 731 |
let region_replicas = replicas_per_region + if i < extra_replicas as usize { 1 } else { 0 }; |
| 732 |
|
| 733 |
for node in region_nodes.iter().take(region_replicas as usize) { |
| 734 |
selected_nodes.push(node.node_id.clone()); |
| 735 |
} |
| 736 |
} |
| 737 |
} |
| 738 |
|
| 739 |
Ok(selected_nodes) |
| 740 |
} |
| 741 |
|
| 742 |
/// Calculate node performance score |
| 743 |
fn calculate_node_score(&self, node: &NodePerformanceProfile, cost_sensitivity: f64) -> f64 { |
| 744 |
let performance_score = (node.uptime_percentage / 100.0) |
| 745 |
* (node.reliability_score / 100.0) |
| 746 |
* (node.bandwidth_mbps / 100.0).min(1.0) |
| 747 |
* (200.0 / (node.latency_ms as f64 + 50.0)); |
| 748 |
|
| 749 |
let cost_score = if cost_sensitivity > 0.0 { |
| 750 |
1.0 / (node.cost_per_gb_month + 0.01) // Avoid division by zero |
| 751 |
} else { |
| 752 |
1.0 |
| 753 |
}; |
| 754 |
|
| 755 |
// Weighted combination |
| 756 |
performance_score * (1.0 - cost_sensitivity) + cost_score * cost_sensitivity |
| 757 |
} |
| 758 |
|
| 759 |
/// Calculate region quality score |
| 760 |
fn calculate_region_score(&self, nodes: &[&NodePerformanceProfile], cost_sensitivity: f64) -> f64 { |
| 761 |
if nodes.is_empty() { |
| 762 |
return 0.0; |
| 763 |
} |
| 764 |
|
| 765 |
nodes.iter() |
| 766 |
.map(|node| self.calculate_node_score(node, cost_sensitivity)) |
| 767 |
.sum::<f64>() / nodes.len() as f64 |
| 768 |
} |
| 769 |
|
| 770 |
/// Calculate estimated replication cost |
| 771 |
fn calculate_replication_cost(&self, selected_nodes: &[String]) -> f64 { |
| 772 |
selected_nodes.iter() |
| 773 |
.filter_map(|node_id| self.node_performance.get(node_id)) |
| 774 |
.map(|node| node.cost_per_gb_month) |
| 775 |
.sum() |
| 776 |
} |
| 777 |
|
| 778 |
/// Calculate expected durability score |
| 779 |
fn calculate_durability_score(&self, selected_nodes: &[String], redundancy_scheme: &RedundancyScheme) -> f64 { |
| 780 |
let avg_reliability = selected_nodes.iter() |
| 781 |
.filter_map(|node_id| self.node_performance.get(node_id)) |
| 782 |
.map(|node| node.reliability_score / 100.0) |
| 783 |
.sum::<f64>() / selected_nodes.len() as f64; |
| 784 |
|
| 785 |
// Calculate durability based on redundancy scheme |
| 786 |
match redundancy_scheme { |
| 787 |
RedundancyScheme::SimpleReplication => { |
| 788 |
// Simple calculation: 1 - (1 - reliability)^replicas |
| 789 |
1.0 - (1.0 - avg_reliability).powf(selected_nodes.len() as f64) |
| 790 |
}, |
| 791 |
RedundancyScheme::ReedSolomon { data, parity } => { |
| 792 |
// Can survive up to 'parity' failures |
| 793 |
let total_shards = data + parity; |
| 794 |
let failure_tolerance = *parity as f64; |
| 795 |
|
| 796 |
// Simplified calculation |
| 797 |
let failure_prob = 1.0 - avg_reliability; |
| 798 |
let survive_prob = (0..=failure_tolerance as u32) |
| 799 |
.map(|failures| { |
| 800 |
binomial_probability(total_shards, failures, failure_prob) |
| 801 |
}) |
| 802 |
.sum::<f64>(); |
| 803 |
|
| 804 |
survive_prob |
| 805 |
}, |
| 806 |
RedundancyScheme::HybridErasure { replicas, erasure } => { |
| 807 |
// Combination of replication and erasure coding |
| 808 |
let replication_durability = 1.0 - (1.0 - avg_reliability).powf(*replicas as f64); |
| 809 |
let erasure_durability = self.calculate_durability_score( |
| 810 |
selected_nodes, |
| 811 |
&RedundancyScheme::ReedSolomon { data: erasure.0, parity: erasure.1 } |
| 812 |
); |
| 813 |
|
| 814 |
// Best of both |
| 815 |
replication_durability.max(erasure_durability) |
| 816 |
}, |
| 817 |
} |
| 818 |
} |
| 819 |
|
| 820 |
/// Update node performance profile |
| 821 |
pub fn update_node_performance(&mut self, node_id: String, profile: NodePerformanceProfile) { |
| 822 |
self.node_performance.insert(node_id, profile); |
| 823 |
} |
| 824 |
|
| 825 |
/// Update chunk replication state |
| 826 |
pub fn update_chunk_state(&mut self, chunk_id: String, state: ChunkReplicationState) { |
| 827 |
self.replication_state.insert(chunk_id, state); |
| 828 |
} |
| 829 |
|
| 830 |
/// Get replication recommendations for a chunk |
| 831 |
pub fn get_replication_recommendations(&self, chunk_id: &str) -> Result<Vec<ReplicationRecommendation>> { |
| 832 |
let state = self.replication_state.get(chunk_id) |
| 833 |
.ok_or_else(|| anyhow::anyhow!("Chunk state not found"))?; |
| 834 |
|
| 835 |
let mut recommendations = Vec::new(); |
| 836 |
|
| 837 |
// Check if we need more replicas |
| 838 |
if state.current_replicas.len() < state.target_replicas as usize { |
| 839 |
recommendations.push(ReplicationRecommendation { |
| 840 |
recommendation_type: RecommendationType::IncreaseReplicas, |
| 841 |
priority: ReplicationPriority::High, |
| 842 |
estimated_cost: 0.02, // Placeholder |
| 843 |
durability_impact: 0.15, |
| 844 |
description: "Increase replica count to meet target".to_string(), |
| 845 |
}); |
| 846 |
} |
| 847 |
|
| 848 |
// Check for unhealthy replicas |
| 849 |
let unhealthy_replicas: Vec<_> = state.current_replicas.iter() |
| 850 |
.filter(|replica| !matches!(replica.status, ReplicaStatus::Healthy)) |
| 851 |
.collect(); |
| 852 |
|
| 853 |
if !unhealthy_replicas.is_empty() { |
| 854 |
recommendations.push(ReplicationRecommendation { |
| 855 |
recommendation_type: RecommendationType::RepairReplicas, |
| 856 |
priority: ReplicationPriority::Immediate, |
| 857 |
estimated_cost: 0.05, |
| 858 |
durability_impact: 0.25, |
| 859 |
description: format!("Repair {} unhealthy replicas", unhealthy_replicas.len()), |
| 860 |
}); |
| 861 |
} |
| 862 |
|
| 863 |
// Check geographic distribution |
| 864 |
let regions: std::collections::HashSet<_> = state.current_replicas.iter() |
| 865 |
.map(|replica| &replica.region) |
| 866 |
.collect(); |
| 867 |
|
| 868 |
if regions.len() < self.geo_distribution.min_regions_per_chunk as usize { |
| 869 |
recommendations.push(ReplicationRecommendation { |
| 870 |
recommendation_type: RecommendationType::ImproveGeographicDistribution, |
| 871 |
priority: ReplicationPriority::Normal, |
| 872 |
estimated_cost: 0.03, |
| 873 |
durability_impact: 0.10, |
| 874 |
description: "Improve geographic distribution".to_string(), |
| 875 |
}); |
| 876 |
} |
| 877 |
|
| 878 |
Ok(recommendations) |
| 879 |
} |
| 880 |
} |
| 881 |
|
| 882 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 883 |
pub struct ReplicationStrategy { |
| 884 |
pub chunk_id: String, |
| 885 |
pub content_type: ContentType, |
| 886 |
pub target_replicas: u32, |
| 887 |
pub selected_nodes: Vec<String>, |
| 888 |
pub redundancy_scheme: RedundancyScheme, |
| 889 |
pub priority: ReplicationPriority, |
| 890 |
pub estimated_cost: f64, |
| 891 |
pub durability_score: f64, |
| 892 |
} |
| 893 |
|
| 894 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 895 |
pub struct ReplicationRecommendation { |
| 896 |
pub recommendation_type: RecommendationType, |
| 897 |
pub priority: ReplicationPriority, |
| 898 |
pub estimated_cost: f64, |
| 899 |
pub durability_impact: f64, |
| 900 |
pub description: String, |
| 901 |
} |
| 902 |
|
| 903 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 904 |
pub enum RecommendationType { |
| 905 |
IncreaseReplicas, |
| 906 |
DecreaseReplicas, |
| 907 |
RepairReplicas, |
| 908 |
MigrateReplicas, |
| 909 |
ImproveGeographicDistribution, |
| 910 |
OptimizeCost, |
| 911 |
UpgradeRedundancyScheme, |
| 912 |
} |
| 913 |
|
| 914 |
/// Calculate binomial probability |
| 915 |
fn binomial_probability(n: u32, k: u32, p: f64) -> f64 { |
| 916 |
if k > n { |
| 917 |
return 0.0; |
| 918 |
} |
| 919 |
|
| 920 |
let combination = factorial(n) / (factorial(k) * factorial(n - k)); |
| 921 |
combination as f64 * p.powi(k as i32) * (1.0 - p).powi((n - k) as i32) |
| 922 |
} |
| 923 |
|
| 924 |
/// Calculate factorial (simplified for small numbers) |
| 925 |
fn factorial(n: u32) -> u64 { |
| 926 |
(1..=n as u64).product() |
| 927 |
} |
| 928 |
|
| 929 |
#[cfg(test)] |
| 930 |
mod tests { |
| 931 |
use super::*; |
| 932 |
|
| 933 |
#[test] |
| 934 |
fn test_replication_manager_creation() { |
| 935 |
let manager = IntelligentReplicationManager::new(); |
| 936 |
assert!(!manager.policies.is_empty()); |
| 937 |
assert!(manager.policies.contains_key(&ContentType::Critical)); |
| 938 |
assert!(manager.policies.contains_key(&ContentType::Standard)); |
| 939 |
} |
| 940 |
|
| 941 |
#[test] |
| 942 |
fn test_node_quality_requirements() { |
| 943 |
let manager = IntelligentReplicationManager::new(); |
| 944 |
|
| 945 |
let high_quality_node = NodePerformanceProfile { |
| 946 |
node_id: "node1".to_string(), |
| 947 |
region: GeographicRegion::NorthAmerica, |
| 948 |
uptime_percentage: 99.5, |
| 949 |
bandwidth_mbps: 100.0, |
| 950 |
latency_ms: 50, |
| 951 |
reliability_score: 98.0, |
| 952 |
connection_quality: ConnectionQuality::Fiber, |
| 953 |
storage_capacity_gb: 1000, |
| 954 |
available_capacity_gb: 500, |
| 955 |
cost_per_gb_month: 0.02, |
| 956 |
performance_tier: PerformanceTier::Premium, |
| 957 |
last_updated: Utc::now(), |
| 958 |
}; |
| 959 |
|
| 960 |
let requirements = &manager.policies[&ContentType::Critical].node_quality_requirements; |
| 961 |
assert!(manager.meets_quality_requirements(&high_quality_node, requirements)); |
| 962 |
} |
| 963 |
|
| 964 |
#[test] |
| 965 |
fn test_access_pattern_adjustment() { |
| 966 |
let manager = IntelligentReplicationManager::new(); |
| 967 |
|
| 968 |
let high_access_patterns = AccessPatterns { |
| 969 |
access_frequency: AccessFrequency::VeryHigh, |
| 970 |
geographic_access: HashMap::from([ |
| 971 |
(GeographicRegion::NorthAmerica, 100), |
| 972 |
(GeographicRegion::Europe, 50), |
| 973 |
(GeographicRegion::Asia, 25), |
| 974 |
]), |
| 975 |
time_patterns: HashMap::new(), |
| 976 |
last_access: Utc::now(), |
| 977 |
predicted_next_access: None, |
| 978 |
}; |
| 979 |
|
| 980 |
let adjusted = manager.adjust_for_access_patterns(5, &high_access_patterns); |
| 981 |
assert!(adjusted > 5); // Should increase for high-access content |
| 982 |
} |
| 983 |
} |