| 1 |
use anyhow::{Context, Result}; |
| 2 |
use std::path::PathBuf; |
| 3 |
use std::sync::Arc; |
| 4 |
use tokio::sync::{mpsc, RwLock}; |
| 5 |
use tracing::{debug, info, warn, error}; |
| 6 |
|
| 7 |
use crate::config::Config; |
| 8 |
use crate::network::{NetworkManager, message_handler::{ZephyrMessage, NodeInfo}}; |
| 9 |
use crate::storage::{StorageManager, StorageConfig as StorageManagerConfig}; |
| 10 |
|
| 11 |
/// Integrated node manager coordinating networking and storage |
| 12 |
/// |
| 13 |
/// Safety: Coordinates secure operations between network and storage layers |
| 14 |
/// Transparency: Provides comprehensive node status and metrics |
| 15 |
/// Privacy: Handles secure chunk distribution and encrypted metadata |
| 16 |
pub struct NodeManager { |
| 17 |
/// Network layer manager |
| 18 |
network_manager: NetworkManager, |
| 19 |
|
| 20 |
/// Storage layer manager |
| 21 |
storage_manager: Arc<StorageManager>, |
| 22 |
|
| 23 |
/// Configuration |
| 24 |
config: Config, |
| 25 |
|
| 26 |
/// Message channel from network to node manager |
| 27 |
message_rx: mpsc::Receiver<ZephyrMessage>, |
| 28 |
|
| 29 |
/// Message channel from node manager to network |
| 30 |
message_tx: mpsc::Sender<ZephyrMessage>, |
| 31 |
|
| 32 |
/// Node statistics |
| 33 |
node_stats: Arc<RwLock<NodeStats>>, |
| 34 |
|
| 35 |
/// Base storage path |
| 36 |
storage_path: PathBuf, |
| 37 |
} |
| 38 |
|
| 39 |
/// Comprehensive node statistics |
| 40 |
/// |
| 41 |
/// Transparency: Detailed metrics for monitoring and audit |
| 42 |
#[derive(Debug, Clone)] |
| 43 |
pub struct NodeStats { |
| 44 |
/// Number of chunks served to other peers |
| 45 |
pub chunks_served: u64, |
| 46 |
|
| 47 |
/// Number of chunks retrieved from peers |
| 48 |
pub chunks_retrieved: u64, |
| 49 |
|
| 50 |
/// Total bytes sent to peers |
| 51 |
pub bytes_sent: u64, |
| 52 |
|
| 53 |
/// Total bytes received from peers |
| 54 |
pub bytes_received: u64, |
| 55 |
|
| 56 |
/// Number of active peer connections |
| 57 |
pub peer_connections: u32, |
| 58 |
|
| 59 |
/// Number of failed chunk requests |
| 60 |
pub failed_requests: u64, |
| 61 |
|
| 62 |
/// Uptime in seconds |
| 63 |
pub uptime_seconds: u64, |
| 64 |
|
| 65 |
/// Node start time |
| 66 |
pub start_time: std::time::Instant, |
| 67 |
} |
| 68 |
|
| 69 |
/// File distribution strategy for P2P sharing |
| 70 |
#[derive(Debug, Clone)] |
| 71 |
pub enum DistributionStrategy { |
| 72 |
/// Store locally only |
| 73 |
LocalOnly, |
| 74 |
|
| 75 |
/// Replicate to N closest peers |
| 76 |
Replicate { redundancy: u32 }, |
| 77 |
|
| 78 |
/// Distribute chunks across network |
| 79 |
Distribute { min_peers: u32 }, |
| 80 |
} |
| 81 |
|
| 82 |
impl NodeManager { |
| 83 |
/// Create a new integrated node manager |
| 84 |
/// |
| 85 |
/// Safety: Initializes both network and storage with secure configurations |
| 86 |
pub async fn new(config: Config, storage_path: PathBuf) -> Result<Self> { |
| 87 |
info!("Initializing NodeManager with integrated network and storage"); |
| 88 |
|
| 89 |
// Create message channel for network-storage communication |
| 90 |
let (message_tx, message_rx) = mpsc::channel::<ZephyrMessage>(1000); |
| 91 |
|
| 92 |
// Initialize storage manager |
| 93 |
let storage_config = StorageManagerConfig { |
| 94 |
max_capacity: config.storage.max_storage, |
| 95 |
warning_threshold: 0.8, |
| 96 |
critical_threshold: 0.95, |
| 97 |
default_chunk_size: config.storage.chunk_size, |
| 98 |
max_file_size: 1024 * 1024 * 1024, // 1GB max file |
| 99 |
enable_gc: true, |
| 100 |
gc_interval: 3600, // 1 hour |
| 101 |
}; |
| 102 |
|
| 103 |
let storage_manager = Arc::new( |
| 104 |
StorageManager::new(&storage_path, storage_config).await |
| 105 |
.context("Failed to initialize storage manager")? |
| 106 |
); |
| 107 |
|
| 108 |
// Initialize network manager with message channel |
| 109 |
let network_manager = NetworkManager::new(config.clone()).await |
| 110 |
.context("Failed to initialize network manager")?; |
| 111 |
|
| 112 |
let node_stats = Arc::new(RwLock::new(NodeStats { |
| 113 |
chunks_served: 0, |
| 114 |
chunks_retrieved: 0, |
| 115 |
bytes_sent: 0, |
| 116 |
bytes_received: 0, |
| 117 |
peer_connections: 0, |
| 118 |
failed_requests: 0, |
| 119 |
uptime_seconds: 0, |
| 120 |
start_time: std::time::Instant::now(), |
| 121 |
})); |
| 122 |
|
| 123 |
Ok(Self { |
| 124 |
network_manager, |
| 125 |
storage_manager, |
| 126 |
config, |
| 127 |
message_rx, |
| 128 |
message_tx, |
| 129 |
node_stats, |
| 130 |
storage_path, |
| 131 |
}) |
| 132 |
} |
| 133 |
|
| 134 |
/// Start the integrated node |
| 135 |
/// |
| 136 |
/// Safety: Starts both network and storage services with proper error handling |
| 137 |
pub async fn start(&mut self) -> Result<()> { |
| 138 |
info!("Starting integrated ZephyrFS node"); |
| 139 |
|
| 140 |
// Start storage manager background tasks (if any) |
| 141 |
self.start_storage_tasks().await?; |
| 142 |
|
| 143 |
// Start network manager |
| 144 |
self.network_manager.start().await |
| 145 |
.context("Failed to start network manager")?; |
| 146 |
|
| 147 |
// Start message processing loop |
| 148 |
self.start_message_processing().await; |
| 149 |
|
| 150 |
info!("ZephyrFS node started successfully"); |
| 151 |
Ok(()) |
| 152 |
} |
| 153 |
|
| 154 |
/// Store a file and optionally distribute to peers |
| 155 |
/// |
| 156 |
/// Safety: Validates file integrity and enforces capacity limits |
| 157 |
/// Privacy: Supports encrypted storage and secure distribution |
| 158 |
pub async fn store_file( |
| 159 |
&self, |
| 160 |
file_id: &str, |
| 161 |
data: &[u8], |
| 162 |
filename: &str, |
| 163 |
strategy: DistributionStrategy, |
| 164 |
) -> Result<String> { |
| 165 |
info!("Storing file: {} ({} bytes) with strategy: {:?}", filename, data.len(), strategy); |
| 166 |
|
| 167 |
// Store file locally first |
| 168 |
let file_hash = self.storage_manager.store_file(file_id, data, filename).await |
| 169 |
.context("Failed to store file locally")?; |
| 170 |
|
| 171 |
// Update statistics |
| 172 |
{ |
| 173 |
let mut stats = self.node_stats.write().await; |
| 174 |
stats.bytes_sent += data.len() as u64; // Will be sent to peers |
| 175 |
} |
| 176 |
|
| 177 |
// Handle distribution strategy |
| 178 |
match strategy { |
| 179 |
DistributionStrategy::LocalOnly => { |
| 180 |
debug!("File stored locally only: {}", file_id); |
| 181 |
} |
| 182 |
DistributionStrategy::Replicate { redundancy } => { |
| 183 |
self.replicate_file_to_peers(file_id, redundancy).await?; |
| 184 |
} |
| 185 |
DistributionStrategy::Distribute { min_peers } => { |
| 186 |
self.distribute_chunks_to_peers(file_id, min_peers).await?; |
| 187 |
} |
| 188 |
} |
| 189 |
|
| 190 |
// Announce file availability to peers |
| 191 |
if let Err(e) = self.announce_file_to_peers(file_id, &file_hash).await { |
| 192 |
warn!("Failed to announce file to peers: {}", e); |
| 193 |
} |
| 194 |
|
| 195 |
info!("Successfully stored and distributed file: {} with hash: {}", file_id, file_hash); |
| 196 |
Ok(file_hash) |
| 197 |
} |
| 198 |
|
| 199 |
/// Retrieve a file, attempting local storage first, then peers |
| 200 |
/// |
| 201 |
/// Safety: Verifies chunk integrity from all sources |
| 202 |
/// Transparency: Logs all retrieval attempts and sources |
| 203 |
pub async fn retrieve_file(&self, file_id: &str) -> Result<Option<Vec<u8>>> { |
| 204 |
info!("Retrieving file: {}", file_id); |
| 205 |
|
| 206 |
// Try local storage first |
| 207 |
match self.storage_manager.retrieve_file(file_id).await? { |
| 208 |
Some(data) => { |
| 209 |
debug!("File retrieved from local storage: {}", file_id); |
| 210 |
return Ok(Some(data)); |
| 211 |
} |
| 212 |
None => { |
| 213 |
debug!("File not found locally, attempting peer retrieval: {}", file_id); |
| 214 |
} |
| 215 |
} |
| 216 |
|
| 217 |
// Attempt to retrieve from peers |
| 218 |
match self.retrieve_file_from_peers(file_id).await? { |
| 219 |
Some(data) => { |
| 220 |
info!("Successfully retrieved file from peers: {}", file_id); |
| 221 |
|
| 222 |
// Store locally for future access |
| 223 |
if let Ok(_) = self.storage_manager.store_file(file_id, &data, "retrieved_file").await { |
| 224 |
debug!("Cached retrieved file locally: {}", file_id); |
| 225 |
} |
| 226 |
|
| 227 |
// Update statistics |
| 228 |
{ |
| 229 |
let mut stats = self.node_stats.write().await; |
| 230 |
stats.chunks_retrieved += 1; |
| 231 |
stats.bytes_received += data.len() as u64; |
| 232 |
} |
| 233 |
|
| 234 |
Ok(Some(data)) |
| 235 |
} |
| 236 |
None => { |
| 237 |
warn!("File not found locally or on peers: {}", file_id); |
| 238 |
Ok(None) |
| 239 |
} |
| 240 |
} |
| 241 |
} |
| 242 |
|
| 243 |
/// Delete a file locally and notify peers |
| 244 |
/// |
| 245 |
/// Safety: Ensures secure deletion and updates peer information |
| 246 |
pub async fn delete_file(&self, file_id: &str) -> Result<bool> { |
| 247 |
info!("Deleting file: {}", file_id); |
| 248 |
|
| 249 |
// Delete locally |
| 250 |
let deleted = self.storage_manager.delete_file(file_id).await?; |
| 251 |
|
| 252 |
if deleted { |
| 253 |
// Notify peers about deletion (future implementation) |
| 254 |
debug!("File deleted locally, peer notification not yet implemented: {}", file_id); |
| 255 |
} |
| 256 |
|
| 257 |
Ok(deleted) |
| 258 |
} |
| 259 |
|
| 260 |
/// Get comprehensive node status |
| 261 |
/// |
| 262 |
/// Transparency: Provides detailed node metrics for monitoring |
| 263 |
pub async fn get_node_status(&self) -> NodeStatus { |
| 264 |
let stats = self.node_stats.read().await; |
| 265 |
let capacity_info = self.storage_manager.get_capacity_info().await; |
| 266 |
let storage_stats = self.storage_manager.get_storage_stats().await.unwrap_or_default(); |
| 267 |
|
| 268 |
// Calculate uptime |
| 269 |
let uptime_seconds = stats.start_time.elapsed().as_secs(); |
| 270 |
|
| 271 |
NodeStatus { |
| 272 |
node_id: "local_node".to_string(), // TODO: Generate proper node ID |
| 273 |
version: env!("CARGO_PKG_VERSION").to_string(), |
| 274 |
uptime_seconds, |
| 275 |
peer_connections: stats.peer_connections, |
| 276 |
chunks_served: stats.chunks_served, |
| 277 |
chunks_retrieved: stats.chunks_retrieved, |
| 278 |
bytes_sent: stats.bytes_sent, |
| 279 |
bytes_received: stats.bytes_received, |
| 280 |
failed_requests: stats.failed_requests, |
| 281 |
storage_capacity: capacity_info.total_capacity, |
| 282 |
storage_used: capacity_info.used_space, |
| 283 |
storage_available: capacity_info.available_space, |
| 284 |
file_count: capacity_info.file_count, |
| 285 |
chunk_count: storage_stats.total_chunks, |
| 286 |
} |
| 287 |
} |
| 288 |
|
| 289 |
/// Shutdown the node gracefully |
| 290 |
/// |
| 291 |
/// Safety: Ensures clean shutdown of both network and storage |
| 292 |
pub async fn shutdown(&mut self) -> Result<()> { |
| 293 |
info!("Shutting down ZephyrFS node"); |
| 294 |
|
| 295 |
// Shutdown network manager |
| 296 |
self.network_manager.shutdown().await |
| 297 |
.context("Failed to shutdown network manager")?; |
| 298 |
|
| 299 |
// Storage manager cleanup (if needed) |
| 300 |
// Currently storage manager doesn't need explicit cleanup |
| 301 |
|
| 302 |
info!("ZephyrFS node shutdown complete"); |
| 303 |
Ok(()) |
| 304 |
} |
| 305 |
|
| 306 |
/// Start storage-related background tasks |
| 307 |
async fn start_storage_tasks(&self) -> Result<()> { |
| 308 |
// Future: garbage collection, capacity monitoring, etc. |
| 309 |
debug!("Storage background tasks initialized"); |
| 310 |
Ok(()) |
| 311 |
} |
| 312 |
|
| 313 |
/// Start message processing loop |
| 314 |
async fn start_message_processing(&self) { |
| 315 |
let storage_manager = Arc::clone(&self.storage_manager); |
| 316 |
let node_stats = Arc::clone(&self.node_stats); |
| 317 |
|
| 318 |
// Spawn message processing task |
| 319 |
tokio::spawn(async move { |
| 320 |
debug!("Starting message processing loop"); |
| 321 |
// Future: Process messages from message_rx |
| 322 |
// This will handle chunk requests/responses from peers |
| 323 |
}); |
| 324 |
} |
| 325 |
|
| 326 |
/// Replicate file to specified number of peers |
| 327 |
async fn replicate_file_to_peers(&self, _file_id: &str, _redundancy: u32) -> Result<()> { |
| 328 |
// TODO: Implement peer replication |
| 329 |
debug!("File replication not yet implemented"); |
| 330 |
Ok(()) |
| 331 |
} |
| 332 |
|
| 333 |
/// Distribute file chunks across peers |
| 334 |
async fn distribute_chunks_to_peers(&self, _file_id: &str, _min_peers: u32) -> Result<()> { |
| 335 |
// TODO: Implement chunk distribution |
| 336 |
debug!("Chunk distribution not yet implemented"); |
| 337 |
Ok(()) |
| 338 |
} |
| 339 |
|
| 340 |
/// Retrieve file from peers |
| 341 |
async fn retrieve_file_from_peers(&self, file_id: &str) -> Result<Option<Vec<u8>>> { |
| 342 |
info!("Attempting to retrieve file from peers: {}", file_id); |
| 343 |
|
| 344 |
// First, check if we have metadata about this file from other sources |
| 345 |
// This is a simplified implementation - in a real system, we'd have |
| 346 |
// a distributed hash table or peer discovery mechanism |
| 347 |
|
| 348 |
// For now, we'll simulate trying to get chunks by ID |
| 349 |
// In a real implementation, this would: |
| 350 |
// 1. Query the DHT for file metadata |
| 351 |
// 2. Get the list of chunk IDs that comprise the file |
| 352 |
// 3. Request each chunk from peers |
| 353 |
// 4. Reconstruct the file |
| 354 |
|
| 355 |
debug!("Peer file retrieval requires DHT implementation - returning None for now"); |
| 356 |
Ok(None) |
| 357 |
} |
| 358 |
|
| 359 |
/// Request a specific chunk from peers |
| 360 |
/// |
| 361 |
/// Safety: Validates chunk integrity from all peer sources |
| 362 |
/// Transparency: Logs all peer chunk requests and responses |
| 363 |
pub async fn request_chunk_from_peers(&self, chunk_id: &str, expected_hash: &str) -> Result<Option<Vec<u8>>> { |
| 364 |
info!("Requesting chunk from peers: {} (expected hash: {})", chunk_id, expected_hash); |
| 365 |
|
| 366 |
// Create chunk request message |
| 367 |
let request = ZephyrMessage::ChunkRequest { |
| 368 |
chunk_id: chunk_id.to_string(), |
| 369 |
expected_hash: expected_hash.to_string(), |
| 370 |
}; |
| 371 |
|
| 372 |
// In a real implementation, this would: |
| 373 |
// 1. Send the request to multiple peers |
| 374 |
// 2. Wait for responses with timeout |
| 375 |
// 3. Validate responses and return the first valid one |
| 376 |
// 4. Update peer reputation based on response quality |
| 377 |
|
| 378 |
debug!("Peer chunk requests require network broadcast - not yet implemented"); |
| 379 |
|
| 380 |
// Update statistics for attempted request |
| 381 |
{ |
| 382 |
let mut stats = self.node_stats.write().await; |
| 383 |
stats.failed_requests += 1; |
| 384 |
} |
| 385 |
|
| 386 |
Ok(None) |
| 387 |
} |
| 388 |
|
| 389 |
/// Announce file availability to peers |
| 390 |
/// |
| 391 |
/// Transparency: Announces stored files to help peers discover content |
| 392 |
pub async fn announce_file_to_peers(&self, file_id: &str, file_hash: &str) -> Result<()> { |
| 393 |
info!("Announcing file availability to peers: {} (hash: {})", file_id, file_hash); |
| 394 |
|
| 395 |
// Get our node info for the announcement |
| 396 |
let node_status = self.get_node_status().await; |
| 397 |
|
| 398 |
let announcement = ZephyrMessage::StatusUpdate { |
| 399 |
node_info: NodeInfo { |
| 400 |
node_id: node_status.node_id, |
| 401 |
version: node_status.version, |
| 402 |
storage_available: node_status.storage_available, |
| 403 |
storage_used: node_status.storage_used, |
| 404 |
uptime_seconds: node_status.uptime_seconds, |
| 405 |
capabilities: vec!["file_storage".to_string(), "chunk_serving".to_string()], |
| 406 |
} |
| 407 |
}; |
| 408 |
|
| 409 |
// In a real implementation, this would broadcast to all connected peers |
| 410 |
debug!("File announcement broadcast not yet implemented"); |
| 411 |
|
| 412 |
Ok(()) |
| 413 |
} |
| 414 |
} |
| 415 |
|
| 416 |
/// Comprehensive node status information |
| 417 |
/// |
| 418 |
/// Transparency: Complete node metrics for monitoring and audit |
| 419 |
#[derive(Debug, Clone)] |
| 420 |
pub struct NodeStatus { |
| 421 |
pub node_id: String, |
| 422 |
pub version: String, |
| 423 |
pub uptime_seconds: u64, |
| 424 |
pub peer_connections: u32, |
| 425 |
pub chunks_served: u64, |
| 426 |
pub chunks_retrieved: u64, |
| 427 |
pub bytes_sent: u64, |
| 428 |
pub bytes_received: u64, |
| 429 |
pub failed_requests: u64, |
| 430 |
pub storage_capacity: u64, |
| 431 |
pub storage_used: u64, |
| 432 |
pub storage_available: u64, |
| 433 |
pub file_count: u64, |
| 434 |
pub chunk_count: u64, |
| 435 |
} |
| 436 |
|
| 437 |
// Import for the storage stats |
| 438 |
use crate::storage::StorageStats; |