Rust · 17500 bytes Raw Blame History
1 use anyhow::Result;
2 use std::collections::HashMap;
3 use std::sync::Arc;
4 use tokio::sync::mpsc;
5 use tracing::{debug, error, info, warn};
6
7 use crate::config::Config;
8 use crate::storage::StorageManager;
9
10 /// Message types that can be exchanged between peers
11 ///
12 /// Transparency: All message types are clearly defined and logged
13 /// Safety: Messages include validation and security checks
14 #[derive(Debug, Clone)]
15 pub enum ZephyrMessage {
16 /// Health check ping
17 Ping { timestamp: u64 },
18
19 /// Health check response
20 Pong { timestamp: u64, node_info: NodeInfo },
21
22 /// Request for peer discovery
23 PeerDiscovery,
24
25 /// Response with known peers
26 PeerList { peers: Vec<PeerAddress> },
27
28 /// File chunk request
29 ChunkRequest { chunk_id: String, expected_hash: String },
30
31 /// File chunk response
32 ChunkResponse {
33 chunk_id: String,
34 data: Vec<u8>,
35 hash: String,
36 success: bool,
37 },
38
39 /// Node status announcement
40 StatusUpdate { node_info: NodeInfo },
41 }
42
43 /// Node information shared in messages
44 ///
45 /// Privacy: Only shares necessary operational information
46 #[derive(Debug, Clone)]
47 pub struct NodeInfo {
48 pub node_id: String,
49 pub version: String,
50 pub storage_available: u64,
51 pub storage_used: u64,
52 pub uptime_seconds: u64,
53 pub capabilities: Vec<String>,
54 }
55
56 /// Peer address information
57 #[derive(Debug, Clone)]
58 pub struct PeerAddress {
59 pub peer_id: String,
60 pub addresses: Vec<String>,
61 pub last_seen: u64,
62 }
63
64 /// Handles P2P message passing with security and validation
65 ///
66 /// Safety: All messages are validated before processing
67 /// Transparency: All message handling is logged
68 /// Privacy: Messages are encrypted in transit via LibP2P transport
69 pub struct MessageHandler {
70 config: Config,
71 message_tx: Option<mpsc::Sender<ZephyrMessage>>,
72 message_rx: Option<mpsc::Receiver<ZephyrMessage>>,
73 pending_requests: HashMap<String, PendingRequest>,
74 storage_manager: Option<Arc<StorageManager>>,
75 }
76
77 #[derive(Debug)]
78 struct PendingRequest {
79 timestamp: crate::SerializableInstant,
80 response_tx: mpsc::Sender<ZephyrMessage>,
81 }
82
83 impl MessageHandler {
84 /// Create a new MessageHandler
85 pub fn new(config: &Config) -> Self {
86 info!("Initializing MessageHandler with security validation");
87
88 let (message_tx, message_rx) = mpsc::channel(1000);
89
90 Self {
91 config: config.clone(),
92 message_tx: Some(message_tx),
93 message_rx: Some(message_rx),
94 pending_requests: HashMap::new(),
95 storage_manager: None,
96 }
97 }
98
99 /// Set the storage manager for chunk operations
100 ///
101 /// Safety: Enables secure chunk storage and retrieval operations
102 pub fn set_storage_manager(&mut self, storage_manager: Arc<StorageManager>) {
103 info!("Integrating MessageHandler with StorageManager");
104 self.storage_manager = Some(storage_manager);
105 }
106
107 /// Process incoming message with validation and security checks
108 ///
109 /// Safety: All messages are validated before processing
110 /// Transparency: Message processing is fully logged
111 pub async fn handle_message(&mut self, message: ZephyrMessage) -> Result<Option<ZephyrMessage>> {
112 debug!("Processing message: {:?}", message);
113
114 // Validate message before processing
115 if !self.validate_message(&message) {
116 warn!("Received invalid message, rejecting");
117 return Ok(None);
118 }
119
120 match message {
121 ZephyrMessage::Ping { timestamp } => {
122 debug!("Received ping with timestamp: {}", timestamp);
123 Ok(Some(ZephyrMessage::Pong {
124 timestamp,
125 node_info: self.get_node_info(),
126 }))
127 }
128
129 ZephyrMessage::Pong { timestamp, node_info } => {
130 debug!("Received pong from node: {}", node_info.node_id);
131 self.handle_pong(timestamp, node_info).await;
132 Ok(None)
133 }
134
135 ZephyrMessage::PeerDiscovery => {
136 debug!("Received peer discovery request");
137 Ok(Some(ZephyrMessage::PeerList {
138 peers: self.get_known_peers().await,
139 }))
140 }
141
142 ZephyrMessage::PeerList { peers } => {
143 debug!("Received peer list with {} peers", peers.len());
144 self.handle_peer_list(peers).await;
145 Ok(None)
146 }
147
148 ZephyrMessage::ChunkRequest { chunk_id, expected_hash } => {
149 debug!("Received chunk request for: {}", chunk_id);
150 self.handle_chunk_request(chunk_id, expected_hash).await
151 }
152
153 ZephyrMessage::ChunkResponse { chunk_id, data, hash, success } => {
154 debug!("Received chunk response for: {} (success: {})", chunk_id, success);
155 self.handle_chunk_response(chunk_id, data, hash, success).await;
156 Ok(None)
157 }
158
159 ZephyrMessage::StatusUpdate { node_info } => {
160 debug!("Received status update from: {}", node_info.node_id);
161 self.handle_status_update(node_info).await;
162 Ok(None)
163 }
164 }
165 }
166
167 /// Validate message for security and correctness
168 ///
169 /// Safety: Comprehensive validation prevents malicious messages
170 fn validate_message(&self, message: &ZephyrMessage) -> bool {
171 match message {
172 ZephyrMessage::Ping { timestamp } => {
173 // Check timestamp is reasonable (within 5 minutes)
174 let now = std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)
176 .unwrap()
177 .as_secs();
178
179 (*timestamp as i64 - now as i64).abs() < 300
180 }
181
182 ZephyrMessage::ChunkRequest { chunk_id, expected_hash } => {
183 // Validate chunk ID format and hash format
184 !chunk_id.is_empty() &&
185 expected_hash.len() == 64 && // SHA-256 hex length
186 expected_hash.chars().all(|c| c.is_ascii_hexdigit())
187 }
188
189 ZephyrMessage::ChunkResponse { data, hash, .. } => {
190 // Validate data size and hash format
191 data.len() <= self.config.storage.chunk_size &&
192 hash.len() == 64 &&
193 hash.chars().all(|c| c.is_ascii_hexdigit())
194 }
195
196 _ => true, // Other messages have basic validation
197 }
198 }
199
200 /// Get current node information
201 ///
202 /// Privacy: Only exposes operational information needed for network function
203 fn get_node_info(&self) -> NodeInfo {
204 NodeInfo {
205 node_id: self.config.node_id.clone().unwrap_or_default(),
206 version: "0.1.0".to_string(),
207 storage_available: self.config.storage.max_storage,
208 storage_used: 0, // TODO: Implement actual usage tracking
209 uptime_seconds: 0, // TODO: Implement uptime tracking
210 capabilities: vec![
211 "chunk-storage".to_string(),
212 "file-metadata".to_string(),
213 "peer-discovery".to_string(),
214 ],
215 }
216 }
217
218 /// Handle pong response
219 async fn handle_pong(&mut self, timestamp: u64, node_info: NodeInfo) {
220 let rtt = std::time::SystemTime::now()
221 .duration_since(std::time::UNIX_EPOCH)
222 .unwrap()
223 .as_millis() as u64 - timestamp;
224
225 info!("Pong received from {}: RTT={}ms, available={}GB",
226 node_info.node_id, rtt, node_info.storage_available / (1024*1024*1024));
227 }
228
229 /// Get list of known peers
230 async fn get_known_peers(&self) -> Vec<PeerAddress> {
231 // TODO: Integrate with PeerManager
232 vec![]
233 }
234
235 /// Handle received peer list
236 async fn handle_peer_list(&mut self, peers: Vec<PeerAddress>) {
237 info!("Received {} peer addresses for discovery", peers.len());
238 // TODO: Integrate with PeerManager to attempt connections
239 }
240
241 /// Handle chunk request
242 async fn handle_chunk_request(
243 &mut self,
244 chunk_id: String,
245 expected_hash: String
246 ) -> Result<Option<ZephyrMessage>> {
247 info!("Processing chunk request: {} (expected hash: {})", chunk_id, expected_hash);
248
249 // Check if storage manager is available
250 let storage_manager = match &self.storage_manager {
251 Some(sm) => sm,
252 None => {
253 warn!("Storage manager not available, rejecting chunk request: {}", chunk_id);
254 return Ok(Some(ZephyrMessage::ChunkResponse {
255 chunk_id,
256 data: vec![],
257 hash: expected_hash,
258 success: false,
259 }));
260 }
261 };
262
263 // Attempt to retrieve chunk from storage
264 match storage_manager.retrieve_chunk(&chunk_id).await {
265 Ok(Some(chunk_data)) => {
266 // Verify the chunk hash matches expected
267 let actual_hash = {
268 use sha2::{Digest, Sha256};
269 let mut hasher = Sha256::new();
270 hasher.update(&chunk_data);
271 hex::encode(hasher.finalize())
272 };
273
274 if actual_hash == expected_hash {
275 info!("Successfully serving chunk: {} ({} bytes)", chunk_id, chunk_data.len());
276 Ok(Some(ZephyrMessage::ChunkResponse {
277 chunk_id,
278 data: chunk_data,
279 hash: actual_hash,
280 success: true,
281 }))
282 } else {
283 warn!("Chunk hash mismatch for {}: expected {}, got {}",
284 chunk_id, expected_hash, actual_hash);
285 Ok(Some(ZephyrMessage::ChunkResponse {
286 chunk_id,
287 data: vec![],
288 hash: expected_hash,
289 success: false,
290 }))
291 }
292 }
293 Ok(None) => {
294 debug!("Chunk not found locally: {}", chunk_id);
295 Ok(Some(ZephyrMessage::ChunkResponse {
296 chunk_id,
297 data: vec![],
298 hash: expected_hash,
299 success: false,
300 }))
301 }
302 Err(e) => {
303 error!("Error retrieving chunk {}: {}", chunk_id, e);
304 Ok(Some(ZephyrMessage::ChunkResponse {
305 chunk_id,
306 data: vec![],
307 hash: expected_hash,
308 success: false,
309 }))
310 }
311 }
312 }
313
314 /// Handle chunk response
315 async fn handle_chunk_response(
316 &mut self,
317 chunk_id: String,
318 data: Vec<u8>,
319 hash: String,
320 success: bool,
321 ) {
322 if success && !data.is_empty() {
323 info!("Successfully received chunk: {} ({} bytes)", chunk_id, data.len());
324
325 // Validate hash
326 let actual_hash = {
327 use sha2::{Digest, Sha256};
328 let mut hasher = Sha256::new();
329 hasher.update(&data);
330 hex::encode(hasher.finalize())
331 };
332
333 if actual_hash != hash {
334 error!("Received chunk {} with invalid hash: expected {}, got {}",
335 chunk_id, hash, actual_hash);
336 return;
337 }
338
339 // Store chunk if storage manager is available
340 if let Some(storage_manager) = &self.storage_manager {
341 match storage_manager.store_chunk(&chunk_id, &data).await {
342 Ok(stored_hash) => {
343 info!("Successfully stored received chunk: {} (hash: {})",
344 chunk_id, stored_hash);
345 }
346 Err(e) => {
347 error!("Failed to store received chunk {}: {}", chunk_id, e);
348 }
349 }
350 } else {
351 warn!("Received chunk {} but storage manager not available", chunk_id);
352 }
353 } else {
354 warn!("Failed to retrieve chunk: {} (success: {})", chunk_id, success);
355 }
356 }
357
358 /// Handle status update from peer
359 async fn handle_status_update(&mut self, node_info: NodeInfo) {
360 info!("Status update from {}: {}GB available, {} capabilities",
361 node_info.node_id,
362 node_info.storage_available / (1024*1024*1024),
363 node_info.capabilities.len());
364 }
365
366 /// Send a message (for future integration with LibP2P)
367 pub async fn send_message(&self, message: ZephyrMessage) -> Result<()> {
368 if let Some(tx) = &self.message_tx {
369 tx.send(message).await.map_err(|e| anyhow::anyhow!("Send error: {}", e))?;
370 }
371 Ok(())
372 }
373 }
374
375 #[cfg(test)]
376 mod tests {
377 use super::*;
378 use std::time::{SystemTime, UNIX_EPOCH};
379
380 fn create_test_config() -> Config {
381 let mut config = Config::default();
382 config.node_id = Some("test-node".to_string());
383 config
384 }
385
386 #[tokio::test]
387 async fn test_message_handler_creation() {
388 let config = create_test_config();
389 let handler = MessageHandler::new(&config);
390
391 // Verify handler is properly initialized
392 assert!(handler.message_tx.is_some());
393 assert!(handler.pending_requests.is_empty());
394 }
395
396 #[tokio::test]
397 async fn test_ping_pong_handling() {
398 let config = create_test_config();
399 let mut handler = MessageHandler::new(&config);
400
401 let timestamp = SystemTime::now()
402 .duration_since(UNIX_EPOCH)
403 .unwrap()
404 .as_secs();
405
406 let ping = ZephyrMessage::Ping { timestamp };
407 let response = handler.handle_message(ping).await.unwrap();
408
409 match response {
410 Some(ZephyrMessage::Pong { timestamp: resp_ts, node_info }) => {
411 assert_eq!(resp_ts, timestamp);
412 assert_eq!(node_info.node_id, "test-node");
413 }
414 _ => panic!("Expected Pong response"),
415 }
416 }
417
418 #[tokio::test]
419 async fn test_message_validation() {
420 let config = create_test_config();
421 let handler = MessageHandler::new(&config);
422
423 // Valid ping
424 let now = SystemTime::now()
425 .duration_since(UNIX_EPOCH)
426 .unwrap()
427 .as_secs();
428 let valid_ping = ZephyrMessage::Ping { timestamp: now };
429 assert!(handler.validate_message(&valid_ping));
430
431 // Invalid ping (too old)
432 let old_ping = ZephyrMessage::Ping { timestamp: now - 400 };
433 assert!(!handler.validate_message(&old_ping));
434
435 // Valid chunk request
436 let valid_chunk_req = ZephyrMessage::ChunkRequest {
437 chunk_id: "chunk123".to_string(),
438 expected_hash: "a".repeat(64), // 64 hex chars
439 };
440 assert!(handler.validate_message(&valid_chunk_req));
441
442 // Invalid chunk request (bad hash)
443 let invalid_chunk_req = ZephyrMessage::ChunkRequest {
444 chunk_id: "chunk123".to_string(),
445 expected_hash: "invalid-hash".to_string(),
446 };
447 assert!(!handler.validate_message(&invalid_chunk_req));
448 }
449
450 #[tokio::test]
451 async fn test_peer_discovery() {
452 let config = create_test_config();
453 let mut handler = MessageHandler::new(&config);
454
455 let discovery_req = ZephyrMessage::PeerDiscovery;
456 let response = handler.handle_message(discovery_req).await.unwrap();
457
458 match response {
459 Some(ZephyrMessage::PeerList { peers }) => {
460 // Should return empty list initially
461 assert!(peers.is_empty());
462 }
463 _ => panic!("Expected PeerList response"),
464 }
465 }
466
467 #[tokio::test]
468 async fn test_chunk_request_handling() {
469 let config = create_test_config();
470 let mut handler = MessageHandler::new(&config);
471
472 let chunk_req = ZephyrMessage::ChunkRequest {
473 chunk_id: "test-chunk".to_string(),
474 expected_hash: "a".repeat(64),
475 };
476
477 let response = handler.handle_message(chunk_req).await.unwrap();
478
479 match response {
480 Some(ZephyrMessage::ChunkResponse { success, .. }) => {
481 // Should fail since storage not implemented yet
482 assert!(!success);
483 }
484 _ => panic!("Expected ChunkResponse"),
485 }
486 }
487
488 #[test]
489 fn test_node_info_generation() {
490 let config = create_test_config();
491 let handler = MessageHandler::new(&config);
492
493 let node_info = handler.get_node_info();
494 assert_eq!(node_info.node_id, "test-node");
495 assert_eq!(node_info.version, "0.1.0");
496 assert!(!node_info.capabilities.is_empty());
497 }
498 }