Rust · 15810 bytes Raw Blame History
1 //! Contribution-Based Replication Manager
2 //!
3 //! High-level manager that integrates contribution tracking with smart redundancy
4
5 use anyhow::Result;
6 use serde::{Deserialize, Serialize};
7 use std::collections::HashMap;
8 use chrono::{DateTime, Utc};
9
10 use crate::economics::{ContributionTracker, UserContribution};
11 use super::contribution_node_selector::{
12 ContributionNodeSelector, NodeSelectionCriteria, NodeSelectionResult, NodeAvailability
13 };
14 use super::reputation_system::{NodeReputation, ReputationManager};
15 use super::intelligent_replication::{ContentType, ReplicationPolicy};
16
17 /// Main replication manager using contribution-based node selection
18 #[derive(Debug, Clone, Serialize, Deserialize)]
19 pub struct ContributionReplicationManager {
20 /// Node selector for finding optimal nodes
21 pub node_selector: ContributionNodeSelector,
22 /// Replication policies for different content types
23 pub replication_policies: HashMap<ContentType, ContributionReplicationPolicy>,
24 /// Current replication jobs
25 pub active_replications: HashMap<String, ReplicationJob>,
26 /// Performance statistics
27 pub performance_stats: ReplicationPerformanceStats,
28 }
29
30 #[derive(Debug, Clone, Serialize, Deserialize)]
31 pub struct ContributionReplicationPolicy {
32 pub content_type: ContentType,
33 pub min_replicas: u32,
34 pub max_replicas: u32,
35 pub target_replicas: u32,
36 pub min_contribution_score: f64,
37 pub min_reliability_score: f64,
38 pub min_uptime_percentage: f64,
39 pub prefer_high_contributors: bool,
40 pub geographic_distribution: bool,
41 }
42
43 #[derive(Debug, Clone, Serialize, Deserialize)]
44 pub struct ReplicationJob {
45 pub job_id: String,
46 pub chunk_id: String,
47 pub content_type: ContentType,
48 pub selected_nodes: Vec<String>,
49 pub replication_status: ReplicationJobStatus,
50 pub started_at: DateTime<Utc>,
51 pub completed_at: Option<DateTime<Utc>>,
52 pub performance_metrics: JobPerformanceMetrics,
53 }
54
55 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56 pub enum ReplicationJobStatus {
57 Pending,
58 NodeSelection,
59 Replicating,
60 Completed,
61 Failed,
62 Cancelled,
63 }
64
65 #[derive(Debug, Clone, Serialize, Deserialize)]
66 pub struct JobPerformanceMetrics {
67 pub node_selection_time_ms: u32,
68 pub replication_time_ms: u32,
69 pub bytes_replicated: u64,
70 pub successful_replicas: u32,
71 pub failed_replicas: u32,
72 pub average_node_response_time_ms: u32,
73 }
74
75 #[derive(Debug, Clone, Serialize, Deserialize)]
76 pub struct ReplicationPerformanceStats {
77 pub total_jobs_completed: u64,
78 pub total_jobs_failed: u64,
79 pub average_job_time_ms: u32,
80 pub average_nodes_per_job: f32,
81 pub contribution_score_distribution: HashMap<String, u32>, // Score ranges -> count
82 pub reliability_score_distribution: HashMap<String, u32>,
83 pub success_rate_by_contributor_level: HashMap<String, f32>,
84 pub last_updated: DateTime<Utc>,
85 }
86
87 impl ContributionReplicationManager {
88 pub fn new() -> Self {
89 let mut policies = HashMap::new();
90
91 // Critical data - highest requirements
92 policies.insert(ContentType::Critical, ContributionReplicationPolicy {
93 content_type: ContentType::Critical,
94 min_replicas: 5,
95 max_replicas: 9,
96 target_replicas: 7,
97 min_contribution_score: 1.5, // Require surplus contributors
98 min_reliability_score: 0.9,
99 min_uptime_percentage: 99.0,
100 prefer_high_contributors: true,
101 geographic_distribution: true,
102 });
103
104 // Important data - high requirements
105 policies.insert(ContentType::Important, ContributionReplicationPolicy {
106 content_type: ContentType::Important,
107 min_replicas: 3,
108 max_replicas: 7,
109 target_replicas: 5,
110 min_contribution_score: 1.0, // Require balanced contributors
111 min_reliability_score: 0.8,
112 min_uptime_percentage: 95.0,
113 prefer_high_contributors: true,
114 geographic_distribution: true,
115 });
116
117 // Standard data - moderate requirements
118 policies.insert(ContentType::Standard, ContributionReplicationPolicy {
119 content_type: ContentType::Standard,
120 min_replicas: 2,
121 max_replicas: 5,
122 target_replicas: 3,
123 min_contribution_score: 0.8, // Accept lower contributors
124 min_reliability_score: 0.7,
125 min_uptime_percentage: 90.0,
126 prefer_high_contributors: false,
127 geographic_distribution: false,
128 });
129
130 // Archive data - basic requirements
131 policies.insert(ContentType::Archive, ContributionReplicationPolicy {
132 content_type: ContentType::Archive,
133 min_replicas: 2,
134 max_replicas: 4,
135 target_replicas: 3,
136 min_contribution_score: 0.5, // Accept deficit contributors
137 min_reliability_score: 0.6,
138 min_uptime_percentage: 85.0,
139 prefer_high_contributors: false,
140 geographic_distribution: false,
141 });
142
143 Self {
144 node_selector: ContributionNodeSelector::new(),
145 replication_policies: policies,
146 active_replications: HashMap::new(),
147 performance_stats: ReplicationPerformanceStats {
148 total_jobs_completed: 0,
149 total_jobs_failed: 0,
150 average_job_time_ms: 0,
151 average_nodes_per_job: 0.0,
152 contribution_score_distribution: HashMap::new(),
153 reliability_score_distribution: HashMap::new(),
154 success_rate_by_contributor_level: HashMap::new(),
155 last_updated: Utc::now(),
156 },
157 }
158 }
159
160 /// Update node data from contribution tracker and reputation system
161 pub async fn update_node_data(
162 &mut self,
163 contribution_tracker: &ContributionTracker,
164 reputation_manager: &ReputationManager,
165 ) -> Result<()> {
166
167 // Update contribution data for all nodes
168 for (node_id, user_contribution) in contribution_tracker.user_contributions.iter() {
169 self.node_selector.update_node_contribution(node_id.clone(), user_contribution).await?;
170 }
171
172 // Update reliability data for all nodes (simplified - would iterate through actual reputation data)
173 // For now, create mock reliability data based on contribution
174 for node_id in contribution_tracker.user_contributions.keys() {
175 // In a real system, this would get actual reputation data
176 let mock_reliability = self.create_mock_reliability(node_id);
177 self.node_selector.update_node_reliability(node_id.clone(), &mock_reliability).await?;
178
179 // Set node availability (simplified)
180 let user_contrib = contribution_tracker.user_contributions.get(node_id).unwrap();
181 let availability = NodeAvailability::Available {
182 available_storage_gb: user_contrib.storage_offered_gb - user_contrib.storage_used_gb,
183 available_bandwidth_mbps: user_contrib.bandwidth_offered_mbps - user_contrib.bandwidth_used_mbps,
184 current_load_percent: (user_contrib.storage_used_gb as f64 / user_contrib.storage_offered_gb.max(1) as f64) * 100.0,
185 };
186 self.node_selector.update_node_availability(node_id.clone(), availability);
187 }
188
189 Ok(())
190 }
191
192 /// Create mock reliability data (in real system, this would come from reputation manager)
193 fn create_mock_reliability(&self, node_id: &str) -> NodeReputation {
194 use super::reputation_system::{ReliabilityMetrics, PerformanceMetrics};
195 use tokio::time::{Duration, Instant};
196
197 NodeReputation {
198 node_id: node_id.to_string(),
199 overall_score: 0.85, // Mock score
200 reliability_metrics: ReliabilityMetrics {
201 uptime_score: 0.95,
202 data_integrity_score: 0.99,
203 response_consistency: 0.9,
204 failure_recovery_time: Duration::from_secs(300),
205 consecutive_failures: 0,
206 mean_time_between_failures: Duration::from_secs(86400 * 30), // 30 days
207 },
208 performance_metrics: PerformanceMetrics {
209 average_latency: Duration::from_millis(50),
210 throughput_score: 0.8,
211 storage_efficiency: 0.85,
212 bandwidth_utilization: 0.75,
213 resource_stability: 0.9,
214 load_handling_capacity: 0.8,
215 },
216 historical_events: vec![],
217 reputation_trend: super::reputation_system::ReputationTrend::Stable,
218 last_updated: Instant::now(),
219 }
220 }
221
222 /// Replicate a chunk using contribution-based node selection
223 pub async fn replicate_chunk(
224 &mut self,
225 chunk_id: String,
226 content_type: ContentType,
227 chunk_size_bytes: u64,
228 ) -> Result<ReplicationJob> {
229
230 let job_id = format!("job_{}", uuid::Uuid::new_v4());
231 let start_time = Utc::now();
232
233 // Get replication policy for this content type
234 let policy = self.replication_policies.get(&content_type)
235 .ok_or_else(|| anyhow::anyhow!("No policy found for content type: {:?}", content_type))?;
236
237 // Create selection criteria based on policy
238 let criteria = NodeSelectionCriteria {
239 min_contribution_score: policy.min_contribution_score,
240 min_reliability_score: policy.min_reliability_score,
241 min_uptime_percentage: policy.min_uptime_percentage,
242 max_failure_rate: 1.0 - policy.min_reliability_score,
243 min_available_storage_gb: (chunk_size_bytes / 1_000_000_000) + 1, // Convert to GB with buffer
244 min_available_bandwidth_mbps: 10.0, // Minimum 10 Mbps
245 geographic_requirements: if policy.geographic_distribution {
246 Some(super::contribution_node_selector::GeographicRequirements {
247 preferred_regions: vec![],
248 excluded_regions: vec![],
249 min_regions: Some(2),
250 max_distance_km: None,
251 })
252 } else {
253 None
254 },
255 exclude_recent_failures: true,
256 };
257
258 // Select nodes using contribution-based selector
259 let selection_result = self.node_selector.select_nodes(
260 chunk_id.clone(),
261 policy.target_replicas,
262 criteria,
263 ).await?;
264
265 let node_selection_time_ms = (Utc::now() - start_time).num_milliseconds() as u32;
266
267 // Create replication job
268 let job = ReplicationJob {
269 job_id: job_id.clone(),
270 chunk_id: chunk_id.clone(),
271 content_type,
272 selected_nodes: selection_result.selected_nodes.iter().map(|n| n.node_id.clone()).collect(),
273 replication_status: ReplicationJobStatus::NodeSelection,
274 started_at: start_time,
275 completed_at: None,
276 performance_metrics: JobPerformanceMetrics {
277 node_selection_time_ms,
278 replication_time_ms: 0,
279 bytes_replicated: 0,
280 successful_replicas: 0,
281 failed_replicas: 0,
282 average_node_response_time_ms: 0,
283 },
284 };
285
286 // Store active job
287 self.active_replications.insert(job_id.clone(), job.clone());
288
289 // Update performance statistics
290 self.update_performance_stats(&selection_result);
291
292 Ok(job)
293 }
294
295 /// Update performance statistics based on node selection results
296 fn update_performance_stats(&mut self, selection_result: &NodeSelectionResult) {
297 // Update contribution score distribution
298 for node in &selection_result.selected_nodes {
299 let score_range = self.get_score_range(node.contribution_score);
300 *self.performance_stats.contribution_score_distribution.entry(score_range).or_insert(0) += 1;
301
302 let reliability_range = self.get_score_range(node.reliability_score);
303 *self.performance_stats.reliability_score_distribution.entry(reliability_range).or_insert(0) += 1;
304 }
305
306 self.performance_stats.last_updated = Utc::now();
307 }
308
309 /// Get score range for statistics
310 fn get_score_range(&self, score: f64) -> String {
311 if score >= 2.0 {
312 "2.0+".to_string()
313 } else if score >= 1.5 {
314 "1.5-2.0".to_string()
315 } else if score >= 1.0 {
316 "1.0-1.5".to_string()
317 } else if score >= 0.5 {
318 "0.5-1.0".to_string()
319 } else {
320 "0.0-0.5".to_string()
321 }
322 }
323
324 /// Complete a replication job
325 pub async fn complete_replication_job(
326 &mut self,
327 job_id: String,
328 successful_replicas: u32,
329 failed_replicas: u32,
330 bytes_replicated: u64,
331 ) -> Result<()> {
332
333 if let Some(job) = self.active_replications.get_mut(&job_id) {
334 job.replication_status = if failed_replicas == 0 {
335 ReplicationJobStatus::Completed
336 } else if successful_replicas > 0 {
337 ReplicationJobStatus::Completed // Partial success still counts as completed
338 } else {
339 ReplicationJobStatus::Failed
340 };
341
342 job.completed_at = Some(Utc::now());
343 job.performance_metrics.successful_replicas = successful_replicas;
344 job.performance_metrics.failed_replicas = failed_replicas;
345 job.performance_metrics.bytes_replicated = bytes_replicated;
346
347 if let Some(completed_at) = job.completed_at {
348 job.performance_metrics.replication_time_ms = (completed_at - job.started_at).num_milliseconds() as u32;
349 }
350
351 // Update global stats
352 if job.replication_status == ReplicationJobStatus::Completed {
353 self.performance_stats.total_jobs_completed += 1;
354 } else {
355 self.performance_stats.total_jobs_failed += 1;
356 }
357
358 // Update average job time
359 let total_jobs = self.performance_stats.total_jobs_completed + self.performance_stats.total_jobs_failed;
360 if total_jobs > 0 {
361 let total_time = (self.performance_stats.average_job_time_ms as u64 * (total_jobs - 1)) + job.performance_metrics.replication_time_ms as u64;
362 self.performance_stats.average_job_time_ms = (total_time / total_jobs) as u32;
363 }
364
365 // Update average nodes per job
366 let total_completed = self.performance_stats.total_jobs_completed as f32;
367 if total_completed > 0.0 {
368 let current_avg = self.performance_stats.average_nodes_per_job;
369 let new_count = job.selected_nodes.len() as f32;
370 self.performance_stats.average_nodes_per_job = (current_avg * (total_completed - 1.0) + new_count) / total_completed;
371 }
372 } else {
373 return Err(anyhow::anyhow!("Job not found: {}", job_id));
374 }
375
376 Ok(())
377 }
378
379 /// Get replication statistics
380 pub fn get_performance_stats(&self) -> &ReplicationPerformanceStats {
381 &self.performance_stats
382 }
383
384 /// Get active replication jobs
385 pub fn get_active_jobs(&self) -> Vec<&ReplicationJob> {
386 self.active_replications.values().collect()
387 }
388
389 /// Get node selection statistics
390 pub fn get_node_selection_stats(&self, node_id: &str) -> super::contribution_node_selector::NodeSelectionStats {
391 self.node_selector.get_node_selection_stats(node_id)
392 }
393 }
394
395 impl Default for ContributionReplicationManager {
396 fn default() -> Self {
397 Self::new()
398 }
399 }