Rust · 13759 bytes Raw Blame History
1 use anyhow::Result;
2 use libp2p::{Multiaddr, PeerId};
3 use std::collections::HashMap;
4 use std::time::{Duration, Instant};
5 use tokio::sync::RwLock;
6 use tracing::{debug, info, warn};
7
8 use crate::config::Config;
9
10 /// Manages peer connections with reputation and security controls
11 ///
12 /// Safety: Implements peer reputation system to prevent malicious connections
13 /// Privacy: Tracks minimal necessary peer information
14 /// Transparency: All peer management decisions are logged
15 pub struct PeerManager {
16 peers: RwLock<HashMap<PeerId, PeerInfo>>,
17 config: Config,
18 }
19
20 /// Information tracked for each peer
21 ///
22 /// Privacy: Only stores network-relevant information, no personal data
23 #[derive(Debug, Clone)]
24 pub struct PeerInfo {
25 /// Peer's multiaddresses
26 pub addresses: Vec<Multiaddr>,
27
28 /// Connection timestamp
29 pub connected_at: Instant,
30
31 /// Last successful ping timestamp
32 pub last_ping: Option<Instant>,
33
34 /// Round-trip time measurements
35 pub rtt_measurements: Vec<Duration>,
36
37 /// Reputation score (0.0 to 1.0)
38 pub reputation: f64,
39
40 /// Number of failed interactions
41 pub failure_count: u32,
42
43 /// Peer capabilities from identify protocol
44 pub agent_version: Option<String>,
45 pub protocol_version: Option<String>,
46 }
47
48 impl PeerInfo {
49 fn new(address: Multiaddr) -> Self {
50 Self {
51 addresses: vec![address],
52 connected_at: Instant::now(),
53 last_ping: None,
54 rtt_measurements: Vec::with_capacity(10), // Keep last 10 measurements
55 reputation: 0.5, // Start with neutral reputation
56 failure_count: 0,
57 agent_version: None,
58 protocol_version: None,
59 }
60 }
61
62 /// Calculate average RTT from recent measurements
63 pub fn average_rtt(&self) -> Option<Duration> {
64 if self.rtt_measurements.is_empty() {
65 None
66 } else {
67 let sum: Duration = self.rtt_measurements.iter().sum();
68 Some(sum / self.rtt_measurements.len() as u32)
69 }
70 }
71
72 /// Check if peer is considered healthy
73 pub fn is_healthy(&self) -> bool {
74 self.reputation >= 0.3 && self.failure_count < 5
75 }
76 }
77
78 impl PeerManager {
79 /// Create a new PeerManager
80 pub fn new(config: &Config) -> Self {
81 info!("Initializing PeerManager with security policies");
82
83 Self {
84 peers: RwLock::new(HashMap::new()),
85 config: config.clone(),
86 }
87 }
88
89 /// Add a new peer to management
90 ///
91 /// Transparency: Log all peer additions for audit trail
92 pub async fn add_peer(&self, peer_id: PeerId, address: Multiaddr) {
93 let mut peers = self.peers.write().await;
94
95 match peers.get_mut(&peer_id) {
96 Some(peer_info) => {
97 // Update existing peer info
98 if !peer_info.addresses.contains(&address) {
99 peer_info.addresses.push(address.clone());
100 debug!("Added new address {} for existing peer {}", address, peer_id);
101 }
102 }
103 None => {
104 // Add new peer
105 let peer_info = PeerInfo::new(address.clone());
106 peers.insert(peer_id, peer_info);
107 info!("Added new peer {} with address {}", peer_id, address);
108 }
109 }
110 }
111
112 /// Remove a peer from management
113 ///
114 /// Transparency: Log all peer removals
115 pub async fn remove_peer(&self, peer_id: &PeerId) {
116 let mut peers = self.peers.write().await;
117 if peers.remove(peer_id).is_some() {
118 info!("Removed peer {}", peer_id);
119 }
120 }
121
122 /// Check if we should connect to a specific peer
123 ///
124 /// Safety: Apply security policies before allowing connections
125 pub async fn should_connect_to_peer(&self, peer_id: &PeerId) -> bool {
126 let peers = self.peers.read().await;
127
128 // Check if we've reached max peers
129 if peers.len() >= self.config.network.max_peers {
130 debug!("Max peers reached, rejecting connection to {}", peer_id);
131 return false;
132 }
133
134 // Check peer reputation if known
135 if let Some(peer_info) = peers.get(peer_id) {
136 if peer_info.reputation < self.config.security.min_peer_reputation {
137 debug!("Peer {} has low reputation: {}, rejecting connection",
138 peer_id, peer_info.reputation);
139 return false;
140 }
141
142 if !peer_info.is_healthy() {
143 debug!("Peer {} is not healthy, rejecting connection", peer_id);
144 return false;
145 }
146 }
147
148 true
149 }
150
151 /// Update peer statistics after successful ping
152 ///
153 /// Transparency: All RTT measurements are logged for network health monitoring
154 pub async fn update_peer_stats(&self, peer_id: &PeerId, rtt: Duration) {
155 let mut peers = self.peers.write().await;
156
157 if let Some(peer_info) = peers.get_mut(peer_id) {
158 peer_info.last_ping = Some(Instant::now());
159
160 // Add RTT measurement, keeping only last 10
161 peer_info.rtt_measurements.push(rtt);
162 if peer_info.rtt_measurements.len() > 10 {
163 peer_info.rtt_measurements.remove(0);
164 }
165
166 // Improve reputation for successful pings
167 peer_info.reputation = (peer_info.reputation + 0.01).min(1.0);
168
169 debug!("Updated stats for peer {}: RTT={:?}, reputation={:.3}",
170 peer_id, rtt, peer_info.reputation);
171 }
172 }
173
174 /// Handle peer failure (failed ping, connection error, etc.)
175 ///
176 /// Safety: Degrade reputation and potentially disconnect problematic peers
177 pub async fn handle_peer_failure(&self, peer_id: &PeerId) {
178 let mut peers = self.peers.write().await;
179
180 if let Some(peer_info) = peers.get_mut(peer_id) {
181 peer_info.failure_count += 1;
182 peer_info.reputation = (peer_info.reputation - 0.1).max(0.0);
183
184 warn!("Peer {} failed interaction: failures={}, reputation={:.3}",
185 peer_id, peer_info.failure_count, peer_info.reputation);
186
187 // Remove peer if too many failures
188 if peer_info.failure_count >= 10 || peer_info.reputation <= 0.0 {
189 warn!("Removing peer {} due to excessive failures", peer_id);
190 peers.remove(peer_id);
191 }
192 }
193 }
194
195 /// Validate peer identity from identify protocol
196 ///
197 /// Safety: Ensure peer is running compatible protocol version
198 pub async fn validate_peer_identity(
199 &self,
200 peer_id: &PeerId,
201 info: &libp2p::identify::Info
202 ) -> Result<()> {
203 let mut peers = self.peers.write().await;
204
205 if let Some(peer_info) = peers.get_mut(peer_id) {
206 peer_info.agent_version = Some(info.agent_version.clone());
207 peer_info.protocol_version = Some(info.protocol_version.clone());
208
209 // Validate protocol compatibility
210 if !info.protocol_version.starts_with("zephyrfs/") {
211 warn!("Peer {} running incompatible protocol: {}",
212 peer_id, info.protocol_version);
213 peer_info.reputation = (peer_info.reputation - 0.2).max(0.0);
214 } else {
215 // Boost reputation for compatible peers
216 peer_info.reputation = (peer_info.reputation + 0.05).min(1.0);
217 info!("Validated compatible peer {}: {}", peer_id, info.agent_version);
218 }
219 }
220
221 Ok(())
222 }
223
224 /// Get statistics about managed peers
225 ///
226 /// Transparency: Provide comprehensive peer statistics for monitoring
227 pub async fn get_peer_stats(&self) -> PeerStats {
228 let peers = self.peers.read().await;
229
230 let mut healthy_peers = 0;
231 let mut total_rtt = Duration::ZERO;
232 let mut rtt_count = 0;
233 let mut avg_reputation = 0.0;
234
235 for peer_info in peers.values() {
236 if peer_info.is_healthy() {
237 healthy_peers += 1;
238 }
239
240 if let Some(rtt) = peer_info.average_rtt() {
241 total_rtt += rtt;
242 rtt_count += 1;
243 }
244
245 avg_reputation += peer_info.reputation;
246 }
247
248 PeerStats {
249 total_peers: peers.len(),
250 healthy_peers,
251 average_rtt: if rtt_count > 0 {
252 Some(total_rtt / rtt_count)
253 } else {
254 None
255 },
256 average_reputation: if peers.is_empty() {
257 0.0
258 } else {
259 avg_reputation / peers.len() as f64
260 },
261 }
262 }
263 }
264
265 /// Statistics about peer network health
266 #[derive(Debug, Clone)]
267 pub struct PeerStats {
268 pub total_peers: usize,
269 pub healthy_peers: usize,
270 pub average_rtt: Option<Duration>,
271 pub average_reputation: f64,
272 }
273
274 #[cfg(test)]
275 mod tests {
276 use super::*;
277 use libp2p::identity;
278
279 #[tokio::test]
280 async fn test_peer_manager_creation() {
281 let config = Config::default();
282 let peer_manager = PeerManager::new(&config);
283
284 let stats = peer_manager.get_peer_stats().await;
285 assert_eq!(stats.total_peers, 0);
286 assert_eq!(stats.healthy_peers, 0);
287 }
288
289 #[tokio::test]
290 async fn test_add_and_remove_peer() {
291 let config = Config::default();
292 let peer_manager = PeerManager::new(&config);
293
294 let keypair = identity::Keypair::generate_ed25519();
295 let peer_id = PeerId::from(keypair.public());
296 let address: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
297
298 // Add peer
299 peer_manager.add_peer(peer_id, address.clone()).await;
300
301 let stats = peer_manager.get_peer_stats().await;
302 assert_eq!(stats.total_peers, 1);
303 assert_eq!(stats.healthy_peers, 1); // New peers start healthy
304
305 // Remove peer
306 peer_manager.remove_peer(&peer_id).await;
307
308 let stats = peer_manager.get_peer_stats().await;
309 assert_eq!(stats.total_peers, 0);
310 }
311
312 #[tokio::test]
313 async fn test_peer_reputation_system() {
314 let config = Config::default();
315 let peer_manager = PeerManager::new(&config);
316
317 let keypair = identity::Keypair::generate_ed25519();
318 let peer_id = PeerId::from(keypair.public());
319 let address: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
320
321 // Add peer
322 peer_manager.add_peer(peer_id, address).await;
323
324 // Test successful ping improves reputation
325 peer_manager.update_peer_stats(&peer_id, Duration::from_millis(50)).await;
326
327 let stats = peer_manager.get_peer_stats().await;
328 assert!(stats.average_reputation > 0.5, "Reputation should improve after successful ping");
329
330 // Test failure degrades reputation
331 peer_manager.handle_peer_failure(&peer_id).await;
332
333 let stats = peer_manager.get_peer_stats().await;
334 assert!(stats.average_reputation < 0.5, "Reputation should degrade after failure");
335 }
336
337 #[tokio::test]
338 async fn test_should_connect_to_peer() {
339 let mut config = Config::default();
340 config.network.max_peers = 2;
341 config.security.min_peer_reputation = 0.3;
342
343 let peer_manager = PeerManager::new(&config);
344
345 let keypair1 = identity::Keypair::generate_ed25519();
346 let peer_id1 = PeerId::from(keypair1.public());
347
348 // Should connect to unknown peer
349 assert!(peer_manager.should_connect_to_peer(&peer_id1).await);
350
351 // Add peer and degrade reputation
352 let address: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
353 peer_manager.add_peer(peer_id1, address).await;
354
355 // Degrade reputation below threshold by causing many failures
356 for _ in 0..5 {
357 peer_manager.handle_peer_failure(&peer_id1).await;
358 }
359
360 // Check if peer was removed or has low reputation
361 let should_connect = peer_manager.should_connect_to_peer(&peer_id1).await;
362 assert!(!should_connect, "Should not connect to peer with degraded reputation");
363 }
364
365 #[test]
366 fn test_peer_info_health_check() {
367 let address: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
368 let mut peer_info = PeerInfo::new(address);
369
370 // Should start healthy
371 assert!(peer_info.is_healthy());
372
373 // Should become unhealthy with low reputation
374 peer_info.reputation = 0.1;
375 assert!(!peer_info.is_healthy());
376
377 // Should become unhealthy with many failures
378 peer_info.reputation = 0.5;
379 peer_info.failure_count = 10;
380 assert!(!peer_info.is_healthy());
381 }
382
383 #[test]
384 fn test_peer_info_rtt_calculation() {
385 let address: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
386 let mut peer_info = PeerInfo::new(address);
387
388 // No measurements initially
389 assert!(peer_info.average_rtt().is_none());
390
391 // Add measurements
392 peer_info.rtt_measurements.push(Duration::from_millis(50));
393 peer_info.rtt_measurements.push(Duration::from_millis(100));
394
395 let avg_rtt = peer_info.average_rtt().unwrap();
396 assert_eq!(avg_rtt, Duration::from_millis(75));
397 }
398 }