Rust · 20049 bytes Raw Blame History
1 //! Contribution-Based Node Selection
2 //!
3 //! Selects optimal nodes for replication based on contribution ratios and reliability
4
5 use anyhow::Result;
6 use serde::{Deserialize, Serialize};
7 use std::collections::HashMap;
8 use chrono::{DateTime, Utc};
9
10 use crate::economics::{UserContribution, PriorityLevel, ContributionTracker};
11 use super::reputation_system::{NodeReputation, ReliabilityMetrics, PerformanceMetrics};
12
13 /// Node selector that prioritizes based on contribution and reliability
14 #[derive(Debug, Clone, Serialize, Deserialize)]
15 pub struct ContributionNodeSelector {
16 /// Node contribution data
17 pub node_contributions: HashMap<String, NodeContribution>,
18 /// Node reliability scores
19 pub node_reliability: HashMap<String, NodeReliability>,
20 /// Selection criteria weights
21 pub selection_weights: SelectionWeights,
22 /// Node availability status
23 pub node_availability: HashMap<String, NodeAvailability>,
24 /// Recent selection history for fairness
25 pub selection_history: Vec<SelectionRecord>,
26 }
27
28 #[derive(Debug, Clone, Serialize, Deserialize)]
29 pub struct NodeContribution {
30 pub node_id: String,
31 pub user_id: String,
32 pub contribution_score: f64,
33 pub priority_level: PriorityLevel,
34 /// Storage offered to the network
35 pub storage_offered_gb: u64,
36 /// Current storage utilization
37 pub storage_used_gb: u64,
38 /// Bandwidth offered to the network
39 pub bandwidth_offered_mbps: f64,
40 /// Current bandwidth utilization
41 pub bandwidth_used_mbps: f64,
42 /// How long node has been active
43 pub tenure_days: u32,
44 /// Recent contribution trend
45 pub contribution_trend: ContributionTrend,
46 pub last_updated: DateTime<Utc>,
47 }
48
49 #[derive(Debug, Clone, Serialize, Deserialize)]
50 pub enum ContributionTrend {
51 Improving,
52 Stable,
53 Declining,
54 New,
55 }
56
57 #[derive(Debug, Clone, Serialize, Deserialize)]
58 pub struct NodeReliability {
59 pub node_id: String,
60 pub overall_reliability_score: f64, // 0.0 to 1.0
61 pub uptime_percentage: f64,
62 pub response_time_ms: u32,
63 pub data_integrity_score: f64,
64 pub failure_rate: f64,
65 pub recovery_time_minutes: u32,
66 pub consistency_score: f64,
67 pub performance_stability: f64,
68 pub last_failure: Option<DateTime<Utc>>,
69 pub consecutive_successful_operations: u64,
70 }
71
72 #[derive(Debug, Clone, Serialize, Deserialize)]
73 pub struct SelectionWeights {
74 /// Weight for contribution score (0.0-1.0)
75 pub contribution_weight: f64,
76 /// Weight for reliability score (0.0-1.0)
77 pub reliability_weight: f64,
78 /// Weight for performance metrics (0.0-1.0)
79 pub performance_weight: f64,
80 /// Weight for geographic distribution (0.0-1.0)
81 pub geographic_weight: f64,
82 /// Weight for fairness/load balancing (0.0-1.0)
83 pub fairness_weight: f64,
84 }
85
86 #[derive(Debug, Clone, Serialize, Deserialize)]
87 pub enum NodeAvailability {
88 Available {
89 available_storage_gb: u64,
90 available_bandwidth_mbps: f64,
91 current_load_percent: f64,
92 },
93 Busy {
94 estimated_available_in_minutes: u32,
95 },
96 Maintenance {
97 expected_return: DateTime<Utc>,
98 },
99 Offline,
100 }
101
102 #[derive(Debug, Clone, Serialize, Deserialize)]
103 pub struct SelectionRecord {
104 pub selection_time: DateTime<Utc>,
105 pub chunk_id: String,
106 pub selected_nodes: Vec<String>,
107 pub selection_reason: String,
108 pub total_candidates: u32,
109 }
110
111 #[derive(Debug, Clone, Serialize, Deserialize)]
112 pub struct NodeSelectionCriteria {
113 /// Minimum contribution score required
114 pub min_contribution_score: f64,
115 /// Minimum reliability score required
116 pub min_reliability_score: f64,
117 /// Minimum uptime percentage required
118 pub min_uptime_percentage: f64,
119 /// Maximum acceptable failure rate
120 pub max_failure_rate: f64,
121 /// Minimum available storage required
122 pub min_available_storage_gb: u64,
123 /// Minimum available bandwidth required
124 pub min_available_bandwidth_mbps: f64,
125 /// Geographic constraints
126 pub geographic_requirements: Option<GeographicRequirements>,
127 /// Exclude nodes with recent failures
128 pub exclude_recent_failures: bool,
129 }
130
131 #[derive(Debug, Clone, Serialize, Deserialize)]
132 pub struct GeographicRequirements {
133 pub preferred_regions: Vec<String>,
134 pub excluded_regions: Vec<String>,
135 pub min_regions: Option<u32>,
136 pub max_distance_km: Option<u32>,
137 }
138
139 #[derive(Debug, Clone, Serialize, Deserialize)]
140 pub struct NodeSelectionResult {
141 pub selected_nodes: Vec<SelectedNode>,
142 pub total_candidates: u32,
143 pub selection_method: SelectionMethod,
144 pub selection_quality: SelectionQuality,
145 pub fallback_used: bool,
146 pub selection_rationale: String,
147 }
148
149 #[derive(Debug, Clone, Serialize, Deserialize)]
150 pub struct SelectedNode {
151 pub node_id: String,
152 pub selection_score: f64,
153 pub contribution_score: f64,
154 pub reliability_score: f64,
155 pub selection_reason: String,
156 pub expected_performance: ExpectedPerformance,
157 }
158
159 #[derive(Debug, Clone, Serialize, Deserialize)]
160 pub struct ExpectedPerformance {
161 pub uptime_percentage: f64,
162 pub response_time_ms: u32,
163 pub throughput_mbps: f64,
164 pub reliability_confidence: f64,
165 }
166
167 #[derive(Debug, Clone, Serialize, Deserialize)]
168 pub enum SelectionMethod {
169 ContributionPrimary, // Prioritize highest contributors
170 ReliabilityPrimary, // Prioritize most reliable nodes
171 Balanced, // Balance contribution and reliability
172 Geographic, // Optimize for geographic distribution
173 LoadBalanced, // Ensure fair distribution of work
174 }
175
176 #[derive(Debug, Clone, Serialize, Deserialize)]
177 pub enum SelectionQuality {
178 Excellent, // All criteria met with high-quality nodes
179 Good, // Most criteria met with good nodes
180 Acceptable, // Minimum criteria met
181 Compromised, // Had to lower standards to find nodes
182 }
183
184 impl ContributionNodeSelector {
185 pub fn new() -> Self {
186 Self {
187 node_contributions: HashMap::new(),
188 node_reliability: HashMap::new(),
189 selection_weights: SelectionWeights {
190 contribution_weight: 0.4, // 40% based on contribution
191 reliability_weight: 0.3, // 30% based on reliability
192 performance_weight: 0.2, // 20% based on performance
193 geographic_weight: 0.05, // 5% for geographic distribution
194 fairness_weight: 0.05, // 5% for load balancing fairness
195 },
196 node_availability: HashMap::new(),
197 selection_history: Vec::new(),
198 }
199 }
200
201 /// Update node contribution data from contribution tracker
202 pub async fn update_node_contribution(&mut self, node_id: String, user_contribution: &UserContribution) -> Result<()> {
203 let tenure_days = (Utc::now() - user_contribution.joined_at).num_days() as u32;
204
205 let contribution_trend = if tenure_days < 7 {
206 ContributionTrend::New
207 } else {
208 // In a real system, this would analyze historical data
209 if user_contribution.contribution_score > 1.2 {
210 ContributionTrend::Improving
211 } else if user_contribution.contribution_score > 0.8 {
212 ContributionTrend::Stable
213 } else {
214 ContributionTrend::Declining
215 }
216 };
217
218 let node_contribution = NodeContribution {
219 node_id: node_id.clone(),
220 user_id: user_contribution.user_id.clone(),
221 contribution_score: user_contribution.contribution_score,
222 priority_level: user_contribution.priority_level.clone(),
223 storage_offered_gb: user_contribution.storage_offered_gb,
224 storage_used_gb: user_contribution.storage_used_gb,
225 bandwidth_offered_mbps: user_contribution.bandwidth_offered_mbps,
226 bandwidth_used_mbps: user_contribution.bandwidth_used_mbps,
227 tenure_days,
228 contribution_trend,
229 last_updated: Utc::now(),
230 };
231
232 self.node_contributions.insert(node_id, node_contribution);
233 Ok(())
234 }
235
236 /// Update node reliability data from reputation system
237 pub async fn update_node_reliability(&mut self, node_id: String, reputation: &NodeReputation) -> Result<()> {
238 let reliability = NodeReliability {
239 node_id: node_id.clone(),
240 overall_reliability_score: reputation.overall_score as f64,
241 uptime_percentage: (reputation.reliability_metrics.uptime_score as f64) * 100.0,
242 response_time_ms: reputation.performance_metrics.average_latency.as_millis() as u32,
243 data_integrity_score: reputation.reliability_metrics.data_integrity_score as f64,
244 failure_rate: 1.0 - (reputation.reliability_metrics.uptime_score as f64),
245 recovery_time_minutes: (reputation.reliability_metrics.failure_recovery_time.as_secs() / 60) as u32,
246 consistency_score: reputation.reliability_metrics.response_consistency as f64,
247 performance_stability: reputation.performance_metrics.resource_stability as f64,
248 last_failure: None, // Would be extracted from reputation events in real system
249 consecutive_successful_operations: 100 - reputation.reliability_metrics.consecutive_failures as u64,
250 };
251
252 self.node_reliability.insert(node_id, reliability);
253 Ok(())
254 }
255
256 /// Select optimal nodes for chunk replication
257 pub async fn select_nodes(
258 &mut self,
259 chunk_id: String,
260 num_nodes_needed: u32,
261 criteria: NodeSelectionCriteria,
262 ) -> Result<NodeSelectionResult> {
263
264 // Get all candidate nodes that meet minimum criteria
265 let candidates = self.get_candidate_nodes(&criteria)?;
266
267 if candidates.is_empty() {
268 return Ok(NodeSelectionResult {
269 selected_nodes: Vec::new(),
270 total_candidates: 0,
271 selection_method: SelectionMethod::Balanced,
272 selection_quality: SelectionQuality::Compromised,
273 fallback_used: true,
274 selection_rationale: "No nodes meet minimum criteria".to_string(),
275 });
276 }
277
278 // Calculate selection scores for each candidate
279 let scored_candidates = self.calculate_selection_scores(&candidates, &criteria)?;
280
281 // Select top nodes based on scores
282 let mut selected_nodes = scored_candidates;
283 selected_nodes.sort_by(|a, b| b.selection_score.partial_cmp(&a.selection_score).unwrap());
284 selected_nodes.truncate(num_nodes_needed as usize);
285
286 // Determine selection quality
287 let selection_quality = self.assess_selection_quality(&selected_nodes, &criteria);
288
289 // Record selection for fairness tracking
290 let selection_record = SelectionRecord {
291 selection_time: Utc::now(),
292 chunk_id: chunk_id.clone(),
293 selected_nodes: selected_nodes.iter().map(|n| n.node_id.clone()).collect(),
294 selection_reason: "Contribution and reliability based selection".to_string(),
295 total_candidates: candidates.len() as u32,
296 };
297 self.selection_history.push(selection_record);
298
299 // Keep only recent history
300 if self.selection_history.len() > 1000 {
301 self.selection_history.drain(0..500);
302 }
303
304 Ok(NodeSelectionResult {
305 selected_nodes,
306 total_candidates: candidates.len() as u32,
307 selection_method: SelectionMethod::Balanced,
308 selection_quality,
309 fallback_used: false,
310 selection_rationale: "Selected based on contribution score and reliability metrics".to_string(),
311 })
312 }
313
314 /// Get nodes that meet minimum criteria
315 fn get_candidate_nodes(&self, criteria: &NodeSelectionCriteria) -> Result<Vec<String>> {
316 let mut candidates = Vec::new();
317
318 for (node_id, contribution) in &self.node_contributions {
319 // Check contribution requirements
320 if contribution.contribution_score < criteria.min_contribution_score {
321 continue;
322 }
323
324 // Check reliability requirements
325 if let Some(reliability) = self.node_reliability.get(node_id) {
326 if reliability.overall_reliability_score < criteria.min_reliability_score {
327 continue;
328 }
329 if reliability.uptime_percentage < criteria.min_uptime_percentage {
330 continue;
331 }
332 if reliability.failure_rate > criteria.max_failure_rate {
333 continue;
334 }
335
336 // Check for recent failures if required
337 if criteria.exclude_recent_failures {
338 if let Some(last_failure) = reliability.last_failure {
339 if (Utc::now() - last_failure).num_hours() < 24 {
340 continue;
341 }
342 }
343 }
344 } else {
345 // Skip nodes without reliability data
346 continue;
347 }
348
349 // Check availability requirements
350 if let Some(availability) = self.node_availability.get(node_id) {
351 match availability {
352 NodeAvailability::Available { available_storage_gb, available_bandwidth_mbps, current_load_percent } => {
353 if *available_storage_gb < criteria.min_available_storage_gb {
354 continue;
355 }
356 if *available_bandwidth_mbps < criteria.min_available_bandwidth_mbps {
357 continue;
358 }
359 if *current_load_percent > 90.0 {
360 continue; // Skip overloaded nodes
361 }
362 },
363 _ => continue, // Skip non-available nodes
364 }
365 }
366
367 candidates.push(node_id.clone());
368 }
369
370 Ok(candidates)
371 }
372
373 /// Calculate selection scores for candidate nodes
374 fn calculate_selection_scores(&self, candidates: &[String], criteria: &NodeSelectionCriteria) -> Result<Vec<SelectedNode>> {
375 let mut scored_nodes = Vec::new();
376
377 for node_id in candidates {
378 let contribution = self.node_contributions.get(node_id).unwrap();
379 let reliability = self.node_reliability.get(node_id).unwrap();
380
381 // Normalize scores (0.0 to 1.0)
382 let contribution_score = (contribution.contribution_score / 3.0).min(1.0); // Cap at 3.0 for normalization
383 let reliability_score = reliability.overall_reliability_score;
384 let performance_score = self.calculate_performance_score(reliability);
385 let fairness_score = self.calculate_fairness_score(node_id);
386 let geographic_score = 1.0; // Simplified for now
387
388 // Calculate weighted total score
389 let total_score =
390 (contribution_score * self.selection_weights.contribution_weight) +
391 (reliability_score * self.selection_weights.reliability_weight) +
392 (performance_score * self.selection_weights.performance_weight) +
393 (geographic_score * self.selection_weights.geographic_weight) +
394 (fairness_score * self.selection_weights.fairness_weight);
395
396 let selected_node = SelectedNode {
397 node_id: node_id.clone(),
398 selection_score: total_score,
399 contribution_score: contribution.contribution_score,
400 reliability_score,
401 selection_reason: format!("Score: {:.3} (Contrib: {:.2}, Reliab: {:.2})",
402 total_score, contribution_score, reliability_score),
403 expected_performance: ExpectedPerformance {
404 uptime_percentage: reliability.uptime_percentage,
405 response_time_ms: reliability.response_time_ms,
406 throughput_mbps: contribution.bandwidth_offered_mbps * 0.8, // Estimate 80% utilization
407 reliability_confidence: reliability.consistency_score,
408 },
409 };
410
411 scored_nodes.push(selected_node);
412 }
413
414 Ok(scored_nodes)
415 }
416
417 /// Calculate performance score based on reliability metrics
418 fn calculate_performance_score(&self, reliability: &NodeReliability) -> f64 {
419 let response_score = (1000.0 - reliability.response_time_ms as f64).max(0.0) / 1000.0;
420 let stability_score = reliability.performance_stability;
421 let consistency_score = reliability.consistency_score;
422
423 (response_score + stability_score + consistency_score) / 3.0
424 }
425
426 /// Calculate fairness score to promote load balancing
427 fn calculate_fairness_score(&self, node_id: &str) -> f64 {
428 // Count recent selections for this node
429 let recent_selections = self.selection_history.iter()
430 .rev()
431 .take(100) // Look at last 100 selections
432 .filter(|record| record.selected_nodes.contains(&node_id.to_string()))
433 .count();
434
435 // Nodes with fewer recent selections get higher fairness scores
436 (10.0 - recent_selections as f64).max(0.0) / 10.0
437 }
438
439 /// Assess the quality of the selection
440 fn assess_selection_quality(&self, selected_nodes: &[SelectedNode], criteria: &NodeSelectionCriteria) -> SelectionQuality {
441 if selected_nodes.is_empty() {
442 return SelectionQuality::Compromised;
443 }
444
445 let avg_contribution = selected_nodes.iter()
446 .map(|n| n.contribution_score)
447 .sum::<f64>() / selected_nodes.len() as f64;
448
449 let avg_reliability = selected_nodes.iter()
450 .map(|n| n.reliability_score)
451 .sum::<f64>() / selected_nodes.len() as f64;
452
453 if avg_contribution >= 1.5 && avg_reliability >= 0.9 {
454 SelectionQuality::Excellent
455 } else if avg_contribution >= 1.0 && avg_reliability >= 0.8 {
456 SelectionQuality::Good
457 } else if avg_contribution >= 0.8 && avg_reliability >= 0.7 {
458 SelectionQuality::Acceptable
459 } else {
460 SelectionQuality::Compromised
461 }
462 }
463
464 /// Update node availability status
465 pub fn update_node_availability(&mut self, node_id: String, availability: NodeAvailability) {
466 self.node_availability.insert(node_id, availability);
467 }
468
469 /// Get selection statistics for a node
470 pub fn get_node_selection_stats(&self, node_id: &str) -> NodeSelectionStats {
471 let total_selections = self.selection_history.iter()
472 .filter(|record| record.selected_nodes.contains(&node_id.to_string()))
473 .count();
474
475 let recent_selections = self.selection_history.iter()
476 .rev()
477 .take(100)
478 .filter(|record| record.selected_nodes.contains(&node_id.to_string()))
479 .count();
480
481 NodeSelectionStats {
482 node_id: node_id.to_string(),
483 total_selections: total_selections as u32,
484 recent_selections: recent_selections as u32,
485 last_selected: self.selection_history.iter()
486 .rev()
487 .find(|record| record.selected_nodes.contains(&node_id.to_string()))
488 .map(|record| record.selection_time),
489 selection_rate: if self.selection_history.len() > 0 {
490 (total_selections as f64 / self.selection_history.len() as f64) * 100.0
491 } else {
492 0.0
493 },
494 }
495 }
496 }
497
498 #[derive(Debug, Clone, Serialize, Deserialize)]
499 pub struct NodeSelectionStats {
500 pub node_id: String,
501 pub total_selections: u32,
502 pub recent_selections: u32,
503 pub last_selected: Option<DateTime<Utc>>,
504 pub selection_rate: f64, // Percentage of total selections
505 }
506
507 impl Default for ContributionNodeSelector {
508 fn default() -> Self {
509 Self::new()
510 }
511 }