@@ -7,31 +7,35 @@ use tracing::{debug, info, warn, error}; |
| 7 | use crate::config::Config; | 7 | use crate::config::Config; |
| 8 | use crate::network::{NetworkManager, message_handler::{ZephyrMessage, NodeInfo}}; | 8 | use crate::network::{NetworkManager, message_handler::{ZephyrMessage, NodeInfo}}; |
| 9 | use crate::storage::{StorageManager, StorageConfig as StorageManagerConfig}; | 9 | use crate::storage::{StorageManager, StorageConfig as StorageManagerConfig}; |
| | 10 | +use crate::coordinator::{CoordinatorManager, RegistrationStatus}; |
| 10 | | 11 | |
| 11 | /// Integrated node manager coordinating networking and storage | 12 | /// Integrated node manager coordinating networking and storage |
| 12 | -/// | 13 | +/// |
| 13 | /// Safety: Coordinates secure operations between network and storage layers | 14 | /// Safety: Coordinates secure operations between network and storage layers |
| 14 | /// Transparency: Provides comprehensive node status and metrics | 15 | /// Transparency: Provides comprehensive node status and metrics |
| 15 | /// Privacy: Handles secure chunk distribution and encrypted metadata | 16 | /// Privacy: Handles secure chunk distribution and encrypted metadata |
| 16 | pub struct NodeManager { | 17 | pub struct NodeManager { |
| 17 | /// Network layer manager | 18 | /// Network layer manager |
| 18 | network_manager: NetworkManager, | 19 | network_manager: NetworkManager, |
| 19 | - | 20 | + |
| 20 | /// Storage layer manager | 21 | /// Storage layer manager |
| 21 | pub storage_manager: Arc<StorageManager>, | 22 | pub storage_manager: Arc<StorageManager>, |
| 22 | - | 23 | + |
| | 24 | + /// Coordinator manager for network coordination |
| | 25 | + coordinator_manager: Option<CoordinatorManager>, |
| | 26 | + |
| 23 | /// Configuration | 27 | /// Configuration |
| 24 | config: Config, | 28 | config: Config, |
| 25 | - | 29 | + |
| 26 | /// Message channel from network to node manager | 30 | /// Message channel from network to node manager |
| 27 | message_rx: mpsc::Receiver<ZephyrMessage>, | 31 | message_rx: mpsc::Receiver<ZephyrMessage>, |
| 28 | - | 32 | + |
| 29 | /// Message channel from node manager to network | 33 | /// Message channel from node manager to network |
| 30 | message_tx: mpsc::Sender<ZephyrMessage>, | 34 | message_tx: mpsc::Sender<ZephyrMessage>, |
| 31 | - | 35 | + |
| 32 | /// Node statistics | 36 | /// Node statistics |
| 33 | node_stats: Arc<RwLock<NodeStats>>, | 37 | node_stats: Arc<RwLock<NodeStats>>, |
| 34 | - | 38 | + |
| 35 | /// Base storage path | 39 | /// Base storage path |
| 36 | storage_path: PathBuf, | 40 | storage_path: PathBuf, |
| 37 | } | 41 | } |
@@ -81,14 +85,14 @@ pub enum DistributionStrategy { |
| 81 | | 85 | |
| 82 | impl NodeManager { | 86 | impl NodeManager { |
| 83 | /// Create a new integrated node manager | 87 | /// Create a new integrated node manager |
| 84 | - /// | 88 | + /// |
| 85 | /// Safety: Initializes both network and storage with secure configurations | 89 | /// Safety: Initializes both network and storage with secure configurations |
| 86 | pub async fn new(config: Config, storage_path: PathBuf) -> Result<Self> { | 90 | pub async fn new(config: Config, storage_path: PathBuf) -> Result<Self> { |
| 87 | info!("Initializing NodeManager with integrated network and storage"); | 91 | info!("Initializing NodeManager with integrated network and storage"); |
| 88 | - | 92 | + |
| 89 | // Create message channel for network-storage communication | 93 | // Create message channel for network-storage communication |
| 90 | let (message_tx, message_rx) = mpsc::channel::<ZephyrMessage>(1000); | 94 | let (message_tx, message_rx) = mpsc::channel::<ZephyrMessage>(1000); |
| 91 | - | 95 | + |
| 92 | // Initialize storage manager | 96 | // Initialize storage manager |
| 93 | let storage_config = StorageManagerConfig { | 97 | let storage_config = StorageManagerConfig { |
| 94 | max_capacity: config.storage.max_storage, | 98 | max_capacity: config.storage.max_storage, |
@@ -99,16 +103,33 @@ impl NodeManager { |
| 99 | enable_gc: true, | 103 | enable_gc: true, |
| 100 | gc_interval: 3600, // 1 hour | 104 | gc_interval: 3600, // 1 hour |
| 101 | }; | 105 | }; |
| 102 | - | 106 | + |
| 103 | let storage_manager = Arc::new( | 107 | let storage_manager = Arc::new( |
| 104 | StorageManager::new(&storage_path, storage_config).await | 108 | StorageManager::new(&storage_path, storage_config).await |
| 105 | .context("Failed to initialize storage manager")? | 109 | .context("Failed to initialize storage manager")? |
| 106 | ); | 110 | ); |
| 107 | - | 111 | + |
| 108 | // Initialize network manager with message channel | 112 | // Initialize network manager with message channel |
| 109 | let network_manager = NetworkManager::new(config.clone()).await | 113 | let network_manager = NetworkManager::new(config.clone()).await |
| 110 | .context("Failed to initialize network manager")?; | 114 | .context("Failed to initialize network manager")?; |
| 111 | - | 115 | + |
| | 116 | + // Initialize coordinator manager if URL is provided |
| | 117 | + let coordinator_manager = if !config.coordinator.url.is_empty() { |
| | 118 | + match CoordinatorManager::new(config.coordinator.url.clone()).await { |
| | 119 | + Ok(manager) => { |
| | 120 | + info!("Successfully connected to coordinator at {}", config.coordinator.url); |
| | 121 | + Some(manager) |
| | 122 | + } |
| | 123 | + Err(e) => { |
| | 124 | + warn!("Failed to connect to coordinator at {}: {}. Running in standalone mode.", config.coordinator.url, e); |
| | 125 | + None |
| | 126 | + } |
| | 127 | + } |
| | 128 | + } else { |
| | 129 | + info!("No coordinator URL configured. Running in standalone mode."); |
| | 130 | + None |
| | 131 | + }; |
| | 132 | + |
| 112 | let node_stats = Arc::new(RwLock::new(NodeStats { | 133 | let node_stats = Arc::new(RwLock::new(NodeStats { |
| 113 | chunks_served: 0, | 134 | chunks_served: 0, |
| 114 | chunks_retrieved: 0, | 135 | chunks_retrieved: 0, |
@@ -119,10 +140,11 @@ impl NodeManager { |
| 119 | uptime_seconds: 0, | 140 | uptime_seconds: 0, |
| 120 | start_time: std::time::Instant::now(), | 141 | start_time: std::time::Instant::now(), |
| 121 | })); | 142 | })); |
| 122 | - | 143 | + |
| 123 | Ok(Self { | 144 | Ok(Self { |
| 124 | network_manager, | 145 | network_manager, |
| 125 | storage_manager, | 146 | storage_manager, |
| | 147 | + coordinator_manager, |
| 126 | config, | 148 | config, |
| 127 | message_rx, | 149 | message_rx, |
| 128 | message_tx, | 150 | message_tx, |
@@ -132,21 +154,55 @@ impl NodeManager { |
| 132 | } | 154 | } |
| 133 | | 155 | |
| 134 | /// Start the integrated node | 156 | /// Start the integrated node |
| 135 | - /// | 157 | + /// |
| 136 | /// Safety: Starts both network and storage services with proper error handling | 158 | /// Safety: Starts both network and storage services with proper error handling |
| 137 | pub async fn start(&mut self) -> Result<()> { | 159 | pub async fn start(&mut self) -> Result<()> { |
| 138 | info!("Starting integrated ZephyrFS node"); | 160 | info!("Starting integrated ZephyrFS node"); |
| 139 | - | 161 | + |
| 140 | // Start storage manager background tasks (if any) | 162 | // Start storage manager background tasks (if any) |
| 141 | self.start_storage_tasks().await?; | 163 | self.start_storage_tasks().await?; |
| 142 | - | 164 | + |
| 143 | // Start network manager | 165 | // Start network manager |
| 144 | self.network_manager.start().await | 166 | self.network_manager.start().await |
| 145 | .context("Failed to start network manager")?; | 167 | .context("Failed to start network manager")?; |
| 146 | - | 168 | + |
| | 169 | + // Register with coordinator if available |
| | 170 | + if self.coordinator_manager.is_some() { |
| | 171 | + let node_status = self.get_node_status().await; |
| | 172 | + let addresses = vec![ |
| | 173 | + format!("{}:{}", "127.0.0.1", self.config.network.p2p_port), |
| | 174 | + format!("{}:{}", "127.0.0.1", self.config.network.api_port), |
| | 175 | + ]; |
| | 176 | + |
| | 177 | + let mut capabilities = std::collections::HashMap::new(); |
| | 178 | + capabilities.insert("version".to_string(), node_status.version); |
| | 179 | + capabilities.insert("storage".to_string(), "true".to_string()); |
| | 180 | + capabilities.insert("encryption".to_string(), "true".to_string()); |
| | 181 | + |
| | 182 | + if let Some(coordinator) = self.coordinator_manager.as_mut() { |
| | 183 | + let response = coordinator.register_node( |
| | 184 | + addresses, |
| | 185 | + node_status.storage_capacity, |
| | 186 | + capabilities, |
| | 187 | + ).await?; |
| | 188 | + |
| | 189 | + if response.success { |
| | 190 | + info!("Successfully registered with coordinator. Node ID: {}", coordinator.get_node_id()); |
| | 191 | + if !response.bootstrap_peers.is_empty() { |
| | 192 | + info!("Received {} bootstrap peers from coordinator", response.bootstrap_peers.len()); |
| | 193 | + // TODO: Connect to bootstrap peers |
| | 194 | + } |
| | 195 | + } else { |
| | 196 | + warn!("Failed to register with coordinator: {}", response.message); |
| | 197 | + } |
| | 198 | + } |
| | 199 | + |
| | 200 | + self.start_coordinator_heartbeat().await; |
| | 201 | + } |
| | 202 | + |
| 147 | // Start message processing loop | 203 | // Start message processing loop |
| 148 | self.start_message_processing().await; | 204 | self.start_message_processing().await; |
| 149 | - | 205 | + |
| 150 | info!("ZephyrFS node started successfully"); | 206 | info!("ZephyrFS node started successfully"); |
| 151 | Ok(()) | 207 | Ok(()) |
| 152 | } | 208 | } |
@@ -187,11 +243,16 @@ impl NodeManager { |
| 187 | } | 243 | } |
| 188 | } | 244 | } |
| 189 | | 245 | |
| | 246 | + // Register file with coordinator if available |
| | 247 | + if let Err(e) = self.register_file_with_coordinator(file_id, &file_hash, data.len() as u64, filename).await { |
| | 248 | + warn!("Failed to register file with coordinator: {}", e); |
| | 249 | + } |
| | 250 | + |
| 190 | // Announce file availability to peers | 251 | // Announce file availability to peers |
| 191 | if let Err(e) = self.announce_file_to_peers(file_id, &file_hash).await { | 252 | if let Err(e) = self.announce_file_to_peers(file_id, &file_hash).await { |
| 192 | warn!("Failed to announce file to peers: {}", e); | 253 | warn!("Failed to announce file to peers: {}", e); |
| 193 | } | 254 | } |
| 194 | - | 255 | + |
| 195 | info!("Successfully stored and distributed file: {} with hash: {}", file_id, file_hash); | 256 | info!("Successfully stored and distributed file: {} with hash: {}", file_id, file_hash); |
| 196 | Ok(file_hash) | 257 | Ok(file_hash) |
| 197 | } | 258 | } |
@@ -268,8 +329,14 @@ impl NodeManager { |
| 268 | // Calculate uptime | 329 | // Calculate uptime |
| 269 | let uptime_seconds = stats.start_time.elapsed().as_secs(); | 330 | let uptime_seconds = stats.start_time.elapsed().as_secs(); |
| 270 | | 331 | |
| | 332 | + let node_id = if let Some(coordinator) = &self.coordinator_manager { |
| | 333 | + coordinator.get_node_id().to_string() |
| | 334 | + } else { |
| | 335 | + self.config.node_id.clone().unwrap_or_else(|| "local_node".to_string()) |
| | 336 | + }; |
| | 337 | + |
| 271 | NodeStatus { | 338 | NodeStatus { |
| 272 | - node_id: "local_node".to_string(), // TODO: Generate proper node ID | 339 | + node_id, |
| 273 | version: env!("CARGO_PKG_VERSION").to_string(), | 340 | version: env!("CARGO_PKG_VERSION").to_string(), |
| 274 | uptime_seconds, | 341 | uptime_seconds, |
| 275 | peer_connections: stats.peer_connections, | 342 | peer_connections: stats.peer_connections, |
@@ -287,18 +354,25 @@ impl NodeManager { |
| 287 | } | 354 | } |
| 288 | | 355 | |
| 289 | /// Shutdown the node gracefully | 356 | /// Shutdown the node gracefully |
| 290 | - /// | 357 | + /// |
| 291 | /// Safety: Ensures clean shutdown of both network and storage | 358 | /// Safety: Ensures clean shutdown of both network and storage |
| 292 | pub async fn shutdown(&mut self) -> Result<()> { | 359 | pub async fn shutdown(&mut self) -> Result<()> { |
| 293 | info!("Shutting down ZephyrFS node"); | 360 | info!("Shutting down ZephyrFS node"); |
| 294 | - | 361 | + |
| | 362 | + // Unregister from coordinator if connected |
| | 363 | + if let Some(coordinator) = &mut self.coordinator_manager { |
| | 364 | + if let Err(e) = coordinator.unregister_node(Some("Normal shutdown".to_string())).await { |
| | 365 | + warn!("Failed to unregister from coordinator: {}", e); |
| | 366 | + } |
| | 367 | + } |
| | 368 | + |
| 295 | // Shutdown network manager | 369 | // Shutdown network manager |
| 296 | self.network_manager.shutdown().await | 370 | self.network_manager.shutdown().await |
| 297 | .context("Failed to shutdown network manager")?; | 371 | .context("Failed to shutdown network manager")?; |
| 298 | - | 372 | + |
| 299 | // Storage manager cleanup (if needed) | 373 | // Storage manager cleanup (if needed) |
| 300 | // Currently storage manager doesn't need explicit cleanup | 374 | // Currently storage manager doesn't need explicit cleanup |
| 301 | - | 375 | + |
| 302 | info!("ZephyrFS node shutdown complete"); | 376 | info!("ZephyrFS node shutdown complete"); |
| 303 | Ok(()) | 377 | Ok(()) |
| 304 | } | 378 | } |
@@ -411,6 +485,134 @@ impl NodeManager { |
| 411 | | 485 | |
| 412 | Ok(()) | 486 | Ok(()) |
| 413 | } | 487 | } |
| | 488 | + |
| | 489 | + /// Register this node with the coordinator |
| | 490 | + async fn register_with_coordinator(&self, coordinator: &mut CoordinatorManager) -> Result<()> { |
| | 491 | + info!("Registering node with coordinator"); |
| | 492 | + |
| | 493 | + let node_status = self.get_node_status().await; |
| | 494 | + let addresses = vec![ |
| | 495 | + format!("{}:{}", "127.0.0.1", self.config.network.p2p_port), |
| | 496 | + format!("{}:{}", "127.0.0.1", self.config.network.api_port), |
| | 497 | + ]; |
| | 498 | + |
| | 499 | + let mut capabilities = std::collections::HashMap::new(); |
| | 500 | + capabilities.insert("version".to_string(), node_status.version); |
| | 501 | + capabilities.insert("storage".to_string(), "true".to_string()); |
| | 502 | + capabilities.insert("encryption".to_string(), "true".to_string()); |
| | 503 | + |
| | 504 | + let response = coordinator.register_node( |
| | 505 | + addresses, |
| | 506 | + node_status.storage_capacity, |
| | 507 | + capabilities, |
| | 508 | + ).await?; |
| | 509 | + |
| | 510 | + if response.success { |
| | 511 | + info!("Successfully registered with coordinator. Node ID: {}", coordinator.get_node_id()); |
| | 512 | + if !response.bootstrap_peers.is_empty() { |
| | 513 | + info!("Received {} bootstrap peers from coordinator", response.bootstrap_peers.len()); |
| | 514 | + // TODO: Connect to bootstrap peers |
| | 515 | + } |
| | 516 | + } else { |
| | 517 | + warn!("Failed to register with coordinator: {}", response.message); |
| | 518 | + } |
| | 519 | + |
| | 520 | + Ok(()) |
| | 521 | + } |
| | 522 | + |
| | 523 | + /// Start coordinator heartbeat loop |
| | 524 | + async fn start_coordinator_heartbeat(&self) { |
| | 525 | + if let Some(coordinator) = &self.coordinator_manager { |
| | 526 | + let node_stats = Arc::clone(&self.node_stats); |
| | 527 | + let storage_manager = Arc::clone(&self.storage_manager); |
| | 528 | + |
| | 529 | + coordinator.start_heartbeat(move || { |
| | 530 | + let stats = tokio::task::block_in_place(|| { |
| | 531 | + tokio::runtime::Handle::current().block_on(async { |
| | 532 | + let node_stats = node_stats.read().await; |
| | 533 | + let capacity_info = storage_manager.get_capacity_info().await; |
| | 534 | + let uptime = node_stats.start_time.elapsed().as_secs() as i64; |
| | 535 | + |
| | 536 | + crate::coordinator::types::NodeStats { |
| | 537 | + storage_used: capacity_info.used_space as i64, |
| | 538 | + storage_available: capacity_info.available_space as i64, |
| | 539 | + chunks_stored: capacity_info.file_count as i64, // Approximation |
| | 540 | + bandwidth_up: node_stats.bytes_sent as i64, |
| | 541 | + bandwidth_down: node_stats.bytes_received as i64, |
| | 542 | + cpu_usage: 0.0, // TODO: Implement CPU monitoring |
| | 543 | + memory_usage: 0.0, // TODO: Implement memory monitoring |
| | 544 | + uptime_seconds: uptime, |
| | 545 | + } |
| | 546 | + }) |
| | 547 | + }); |
| | 548 | + stats |
| | 549 | + }).await; |
| | 550 | + |
| | 551 | + info!("Started coordinator heartbeat"); |
| | 552 | + } |
| | 553 | + } |
| | 554 | + |
| | 555 | + /// Register a file with the coordinator if available |
| | 556 | + async fn register_file_with_coordinator(&self, file_id: &str, file_hash: &str, file_size: u64, filename: &str) -> Result<()> { |
| | 557 | + if let Some(coordinator) = &self.coordinator_manager { |
| | 558 | + // Get file chunks from storage manager |
| | 559 | + let chunks = match self.storage_manager.get_file_chunks(file_id).await { |
| | 560 | + Ok(Some(chunk_list)) => { |
| | 561 | + chunk_list.into_iter().enumerate().map(|(index, chunk_id)| { |
| | 562 | + crate::coordinator::types::ChunkMetadata { |
| | 563 | + chunk_id: chunk_id.clone(), |
| | 564 | + hash: chunk_id, // For now, use chunk_id as hash |
| | 565 | + size: self.config.storage.chunk_size as i64, // Default chunk size |
| | 566 | + index: index as i32, |
| | 567 | + } |
| | 568 | + }).collect() |
| | 569 | + } |
| | 570 | + _ => Vec::new(), |
| | 571 | + }; |
| | 572 | + |
| | 573 | + let response = coordinator.register_file( |
| | 574 | + file_id.to_string(), |
| | 575 | + filename.to_string(), |
| | 576 | + file_size, |
| | 577 | + file_hash.to_string(), |
| | 578 | + chunks, |
| | 579 | + ).await?; |
| | 580 | + |
| | 581 | + if response.success { |
| | 582 | + debug!("Successfully registered file {} with coordinator", file_id); |
| | 583 | + if !response.chunk_placements.is_empty() { |
| | 584 | + debug!("Coordinator provided {} chunk placement recommendations", response.chunk_placements.len()); |
| | 585 | + // TODO: Handle chunk placement recommendations |
| | 586 | + } |
| | 587 | + } else { |
| | 588 | + warn!("Failed to register file with coordinator: {}", response.message); |
| | 589 | + } |
| | 590 | + } |
| | 591 | + |
| | 592 | + Ok(()) |
| | 593 | + } |
| | 594 | + |
| | 595 | + /// Find chunk locations using coordinator |
| | 596 | + async fn find_chunk_locations_via_coordinator(&self, chunk_id: &str) -> Result<Vec<String>> { |
| | 597 | + if let Some(coordinator) = &self.coordinator_manager { |
| | 598 | + let response = coordinator.find_chunk_locations(chunk_id.to_string(), 3).await?; |
| | 599 | + |
| | 600 | + if response.success { |
| | 601 | + debug!("Found {} locations for chunk {} via coordinator", response.node_addresses.len(), chunk_id); |
| | 602 | + Ok(response.node_addresses) |
| | 603 | + } else { |
| | 604 | + debug!("Coordinator couldn't find locations for chunk {}: {}", chunk_id, response.message); |
| | 605 | + Ok(Vec::new()) |
| | 606 | + } |
| | 607 | + } else { |
| | 608 | + Ok(Vec::new()) |
| | 609 | + } |
| | 610 | + } |
| | 611 | + |
| | 612 | + /// Get coordinator registration status |
| | 613 | + pub fn get_coordinator_status(&self) -> Option<&RegistrationStatus> { |
| | 614 | + self.coordinator_manager.as_ref().map(|c| c.get_registration_status()) |
| | 615 | + } |
| 414 | } | 616 | } |
| 415 | | 617 | |
| 416 | /// Comprehensive node status information | 618 | /// Comprehensive node status information |