| 1 |
use anyhow::{Context, Result}; |
| 2 |
use std::path::PathBuf; |
| 3 |
use std::sync::Arc; |
| 4 |
use tokio::sync::{mpsc, RwLock}; |
| 5 |
use tracing::{debug, info, warn, error}; |
| 6 |
|
| 7 |
use crate::config::Config; |
| 8 |
use crate::network::{NetworkManager, message_handler::{ZephyrMessage, NodeInfo}}; |
| 9 |
use crate::storage::{StorageManager, StorageConfig as StorageManagerConfig}; |
| 10 |
use crate::coordinator::{CoordinatorManager, RegistrationStatus}; |
| 11 |
|
| 12 |
/// Integrated node manager coordinating networking and storage |
| 13 |
/// |
| 14 |
/// Safety: Coordinates secure operations between network and storage layers |
| 15 |
/// Transparency: Provides comprehensive node status and metrics |
| 16 |
/// Privacy: Handles secure chunk distribution and encrypted metadata |
| 17 |
pub struct NodeManager { |
| 18 |
/// Network layer manager |
| 19 |
network_manager: NetworkManager, |
| 20 |
|
| 21 |
/// Storage layer manager |
| 22 |
pub storage_manager: Arc<StorageManager>, |
| 23 |
|
| 24 |
/// Coordinator manager for network coordination |
| 25 |
coordinator_manager: Option<CoordinatorManager>, |
| 26 |
|
| 27 |
/// Configuration |
| 28 |
config: Config, |
| 29 |
|
| 30 |
/// Message channel from network to node manager |
| 31 |
message_rx: mpsc::Receiver<ZephyrMessage>, |
| 32 |
|
| 33 |
/// Message channel from node manager to network |
| 34 |
message_tx: mpsc::Sender<ZephyrMessage>, |
| 35 |
|
| 36 |
/// Node statistics |
| 37 |
node_stats: Arc<RwLock<NodeStats>>, |
| 38 |
|
| 39 |
/// Base storage path |
| 40 |
storage_path: PathBuf, |
| 41 |
} |
| 42 |
|
| 43 |
/// Comprehensive node statistics |
| 44 |
/// |
| 45 |
/// Transparency: Detailed metrics for monitoring and audit |
| 46 |
#[derive(Debug, Clone)] |
| 47 |
pub struct NodeStats { |
| 48 |
/// Number of chunks served to other peers |
| 49 |
pub chunks_served: u64, |
| 50 |
|
| 51 |
/// Number of chunks retrieved from peers |
| 52 |
pub chunks_retrieved: u64, |
| 53 |
|
| 54 |
/// Total bytes sent to peers |
| 55 |
pub bytes_sent: u64, |
| 56 |
|
| 57 |
/// Total bytes received from peers |
| 58 |
pub bytes_received: u64, |
| 59 |
|
| 60 |
/// Number of active peer connections |
| 61 |
pub peer_connections: u32, |
| 62 |
|
| 63 |
/// Number of failed chunk requests |
| 64 |
pub failed_requests: u64, |
| 65 |
|
| 66 |
/// Uptime in seconds |
| 67 |
pub uptime_seconds: u64, |
| 68 |
|
| 69 |
/// Node start time |
| 70 |
pub start_time: crate::SerializableInstant, |
| 71 |
} |
| 72 |
|
| 73 |
/// File distribution strategy for P2P sharing |
| 74 |
#[derive(Debug, Clone)] |
| 75 |
pub enum DistributionStrategy { |
| 76 |
/// Store locally only |
| 77 |
LocalOnly, |
| 78 |
|
| 79 |
/// Replicate to N closest peers |
| 80 |
Replicate { redundancy: u32 }, |
| 81 |
|
| 82 |
/// Distribute chunks across network |
| 83 |
Distribute { min_peers: u32 }, |
| 84 |
} |
| 85 |
|
| 86 |
impl NodeManager { |
| 87 |
/// Create a new integrated node manager |
| 88 |
/// |
| 89 |
/// Safety: Initializes both network and storage with secure configurations |
| 90 |
pub async fn new(config: Config, storage_path: PathBuf) -> Result<Self> { |
| 91 |
info!("Initializing NodeManager with integrated network and storage"); |
| 92 |
|
| 93 |
// Create message channel for network-storage communication |
| 94 |
let (message_tx, message_rx) = mpsc::channel::<ZephyrMessage>(1000); |
| 95 |
|
| 96 |
// Initialize storage manager |
| 97 |
let storage_config = StorageManagerConfig { |
| 98 |
max_capacity: config.storage.max_storage, |
| 99 |
warning_threshold: 0.8, |
| 100 |
critical_threshold: 0.95, |
| 101 |
default_chunk_size: config.storage.chunk_size, |
| 102 |
max_file_size: 1024 * 1024 * 1024, // 1GB max file |
| 103 |
enable_gc: true, |
| 104 |
gc_interval: 3600, // 1 hour |
| 105 |
}; |
| 106 |
|
| 107 |
let storage_manager = Arc::new( |
| 108 |
StorageManager::new(&storage_path, storage_config).await |
| 109 |
.context("Failed to initialize storage manager")? |
| 110 |
); |
| 111 |
|
| 112 |
// Initialize network manager with message channel |
| 113 |
let network_manager = NetworkManager::new(config.clone()).await |
| 114 |
.context("Failed to initialize network manager")?; |
| 115 |
|
| 116 |
// Initialize coordinator manager if URL is provided |
| 117 |
let coordinator_manager = if !config.coordinator.url.is_empty() { |
| 118 |
match CoordinatorManager::new(config.coordinator.url.clone()).await { |
| 119 |
Ok(manager) => { |
| 120 |
info!("Successfully connected to coordinator at {}", config.coordinator.url); |
| 121 |
Some(manager) |
| 122 |
} |
| 123 |
Err(e) => { |
| 124 |
warn!("Failed to connect to coordinator at {}: {}. Running in standalone mode.", config.coordinator.url, e); |
| 125 |
None |
| 126 |
} |
| 127 |
} |
| 128 |
} else { |
| 129 |
info!("No coordinator URL configured. Running in standalone mode."); |
| 130 |
None |
| 131 |
}; |
| 132 |
|
| 133 |
let node_stats = Arc::new(RwLock::new(NodeStats { |
| 134 |
chunks_served: 0, |
| 135 |
chunks_retrieved: 0, |
| 136 |
bytes_sent: 0, |
| 137 |
bytes_received: 0, |
| 138 |
peer_connections: 0, |
| 139 |
failed_requests: 0, |
| 140 |
uptime_seconds: 0, |
| 141 |
start_time: crate::SerializableInstant::now(), |
| 142 |
})); |
| 143 |
|
| 144 |
Ok(Self { |
| 145 |
network_manager, |
| 146 |
storage_manager, |
| 147 |
coordinator_manager, |
| 148 |
config, |
| 149 |
message_rx, |
| 150 |
message_tx, |
| 151 |
node_stats, |
| 152 |
storage_path, |
| 153 |
}) |
| 154 |
} |
| 155 |
|
| 156 |
/// Start the integrated node |
| 157 |
/// |
| 158 |
/// Safety: Starts both network and storage services with proper error handling |
| 159 |
pub async fn start(&mut self) -> Result<()> { |
| 160 |
info!("Starting integrated ZephyrFS node"); |
| 161 |
|
| 162 |
// Start storage manager background tasks (if any) |
| 163 |
self.start_storage_tasks().await?; |
| 164 |
|
| 165 |
// Start network manager |
| 166 |
self.network_manager.start().await |
| 167 |
.context("Failed to start network manager")?; |
| 168 |
|
| 169 |
// Register with coordinator if available |
| 170 |
if self.coordinator_manager.is_some() { |
| 171 |
let node_status = self.get_node_status().await; |
| 172 |
let addresses = vec![ |
| 173 |
format!("{}:{}", "127.0.0.1", self.config.network.p2p_port), |
| 174 |
format!("{}:{}", "127.0.0.1", self.config.network.api_port), |
| 175 |
]; |
| 176 |
|
| 177 |
let mut capabilities = std::collections::HashMap::new(); |
| 178 |
capabilities.insert("version".to_string(), node_status.version); |
| 179 |
capabilities.insert("storage".to_string(), "true".to_string()); |
| 180 |
capabilities.insert("encryption".to_string(), "true".to_string()); |
| 181 |
|
| 182 |
if let Some(coordinator) = self.coordinator_manager.as_mut() { |
| 183 |
let response = coordinator.register_node( |
| 184 |
addresses, |
| 185 |
node_status.storage_capacity, |
| 186 |
capabilities, |
| 187 |
).await?; |
| 188 |
|
| 189 |
if response.success { |
| 190 |
info!("Successfully registered with coordinator. Node ID: {}", coordinator.get_node_id()); |
| 191 |
if !response.bootstrap_peers.is_empty() { |
| 192 |
info!("Received {} bootstrap peers from coordinator", response.bootstrap_peers.len()); |
| 193 |
// TODO: Connect to bootstrap peers |
| 194 |
} |
| 195 |
} else { |
| 196 |
warn!("Failed to register with coordinator: {}", response.message); |
| 197 |
} |
| 198 |
} |
| 199 |
|
| 200 |
self.start_coordinator_heartbeat().await; |
| 201 |
} |
| 202 |
|
| 203 |
// Start message processing loop |
| 204 |
self.start_message_processing().await; |
| 205 |
|
| 206 |
info!("ZephyrFS node started successfully"); |
| 207 |
Ok(()) |
| 208 |
} |
| 209 |
|
| 210 |
/// Store a file and optionally distribute to peers |
| 211 |
/// |
| 212 |
/// Safety: Validates file integrity and enforces capacity limits |
| 213 |
/// Privacy: Supports encrypted storage and secure distribution |
| 214 |
pub async fn store_file( |
| 215 |
&self, |
| 216 |
file_id: &str, |
| 217 |
data: &[u8], |
| 218 |
filename: &str, |
| 219 |
strategy: DistributionStrategy, |
| 220 |
) -> Result<String> { |
| 221 |
info!("Storing file: {} ({} bytes) with strategy: {:?}", filename, data.len(), strategy); |
| 222 |
|
| 223 |
// Store file locally first |
| 224 |
let file_hash = self.storage_manager.store_file(file_id, data, filename).await |
| 225 |
.context("Failed to store file locally")?; |
| 226 |
|
| 227 |
// Update statistics |
| 228 |
{ |
| 229 |
let mut stats = self.node_stats.write().await; |
| 230 |
stats.bytes_sent += data.len() as u64; // Will be sent to peers |
| 231 |
} |
| 232 |
|
| 233 |
// Handle distribution strategy |
| 234 |
match strategy { |
| 235 |
DistributionStrategy::LocalOnly => { |
| 236 |
debug!("File stored locally only: {}", file_id); |
| 237 |
} |
| 238 |
DistributionStrategy::Replicate { redundancy } => { |
| 239 |
self.replicate_file_to_peers(file_id, redundancy).await?; |
| 240 |
} |
| 241 |
DistributionStrategy::Distribute { min_peers } => { |
| 242 |
self.distribute_chunks_to_peers(file_id, min_peers).await?; |
| 243 |
} |
| 244 |
} |
| 245 |
|
| 246 |
// Register file with coordinator if available |
| 247 |
if let Err(e) = self.register_file_with_coordinator(file_id, &file_hash, data.len() as u64, filename).await { |
| 248 |
warn!("Failed to register file with coordinator: {}", e); |
| 249 |
} |
| 250 |
|
| 251 |
// Announce file availability to peers |
| 252 |
if let Err(e) = self.announce_file_to_peers(file_id, &file_hash).await { |
| 253 |
warn!("Failed to announce file to peers: {}", e); |
| 254 |
} |
| 255 |
|
| 256 |
info!("Successfully stored and distributed file: {} with hash: {}", file_id, file_hash); |
| 257 |
Ok(file_hash) |
| 258 |
} |
| 259 |
|
| 260 |
/// Retrieve a file, attempting local storage first, then peers |
| 261 |
/// |
| 262 |
/// Safety: Verifies chunk integrity from all sources |
| 263 |
/// Transparency: Logs all retrieval attempts and sources |
| 264 |
pub async fn retrieve_file(&self, file_id: &str) -> Result<Option<Vec<u8>>> { |
| 265 |
info!("Retrieving file: {}", file_id); |
| 266 |
|
| 267 |
// Try local storage first |
| 268 |
match self.storage_manager.retrieve_file(file_id).await? { |
| 269 |
Some(data) => { |
| 270 |
debug!("File retrieved from local storage: {}", file_id); |
| 271 |
return Ok(Some(data)); |
| 272 |
} |
| 273 |
None => { |
| 274 |
debug!("File not found locally, attempting peer retrieval: {}", file_id); |
| 275 |
} |
| 276 |
} |
| 277 |
|
| 278 |
// Attempt to retrieve from peers |
| 279 |
match self.retrieve_file_from_peers(file_id).await? { |
| 280 |
Some(data) => { |
| 281 |
info!("Successfully retrieved file from peers: {}", file_id); |
| 282 |
|
| 283 |
// Store locally for future access |
| 284 |
if let Ok(_) = self.storage_manager.store_file(file_id, &data, "retrieved_file").await { |
| 285 |
debug!("Cached retrieved file locally: {}", file_id); |
| 286 |
} |
| 287 |
|
| 288 |
// Update statistics |
| 289 |
{ |
| 290 |
let mut stats = self.node_stats.write().await; |
| 291 |
stats.chunks_retrieved += 1; |
| 292 |
stats.bytes_received += data.len() as u64; |
| 293 |
} |
| 294 |
|
| 295 |
Ok(Some(data)) |
| 296 |
} |
| 297 |
None => { |
| 298 |
warn!("File not found locally or on peers: {}", file_id); |
| 299 |
Ok(None) |
| 300 |
} |
| 301 |
} |
| 302 |
} |
| 303 |
|
| 304 |
/// Delete a file locally and notify peers |
| 305 |
/// |
| 306 |
/// Safety: Ensures secure deletion and updates peer information |
| 307 |
pub async fn delete_file(&self, file_id: &str) -> Result<bool> { |
| 308 |
info!("Deleting file: {}", file_id); |
| 309 |
|
| 310 |
// Delete locally |
| 311 |
let deleted = self.storage_manager.delete_file(file_id).await?; |
| 312 |
|
| 313 |
if deleted { |
| 314 |
// Notify peers about deletion (future implementation) |
| 315 |
debug!("File deleted locally, peer notification not yet implemented: {}", file_id); |
| 316 |
} |
| 317 |
|
| 318 |
Ok(deleted) |
| 319 |
} |
| 320 |
|
| 321 |
/// Get comprehensive node status |
| 322 |
/// |
| 323 |
/// Transparency: Provides detailed node metrics for monitoring |
| 324 |
pub async fn get_node_status(&self) -> NodeStatus { |
| 325 |
let stats = self.node_stats.read().await; |
| 326 |
let capacity_info = self.storage_manager.get_capacity_info().await; |
| 327 |
let storage_stats = self.storage_manager.get_storage_stats().await.unwrap_or_default(); |
| 328 |
|
| 329 |
// Calculate uptime |
| 330 |
let uptime_seconds = stats.start_time.elapsed().as_secs(); |
| 331 |
|
| 332 |
let node_id = if let Some(coordinator) = &self.coordinator_manager { |
| 333 |
coordinator.get_node_id().to_string() |
| 334 |
} else { |
| 335 |
self.config.node_id.clone().unwrap_or_else(|| "local_node".to_string()) |
| 336 |
}; |
| 337 |
|
| 338 |
NodeStatus { |
| 339 |
node_id, |
| 340 |
version: env!("CARGO_PKG_VERSION").to_string(), |
| 341 |
uptime_seconds, |
| 342 |
peer_connections: stats.peer_connections, |
| 343 |
chunks_served: stats.chunks_served, |
| 344 |
chunks_retrieved: stats.chunks_retrieved, |
| 345 |
bytes_sent: stats.bytes_sent, |
| 346 |
bytes_received: stats.bytes_received, |
| 347 |
failed_requests: stats.failed_requests, |
| 348 |
storage_capacity: capacity_info.total_capacity, |
| 349 |
storage_used: capacity_info.used_space, |
| 350 |
storage_available: capacity_info.available_space, |
| 351 |
file_count: capacity_info.file_count, |
| 352 |
chunk_count: storage_stats.total_chunks, |
| 353 |
} |
| 354 |
} |
| 355 |
|
| 356 |
/// Shutdown the node gracefully |
| 357 |
/// |
| 358 |
/// Safety: Ensures clean shutdown of both network and storage |
| 359 |
pub async fn shutdown(&mut self) -> Result<()> { |
| 360 |
info!("Shutting down ZephyrFS node"); |
| 361 |
|
| 362 |
// Unregister from coordinator if connected |
| 363 |
if let Some(coordinator) = &mut self.coordinator_manager { |
| 364 |
if let Err(e) = coordinator.unregister_node(Some("Normal shutdown".to_string())).await { |
| 365 |
warn!("Failed to unregister from coordinator: {}", e); |
| 366 |
} |
| 367 |
} |
| 368 |
|
| 369 |
// Shutdown network manager |
| 370 |
self.network_manager.shutdown().await |
| 371 |
.context("Failed to shutdown network manager")?; |
| 372 |
|
| 373 |
// Storage manager cleanup (if needed) |
| 374 |
// Currently storage manager doesn't need explicit cleanup |
| 375 |
|
| 376 |
info!("ZephyrFS node shutdown complete"); |
| 377 |
Ok(()) |
| 378 |
} |
| 379 |
|
| 380 |
/// Start storage-related background tasks |
| 381 |
async fn start_storage_tasks(&self) -> Result<()> { |
| 382 |
// Future: garbage collection, capacity monitoring, etc. |
| 383 |
debug!("Storage background tasks initialized"); |
| 384 |
Ok(()) |
| 385 |
} |
| 386 |
|
| 387 |
/// Start message processing loop |
| 388 |
async fn start_message_processing(&self) { |
| 389 |
let storage_manager = Arc::clone(&self.storage_manager); |
| 390 |
let node_stats = Arc::clone(&self.node_stats); |
| 391 |
|
| 392 |
// Spawn message processing task |
| 393 |
tokio::spawn(async move { |
| 394 |
debug!("Starting message processing loop"); |
| 395 |
// Future: Process messages from message_rx |
| 396 |
// This will handle chunk requests/responses from peers |
| 397 |
}); |
| 398 |
} |
| 399 |
|
| 400 |
/// Replicate file to specified number of peers |
| 401 |
async fn replicate_file_to_peers(&self, _file_id: &str, _redundancy: u32) -> Result<()> { |
| 402 |
// TODO: Implement peer replication |
| 403 |
debug!("File replication not yet implemented"); |
| 404 |
Ok(()) |
| 405 |
} |
| 406 |
|
| 407 |
/// Distribute file chunks across peers |
| 408 |
async fn distribute_chunks_to_peers(&self, _file_id: &str, _min_peers: u32) -> Result<()> { |
| 409 |
// TODO: Implement chunk distribution |
| 410 |
debug!("Chunk distribution not yet implemented"); |
| 411 |
Ok(()) |
| 412 |
} |
| 413 |
|
| 414 |
/// Retrieve file from peers |
| 415 |
async fn retrieve_file_from_peers(&self, file_id: &str) -> Result<Option<Vec<u8>>> { |
| 416 |
info!("Attempting to retrieve file from peers: {}", file_id); |
| 417 |
|
| 418 |
// First, check if we have metadata about this file from other sources |
| 419 |
// This is a simplified implementation - in a real system, we'd have |
| 420 |
// a distributed hash table or peer discovery mechanism |
| 421 |
|
| 422 |
// For now, we'll simulate trying to get chunks by ID |
| 423 |
// In a real implementation, this would: |
| 424 |
// 1. Query the DHT for file metadata |
| 425 |
// 2. Get the list of chunk IDs that comprise the file |
| 426 |
// 3. Request each chunk from peers |
| 427 |
// 4. Reconstruct the file |
| 428 |
|
| 429 |
debug!("Peer file retrieval requires DHT implementation - returning None for now"); |
| 430 |
Ok(None) |
| 431 |
} |
| 432 |
|
| 433 |
/// Request a specific chunk from peers |
| 434 |
/// |
| 435 |
/// Safety: Validates chunk integrity from all peer sources |
| 436 |
/// Transparency: Logs all peer chunk requests and responses |
| 437 |
pub async fn request_chunk_from_peers(&self, chunk_id: &str, expected_hash: &str) -> Result<Option<Vec<u8>>> { |
| 438 |
info!("Requesting chunk from peers: {} (expected hash: {})", chunk_id, expected_hash); |
| 439 |
|
| 440 |
// Create chunk request message |
| 441 |
let request = ZephyrMessage::ChunkRequest { |
| 442 |
chunk_id: chunk_id.to_string(), |
| 443 |
expected_hash: expected_hash.to_string(), |
| 444 |
}; |
| 445 |
|
| 446 |
// In a real implementation, this would: |
| 447 |
// 1. Send the request to multiple peers |
| 448 |
// 2. Wait for responses with timeout |
| 449 |
// 3. Validate responses and return the first valid one |
| 450 |
// 4. Update peer reputation based on response quality |
| 451 |
|
| 452 |
debug!("Peer chunk requests require network broadcast - not yet implemented"); |
| 453 |
|
| 454 |
// Update statistics for attempted request |
| 455 |
{ |
| 456 |
let mut stats = self.node_stats.write().await; |
| 457 |
stats.failed_requests += 1; |
| 458 |
} |
| 459 |
|
| 460 |
Ok(None) |
| 461 |
} |
| 462 |
|
| 463 |
/// Announce file availability to peers |
| 464 |
/// |
| 465 |
/// Transparency: Announces stored files to help peers discover content |
| 466 |
pub async fn announce_file_to_peers(&self, file_id: &str, file_hash: &str) -> Result<()> { |
| 467 |
info!("Announcing file availability to peers: {} (hash: {})", file_id, file_hash); |
| 468 |
|
| 469 |
// Get our node info for the announcement |
| 470 |
let node_status = self.get_node_status().await; |
| 471 |
|
| 472 |
let announcement = ZephyrMessage::StatusUpdate { |
| 473 |
node_info: NodeInfo { |
| 474 |
node_id: node_status.node_id, |
| 475 |
version: node_status.version, |
| 476 |
storage_available: node_status.storage_available, |
| 477 |
storage_used: node_status.storage_used, |
| 478 |
uptime_seconds: node_status.uptime_seconds, |
| 479 |
capabilities: vec!["file_storage".to_string(), "chunk_serving".to_string()], |
| 480 |
} |
| 481 |
}; |
| 482 |
|
| 483 |
// In a real implementation, this would broadcast to all connected peers |
| 484 |
debug!("File announcement broadcast not yet implemented"); |
| 485 |
|
| 486 |
Ok(()) |
| 487 |
} |
| 488 |
|
| 489 |
/// Register this node with the coordinator |
| 490 |
async fn register_with_coordinator(&self, coordinator: &mut CoordinatorManager) -> Result<()> { |
| 491 |
info!("Registering node with coordinator"); |
| 492 |
|
| 493 |
let node_status = self.get_node_status().await; |
| 494 |
let addresses = vec![ |
| 495 |
format!("{}:{}", "127.0.0.1", self.config.network.p2p_port), |
| 496 |
format!("{}:{}", "127.0.0.1", self.config.network.api_port), |
| 497 |
]; |
| 498 |
|
| 499 |
let mut capabilities = std::collections::HashMap::new(); |
| 500 |
capabilities.insert("version".to_string(), node_status.version); |
| 501 |
capabilities.insert("storage".to_string(), "true".to_string()); |
| 502 |
capabilities.insert("encryption".to_string(), "true".to_string()); |
| 503 |
|
| 504 |
let response = coordinator.register_node( |
| 505 |
addresses, |
| 506 |
node_status.storage_capacity, |
| 507 |
capabilities, |
| 508 |
).await?; |
| 509 |
|
| 510 |
if response.success { |
| 511 |
info!("Successfully registered with coordinator. Node ID: {}", coordinator.get_node_id()); |
| 512 |
if !response.bootstrap_peers.is_empty() { |
| 513 |
info!("Received {} bootstrap peers from coordinator", response.bootstrap_peers.len()); |
| 514 |
// TODO: Connect to bootstrap peers |
| 515 |
} |
| 516 |
} else { |
| 517 |
warn!("Failed to register with coordinator: {}", response.message); |
| 518 |
} |
| 519 |
|
| 520 |
Ok(()) |
| 521 |
} |
| 522 |
|
| 523 |
/// Start coordinator heartbeat loop |
| 524 |
async fn start_coordinator_heartbeat(&self) { |
| 525 |
if let Some(coordinator) = &self.coordinator_manager { |
| 526 |
let node_stats = Arc::clone(&self.node_stats); |
| 527 |
let storage_manager = Arc::clone(&self.storage_manager); |
| 528 |
|
| 529 |
coordinator.start_heartbeat(move || { |
| 530 |
let stats = tokio::task::block_in_place(|| { |
| 531 |
tokio::runtime::Handle::current().block_on(async { |
| 532 |
let node_stats = node_stats.read().await; |
| 533 |
let capacity_info = storage_manager.get_capacity_info().await; |
| 534 |
let uptime = node_stats.start_time.elapsed().as_secs() as i64; |
| 535 |
|
| 536 |
crate::coordinator::types::NodeStats { |
| 537 |
storage_used: capacity_info.used_space as i64, |
| 538 |
storage_available: capacity_info.available_space as i64, |
| 539 |
chunks_stored: capacity_info.file_count as i64, // Approximation |
| 540 |
bandwidth_up: node_stats.bytes_sent as i64, |
| 541 |
bandwidth_down: node_stats.bytes_received as i64, |
| 542 |
cpu_usage: 0.0, // TODO: Implement CPU monitoring |
| 543 |
memory_usage: 0.0, // TODO: Implement memory monitoring |
| 544 |
uptime_seconds: uptime, |
| 545 |
} |
| 546 |
}) |
| 547 |
}); |
| 548 |
stats |
| 549 |
}).await; |
| 550 |
|
| 551 |
info!("Started coordinator heartbeat"); |
| 552 |
} |
| 553 |
} |
| 554 |
|
| 555 |
/// Register a file with the coordinator if available |
| 556 |
async fn register_file_with_coordinator(&self, file_id: &str, file_hash: &str, file_size: u64, filename: &str) -> Result<()> { |
| 557 |
if let Some(coordinator) = &self.coordinator_manager { |
| 558 |
// Get file chunks from storage manager |
| 559 |
let chunks = match self.storage_manager.get_file_chunks(file_id).await { |
| 560 |
Ok(Some(chunk_list)) => { |
| 561 |
chunk_list.into_iter().enumerate().map(|(index, chunk_id)| { |
| 562 |
crate::coordinator::types::ChunkMetadata { |
| 563 |
chunk_id: chunk_id.clone(), |
| 564 |
hash: chunk_id, // For now, use chunk_id as hash |
| 565 |
size: self.config.storage.chunk_size as i64, // Default chunk size |
| 566 |
index: index as i32, |
| 567 |
} |
| 568 |
}).collect() |
| 569 |
} |
| 570 |
_ => Vec::new(), |
| 571 |
}; |
| 572 |
|
| 573 |
let response = coordinator.register_file( |
| 574 |
file_id.to_string(), |
| 575 |
filename.to_string(), |
| 576 |
file_size, |
| 577 |
file_hash.to_string(), |
| 578 |
chunks, |
| 579 |
).await?; |
| 580 |
|
| 581 |
if response.success { |
| 582 |
debug!("Successfully registered file {} with coordinator", file_id); |
| 583 |
if !response.chunk_placements.is_empty() { |
| 584 |
debug!("Coordinator provided {} chunk placement recommendations", response.chunk_placements.len()); |
| 585 |
// TODO: Handle chunk placement recommendations |
| 586 |
} |
| 587 |
} else { |
| 588 |
warn!("Failed to register file with coordinator: {}", response.message); |
| 589 |
} |
| 590 |
} |
| 591 |
|
| 592 |
Ok(()) |
| 593 |
} |
| 594 |
|
| 595 |
/// Find chunk locations using coordinator |
| 596 |
async fn find_chunk_locations_via_coordinator(&self, chunk_id: &str) -> Result<Vec<String>> { |
| 597 |
if let Some(coordinator) = &self.coordinator_manager { |
| 598 |
let response = coordinator.find_chunk_locations(chunk_id.to_string(), 3).await?; |
| 599 |
|
| 600 |
if response.success { |
| 601 |
debug!("Found {} locations for chunk {} via coordinator", response.node_addresses.len(), chunk_id); |
| 602 |
Ok(response.node_addresses) |
| 603 |
} else { |
| 604 |
debug!("Coordinator couldn't find locations for chunk {}: {}", chunk_id, response.message); |
| 605 |
Ok(Vec::new()) |
| 606 |
} |
| 607 |
} else { |
| 608 |
Ok(Vec::new()) |
| 609 |
} |
| 610 |
} |
| 611 |
|
| 612 |
/// Get coordinator registration status |
| 613 |
pub fn get_coordinator_status(&self) -> Option<&RegistrationStatus> { |
| 614 |
self.coordinator_manager.as_ref().map(|c| c.get_registration_status()) |
| 615 |
} |
| 616 |
} |
| 617 |
|
| 618 |
/// Comprehensive node status information |
| 619 |
/// |
| 620 |
/// Transparency: Complete node metrics for monitoring and audit |
| 621 |
#[derive(Debug, Clone)] |
| 622 |
pub struct NodeStatus { |
| 623 |
pub node_id: String, |
| 624 |
pub version: String, |
| 625 |
pub uptime_seconds: u64, |
| 626 |
pub peer_connections: u32, |
| 627 |
pub chunks_served: u64, |
| 628 |
pub chunks_retrieved: u64, |
| 629 |
pub bytes_sent: u64, |
| 630 |
pub bytes_received: u64, |
| 631 |
pub failed_requests: u64, |
| 632 |
pub storage_capacity: u64, |
| 633 |
pub storage_used: u64, |
| 634 |
pub storage_available: u64, |
| 635 |
pub file_count: u64, |
| 636 |
pub chunk_count: u64, |
| 637 |
} |
| 638 |
|
| 639 |
// Import for the storage stats |
| 640 |
use crate::storage::StorageStats; |