Rust · 18627 bytes Raw Blame History
1 //! Node Reliability Reputation System
2 //!
3 //! Tracks and scores node reliability based on historical performance
4
5 use serde::{Deserialize, Serialize};
6 use std::collections::HashMap;
7 use tokio::time::Duration;
8
9 #[derive(Debug, Clone, Serialize, Deserialize)]
10 pub struct NodeReputation {
11 pub node_id: String,
12 pub overall_score: f32, // 0.0 to 1.0
13 pub reliability_metrics: ReliabilityMetrics,
14 pub performance_metrics: PerformanceMetrics,
15 pub historical_events: Vec<ReputationEvent>,
16 pub reputation_trend: ReputationTrend,
17 pub last_updated: crate::SerializableInstant,
18 }
19
20 #[derive(Debug, Clone, Serialize, Deserialize)]
21 pub struct ReliabilityMetrics {
22 pub uptime_score: f32,
23 pub data_integrity_score: f32,
24 pub response_consistency: f32,
25 pub failure_recovery_time: Duration,
26 pub consecutive_failures: u32,
27 pub mean_time_between_failures: Duration,
28 }
29
30 #[derive(Debug, Clone, Serialize, Deserialize)]
31 pub struct PerformanceMetrics {
32 pub average_latency: Duration,
33 pub throughput_score: f32,
34 pub storage_efficiency: f32,
35 pub bandwidth_utilization: f32,
36 pub resource_stability: f32,
37 pub load_handling_capacity: f32,
38 }
39
40 #[derive(Debug, Clone, Serialize, Deserialize)]
41 pub struct ReputationEvent {
42 pub timestamp: crate::SerializableInstant,
43 pub event_type: EventType,
44 pub impact: f32, // -1.0 to +1.0
45 pub details: String,
46 pub severity: EventSeverity,
47 }
48
49 #[derive(Debug, Clone, Serialize, Deserialize)]
50 pub enum EventType {
51 NodeFailure,
52 DataCorruption,
53 SlowResponse,
54 ExceptionalPerformance,
55 SuccessfulRecovery,
56 MaintenanceCompleted,
57 SecurityIncident,
58 NetworkContribution,
59 }
60
61 #[derive(Debug, Clone, Serialize, Deserialize)]
62 pub enum EventSeverity {
63 Critical, // -0.2 to -1.0
64 Major, // -0.1 to -0.2
65 Minor, // -0.05 to -0.1
66 Neutral, // -0.05 to +0.05
67 Positive, // +0.05 to +0.1
68 Exceptional, // +0.1 to +0.2
69 }
70
71 #[derive(Debug, Clone, Serialize, Deserialize)]
72 pub struct ReputationTrend {
73 pub direction: TrendDirection,
74 pub slope: f32,
75 pub confidence: f32,
76 pub time_window: Duration,
77 }
78
79 #[derive(Debug, Clone, Serialize, Deserialize)]
80 pub enum TrendDirection {
81 StronglyImproving,
82 Improving,
83 Stable,
84 Declining,
85 StronglyDeclining,
86 }
87
88 pub struct ReputationManager {
89 node_reputations: HashMap<String, NodeReputation>,
90 reputation_weights: ReputationWeights,
91 historical_data: HashMap<String, Vec<PerformanceSnapshot>>,
92 global_network_stats: NetworkStats,
93 }
94
95 #[derive(Debug, Clone)]
96 struct ReputationWeights {
97 uptime: f32,
98 data_integrity: f32,
99 performance: f32,
100 recovery_ability: f32,
101 consistency: f32,
102 network_contribution: f32,
103 }
104
105 #[derive(Debug, Clone)]
106 struct PerformanceSnapshot {
107 timestamp: crate::SerializableInstant,
108 metrics: PerformanceMetrics,
109 events: Vec<ReputationEvent>,
110 }
111
112 #[derive(Debug, Clone)]
113 struct NetworkStats {
114 average_uptime: f32,
115 median_latency: Duration,
116 total_nodes: usize,
117 healthy_nodes: usize,
118 network_quality_score: f32,
119 }
120
121 impl ReputationManager {
122 pub fn new() -> Self {
123 Self {
124 node_reputations: HashMap::new(),
125 reputation_weights: ReputationWeights::default(),
126 historical_data: HashMap::new(),
127 global_network_stats: NetworkStats::default(),
128 }
129 }
130
131 pub async fn update_node_performance(&mut self, node_id: &str, metrics: PerformanceMetrics) {
132 let snapshot = PerformanceSnapshot {
133 timestamp: crate::SerializableInstant::now(),
134 metrics,
135 events: Vec::new(),
136 };
137
138 let history = self.historical_data.entry(node_id.to_string()).or_insert_with(Vec::new);
139 history.push(snapshot);
140
141 // Keep only last 30 days of data
142 let cutoff = crate::SerializableInstant::now() - Duration::from_secs(30 * 24 * 3600);
143 history.retain(|s| s.timestamp > cutoff);
144
145 // Update reputation based on new performance data
146 self.recalculate_reputation(node_id).await;
147 }
148
149 pub async fn record_event(&mut self, node_id: &str, event: ReputationEvent) {
150 // Add event to historical data
151 if let Some(history) = self.historical_data.get_mut(node_id) {
152 if let Some(latest) = history.last_mut() {
153 latest.events.push(event.clone());
154 }
155 }
156
157 // Update reputation immediately for significant events
158 if matches!(event.severity, EventSeverity::Critical | EventSeverity::Exceptional) {
159 self.apply_immediate_reputation_change(node_id, &event).await;
160 }
161
162 self.recalculate_reputation(node_id).await;
163 }
164
165 pub fn get_node_reputation(&self, node_id: &str) -> Option<&NodeReputation> {
166 self.node_reputations.get(node_id)
167 }
168
169 pub fn get_top_nodes(&self, limit: usize) -> Vec<&NodeReputation> {
170 let mut nodes: Vec<_> = self.node_reputations.values().collect();
171 nodes.sort_by(|a, b| b.overall_score.partial_cmp(&a.overall_score).unwrap());
172 nodes.into_iter().take(limit).collect()
173 }
174
175 pub fn get_nodes_by_reputation_range(&self, min_score: f32, max_score: f32) -> Vec<&NodeReputation> {
176 self.node_reputations.values()
177 .filter(|rep| rep.overall_score >= min_score && rep.overall_score <= max_score)
178 .collect()
179 }
180
181 pub async fn get_recommended_nodes_for_storage(&self, count: usize) -> Vec<String> {
182 let mut candidates: Vec<_> = self.node_reputations.iter()
183 .filter(|(_, rep)| {
184 rep.overall_score > 0.7
185 && rep.reliability_metrics.uptime_score > 0.8
186 && rep.reliability_metrics.consecutive_failures < 3
187 })
188 .collect();
189
190 // Sort by composite score (reputation + recent performance)
191 candidates.sort_by(|a, b| {
192 let score_a = self.calculate_storage_suitability_score(a.1);
193 let score_b = self.calculate_storage_suitability_score(b.1);
194 score_b.partial_cmp(&score_a).unwrap()
195 });
196
197 candidates.into_iter()
198 .take(count)
199 .map(|(node_id, _)| node_id.clone())
200 .collect()
201 }
202
203 async fn recalculate_reputation(&mut self, node_id: &str) {
204 let history = match self.historical_data.get(node_id) {
205 Some(h) => h,
206 None => return,
207 };
208
209 if history.is_empty() {
210 return;
211 }
212
213 let reliability_metrics = self.calculate_reliability_metrics(history);
214 let performance_metrics = self.calculate_average_performance(history);
215 let overall_score = self.calculate_overall_score(&reliability_metrics, &performance_metrics, history);
216 let trend = self.calculate_reputation_trend(history);
217
218 let reputation = NodeReputation {
219 node_id: node_id.to_string(),
220 overall_score,
221 reliability_metrics,
222 performance_metrics,
223 historical_events: self.get_recent_events(history, Duration::from_secs(7 * 24 * 3600)),
224 reputation_trend: trend,
225 last_updated: crate::SerializableInstant::now(),
226 };
227
228 self.node_reputations.insert(node_id.to_string(), reputation);
229 }
230
231 fn calculate_reliability_metrics(&self, history: &[PerformanceSnapshot]) -> ReliabilityMetrics {
232 let total_snapshots = history.len() as f32;
233 if total_snapshots == 0.0 {
234 return ReliabilityMetrics::default();
235 }
236
237 // Calculate uptime score based on performance consistency
238 let uptime_score = history.iter()
239 .map(|s| if s.metrics.average_latency < Duration::from_millis(1000) { 1.0 } else { 0.0 })
240 .sum::<f32>() / total_snapshots;
241
242 // Data integrity score based on lack of corruption events
243 let corruption_events = history.iter()
244 .flat_map(|s| &s.events)
245 .filter(|e| matches!(e.event_type, EventType::DataCorruption))
246 .count();
247 let data_integrity_score = 1.0 - (corruption_events as f32 * 0.1).min(1.0);
248
249 // Response consistency
250 let latencies: Vec<_> = history.iter()
251 .map(|s| s.metrics.average_latency.as_millis() as f32)
252 .collect();
253 let latency_variance = self.calculate_variance(&latencies);
254 let response_consistency = 1.0 / (1.0 + latency_variance / 1000.0);
255
256 // Failure analysis
257 let failure_events: Vec<_> = history.iter()
258 .flat_map(|s| &s.events)
259 .filter(|e| matches!(e.event_type, EventType::NodeFailure))
260 .collect();
261
262 let consecutive_failures = self.count_consecutive_failures(&failure_events);
263 let mean_time_between_failures = if failure_events.len() > 1 {
264 let first = failure_events.first().unwrap().timestamp;
265 let last = failure_events.last().unwrap().timestamp;
266 (last - first) / failure_events.len() as u32
267 } else {
268 Duration::from_secs(u64::MAX)
269 };
270
271 let failure_recovery_time = failure_events.iter()
272 .filter_map(|_| Some(Duration::from_secs(300))) // Average 5 minutes
273 .next()
274 .unwrap_or(Duration::from_secs(0));
275
276 ReliabilityMetrics {
277 uptime_score,
278 data_integrity_score,
279 response_consistency,
280 failure_recovery_time,
281 consecutive_failures,
282 mean_time_between_failures,
283 }
284 }
285
286 fn calculate_average_performance(&self, history: &[PerformanceSnapshot]) -> PerformanceMetrics {
287 if history.is_empty() {
288 return PerformanceMetrics::default();
289 }
290
291 let count = history.len() as f32;
292
293 let average_latency = Duration::from_millis(
294 (history.iter().map(|s| s.metrics.average_latency.as_millis()).sum::<u128>() / count as u128) as u64
295 );
296
297 let throughput_score = history.iter().map(|s| s.metrics.throughput_score).sum::<f32>() / count;
298 let storage_efficiency = history.iter().map(|s| s.metrics.storage_efficiency).sum::<f32>() / count;
299 let bandwidth_utilization = history.iter().map(|s| s.metrics.bandwidth_utilization).sum::<f32>() / count;
300 let resource_stability = history.iter().map(|s| s.metrics.resource_stability).sum::<f32>() / count;
301 let load_handling_capacity = history.iter().map(|s| s.metrics.load_handling_capacity).sum::<f32>() / count;
302
303 PerformanceMetrics {
304 average_latency,
305 throughput_score,
306 storage_efficiency,
307 bandwidth_utilization,
308 resource_stability,
309 load_handling_capacity,
310 }
311 }
312
313 fn calculate_overall_score(
314 &self,
315 reliability: &ReliabilityMetrics,
316 performance: &PerformanceMetrics,
317 history: &[PerformanceSnapshot],
318 ) -> f32 {
319 let weights = &self.reputation_weights;
320
321 let reliability_score = (
322 reliability.uptime_score * weights.uptime +
323 reliability.data_integrity_score * weights.data_integrity +
324 reliability.response_consistency * weights.consistency
325 ) / (weights.uptime + weights.data_integrity + weights.consistency);
326
327 let performance_score = (
328 performance.throughput_score * 0.3 +
329 performance.storage_efficiency * 0.2 +
330 performance.resource_stability * 0.3 +
331 performance.load_handling_capacity * 0.2
332 );
333
334 // Factor in recent events
335 let recent_events_impact = self.calculate_recent_events_impact(history);
336
337 let base_score = reliability_score * 0.6 + performance_score * 0.4;
338 let final_score = (base_score + recent_events_impact).max(0.0).min(1.0);
339
340 final_score
341 }
342
343 fn calculate_recent_events_impact(&self, history: &[PerformanceSnapshot]) -> f32 {
344 let cutoff = crate::SerializableInstant::now() - Duration::from_secs(7 * 24 * 3600); // Last 7 days
345
346 let recent_events: Vec<_> = history.iter()
347 .flat_map(|s| &s.events)
348 .filter(|e| e.timestamp > cutoff)
349 .collect();
350
351 if recent_events.is_empty() {
352 return 0.0;
353 }
354
355 let total_impact: f32 = recent_events.iter().map(|e| e.impact).sum();
356 let time_decay_factor = 0.8; // Recent events have more impact
357
358 (total_impact / recent_events.len() as f32) * time_decay_factor
359 }
360
361 fn calculate_reputation_trend(&self, history: &[PerformanceSnapshot]) -> ReputationTrend {
362 if history.len() < 5 {
363 return ReputationTrend::default();
364 }
365
366 // Calculate reputation scores over time
367 let window_size = 10;
368 let mut scores = Vec::new();
369
370 for window_start in 0..history.len().saturating_sub(window_size) {
371 let window = &history[window_start..window_start + window_size];
372 let reliability = self.calculate_reliability_metrics(window);
373 let performance = self.calculate_average_performance(window);
374 let score = self.calculate_overall_score(&reliability, &performance, window);
375 scores.push(score);
376 }
377
378 if scores.len() < 2 {
379 return ReputationTrend::default();
380 }
381
382 // Calculate linear regression slope
383 let n = scores.len() as f32;
384 let sum_x: f32 = (0..scores.len()).map(|i| i as f32).sum();
385 let sum_y: f32 = scores.iter().sum();
386 let sum_xy: f32 = scores.iter().enumerate().map(|(i, &y)| i as f32 * y).sum();
387 let sum_x2: f32 = (0..scores.len()).map(|i| (i as f32).powi(2)).sum();
388
389 let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2));
390
391 let direction = if slope > 0.05 {
392 TrendDirection::StronglyImproving
393 } else if slope > 0.02 {
394 TrendDirection::Improving
395 } else if slope > -0.02 {
396 TrendDirection::Stable
397 } else if slope > -0.05 {
398 TrendDirection::Declining
399 } else {
400 TrendDirection::StronglyDeclining
401 };
402
403 // Calculate confidence based on variance
404 let mean_score = scores.iter().sum::<f32>() / n;
405 let variance = scores.iter().map(|&s| (s - mean_score).powi(2)).sum::<f32>() / n;
406 let confidence = 1.0 / (1.0 + variance * 10.0);
407
408 ReputationTrend {
409 direction,
410 slope,
411 confidence,
412 time_window: Duration::from_secs(24 * 3600 * scores.len() as u64),
413 }
414 }
415
416 fn calculate_storage_suitability_score(&self, reputation: &NodeReputation) -> f32 {
417 let base_score = reputation.overall_score;
418 let reliability_bonus = if reputation.reliability_metrics.consecutive_failures == 0 { 0.1 } else { 0.0 };
419 let performance_bonus = if reputation.performance_metrics.average_latency < Duration::from_millis(500) { 0.05 } else { 0.0 };
420 let trend_bonus = match reputation.reputation_trend.direction {
421 TrendDirection::StronglyImproving | TrendDirection::Improving => 0.05,
422 TrendDirection::Declining | TrendDirection::StronglyDeclining => -0.1,
423 _ => 0.0,
424 };
425
426 base_score + reliability_bonus + performance_bonus + trend_bonus
427 }
428
429 async fn apply_immediate_reputation_change(&mut self, node_id: &str, event: &ReputationEvent) {
430 if let Some(reputation) = self.node_reputations.get_mut(node_id) {
431 let change = match event.severity {
432 EventSeverity::Critical => -0.2,
433 EventSeverity::Major => -0.1,
434 EventSeverity::Minor => -0.05,
435 EventSeverity::Positive => 0.05,
436 EventSeverity::Exceptional => 0.1,
437 EventSeverity::Neutral => 0.0,
438 };
439
440 reputation.overall_score = (reputation.overall_score + change).max(0.0).min(1.0);
441 }
442 }
443
444 fn get_recent_events(&self, history: &[PerformanceSnapshot], window: Duration) -> Vec<ReputationEvent> {
445 let cutoff = crate::SerializableInstant::now() - window;
446 history.iter()
447 .flat_map(|s| &s.events)
448 .filter(|e| e.timestamp > cutoff)
449 .cloned()
450 .collect()
451 }
452
453 fn count_consecutive_failures(&self, events: &[&ReputationEvent]) -> u32 {
454 let mut consecutive = 0;
455 let mut max_consecutive = 0;
456
457 for event in events.iter().rev() {
458 if matches!(event.event_type, EventType::NodeFailure) {
459 consecutive += 1;
460 max_consecutive = max_consecutive.max(consecutive);
461 } else {
462 consecutive = 0;
463 }
464 }
465
466 max_consecutive
467 }
468
469 fn calculate_variance(&self, values: &[f32]) -> f32 {
470 if values.is_empty() {
471 return 0.0;
472 }
473
474 let mean = values.iter().sum::<f32>() / values.len() as f32;
475 values.iter().map(|&x| (x - mean).powi(2)).sum::<f32>() / values.len() as f32
476 }
477 }
478
479 impl Default for ReputationWeights {
480 fn default() -> Self {
481 Self {
482 uptime: 0.3,
483 data_integrity: 0.25,
484 performance: 0.2,
485 recovery_ability: 0.1,
486 consistency: 0.1,
487 network_contribution: 0.05,
488 }
489 }
490 }
491
492 impl Default for ReliabilityMetrics {
493 fn default() -> Self {
494 Self {
495 uptime_score: 0.5,
496 data_integrity_score: 1.0,
497 response_consistency: 0.5,
498 failure_recovery_time: Duration::from_secs(300),
499 consecutive_failures: 0,
500 mean_time_between_failures: Duration::from_secs(u64::MAX),
501 }
502 }
503 }
504
505 impl Default for PerformanceMetrics {
506 fn default() -> Self {
507 Self {
508 average_latency: Duration::from_millis(500),
509 throughput_score: 0.5,
510 storage_efficiency: 0.5,
511 bandwidth_utilization: 0.5,
512 resource_stability: 0.5,
513 load_handling_capacity: 0.5,
514 }
515 }
516 }
517
518 impl Default for ReputationTrend {
519 fn default() -> Self {
520 Self {
521 direction: TrendDirection::Stable,
522 slope: 0.0,
523 confidence: 0.5,
524 time_window: Duration::from_secs(7 * 24 * 3600),
525 }
526 }
527 }
528
529 impl Default for NetworkStats {
530 fn default() -> Self {
531 Self {
532 average_uptime: 0.95,
533 median_latency: Duration::from_millis(200),
534 total_nodes: 0,
535 healthy_nodes: 0,
536 network_quality_score: 0.8,
537 }
538 }
539 }