zephyrfs/zephyrfs-node / 4588949

Browse files

Update node manager and main entry point

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
45889491ea7f908e9b79eb4e075d679996e66065
Parents
5e270a4
Tree
7def099

2 changed files

StatusFile+-
M src/main.rs 322 23
M src/node_manager.rs 1 1
src/main.rsmodified
@@ -16,16 +16,85 @@ use config::Config;
1616
 use node_manager::{NodeManager, DistributionStrategy};
1717
 
1818
 #[derive(Parser, Debug)]
19
-#[command(author, version, about, long_about = None)]
19
+#[command(author, version, about = "ZephyrFS - Distributed P2P storage node", long_about = None)]
2020
 struct Args {
21
-    #[arg(short, long, value_name = "FILE")]
21
+    #[arg(short, long, value_name = "FILE", global = true)]
2222
     config: Option<std::path::PathBuf>,
2323
     
24
-    #[arg(long)]
25
-    health_check: bool,
26
-    
27
-    #[arg(short, long)]
24
+    #[arg(short, long, global = true)]
2825
     verbose: bool,
26
+    
27
+    #[command(subcommand)]
28
+    command: Option<Commands>,
29
+}
30
+
31
+#[derive(Parser, Debug)]
32
+enum Commands {
33
+    /// Initialize a new ZephyrFS node
34
+    Init {
35
+        /// Storage path for the node
36
+        #[arg(short, long, default_value = "./zephyrfs_storage")]
37
+        storage_path: std::path::PathBuf,
38
+        
39
+        /// Maximum storage allocation in GB
40
+        #[arg(short, long, default_value = "10")]
41
+        max_storage_gb: u64,
42
+    },
43
+    
44
+    /// Start the ZephyrFS node daemon
45
+    Start {
46
+        /// Run in background/daemon mode
47
+        #[arg(short, long)]
48
+        daemon: bool,
49
+    },
50
+    
51
+    /// Join an existing ZephyrFS network
52
+    Join {
53
+        /// Bootstrap peer address (multiaddr format)
54
+        #[arg(required = true)]
55
+        bootstrap_peer: String,
56
+    },
57
+    
58
+    /// Upload a file to the network
59
+    Upload {
60
+        /// File to upload
61
+        file_path: std::path::PathBuf,
62
+        
63
+        /// Optional file ID (defaults to filename)
64
+        #[arg(short, long)]
65
+        file_id: Option<String>,
66
+        
67
+        /// Distribution strategy
68
+        #[arg(short, long, default_value = "local-only")]
69
+        strategy: String,
70
+    },
71
+    
72
+    /// Download a file from the network
73
+    Download {
74
+        /// File ID to download
75
+        file_id: String,
76
+        
77
+        /// Output path (defaults to current directory)
78
+        #[arg(short, long)]
79
+        output: Option<std::path::PathBuf>,
80
+    },
81
+    
82
+    /// List files in the network
83
+    Ls {
84
+        /// Show detailed information
85
+        #[arg(short, long)]
86
+        detailed: bool,
87
+        
88
+        /// Limit number of results
89
+        #[arg(short, long)]
90
+        limit: Option<usize>,
91
+    },
92
+    
93
+    /// Show node status
94
+    Status,
95
+    
96
+    /// Health check (for Docker/monitoring)
97
+    HealthCheck,
2998
 }
3099
 
31100
 #[tokio::main]
