Rust · 24453 bytes Raw Blame History
1 use anyhow::{Context, Result};
2 use std::path::PathBuf;
3 use std::sync::Arc;
4 use tokio::sync::{mpsc, RwLock};
5 use tracing::{debug, info, warn, error};
6
7 use crate::config::Config;
8 use crate::network::{NetworkManager, message_handler::{ZephyrMessage, NodeInfo}};
9 use crate::storage::{StorageManager, StorageConfig as StorageManagerConfig};
10 use crate::coordinator::{CoordinatorManager, RegistrationStatus};
11
12 /// Integrated node manager coordinating networking and storage
13 ///
14 /// Safety: Coordinates secure operations between network and storage layers
15 /// Transparency: Provides comprehensive node status and metrics
16 /// Privacy: Handles secure chunk distribution and encrypted metadata
17 pub struct NodeManager {
18 /// Network layer manager
19 network_manager: NetworkManager,
20
21 /// Storage layer manager
22 pub storage_manager: Arc<StorageManager>,
23
24 /// Coordinator manager for network coordination
25 coordinator_manager: Option<CoordinatorManager>,
26
27 /// Configuration
28 config: Config,
29
30 /// Message channel from network to node manager
31 message_rx: mpsc::Receiver<ZephyrMessage>,
32
33 /// Message channel from node manager to network
34 message_tx: mpsc::Sender<ZephyrMessage>,
35
36 /// Node statistics
37 node_stats: Arc<RwLock<NodeStats>>,
38
39 /// Base storage path
40 storage_path: PathBuf,
41 }
42
43 /// Comprehensive node statistics
44 ///
45 /// Transparency: Detailed metrics for monitoring and audit
46 #[derive(Debug, Clone)]
47 pub struct NodeStats {
48 /// Number of chunks served to other peers
49 pub chunks_served: u64,
50
51 /// Number of chunks retrieved from peers
52 pub chunks_retrieved: u64,
53
54 /// Total bytes sent to peers
55 pub bytes_sent: u64,
56
57 /// Total bytes received from peers
58 pub bytes_received: u64,
59
60 /// Number of active peer connections
61 pub peer_connections: u32,
62
63 /// Number of failed chunk requests
64 pub failed_requests: u64,
65
66 /// Uptime in seconds
67 pub uptime_seconds: u64,
68
69 /// Node start time
70 pub start_time: std::time::Instant,
71 }
72
73 /// File distribution strategy for P2P sharing
74 #[derive(Debug, Clone)]
75 pub enum DistributionStrategy {
76 /// Store locally only
77 LocalOnly,
78
79 /// Replicate to N closest peers
80 Replicate { redundancy: u32 },
81
82 /// Distribute chunks across network
83 Distribute { min_peers: u32 },
84 }
85
86 impl NodeManager {
87 /// Create a new integrated node manager
88 ///
89 /// Safety: Initializes both network and storage with secure configurations
90 pub async fn new(config: Config, storage_path: PathBuf) -> Result<Self> {
91 info!("Initializing NodeManager with integrated network and storage");
92
93 // Create message channel for network-storage communication
94 let (message_tx, message_rx) = mpsc::channel::<ZephyrMessage>(1000);
95
96 // Initialize storage manager
97 let storage_config = StorageManagerConfig {
98 max_capacity: config.storage.max_storage,
99 warning_threshold: 0.8,
100 critical_threshold: 0.95,
101 default_chunk_size: config.storage.chunk_size,
102 max_file_size: 1024 * 1024 * 1024, // 1GB max file
103 enable_gc: true,
104 gc_interval: 3600, // 1 hour
105 };
106
107 let storage_manager = Arc::new(
108 StorageManager::new(&storage_path, storage_config).await
109 .context("Failed to initialize storage manager")?
110 );
111
112 // Initialize network manager with message channel
113 let network_manager = NetworkManager::new(config.clone()).await
114 .context("Failed to initialize network manager")?;
115
116 // Initialize coordinator manager if URL is provided
117 let coordinator_manager = if !config.coordinator.url.is_empty() {
118 match CoordinatorManager::new(config.coordinator.url.clone()).await {
119 Ok(manager) => {
120 info!("Successfully connected to coordinator at {}", config.coordinator.url);
121 Some(manager)
122 }
123 Err(e) => {
124 warn!("Failed to connect to coordinator at {}: {}. Running in standalone mode.", config.coordinator.url, e);
125 None
126 }
127 }
128 } else {
129 info!("No coordinator URL configured. Running in standalone mode.");
130 None
131 };
132
133 let node_stats = Arc::new(RwLock::new(NodeStats {
134 chunks_served: 0,
135 chunks_retrieved: 0,
136 bytes_sent: 0,
137 bytes_received: 0,
138 peer_connections: 0,
139 failed_requests: 0,
140 uptime_seconds: 0,
141 start_time: std::time::Instant::now(),
142 }));
143
144 Ok(Self {
145 network_manager,
146 storage_manager,
147 coordinator_manager,
148 config,
149 message_rx,
150 message_tx,
151 node_stats,
152 storage_path,
153 })
154 }
155
156 /// Start the integrated node
157 ///
158 /// Safety: Starts both network and storage services with proper error handling
159 pub async fn start(&mut self) -> Result<()> {
160 info!("Starting integrated ZephyrFS node");
161
162 // Start storage manager background tasks (if any)
163 self.start_storage_tasks().await?;
164
165 // Start network manager
166 self.network_manager.start().await
167 .context("Failed to start network manager")?;
168
169 // Register with coordinator if available
170 if self.coordinator_manager.is_some() {
171 let node_status = self.get_node_status().await;
172 let addresses = vec![
173 format!("{}:{}", "127.0.0.1", self.config.network.p2p_port),
174 format!("{}:{}", "127.0.0.1", self.config.network.api_port),
175 ];
176
177 let mut capabilities = std::collections::HashMap::new();
178 capabilities.insert("version".to_string(), node_status.version);
179 capabilities.insert("storage".to_string(), "true".to_string());
180 capabilities.insert("encryption".to_string(), "true".to_string());
181
182 if let Some(coordinator) = self.coordinator_manager.as_mut() {
183 let response = coordinator.register_node(
184 addresses,
185 node_status.storage_capacity,
186 capabilities,
187 ).await?;
188
189 if response.success {
190 info!("Successfully registered with coordinator. Node ID: {}", coordinator.get_node_id());
191 if !response.bootstrap_peers.is_empty() {
192 info!("Received {} bootstrap peers from coordinator", response.bootstrap_peers.len());
193 // TODO: Connect to bootstrap peers
194 }
195 } else {
196 warn!("Failed to register with coordinator: {}", response.message);
197 }
198 }
199
200 self.start_coordinator_heartbeat().await;
201 }
202
203 // Start message processing loop
204 self.start_message_processing().await;
205
206 info!("ZephyrFS node started successfully");
207 Ok(())
208 }
209
210 /// Store a file and optionally distribute to peers
211 ///
212 /// Safety: Validates file integrity and enforces capacity limits
213 /// Privacy: Supports encrypted storage and secure distribution
214 pub async fn store_file(
215 &self,
216 file_id: &str,
217 data: &[u8],
218 filename: &str,
219 strategy: DistributionStrategy,
220 ) -> Result<String> {
221 info!("Storing file: {} ({} bytes) with strategy: {:?}", filename, data.len(), strategy);
222
223 // Store file locally first
224 let file_hash = self.storage_manager.store_file(file_id, data, filename).await
225 .context("Failed to store file locally")?;
226
227 // Update statistics
228 {
229 let mut stats = self.node_stats.write().await;
230 stats.bytes_sent += data.len() as u64; // Will be sent to peers
231 }
232
233 // Handle distribution strategy
234 match strategy {
235 DistributionStrategy::LocalOnly => {
236 debug!("File stored locally only: {}", file_id);
237 }
238 DistributionStrategy::Replicate { redundancy } => {
239 self.replicate_file_to_peers(file_id, redundancy).await?;
240 }
241 DistributionStrategy::Distribute { min_peers } => {
242 self.distribute_chunks_to_peers(file_id, min_peers).await?;
243 }
244 }
245
246 // Register file with coordinator if available
247 if let Err(e) = self.register_file_with_coordinator(file_id, &file_hash, data.len() as u64, filename).await {
248 warn!("Failed to register file with coordinator: {}", e);
249 }
250
251 // Announce file availability to peers
252 if let Err(e) = self.announce_file_to_peers(file_id, &file_hash).await {
253 warn!("Failed to announce file to peers: {}", e);
254 }
255
256 info!("Successfully stored and distributed file: {} with hash: {}", file_id, file_hash);
257 Ok(file_hash)
258 }
259
260 /// Retrieve a file, attempting local storage first, then peers
261 ///
262 /// Safety: Verifies chunk integrity from all sources
263 /// Transparency: Logs all retrieval attempts and sources
264 pub async fn retrieve_file(&self, file_id: &str) -> Result<Option<Vec<u8>>> {
265 info!("Retrieving file: {}", file_id);
266
267 // Try local storage first
268 match self.storage_manager.retrieve_file(file_id).await? {
269 Some(data) => {
270 debug!("File retrieved from local storage: {}", file_id);
271 return Ok(Some(data));
272 }
273 None => {
274 debug!("File not found locally, attempting peer retrieval: {}", file_id);
275 }
276 }
277
278 // Attempt to retrieve from peers
279 match self.retrieve_file_from_peers(file_id).await? {
280 Some(data) => {
281 info!("Successfully retrieved file from peers: {}", file_id);
282
283 // Store locally for future access
284 if let Ok(_) = self.storage_manager.store_file(file_id, &data, "retrieved_file").await {
285 debug!("Cached retrieved file locally: {}", file_id);
286 }
287
288 // Update statistics
289 {
290 let mut stats = self.node_stats.write().await;
291 stats.chunks_retrieved += 1;
292 stats.bytes_received += data.len() as u64;
293 }
294
295 Ok(Some(data))
296 }
297 None => {
298 warn!("File not found locally or on peers: {}", file_id);
299 Ok(None)
300 }
301 }
302 }
303
304 /// Delete a file locally and notify peers
305 ///
306 /// Safety: Ensures secure deletion and updates peer information
307 pub async fn delete_file(&self, file_id: &str) -> Result<bool> {
308 info!("Deleting file: {}", file_id);
309
310 // Delete locally
311 let deleted = self.storage_manager.delete_file(file_id).await?;
312
313 if deleted {
314 // Notify peers about deletion (future implementation)
315 debug!("File deleted locally, peer notification not yet implemented: {}", file_id);
316 }
317
318 Ok(deleted)
319 }
320
321 /// Get comprehensive node status
322 ///
323 /// Transparency: Provides detailed node metrics for monitoring
324 pub async fn get_node_status(&self) -> NodeStatus {
325 let stats = self.node_stats.read().await;
326 let capacity_info = self.storage_manager.get_capacity_info().await;
327 let storage_stats = self.storage_manager.get_storage_stats().await.unwrap_or_default();
328
329 // Calculate uptime
330 let uptime_seconds = stats.start_time.elapsed().as_secs();
331
332 let node_id = if let Some(coordinator) = &self.coordinator_manager {
333 coordinator.get_node_id().to_string()
334 } else {
335 self.config.node_id.clone().unwrap_or_else(|| "local_node".to_string())
336 };
337
338 NodeStatus {
339 node_id,
340 version: env!("CARGO_PKG_VERSION").to_string(),
341 uptime_seconds,
342 peer_connections: stats.peer_connections,
343 chunks_served: stats.chunks_served,
344 chunks_retrieved: stats.chunks_retrieved,
345 bytes_sent: stats.bytes_sent,
346 bytes_received: stats.bytes_received,
347 failed_requests: stats.failed_requests,
348 storage_capacity: capacity_info.total_capacity,
349 storage_used: capacity_info.used_space,
350 storage_available: capacity_info.available_space,
351 file_count: capacity_info.file_count,
352 chunk_count: storage_stats.total_chunks,
353 }
354 }
355
356 /// Shutdown the node gracefully
357 ///
358 /// Safety: Ensures clean shutdown of both network and storage
359 pub async fn shutdown(&mut self) -> Result<()> {
360 info!("Shutting down ZephyrFS node");
361
362 // Unregister from coordinator if connected
363 if let Some(coordinator) = &mut self.coordinator_manager {
364 if let Err(e) = coordinator.unregister_node(Some("Normal shutdown".to_string())).await {
365 warn!("Failed to unregister from coordinator: {}", e);
366 }
367 }
368
369 // Shutdown network manager
370 self.network_manager.shutdown().await
371 .context("Failed to shutdown network manager")?;
372
373 // Storage manager cleanup (if needed)
374 // Currently storage manager doesn't need explicit cleanup
375
376 info!("ZephyrFS node shutdown complete");
377 Ok(())
378 }
379
380 /// Start storage-related background tasks
381 async fn start_storage_tasks(&self) -> Result<()> {
382 // Future: garbage collection, capacity monitoring, etc.
383 debug!("Storage background tasks initialized");
384 Ok(())
385 }
386
387 /// Start message processing loop
388 async fn start_message_processing(&self) {
389 let storage_manager = Arc::clone(&self.storage_manager);
390 let node_stats = Arc::clone(&self.node_stats);
391
392 // Spawn message processing task
393 tokio::spawn(async move {
394 debug!("Starting message processing loop");
395 // Future: Process messages from message_rx
396 // This will handle chunk requests/responses from peers
397 });
398 }
399
400 /// Replicate file to specified number of peers
401 async fn replicate_file_to_peers(&self, _file_id: &str, _redundancy: u32) -> Result<()> {
402 // TODO: Implement peer replication
403 debug!("File replication not yet implemented");
404 Ok(())
405 }
406
407 /// Distribute file chunks across peers
408 async fn distribute_chunks_to_peers(&self, _file_id: &str, _min_peers: u32) -> Result<()> {
409 // TODO: Implement chunk distribution
410 debug!("Chunk distribution not yet implemented");
411 Ok(())
412 }
413
414 /// Retrieve file from peers
415 async fn retrieve_file_from_peers(&self, file_id: &str) -> Result<Option<Vec<u8>>> {
416 info!("Attempting to retrieve file from peers: {}", file_id);
417
418 // First, check if we have metadata about this file from other sources
419 // This is a simplified implementation - in a real system, we'd have
420 // a distributed hash table or peer discovery mechanism
421
422 // For now, we'll simulate trying to get chunks by ID
423 // In a real implementation, this would:
424 // 1. Query the DHT for file metadata
425 // 2. Get the list of chunk IDs that comprise the file
426 // 3. Request each chunk from peers
427 // 4. Reconstruct the file
428
429 debug!("Peer file retrieval requires DHT implementation - returning None for now");
430 Ok(None)
431 }
432
433 /// Request a specific chunk from peers
434 ///
435 /// Safety: Validates chunk integrity from all peer sources
436 /// Transparency: Logs all peer chunk requests and responses
437 pub async fn request_chunk_from_peers(&self, chunk_id: &str, expected_hash: &str) -> Result<Option<Vec<u8>>> {
438 info!("Requesting chunk from peers: {} (expected hash: {})", chunk_id, expected_hash);
439
440 // Create chunk request message
441 let request = ZephyrMessage::ChunkRequest {
442 chunk_id: chunk_id.to_string(),
443 expected_hash: expected_hash.to_string(),
444 };
445
446 // In a real implementation, this would:
447 // 1. Send the request to multiple peers
448 // 2. Wait for responses with timeout
449 // 3. Validate responses and return the first valid one
450 // 4. Update peer reputation based on response quality
451
452 debug!("Peer chunk requests require network broadcast - not yet implemented");
453
454 // Update statistics for attempted request
455 {
456 let mut stats = self.node_stats.write().await;
457 stats.failed_requests += 1;
458 }
459
460 Ok(None)
461 }
462
463 /// Announce file availability to peers
464 ///
465 /// Transparency: Announces stored files to help peers discover content
466 pub async fn announce_file_to_peers(&self, file_id: &str, file_hash: &str) -> Result<()> {
467 info!("Announcing file availability to peers: {} (hash: {})", file_id, file_hash);
468
469 // Get our node info for the announcement
470 let node_status = self.get_node_status().await;
471
472 let announcement = ZephyrMessage::StatusUpdate {
473 node_info: NodeInfo {
474 node_id: node_status.node_id,
475 version: node_status.version,
476 storage_available: node_status.storage_available,
477 storage_used: node_status.storage_used,
478 uptime_seconds: node_status.uptime_seconds,
479 capabilities: vec!["file_storage".to_string(), "chunk_serving".to_string()],
480 }
481 };
482
483 // In a real implementation, this would broadcast to all connected peers
484 debug!("File announcement broadcast not yet implemented");
485
486 Ok(())
487 }
488
489 /// Register this node with the coordinator
490 async fn register_with_coordinator(&self, coordinator: &mut CoordinatorManager) -> Result<()> {
491 info!("Registering node with coordinator");
492
493 let node_status = self.get_node_status().await;
494 let addresses = vec![
495 format!("{}:{}", "127.0.0.1", self.config.network.p2p_port),
496 format!("{}:{}", "127.0.0.1", self.config.network.api_port),
497 ];
498
499 let mut capabilities = std::collections::HashMap::new();
500 capabilities.insert("version".to_string(), node_status.version);
501 capabilities.insert("storage".to_string(), "true".to_string());
502 capabilities.insert("encryption".to_string(), "true".to_string());
503
504 let response = coordinator.register_node(
505 addresses,
506 node_status.storage_capacity,
507 capabilities,
508 ).await?;
509
510 if response.success {
511 info!("Successfully registered with coordinator. Node ID: {}", coordinator.get_node_id());
512 if !response.bootstrap_peers.is_empty() {
513 info!("Received {} bootstrap peers from coordinator", response.bootstrap_peers.len());
514 // TODO: Connect to bootstrap peers
515 }
516 } else {
517 warn!("Failed to register with coordinator: {}", response.message);
518 }
519
520 Ok(())
521 }
522
523 /// Start coordinator heartbeat loop
524 async fn start_coordinator_heartbeat(&self) {
525 if let Some(coordinator) = &self.coordinator_manager {
526 let node_stats = Arc::clone(&self.node_stats);
527 let storage_manager = Arc::clone(&self.storage_manager);
528
529 coordinator.start_heartbeat(move || {
530 let stats = tokio::task::block_in_place(|| {
531 tokio::runtime::Handle::current().block_on(async {
532 let node_stats = node_stats.read().await;
533 let capacity_info = storage_manager.get_capacity_info().await;
534 let uptime = node_stats.start_time.elapsed().as_secs() as i64;
535
536 crate::coordinator::types::NodeStats {
537 storage_used: capacity_info.used_space as i64,
538 storage_available: capacity_info.available_space as i64,
539 chunks_stored: capacity_info.file_count as i64, // Approximation
540 bandwidth_up: node_stats.bytes_sent as i64,
541 bandwidth_down: node_stats.bytes_received as i64,
542 cpu_usage: 0.0, // TODO: Implement CPU monitoring
543 memory_usage: 0.0, // TODO: Implement memory monitoring
544 uptime_seconds: uptime,
545 }
546 })
547 });
548 stats
549 }).await;
550
551 info!("Started coordinator heartbeat");
552 }
553 }
554
555 /// Register a file with the coordinator if available
556 async fn register_file_with_coordinator(&self, file_id: &str, file_hash: &str, file_size: u64, filename: &str) -> Result<()> {
557 if let Some(coordinator) = &self.coordinator_manager {
558 // Get file chunks from storage manager
559 let chunks = match self.storage_manager.get_file_chunks(file_id).await {
560 Ok(Some(chunk_list)) => {
561 chunk_list.into_iter().enumerate().map(|(index, chunk_id)| {
562 crate::coordinator::types::ChunkMetadata {
563 chunk_id: chunk_id.clone(),
564 hash: chunk_id, // For now, use chunk_id as hash
565 size: self.config.storage.chunk_size as i64, // Default chunk size
566 index: index as i32,
567 }
568 }).collect()
569 }
570 _ => Vec::new(),
571 };
572
573 let response = coordinator.register_file(
574 file_id.to_string(),
575 filename.to_string(),
576 file_size,
577 file_hash.to_string(),
578 chunks,
579 ).await?;
580
581 if response.success {
582 debug!("Successfully registered file {} with coordinator", file_id);
583 if !response.chunk_placements.is_empty() {
584 debug!("Coordinator provided {} chunk placement recommendations", response.chunk_placements.len());
585 // TODO: Handle chunk placement recommendations
586 }
587 } else {
588 warn!("Failed to register file with coordinator: {}", response.message);
589 }
590 }
591
592 Ok(())
593 }
594
595 /// Find chunk locations using coordinator
596 async fn find_chunk_locations_via_coordinator(&self, chunk_id: &str) -> Result<Vec<String>> {
597 if let Some(coordinator) = &self.coordinator_manager {
598 let response = coordinator.find_chunk_locations(chunk_id.to_string(), 3).await?;
599
600 if response.success {
601 debug!("Found {} locations for chunk {} via coordinator", response.node_addresses.len(), chunk_id);
602 Ok(response.node_addresses)
603 } else {
604 debug!("Coordinator couldn't find locations for chunk {}: {}", chunk_id, response.message);
605 Ok(Vec::new())
606 }
607 } else {
608 Ok(Vec::new())
609 }
610 }
611
612 /// Get coordinator registration status
613 pub fn get_coordinator_status(&self) -> Option<&RegistrationStatus> {
614 self.coordinator_manager.as_ref().map(|c| c.get_registration_status())
615 }
616 }
617
618 /// Comprehensive node status information
619 ///
620 /// Transparency: Complete node metrics for monitoring and audit
621 #[derive(Debug, Clone)]
622 pub struct NodeStatus {
623 pub node_id: String,
624 pub version: String,
625 pub uptime_seconds: u64,
626 pub peer_connections: u32,
627 pub chunks_served: u64,
628 pub chunks_retrieved: u64,
629 pub bytes_sent: u64,
630 pub bytes_received: u64,
631 pub failed_requests: u64,
632 pub storage_capacity: u64,
633 pub storage_used: u64,
634 pub storage_available: u64,
635 pub file_count: u64,
636 pub chunk_count: u64,
637 }
638
639 // Import for the storage stats
640 use crate::storage::StorageStats;