Rust · 13558 bytes Raw Blame History
1 use anyhow::Result;
2 use std::collections::HashMap;
3 use tokio::sync::mpsc;
4 use tracing::{debug, error, info, warn};
5
6 use crate::config::Config;
7
8 /// Message types that can be exchanged between peers
9 ///
10 /// Transparency: All message types are clearly defined and logged
11 /// Safety: Messages include validation and security checks
12 #[derive(Debug, Clone)]
13 pub enum ZephyrMessage {
14 /// Health check ping
15 Ping { timestamp: u64 },
16
17 /// Health check response
18 Pong { timestamp: u64, node_info: NodeInfo },
19
20 /// Request for peer discovery
21 PeerDiscovery,
22
23 /// Response with known peers
24 PeerList { peers: Vec<PeerAddress> },
25
26 /// File chunk request
27 ChunkRequest { chunk_id: String, expected_hash: String },
28
29 /// File chunk response
30 ChunkResponse {
31 chunk_id: String,
32 data: Vec<u8>,
33 hash: String,
34 success: bool,
35 },
36
37 /// Node status announcement
38 StatusUpdate { node_info: NodeInfo },
39 }
40
41 /// Node information shared in messages
42 ///
43 /// Privacy: Only shares necessary operational information
44 #[derive(Debug, Clone)]
45 pub struct NodeInfo {
46 pub node_id: String,
47 pub version: String,
48 pub storage_available: u64,
49 pub storage_used: u64,
50 pub uptime_seconds: u64,
51 pub capabilities: Vec<String>,
52 }
53
54 /// Peer address information
55 #[derive(Debug, Clone)]
56 pub struct PeerAddress {
57 pub peer_id: String,
58 pub addresses: Vec<String>,
59 pub last_seen: u64,
60 }
61
62 /// Handles P2P message passing with security and validation
63 ///
64 /// Safety: All messages are validated before processing
65 /// Transparency: All message handling is logged
66 /// Privacy: Messages are encrypted in transit via LibP2P transport
67 pub struct MessageHandler {
68 config: Config,
69 message_tx: Option<mpsc::Sender<ZephyrMessage>>,
70 message_rx: Option<mpsc::Receiver<ZephyrMessage>>,
71 pending_requests: HashMap<String, PendingRequest>,
72 }
73
74 #[derive(Debug)]
75 struct PendingRequest {
76 timestamp: std::time::Instant,
77 response_tx: mpsc::Sender<ZephyrMessage>,
78 }
79
80 impl MessageHandler {
81 /// Create a new MessageHandler
82 pub fn new(config: &Config) -> Self {
83 info!("Initializing MessageHandler with security validation");
84
85 let (message_tx, message_rx) = mpsc::channel(1000);
86
87 Self {
88 config: config.clone(),
89 message_tx: Some(message_tx),
90 message_rx: Some(message_rx),
91 pending_requests: HashMap::new(),
92 }
93 }
94
95 /// Process incoming message with validation and security checks
96 ///
97 /// Safety: All messages are validated before processing
98 /// Transparency: Message processing is fully logged
99 pub async fn handle_message(&mut self, message: ZephyrMessage) -> Result<Option<ZephyrMessage>> {
100 debug!("Processing message: {:?}", message);
101
102 // Validate message before processing
103 if !self.validate_message(&message) {
104 warn!("Received invalid message, rejecting");
105 return Ok(None);
106 }
107
108 match message {
109 ZephyrMessage::Ping { timestamp } => {
110 debug!("Received ping with timestamp: {}", timestamp);
111 Ok(Some(ZephyrMessage::Pong {
112 timestamp,
113 node_info: self.get_node_info(),
114 }))
115 }
116
117 ZephyrMessage::Pong { timestamp, node_info } => {
118 debug!("Received pong from node: {}", node_info.node_id);
119 self.handle_pong(timestamp, node_info).await;
120 Ok(None)
121 }
122
123 ZephyrMessage::PeerDiscovery => {
124 debug!("Received peer discovery request");
125 Ok(Some(ZephyrMessage::PeerList {
126 peers: self.get_known_peers().await,
127 }))
128 }
129
130 ZephyrMessage::PeerList { peers } => {
131 debug!("Received peer list with {} peers", peers.len());
132 self.handle_peer_list(peers).await;
133 Ok(None)
134 }
135
136 ZephyrMessage::ChunkRequest { chunk_id, expected_hash } => {
137 debug!("Received chunk request for: {}", chunk_id);
138 self.handle_chunk_request(chunk_id, expected_hash).await
139 }
140
141 ZephyrMessage::ChunkResponse { chunk_id, data, hash, success } => {
142 debug!("Received chunk response for: {} (success: {})", chunk_id, success);
143 self.handle_chunk_response(chunk_id, data, hash, success).await;
144 Ok(None)
145 }
146
147 ZephyrMessage::StatusUpdate { node_info } => {
148 debug!("Received status update from: {}", node_info.node_id);
149 self.handle_status_update(node_info).await;
150 Ok(None)
151 }
152 }
153 }
154
155 /// Validate message for security and correctness
156 ///
157 /// Safety: Comprehensive validation prevents malicious messages
158 fn validate_message(&self, message: &ZephyrMessage) -> bool {
159 match message {
160 ZephyrMessage::Ping { timestamp } => {
161 // Check timestamp is reasonable (within 5 minutes)
162 let now = std::time::SystemTime::now()
163 .duration_since(std::time::UNIX_EPOCH)
164 .unwrap()
165 .as_secs();
166
167 (*timestamp as i64 - now as i64).abs() < 300
168 }
169
170 ZephyrMessage::ChunkRequest { chunk_id, expected_hash } => {
171 // Validate chunk ID format and hash format
172 !chunk_id.is_empty() &&
173 expected_hash.len() == 64 && // SHA-256 hex length
174 expected_hash.chars().all(|c| c.is_ascii_hexdigit())
175 }
176
177 ZephyrMessage::ChunkResponse { data, hash, .. } => {
178 // Validate data size and hash format
179 data.len() <= self.config.storage.chunk_size &&
180 hash.len() == 64 &&
181 hash.chars().all(|c| c.is_ascii_hexdigit())
182 }
183
184 _ => true, // Other messages have basic validation
185 }
186 }
187
188 /// Get current node information
189 ///
190 /// Privacy: Only exposes operational information needed for network function
191 fn get_node_info(&self) -> NodeInfo {
192 NodeInfo {
193 node_id: self.config.node_id.clone().unwrap_or_default(),
194 version: "0.1.0".to_string(),
195 storage_available: self.config.storage.max_storage,
196 storage_used: 0, // TODO: Implement actual usage tracking
197 uptime_seconds: 0, // TODO: Implement uptime tracking
198 capabilities: vec![
199 "chunk-storage".to_string(),
200 "file-metadata".to_string(),
201 "peer-discovery".to_string(),
202 ],
203 }
204 }
205
206 /// Handle pong response
207 async fn handle_pong(&mut self, timestamp: u64, node_info: NodeInfo) {
208 let rtt = std::time::SystemTime::now()
209 .duration_since(std::time::UNIX_EPOCH)
210 .unwrap()
211 .as_millis() as u64 - timestamp;
212
213 info!("Pong received from {}: RTT={}ms, available={}GB",
214 node_info.node_id, rtt, node_info.storage_available / (1024*1024*1024));
215 }
216
217 /// Get list of known peers
218 async fn get_known_peers(&self) -> Vec<PeerAddress> {
219 // TODO: Integrate with PeerManager
220 vec![]
221 }
222
223 /// Handle received peer list
224 async fn handle_peer_list(&mut self, peers: Vec<PeerAddress>) {
225 info!("Received {} peer addresses for discovery", peers.len());
226 // TODO: Integrate with PeerManager to attempt connections
227 }
228
229 /// Handle chunk request
230 async fn handle_chunk_request(
231 &mut self,
232 chunk_id: String,
233 expected_hash: String
234 ) -> Result<Option<ZephyrMessage>> {
235 // TODO: Integrate with storage layer
236 warn!("Chunk storage not yet implemented, rejecting request for: {}", chunk_id);
237
238 Ok(Some(ZephyrMessage::ChunkResponse {
239 chunk_id,
240 data: vec![],
241 hash: expected_hash,
242 success: false,
243 }))
244 }
245
246 /// Handle chunk response
247 async fn handle_chunk_response(
248 &mut self,
249 chunk_id: String,
250 data: Vec<u8>,
251 hash: String,
252 success: bool,
253 ) {
254 if success {
255 info!("Successfully received chunk: {} ({}bytes)", chunk_id, data.len());
256 // TODO: Validate hash and store chunk
257 } else {
258 warn!("Failed to retrieve chunk: {}", chunk_id);
259 }
260 }
261
262 /// Handle status update from peer
263 async fn handle_status_update(&mut self, node_info: NodeInfo) {
264 info!("Status update from {}: {}GB available, {} capabilities",
265 node_info.node_id,
266 node_info.storage_available / (1024*1024*1024),
267 node_info.capabilities.len());
268 }
269
270 /// Send a message (for future integration with LibP2P)
271 pub async fn send_message(&self, message: ZephyrMessage) -> Result<()> {
272 if let Some(tx) = &self.message_tx {
273 tx.send(message).await.map_err(|e| anyhow::anyhow!("Send error: {}", e))?;
274 }
275 Ok(())
276 }
277 }
278
279 #[cfg(test)]
280 mod tests {
281 use super::*;
282 use std::time::{SystemTime, UNIX_EPOCH};
283
284 fn create_test_config() -> Config {
285 let mut config = Config::default();
286 config.node_id = Some("test-node".to_string());
287 config
288 }
289
290 #[tokio::test]
291 async fn test_message_handler_creation() {
292 let config = create_test_config();
293 let handler = MessageHandler::new(&config);
294
295 // Verify handler is properly initialized
296 assert!(handler.message_tx.is_some());
297 assert!(handler.pending_requests.is_empty());
298 }
299
300 #[tokio::test]
301 async fn test_ping_pong_handling() {
302 let config = create_test_config();
303 let mut handler = MessageHandler::new(&config);
304
305 let timestamp = SystemTime::now()
306 .duration_since(UNIX_EPOCH)
307 .unwrap()
308 .as_secs();
309
310 let ping = ZephyrMessage::Ping { timestamp };
311 let response = handler.handle_message(ping).await.unwrap();
312
313 match response {
314 Some(ZephyrMessage::Pong { timestamp: resp_ts, node_info }) => {
315 assert_eq!(resp_ts, timestamp);
316 assert_eq!(node_info.node_id, "test-node");
317 }
318 _ => panic!("Expected Pong response"),
319 }
320 }
321
322 #[tokio::test]
323 async fn test_message_validation() {
324 let config = create_test_config();
325 let handler = MessageHandler::new(&config);
326
327 // Valid ping
328 let now = SystemTime::now()
329 .duration_since(UNIX_EPOCH)
330 .unwrap()
331 .as_secs();
332 let valid_ping = ZephyrMessage::Ping { timestamp: now };
333 assert!(handler.validate_message(&valid_ping));
334
335 // Invalid ping (too old)
336 let old_ping = ZephyrMessage::Ping { timestamp: now - 400 };
337 assert!(!handler.validate_message(&old_ping));
338
339 // Valid chunk request
340 let valid_chunk_req = ZephyrMessage::ChunkRequest {
341 chunk_id: "chunk123".to_string(),
342 expected_hash: "a".repeat(64), // 64 hex chars
343 };
344 assert!(handler.validate_message(&valid_chunk_req));
345
346 // Invalid chunk request (bad hash)
347 let invalid_chunk_req = ZephyrMessage::ChunkRequest {
348 chunk_id: "chunk123".to_string(),
349 expected_hash: "invalid-hash".to_string(),
350 };
351 assert!(!handler.validate_message(&invalid_chunk_req));
352 }
353
354 #[tokio::test]
355 async fn test_peer_discovery() {
356 let config = create_test_config();
357 let mut handler = MessageHandler::new(&config);
358
359 let discovery_req = ZephyrMessage::PeerDiscovery;
360 let response = handler.handle_message(discovery_req).await.unwrap();
361
362 match response {
363 Some(ZephyrMessage::PeerList { peers }) => {
364 // Should return empty list initially
365 assert!(peers.is_empty());
366 }
367 _ => panic!("Expected PeerList response"),
368 }
369 }
370
371 #[tokio::test]
372 async fn test_chunk_request_handling() {
373 let config = create_test_config();
374 let mut handler = MessageHandler::new(&config);
375
376 let chunk_req = ZephyrMessage::ChunkRequest {
377 chunk_id: "test-chunk".to_string(),
378 expected_hash: "a".repeat(64),
379 };
380
381 let response = handler.handle_message(chunk_req).await.unwrap();
382
383 match response {
384 Some(ZephyrMessage::ChunkResponse { success, .. }) => {
385 // Should fail since storage not implemented yet
386 assert!(!success);
387 }
388 _ => panic!("Expected ChunkResponse"),
389 }
390 }
391
392 #[test]
393 fn test_node_info_generation() {
394 let config = create_test_config();
395 let handler = MessageHandler::new(&config);
396
397 let node_info = handler.get_node_info();
398 assert_eq!(node_info.node_id, "test-node");
399 assert_eq!(node_info.version, "0.1.0");
400 assert!(!node_info.capabilities.is_empty());
401 }
402 }