@@ -37,38 +106,268 @@ async fn main() -> Result<()> {
37106
     tracing_subscriber::fmt()
38107
         .with_env_filter(format!("zephyrfs_node={}", log_level))
39108
         .init();
40
-
41
-    if args.health_check {
42
-        // Simple health check for Docker
43
-        info!("Health check passed");
44
-        return Ok(());
109
+    
110
+    match args.command {
111
+        Some(Commands::HealthCheck) | None if std::env::args().any(|arg| arg == "--health-check") => {
112
+            // Simple health check for Docker/monitoring
113
+            info!("Health check passed");
114
+            Ok(())
115
+        }
116
+        
117
+        Some(Commands::Init { storage_path, max_storage_gb }) => {
118
+            init_node(storage_path, max_storage_gb, args.config.as_deref()).await
119
+        }
120
+        
121
+        Some(Commands::Start { daemon }) | None => {
122
+            start_node(daemon, args.config.as_deref()).await
123
+        }
124
+        
125
+        Some(Commands::Join { bootstrap_peer }) => {
126
+            join_network(bootstrap_peer, args.config.as_deref()).await
127
+        }
128
+        
129
+        Some(Commands::Upload { file_path, file_id, strategy }) => {
130
+            upload_file(file_path, file_id, strategy, args.config.as_deref()).await
131
+        }
132
+        
133
+        Some(Commands::Download { file_id, output }) => {
134
+            download_file(file_id, output, args.config.as_deref()).await
135
+        }
136
+        
137
+        Some(Commands::Ls { detailed, limit }) => {
138
+            list_files(detailed, limit, args.config.as_deref()).await
139
+        }
140
+        
141
+        Some(Commands::Status) => {
142
+            show_status(args.config.as_deref()).await
143
+        }
45144
     }
145
+}
146
+
147
+/// Initialize a new ZephyrFS node
148
+async fn init_node(storage_path: std::path::PathBuf, max_storage_gb: u64, config_path: Option<&std::path::Path>) -> Result<()> {
149
+    info!("Initializing ZephyrFS node");
150
+    info!("Storage path: {:?}", storage_path);
151
+    info!("Max storage: {} GB", max_storage_gb);
152
+    
153
+    // Create storage directory
154
+    std::fs::create_dir_all(&storage_path)?;
155
+    
156
+    // Load or create config
157
+    let mut config = Config::load(config_path)?;
158
+    config.storage.max_storage = max_storage_gb * 1024 * 1024 * 1024; // Convert GB to bytes
159
+    
160
+    // Test initialization by creating a NodeManager
161
+    let _node_manager = NodeManager::new(config, storage_path).await?;
162
+    
163
+    info!("ZephyrFS node initialized successfully");
164
+    info!("Run 'zephyrfs-node start' to begin serving");
165
+    Ok(())
166
+}
46167
 
47
-    info!("Starting ZephyrFS Node");
168
+/// Start the ZephyrFS node daemon
169
+async fn start_node(daemon: bool, config_path: Option<&std::path::Path>) -> Result<()> {
170
+    if daemon {
171
+        info!("Starting ZephyrFS node in daemon mode");
172
+        // TODO: Implement proper daemon mode with process forking
173
+        warn!("Daemon mode not yet implemented - running in foreground");
174
+    } else {
175
+        info!("Starting ZephyrFS node");
176
+    }
48177
     
49178
     // Load configuration
50
-    let config = Config::load(args.config.as_deref())?;
51
-    info!("Configuration loaded: {:?}", config);
179
+    let config = Config::load(config_path)?;
52180
     
53181
     // Determine storage path
54
-    let storage_path = std::env::current_dir()?
55
-        .join("zephyrfs_storage");
182
+    let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
56183
     
57
-    // Initialize integrated node manager
58
-    let mut node_manager = NodeManager::new(config.clone(), storage_path).await?;
184
+    // Initialize and start node manager
185
+    let mut node_manager = NodeManager::new(config, storage_path).await?;
59186
     
60
-    // Start the integrated node
61
-    info!("Starting integrated ZephyrFS node...");
187
+    info!("🚀 Starting integrated ZephyrFS node...");
62188
     node_manager.start().await?;
63189
     
64
-    info!("ZephyrFS Node is running. Press Ctrl+C to stop.");
190
+    info!("✅ ZephyrFS Node is running. Press Ctrl+C to stop.");
65191
     
66192
     // Keep running until shutdown signal
67193
     tokio::signal::ctrl_c().await?;
68
-    warn!("Shutdown signal received");
194
+    warn!("🛑 Shutdown signal received");
69195
     
70196
     node_manager.shutdown().await?;
71
-    info!("ZephyrFS Node stopped");
197
+    info!("✅ ZephyrFS Node stopped");
198
+    
199
+    Ok(())
200
+}
201
+
202
+/// Join an existing ZephyrFS network
203
+async fn join_network(bootstrap_peer: String, config_path: Option<&std::path::Path>) -> Result<()> {
204
+    info!("Joining ZephyrFS network via bootstrap peer: {}", bootstrap_peer);
205
+    
206
+    // Load configuration
207
+    let config = Config::load(config_path)?;
208
+    let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
209
+    
210
+    // Initialize node manager
211
+    let mut node_manager = NodeManager::new(config, storage_path).await?;
212
+    
213
+    // TODO: Add bootstrap peer to configuration and connect
214
+    warn!("Bootstrap peer connection not yet implemented");
215
+    warn!("Starting node normally - implement P2P bootstrap in future");
216
+    
217
+    // Start normally for now
218
+    node_manager.start().await?;
219
+    info!("✅ Node started (bootstrap connection pending implementation)");
220
+    
221
+    tokio::signal::ctrl_c().await?;
222
+    node_manager.shutdown().await?;
223
+    
224
+    Ok(())
225
+}
226
+
227
+/// Upload a file to the network
228
+async fn upload_file(file_path: std::path::PathBuf, file_id: Option<String>, strategy: String, config_path: Option<&std::path::Path>) -> Result<()> {
229
+    info!("Uploading file: {:?}", file_path);
230
+    
231
+    // Read file data
232
+    let data = std::fs::read(&file_path)?;
233
+    let filename = file_path.file_name()
234
+        .and_then(|n| n.to_str())
235
+        .unwrap_or("uploaded_file");
236
+    
237
+    let file_id = file_id.unwrap_or_else(|| filename.to_string());
238
+    
239
+    // Parse distribution strategy
240
+    let dist_strategy = match strategy.as_str() {
241
+        "local-only" | "local" => DistributionStrategy::LocalOnly,
242
+        "replicate" => DistributionStrategy::Replicate { redundancy: 3 },
243
+        "distribute" => DistributionStrategy::Distribute { min_peers: 2 },
244
+        _ => {
245
+            warn!("Unknown strategy '{}', using local-only", strategy);
246
+            DistributionStrategy::LocalOnly
247
+        }
248
+    };
249
+    
250
+    // Initialize node manager for operation
251
+    let config = Config::load(config_path)?;
252
+    let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
253
+    let node_manager = NodeManager::new(config, storage_path).await?;
254
+    
255
+    // Perform upload
256
+    let file_hash = node_manager.store_file(&file_id, &data, filename, dist_strategy).await?;
257
+    
258
+    info!("✅ File uploaded successfully");
259
+    info!("[INFO] File ID: {}", file_id);
260
+    info!("[INFO] Hash: {}", file_hash);
261
+    info!("[INFO] Size: {} bytes", data.len());
262
+    
263
+    Ok(())
264
+}
265
+
266
+/// Download a file from the network
267
+async fn download_file(file_id: String, output: Option<std::path::PathBuf>, config_path: Option<&std::path::Path>) -> Result<()> {
268
+    info!("Downloading file: {}", file_id);
269
+    
270
+    // Initialize node manager
271
+    let config = Config::load(config_path)?;
272
+    let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
273
+    let node_manager = NodeManager::new(config, storage_path).await?;
274
+    
275
+    // Attempt download
276
+    match node_manager.retrieve_file(&file_id).await? {
277
+        Some(data) => {
278
+            let output_path = output.unwrap_or_else(|| std::path::PathBuf::from(&file_id));
279
+            std::fs::write(&output_path, &data)?;
280
+            
281
+            info!("✅ File downloaded successfully");
282
+            info!("[SUCCESS] Saved to: {:?}", output_path);
283
+            info!("[INFO] Size: {} bytes", data.len());
284
+        }
285
+        None => {
286
+            warn!("[ERROR] File not found: {}", file_id);
287
+            std::process::exit(1);
288
+        }
289
+    }
290
+    
291
+    Ok(())
292
+}
293
+
294
+/// List files in the network
295
+async fn list_files(detailed: bool, limit: Option<usize>, config_path: Option<&std::path::Path>) -> Result<()> {
296
+    info!("Listing files in network");
297
+    
298
+    // Initialize node manager
299
+    let config = Config::load(config_path)?;
300
+    let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
301
+    let node_manager = NodeManager::new(config, storage_path).await?;
302
+    
303
+    // Get file list
304
+    let files = node_manager.storage_manager.list_files(limit).await?;
305
+    
306
+    if files.is_empty() {
307
+        info!("[INFO] No files found");
308
+        return Ok(());
309
+    }
310
+    
311
+    info!("[INFO] Found {} files:", files.len());
312
+    
313
+    for (file_id, metadata) in files {
314
+        if detailed {
315
+            info!("[INFO] File: {}", file_id);
316
+            info!("  Name: {}", metadata.name);
317
+            info!("  Size: {} bytes", metadata.size);
318
+            info!("  Hash: {}", metadata.file_hash);
319
+            info!("  Created: {}", metadata.created_at);
320
+            info!("  Chunks: {}", metadata.chunk_ids.len());
321
+            if let Some(mime) = &metadata.mime_type {
322
+                info!("  Type: {}", mime);
323
+            }
324
+            info!("");
325
+        } else {
326
+            info!("  {} ({} bytes) - {}", file_id, metadata.size, metadata.name);
327
+        }
328
+    }
329
+    
330
+    Ok(())
331
+}
332
+
333
+/// Show node status
334
+async fn show_status(config_path: Option<&std::path::Path>) -> Result<()> {
335
+    info!("Checking ZephyrFS node status");
336
+    
337
+    // Initialize node manager
338
+    let config = Config::load(config_path)?;
339
+    let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
340
+    let node_manager = NodeManager::new(config, storage_path).await?;
341
+    
342
+    // Get comprehensive status
343
+    let status = node_manager.get_node_status().await;
344
+    
345
+    info!("🚀 ZephyrFS Node Status");
346
+    info!("  🆔 Node ID: {}", status.node_id);
347
+    info!("  📦 Version: {}", status.version);
348
+    info!("  ⏱️  Uptime: {} seconds", status.uptime_seconds);
349
+    info!("");
350
+    
351
+    info!("🌐 Network Status");
352
+    info!("  🔗 Peer connections: {}", status.peer_connections);
353
+    info!("  📤 Bytes sent: {}", status.bytes_sent);
354
+    info!("  📥 Bytes received: {}", status.bytes_received);
355
+    info!("  ❌ Failed requests: {}", status.failed_requests);
356
+    info!("");
357
+    
358
+    info!("💾 Storage Status");
359
+    info!("  💽 Total capacity: {} GB", status.storage_capacity / (1024 * 1024 * 1024));
360
+    info!("  📊 Used space: {} MB", status.storage_used / (1024 * 1024));
361
+    info!("  💚 Available space: {} GB", status.storage_available / (1024 * 1024 * 1024));
362
+    info!("  📁 File count: {}", status.file_count);
363
+    info!("  🧩 Chunk count: {}", status.chunk_count);
364
+    
365
+    let usage_percent = if status.storage_capacity > 0 {
366
+        (status.storage_used as f64 / status.storage_capacity as f64) * 100.0
367
+    } else {
368
+        0.0
369
+    };
370
+    info!("  📈 Usage: {:.1}%", usage_percent);
72371
     
73372
     Ok(())
74373
 }
src/node_manager.rsmodified
@@ -18,7 +18,7 @@ pub struct NodeManager {
1818
     network_manager: NetworkManager,
1919
     
2020
     /// Storage layer manager
21
-    storage_manager: Arc<StorageManager>,
21
+    pub storage_manager: Arc<StorageManager>,
2222
     
2323
     /// Configuration
2424
     config: Config,