Rust · 12226 bytes Raw Blame History
1 use anyhow::Result;
2 use clap::Parser;
3 use tracing::{info, warn};
4 use tracing_subscriber;
5
6 mod config;
7 mod network;
8 mod storage;
9 mod protocol;
10 mod node_manager;
11
12 #[cfg(test)]
13 mod integration_tests;
14
15 use config::Config;
16 use node_manager::{NodeManager, DistributionStrategy};
17
18 #[derive(Parser, Debug)]
19 #[command(author, version, about = "ZephyrFS - Distributed P2P storage node", long_about = None)]
20 struct Args {
21 #[arg(short, long, value_name = "FILE", global = true)]
22 config: Option<std::path::PathBuf>,
23
24 #[arg(short, long, global = true)]
25 verbose: bool,
26
27 #[command(subcommand)]
28 command: Option<Commands>,
29 }
30
31 #[derive(Parser, Debug)]
32 enum Commands {
33 /// Initialize a new ZephyrFS node
34 Init {
35 /// Storage path for the node
36 #[arg(short, long, default_value = "./zephyrfs_storage")]
37 storage_path: std::path::PathBuf,
38
39 /// Maximum storage allocation in GB
40 #[arg(short, long, default_value = "10")]
41 max_storage_gb: u64,
42 },
43
44 /// Start the ZephyrFS node daemon
45 Start {
46 /// Run in background/daemon mode
47 #[arg(short, long)]
48 daemon: bool,
49 },
50
51 /// Join an existing ZephyrFS network
52 Join {
53 /// Bootstrap peer address (multiaddr format)
54 #[arg(required = true)]
55 bootstrap_peer: String,
56 },
57
58 /// Upload a file to the network
59 Upload {
60 /// File to upload
61 file_path: std::path::PathBuf,
62
63 /// Optional file ID (defaults to filename)
64 #[arg(short, long)]
65 file_id: Option<String>,
66
67 /// Distribution strategy
68 #[arg(short, long, default_value = "local-only")]
69 strategy: String,
70 },
71
72 /// Download a file from the network
73 Download {
74 /// File ID to download
75 file_id: String,
76
77 /// Output path (defaults to current directory)
78 #[arg(short, long)]
79 output: Option<std::path::PathBuf>,
80 },
81
82 /// List files in the network
83 Ls {
84 /// Show detailed information
85 #[arg(short, long)]
86 detailed: bool,
87
88 /// Limit number of results
89 #[arg(short, long)]
90 limit: Option<usize>,
91 },
92
93 /// Show node status
94 Status,
95
96 /// Health check (for Docker/monitoring)
97 HealthCheck,
98 }
99
100 #[tokio::main]
101 async fn main() -> Result<()> {
102 let args = Args::parse();
103
104 // Initialize logging
105 let log_level = if args.verbose { "debug" } else { "info" };
106 tracing_subscriber::fmt()
107 .with_env_filter(format!("zephyrfs_node={}", log_level))
108 .init();
109
110 match args.command {
111 Some(Commands::HealthCheck) | None if std::env::args().any(|arg| arg == "--health-check") => {
112 // Simple health check for Docker/monitoring
113 info!("Health check passed");
114 Ok(())
115 }
116
117 Some(Commands::Init { storage_path, max_storage_gb }) => {
118 init_node(storage_path, max_storage_gb, args.config.as_deref()).await
119 }
120
121 Some(Commands::Start { daemon }) | None => {
122 start_node(daemon, args.config.as_deref()).await
123 }
124
125 Some(Commands::Join { bootstrap_peer }) => {
126 join_network(bootstrap_peer, args.config.as_deref()).await
127 }
128
129 Some(Commands::Upload { file_path, file_id, strategy }) => {
130 upload_file(file_path, file_id, strategy, args.config.as_deref()).await
131 }
132
133 Some(Commands::Download { file_id, output }) => {
134 download_file(file_id, output, args.config.as_deref()).await
135 }
136
137 Some(Commands::Ls { detailed, limit }) => {
138 list_files(detailed, limit, args.config.as_deref()).await
139 }
140
141 Some(Commands::Status) => {
142 show_status(args.config.as_deref()).await
143 }
144 }
145 }
146
147 /// Initialize a new ZephyrFS node
148 async fn init_node(storage_path: std::path::PathBuf, max_storage_gb: u64, config_path: Option<&std::path::Path>) -> Result<()> {
149 info!("Initializing ZephyrFS node");
150 info!("Storage path: {:?}", storage_path);
151 info!("Max storage: {} GB", max_storage_gb);
152
153 // Create storage directory
154 std::fs::create_dir_all(&storage_path)?;
155
156 // Load or create config
157 let mut config = Config::load(config_path)?;
158 config.storage.max_storage = max_storage_gb * 1024 * 1024 * 1024; // Convert GB to bytes
159
160 // Test initialization by creating a NodeManager
161 let _node_manager = NodeManager::new(config, storage_path).await?;
162
163 info!("ZephyrFS node initialized successfully");
164 info!("Run 'zephyrfs-node start' to begin serving");
165 Ok(())
166 }
167
168 /// Start the ZephyrFS node daemon
169 async fn start_node(daemon: bool, config_path: Option<&std::path::Path>) -> Result<()> {
170 if daemon {
171 info!("Starting ZephyrFS node in daemon mode");
172 // TODO: Implement proper daemon mode with process forking
173 warn!("Daemon mode not yet implemented - running in foreground");
174 } else {
175 info!("Starting ZephyrFS node");
176 }
177
178 // Load configuration
179 let config = Config::load(config_path)?;
180
181 // Determine storage path
182 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
183
184 // Initialize and start node manager
185 let mut node_manager = NodeManager::new(config, storage_path).await?;
186
187 info!("🚀 Starting integrated ZephyrFS node...");
188 node_manager.start().await?;
189
190 info!("✅ ZephyrFS Node is running. Press Ctrl+C to stop.");
191
192 // Keep running until shutdown signal
193 tokio::signal::ctrl_c().await?;
194 warn!("🛑 Shutdown signal received");
195
196 node_manager.shutdown().await?;
197 info!("✅ ZephyrFS Node stopped");
198
199 Ok(())
200 }
201
202 /// Join an existing ZephyrFS network
203 async fn join_network(bootstrap_peer: String, config_path: Option<&std::path::Path>) -> Result<()> {
204 info!("Joining ZephyrFS network via bootstrap peer: {}", bootstrap_peer);
205
206 // Load configuration
207 let config = Config::load(config_path)?;
208 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
209
210 // Initialize node manager
211 let mut node_manager = NodeManager::new(config, storage_path).await?;
212
213 // TODO: Add bootstrap peer to configuration and connect
214 warn!("Bootstrap peer connection not yet implemented");
215 warn!("Starting node normally - implement P2P bootstrap in future");
216
217 // Start normally for now
218 node_manager.start().await?;
219 info!("✅ Node started (bootstrap connection pending implementation)");
220
221 tokio::signal::ctrl_c().await?;
222 node_manager.shutdown().await?;
223
224 Ok(())
225 }
226
227 /// Upload a file to the network
228 async fn upload_file(file_path: std::path::PathBuf, file_id: Option<String>, strategy: String, config_path: Option<&std::path::Path>) -> Result<()> {
229 info!("Uploading file: {:?}", file_path);
230
231 // Read file data
232 let data = std::fs::read(&file_path)?;
233 let filename = file_path.file_name()
234 .and_then(|n| n.to_str())
235 .unwrap_or("uploaded_file");
236
237 let file_id = file_id.unwrap_or_else(|| filename.to_string());
238
239 // Parse distribution strategy
240 let dist_strategy = match strategy.as_str() {
241 "local-only" | "local" => DistributionStrategy::LocalOnly,
242 "replicate" => DistributionStrategy::Replicate { redundancy: 3 },
243 "distribute" => DistributionStrategy::Distribute { min_peers: 2 },
244 _ => {
245 warn!("Unknown strategy '{}', using local-only", strategy);
246 DistributionStrategy::LocalOnly
247 }
248 };
249
250 // Initialize node manager for operation
251 let config = Config::load(config_path)?;
252 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
253 let node_manager = NodeManager::new(config, storage_path).await?;
254
255 // Perform upload
256 let file_hash = node_manager.store_file(&file_id, &data, filename, dist_strategy).await?;
257
258 info!("✅ File uploaded successfully");
259 info!("[INFO] File ID: {}", file_id);
260 info!("[INFO] Hash: {}", file_hash);
261 info!("[INFO] Size: {} bytes", data.len());
262
263 Ok(())
264 }
265
266 /// Download a file from the network
267 async fn download_file(file_id: String, output: Option<std::path::PathBuf>, config_path: Option<&std::path::Path>) -> Result<()> {
268 info!("Downloading file: {}", file_id);
269
270 // Initialize node manager
271 let config = Config::load(config_path)?;
272 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
273 let node_manager = NodeManager::new(config, storage_path).await?;
274
275 // Attempt download
276 match node_manager.retrieve_file(&file_id).await? {
277 Some(data) => {
278 let output_path = output.unwrap_or_else(|| std::path::PathBuf::from(&file_id));
279 std::fs::write(&output_path, &data)?;
280
281 info!("✅ File downloaded successfully");
282 info!("[SUCCESS] Saved to: {:?}", output_path);
283 info!("[INFO] Size: {} bytes", data.len());
284 }
285 None => {
286 warn!("[ERROR] File not found: {}", file_id);
287 std::process::exit(1);
288 }
289 }
290
291 Ok(())
292 }
293
294 /// List files in the network
295 async fn list_files(detailed: bool, limit: Option<usize>, config_path: Option<&std::path::Path>) -> Result<()> {
296 info!("Listing files in network");
297
298 // Initialize node manager
299 let config = Config::load(config_path)?;
300 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
301 let node_manager = NodeManager::new(config, storage_path).await?;
302
303 // Get file list
304 let files = node_manager.storage_manager.list_files(limit).await?;
305
306 if files.is_empty() {
307 info!("[INFO] No files found");
308 return Ok(());
309 }
310
311 info!("[INFO] Found {} files:", files.len());
312
313 for (file_id, metadata) in files {
314 if detailed {
315 info!("[INFO] File: {}", file_id);
316 info!(" Name: {}", metadata.name);
317 info!(" Size: {} bytes", metadata.size);
318 info!(" Hash: {}", metadata.file_hash);
319 info!(" Created: {}", metadata.created_at);
320 info!(" Chunks: {}", metadata.chunk_ids.len());
321 if let Some(mime) = &metadata.mime_type {
322 info!(" Type: {}", mime);
323 }
324 info!("");
325 } else {
326 info!(" {} ({} bytes) - {}", file_id, metadata.size, metadata.name);
327 }
328 }
329
330 Ok(())
331 }
332
333 /// Show node status
334 async fn show_status(config_path: Option<&std::path::Path>) -> Result<()> {
335 info!("Checking ZephyrFS node status");
336
337 // Initialize node manager
338 let config = Config::load(config_path)?;
339 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
340 let node_manager = NodeManager::new(config, storage_path).await?;
341
342 // Get comprehensive status
343 let status = node_manager.get_node_status().await;
344
345 info!("🚀 ZephyrFS Node Status");
346 info!(" 🆔 Node ID: {}", status.node_id);
347 info!(" 📦 Version: {}", status.version);
348 info!(" ⏱️ Uptime: {} seconds", status.uptime_seconds);
349 info!("");
350
351 info!("🌐 Network Status");
352 info!(" 🔗 Peer connections: {}", status.peer_connections);
353 info!(" 📤 Bytes sent: {}", status.bytes_sent);
354 info!(" 📥 Bytes received: {}", status.bytes_received);
355 info!(" ❌ Failed requests: {}", status.failed_requests);
356 info!("");
357
358 info!("💾 Storage Status");
359 info!(" 💽 Total capacity: {} GB", status.storage_capacity / (1024 * 1024 * 1024));
360 info!(" 📊 Used space: {} MB", status.storage_used / (1024 * 1024));
361 info!(" 💚 Available space: {} GB", status.storage_available / (1024 * 1024 * 1024));
362 info!(" 📁 File count: {}", status.file_count);
363 info!(" 🧩 Chunk count: {}", status.chunk_count);
364
365 let usage_percent = if status.storage_capacity > 0 {
366 (status.storage_used as f64 / status.storage_capacity as f64) * 100.0
367 } else {
368 0.0
369 };
370 info!(" 📈 Usage: {:.1}%", usage_percent);
371
372 Ok(())
373 }