Rust · 15691 bytes Raw Blame History
1 use anyhow::{Context, Result};
2 use std::path::PathBuf;
3 use std::sync::Arc;
4 use tokio::sync::{mpsc, RwLock};
5 use tracing::{debug, info, warn, error};
6
7 use crate::config::Config;
8 use crate::network::{NetworkManager, message_handler::{ZephyrMessage, NodeInfo}};
9 use crate::storage::{StorageManager, StorageConfig as StorageManagerConfig};
10
11 /// Integrated node manager coordinating networking and storage
12 ///
13 /// Safety: Coordinates secure operations between network and storage layers
14 /// Transparency: Provides comprehensive node status and metrics
15 /// Privacy: Handles secure chunk distribution and encrypted metadata
16 pub struct NodeManager {
17 /// Network layer manager
18 network_manager: NetworkManager,
19
20 /// Storage layer manager
21 storage_manager: Arc<StorageManager>,
22
23 /// Configuration
24 config: Config,
25
26 /// Message channel from network to node manager
27 message_rx: mpsc::Receiver<ZephyrMessage>,
28
29 /// Message channel from node manager to network
30 message_tx: mpsc::Sender<ZephyrMessage>,
31
32 /// Node statistics
33 node_stats: Arc<RwLock<NodeStats>>,
34
35 /// Base storage path
36 storage_path: PathBuf,
37 }
38
39 /// Comprehensive node statistics
40 ///
41 /// Transparency: Detailed metrics for monitoring and audit
42 #[derive(Debug, Clone)]
43 pub struct NodeStats {
44 /// Number of chunks served to other peers
45 pub chunks_served: u64,
46
47 /// Number of chunks retrieved from peers
48 pub chunks_retrieved: u64,
49
50 /// Total bytes sent to peers
51 pub bytes_sent: u64,
52
53 /// Total bytes received from peers
54 pub bytes_received: u64,
55
56 /// Number of active peer connections
57 pub peer_connections: u32,
58
59 /// Number of failed chunk requests
60 pub failed_requests: u64,
61
62 /// Uptime in seconds
63 pub uptime_seconds: u64,
64
65 /// Node start time
66 pub start_time: std::time::Instant,
67 }
68
69 /// File distribution strategy for P2P sharing
70 #[derive(Debug, Clone)]
71 pub enum DistributionStrategy {
72 /// Store locally only
73 LocalOnly,
74
75 /// Replicate to N closest peers
76 Replicate { redundancy: u32 },
77
78 /// Distribute chunks across network
79 Distribute { min_peers: u32 },
80 }
81
82 impl NodeManager {
83 /// Create a new integrated node manager
84 ///
85 /// Safety: Initializes both network and storage with secure configurations
86 pub async fn new(config: Config, storage_path: PathBuf) -> Result<Self> {
87 info!("Initializing NodeManager with integrated network and storage");
88
89 // Create message channel for network-storage communication
90 let (message_tx, message_rx) = mpsc::channel::<ZephyrMessage>(1000);
91
92 // Initialize storage manager
93 let storage_config = StorageManagerConfig {
94 max_capacity: config.storage.max_storage,
95 warning_threshold: 0.8,
96 critical_threshold: 0.95,
97 default_chunk_size: config.storage.chunk_size,
98 max_file_size: 1024 * 1024 * 1024, // 1GB max file
99 enable_gc: true,
100 gc_interval: 3600, // 1 hour
101 };
102
103 let storage_manager = Arc::new(
104 StorageManager::new(&storage_path, storage_config).await
105 .context("Failed to initialize storage manager")?
106 );
107
108 // Initialize network manager with message channel
109 let network_manager = NetworkManager::new(config.clone()).await
110 .context("Failed to initialize network manager")?;
111
112 let node_stats = Arc::new(RwLock::new(NodeStats {
113 chunks_served: 0,
114 chunks_retrieved: 0,
115 bytes_sent: 0,
116 bytes_received: 0,
117 peer_connections: 0,
118 failed_requests: 0,
119 uptime_seconds: 0,
120 start_time: std::time::Instant::now(),
121 }));
122
123 Ok(Self {
124 network_manager,
125 storage_manager,
126 config,
127 message_rx,
128 message_tx,
129 node_stats,
130 storage_path,
131 })
132 }
133
134 /// Start the integrated node
135 ///
136 /// Safety: Starts both network and storage services with proper error handling
137 pub async fn start(&mut self) -> Result<()> {
138 info!("Starting integrated ZephyrFS node");
139
140 // Start storage manager background tasks (if any)
141 self.start_storage_tasks().await?;
142
143 // Start network manager
144 self.network_manager.start().await
145 .context("Failed to start network manager")?;
146
147 // Start message processing loop
148 self.start_message_processing().await;
149
150 info!("ZephyrFS node started successfully");
151 Ok(())
152 }
153
154 /// Store a file and optionally distribute to peers
155 ///
156 /// Safety: Validates file integrity and enforces capacity limits
157 /// Privacy: Supports encrypted storage and secure distribution
158 pub async fn store_file(
159 &self,
160 file_id: &str,
161 data: &[u8],
162 filename: &str,
163 strategy: DistributionStrategy,
164 ) -> Result<String> {
165 info!("Storing file: {} ({} bytes) with strategy: {:?}", filename, data.len(), strategy);
166
167 // Store file locally first
168 let file_hash = self.storage_manager.store_file(file_id, data, filename).await
169 .context("Failed to store file locally")?;
170
171 // Update statistics
172 {
173 let mut stats = self.node_stats.write().await;
174 stats.bytes_sent += data.len() as u64; // Will be sent to peers
175 }
176
177 // Handle distribution strategy
178 match strategy {
179 DistributionStrategy::LocalOnly => {
180 debug!("File stored locally only: {}", file_id);
181 }
182 DistributionStrategy::Replicate { redundancy } => {
183 self.replicate_file_to_peers(file_id, redundancy).await?;
184 }
185 DistributionStrategy::Distribute { min_peers } => {
186 self.distribute_chunks_to_peers(file_id, min_peers).await?;
187 }
188 }
189
190 // Announce file availability to peers
191 if let Err(e) = self.announce_file_to_peers(file_id, &file_hash).await {
192 warn!("Failed to announce file to peers: {}", e);
193 }
194
195 info!("Successfully stored and distributed file: {} with hash: {}", file_id, file_hash);
196 Ok(file_hash)
197 }
198
199 /// Retrieve a file, attempting local storage first, then peers
200 ///
201 /// Safety: Verifies chunk integrity from all sources
202 /// Transparency: Logs all retrieval attempts and sources
203 pub async fn retrieve_file(&self, file_id: &str) -> Result<Option<Vec<u8>>> {
204 info!("Retrieving file: {}", file_id);
205
206 // Try local storage first
207 match self.storage_manager.retrieve_file(file_id).await? {
208 Some(data) => {
209 debug!("File retrieved from local storage: {}", file_id);
210 return Ok(Some(data));
211 }
212 None => {
213 debug!("File not found locally, attempting peer retrieval: {}", file_id);
214 }
215 }
216
217 // Attempt to retrieve from peers
218 match self.retrieve_file_from_peers(file_id).await? {
219 Some(data) => {
220 info!("Successfully retrieved file from peers: {}", file_id);
221
222 // Store locally for future access
223 if let Ok(_) = self.storage_manager.store_file(file_id, &data, "retrieved_file").await {
224 debug!("Cached retrieved file locally: {}", file_id);
225 }
226
227 // Update statistics
228 {
229 let mut stats = self.node_stats.write().await;
230 stats.chunks_retrieved += 1;
231 stats.bytes_received += data.len() as u64;
232 }
233
234 Ok(Some(data))
235 }
236 None => {
237 warn!("File not found locally or on peers: {}", file_id);
238 Ok(None)
239 }
240 }
241 }
242
243 /// Delete a file locally and notify peers
244 ///
245 /// Safety: Ensures secure deletion and updates peer information
246 pub async fn delete_file(&self, file_id: &str) -> Result<bool> {
247 info!("Deleting file: {}", file_id);
248
249 // Delete locally
250 let deleted = self.storage_manager.delete_file(file_id).await?;
251
252 if deleted {
253 // Notify peers about deletion (future implementation)
254 debug!("File deleted locally, peer notification not yet implemented: {}", file_id);
255 }
256
257 Ok(deleted)
258 }
259
260 /// Get comprehensive node status
261 ///
262 /// Transparency: Provides detailed node metrics for monitoring
263 pub async fn get_node_status(&self) -> NodeStatus {
264 let stats = self.node_stats.read().await;
265 let capacity_info = self.storage_manager.get_capacity_info().await;
266 let storage_stats = self.storage_manager.get_storage_stats().await.unwrap_or_default();
267
268 // Calculate uptime
269 let uptime_seconds = stats.start_time.elapsed().as_secs();
270
271 NodeStatus {
272 node_id: "local_node".to_string(), // TODO: Generate proper node ID
273 version: env!("CARGO_PKG_VERSION").to_string(),
274 uptime_seconds,
275 peer_connections: stats.peer_connections,
276 chunks_served: stats.chunks_served,
277 chunks_retrieved: stats.chunks_retrieved,
278 bytes_sent: stats.bytes_sent,
279 bytes_received: stats.bytes_received,
280 failed_requests: stats.failed_requests,
281 storage_capacity: capacity_info.total_capacity,
282 storage_used: capacity_info.used_space,
283 storage_available: capacity_info.available_space,
284 file_count: capacity_info.file_count,
285 chunk_count: storage_stats.total_chunks,
286 }
287 }
288
289 /// Shutdown the node gracefully
290 ///
291 /// Safety: Ensures clean shutdown of both network and storage
292 pub async fn shutdown(&mut self) -> Result<()> {
293 info!("Shutting down ZephyrFS node");
294
295 // Shutdown network manager
296 self.network_manager.shutdown().await
297 .context("Failed to shutdown network manager")?;
298
299 // Storage manager cleanup (if needed)
300 // Currently storage manager doesn't need explicit cleanup
301
302 info!("ZephyrFS node shutdown complete");
303 Ok(())
304 }
305
306 /// Start storage-related background tasks
307 async fn start_storage_tasks(&self) -> Result<()> {
308 // Future: garbage collection, capacity monitoring, etc.
309 debug!("Storage background tasks initialized");
310 Ok(())
311 }
312
313 /// Start message processing loop
314 async fn start_message_processing(&self) {
315 let storage_manager = Arc::clone(&self.storage_manager);
316 let node_stats = Arc::clone(&self.node_stats);
317
318 // Spawn message processing task
319 tokio::spawn(async move {
320 debug!("Starting message processing loop");
321 // Future: Process messages from message_rx
322 // This will handle chunk requests/responses from peers
323 });
324 }
325
326 /// Replicate file to specified number of peers
327 async fn replicate_file_to_peers(&self, _file_id: &str, _redundancy: u32) -> Result<()> {
328 // TODO: Implement peer replication
329 debug!("File replication not yet implemented");
330 Ok(())
331 }
332
333 /// Distribute file chunks across peers
334 async fn distribute_chunks_to_peers(&self, _file_id: &str, _min_peers: u32) -> Result<()> {
335 // TODO: Implement chunk distribution
336 debug!("Chunk distribution not yet implemented");
337 Ok(())
338 }
339
340 /// Retrieve file from peers
341 async fn retrieve_file_from_peers(&self, file_id: &str) -> Result<Option<Vec<u8>>> {
342 info!("Attempting to retrieve file from peers: {}", file_id);
343
344 // First, check if we have metadata about this file from other sources
345 // This is a simplified implementation - in a real system, we'd have
346 // a distributed hash table or peer discovery mechanism
347
348 // For now, we'll simulate trying to get chunks by ID
349 // In a real implementation, this would:
350 // 1. Query the DHT for file metadata
351 // 2. Get the list of chunk IDs that comprise the file
352 // 3. Request each chunk from peers
353 // 4. Reconstruct the file
354
355 debug!("Peer file retrieval requires DHT implementation - returning None for now");
356 Ok(None)
357 }
358
359 /// Request a specific chunk from peers
360 ///
361 /// Safety: Validates chunk integrity from all peer sources
362 /// Transparency: Logs all peer chunk requests and responses
363 pub async fn request_chunk_from_peers(&self, chunk_id: &str, expected_hash: &str) -> Result<Option<Vec<u8>>> {
364 info!("Requesting chunk from peers: {} (expected hash: {})", chunk_id, expected_hash);
365
366 // Create chunk request message
367 let request = ZephyrMessage::ChunkRequest {
368 chunk_id: chunk_id.to_string(),
369 expected_hash: expected_hash.to_string(),
370 };
371
372 // In a real implementation, this would:
373 // 1. Send the request to multiple peers
374 // 2. Wait for responses with timeout
375 // 3. Validate responses and return the first valid one
376 // 4. Update peer reputation based on response quality
377
378 debug!("Peer chunk requests require network broadcast - not yet implemented");
379
380 // Update statistics for attempted request
381 {
382 let mut stats = self.node_stats.write().await;
383 stats.failed_requests += 1;
384 }
385
386 Ok(None)
387 }
388
389 /// Announce file availability to peers
390 ///
391 /// Transparency: Announces stored files to help peers discover content
392 pub async fn announce_file_to_peers(&self, file_id: &str, file_hash: &str) -> Result<()> {
393 info!("Announcing file availability to peers: {} (hash: {})", file_id, file_hash);
394
395 // Get our node info for the announcement
396 let node_status = self.get_node_status().await;
397
398 let announcement = ZephyrMessage::StatusUpdate {
399 node_info: NodeInfo {
400 node_id: node_status.node_id,
401 version: node_status.version,
402 storage_available: node_status.storage_available,
403 storage_used: node_status.storage_used,
404 uptime_seconds: node_status.uptime_seconds,
405 capabilities: vec!["file_storage".to_string(), "chunk_serving".to_string()],
406 }
407 };
408
409 // In a real implementation, this would broadcast to all connected peers
410 debug!("File announcement broadcast not yet implemented");
411
412 Ok(())
413 }
414 }
415
416 /// Comprehensive node status information
417 ///
418 /// Transparency: Complete node metrics for monitoring and audit
419 #[derive(Debug, Clone)]
420 pub struct NodeStatus {
421 pub node_id: String,
422 pub version: String,
423 pub uptime_seconds: u64,
424 pub peer_connections: u32,
425 pub chunks_served: u64,
426 pub chunks_retrieved: u64,
427 pub bytes_sent: u64,
428 pub bytes_received: u64,
429 pub failed_requests: u64,
430 pub storage_capacity: u64,
431 pub storage_used: u64,
432 pub storage_available: u64,
433 pub file_count: u64,
434 pub chunk_count: u64,
435 }
436
437 // Import for the storage stats
438 use crate::storage::StorageStats;