Rust · 12283 bytes Raw Blame History
1 use anyhow::Result;
2 use clap::Parser;
3 use tracing::{info, warn};
4 use tracing_subscriber;
5
6 mod config;
7 mod network;
8 mod storage;
9 mod protocol;
10 mod node_manager;
11 mod crypto;
12 mod coordinator;
13
14 #[cfg(test)]
15 mod integration_tests;
16
17 use config::Config;
18 use node_manager::{NodeManager, DistributionStrategy};
19
20 #[derive(Parser, Debug)]
21 #[command(author, version, about = "ZephyrFS - Distributed P2P storage node", long_about = None)]
22 struct Args {
23 #[arg(short, long, value_name = "FILE", global = true)]
24 config: Option<std::path::PathBuf>,
25
26 #[arg(short, long, global = true)]
27 verbose: bool,
28
29 #[command(subcommand)]
30 command: Option<Commands>,
31 }
32
33 #[derive(Parser, Debug)]
34 enum Commands {
35 /// Initialize a new ZephyrFS node
36 Init {
37 /// Storage path for the node
38 #[arg(short, long, default_value = "./zephyrfs_storage")]
39 storage_path: std::path::PathBuf,
40
41 /// Maximum storage allocation in GB
42 #[arg(short, long, default_value = "10")]
43 max_storage_gb: u64,
44 },
45
46 /// Start the ZephyrFS node daemon
47 Start {
48 /// Run in background/daemon mode
49 #[arg(short, long)]
50 daemon: bool,
51 },
52
53 /// Join an existing ZephyrFS network
54 Join {
55 /// Bootstrap peer address (multiaddr format)
56 #[arg(required = true)]
57 bootstrap_peer: String,
58 },
59
60 /// Upload a file to the network
61 Upload {
62 /// File to upload
63 file_path: std::path::PathBuf,
64
65 /// Optional file ID (defaults to filename)
66 #[arg(short, long)]
67 file_id: Option<String>,
68
69 /// Distribution strategy
70 #[arg(short, long, default_value = "local-only")]
71 strategy: String,
72 },
73
74 /// Download a file from the network
75 Download {
76 /// File ID to download
77 file_id: String,
78
79 /// Output path (defaults to current directory)
80 #[arg(short, long)]
81 output: Option<std::path::PathBuf>,
82 },
83
84 /// List files in the network
85 Ls {
86 /// Show detailed information
87 #[arg(short, long)]
88 detailed: bool,
89
90 /// Limit number of results
91 #[arg(short, long)]
92 limit: Option<usize>,
93 },
94
95 /// Show node status
96 Status,
97
98 /// Health check (for Docker/monitoring)
99 HealthCheck,
100 }
101
102 #[tokio::main]
103 async fn main() -> Result<()> {
104 let args = Args::parse();
105
106 // Initialize logging
107 let log_level = if args.verbose { "debug" } else { "info" };
108 tracing_subscriber::fmt()
109 .with_env_filter(format!("zephyrfs_node={}", log_level))
110 .init();
111
112 match args.command {
113 Some(Commands::HealthCheck) => {
114 // Simple health check for Docker/monitoring
115 info!("Health check passed");
116 Ok(())
117 }
118
119 Some(Commands::Init { storage_path, max_storage_gb }) => {
120 init_node(storage_path, max_storage_gb, args.config.as_deref()).await
121 }
122
123 Some(Commands::Start { daemon }) => {
124 start_node(daemon, args.config.as_deref()).await
125 }
126
127 None => {
128 start_node(false, args.config.as_deref()).await
129 }
130
131 Some(Commands::Join { bootstrap_peer }) => {
132 join_network(bootstrap_peer, args.config.as_deref()).await
133 }
134
135 Some(Commands::Upload { file_path, file_id, strategy }) => {
136 upload_file(file_path, file_id, strategy, args.config.as_deref()).await
137 }
138
139 Some(Commands::Download { file_id, output }) => {
140 download_file(file_id, output, args.config.as_deref()).await
141 }
142
143 Some(Commands::Ls { detailed, limit }) => {
144 list_files(detailed, limit, args.config.as_deref()).await
145 }
146
147 Some(Commands::Status) => {
148 show_status(args.config.as_deref()).await
149 }
150 }
151 }
152
153 /// Initialize a new ZephyrFS node
154 async fn init_node(storage_path: std::path::PathBuf, max_storage_gb: u64, config_path: Option<&std::path::Path>) -> Result<()> {
155 info!("Initializing ZephyrFS node");
156 info!("Storage path: {:?}", storage_path);
157 info!("Max storage: {} GB", max_storage_gb);
158
159 // Create storage directory
160 std::fs::create_dir_all(&storage_path)?;
161
162 // Load or create config
163 let mut config = Config::load(config_path)?;
164 config.storage.max_storage = max_storage_gb * 1024 * 1024 * 1024; // Convert GB to bytes
165
166 // Test initialization by creating a NodeManager
167 let _node_manager = NodeManager::new(config, storage_path).await?;
168
169 info!("ZephyrFS node initialized successfully");
170 info!("Run 'zephyrfs-node start' to begin serving");
171 Ok(())
172 }
173
174 /// Start the ZephyrFS node daemon
175 async fn start_node(daemon: bool, config_path: Option<&std::path::Path>) -> Result<()> {
176 if daemon {
177 info!("Starting ZephyrFS node in daemon mode");
178 // TODO: Implement proper daemon mode with process forking
179 warn!("Daemon mode not yet implemented - running in foreground");
180 } else {
181 info!("Starting ZephyrFS node");
182 }
183
184 // Load configuration
185 let config = Config::load(config_path)?;
186
187 // Determine storage path
188 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
189
190 // Initialize and start node manager
191 let mut node_manager = NodeManager::new(config, storage_path).await?;
192
193 info!("🚀 Starting integrated ZephyrFS node...");
194 node_manager.start().await?;
195
196 info!("✅ ZephyrFS Node is running. Press Ctrl+C to stop.");
197
198 // Keep running until shutdown signal
199 tokio::signal::ctrl_c().await?;
200 warn!("🛑 Shutdown signal received");
201
202 node_manager.shutdown().await?;
203 info!("✅ ZephyrFS Node stopped");
204
205 Ok(())
206 }
207
208 /// Join an existing ZephyrFS network
209 async fn join_network(bootstrap_peer: String, config_path: Option<&std::path::Path>) -> Result<()> {
210 info!("Joining ZephyrFS network via bootstrap peer: {}", bootstrap_peer);
211
212 // Load configuration
213 let config = Config::load(config_path)?;
214 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
215
216 // Initialize node manager
217 let mut node_manager = NodeManager::new(config, storage_path).await?;
218
219 // TODO: Add bootstrap peer to configuration and connect
220 warn!("Bootstrap peer connection not yet implemented");
221 warn!("Starting node normally - implement P2P bootstrap in future");
222
223 // Start normally for now
224 node_manager.start().await?;
225 info!("✅ Node started (bootstrap connection pending implementation)");
226
227 tokio::signal::ctrl_c().await?;
228 node_manager.shutdown().await?;
229
230 Ok(())
231 }
232
233 /// Upload a file to the network
234 async fn upload_file(file_path: std::path::PathBuf, file_id: Option<String>, strategy: String, config_path: Option<&std::path::Path>) -> Result<()> {
235 info!("Uploading file: {:?}", file_path);
236
237 // Read file data
238 let data = std::fs::read(&file_path)?;
239 let filename = file_path.file_name()
240 .and_then(|n| n.to_str())
241 .unwrap_or("uploaded_file");
242
243 let file_id = file_id.unwrap_or_else(|| filename.to_string());
244
245 // Parse distribution strategy
246 let dist_strategy = match strategy.as_str() {
247 "local-only" | "local" => DistributionStrategy::LocalOnly,
248 "replicate" => DistributionStrategy::Replicate { redundancy: 3 },
249 "distribute" => DistributionStrategy::Distribute { min_peers: 2 },
250 _ => {
251 warn!("Unknown strategy '{}', using local-only", strategy);
252 DistributionStrategy::LocalOnly
253 }
254 };
255
256 // Initialize node manager for operation
257 let config = Config::load(config_path)?;
258 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
259 let node_manager = NodeManager::new(config, storage_path).await?;
260
261 // Perform upload
262 let file_hash = node_manager.store_file(&file_id, &data, filename, dist_strategy).await?;
263
264 info!("✅ File uploaded successfully");
265 info!("[INFO] File ID: {}", file_id);
266 info!("[INFO] Hash: {}", file_hash);
267 info!("[INFO] Size: {} bytes", data.len());
268
269 Ok(())
270 }
271
272 /// Download a file from the network
273 async fn download_file(file_id: String, output: Option<std::path::PathBuf>, config_path: Option<&std::path::Path>) -> Result<()> {
274 info!("Downloading file: {}", file_id);
275
276 // Initialize node manager
277 let config = Config::load(config_path)?;
278 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
279 let node_manager = NodeManager::new(config, storage_path).await?;
280
281 // Attempt download
282 match node_manager.retrieve_file(&file_id).await? {
283 Some(data) => {
284 let output_path = output.unwrap_or_else(|| std::path::PathBuf::from(&file_id));
285 std::fs::write(&output_path, &data)?;
286
287 info!("✅ File downloaded successfully");
288 info!("[SUCCESS] Saved to: {:?}", output_path);
289 info!("[INFO] Size: {} bytes", data.len());
290 }
291 None => {
292 warn!("[ERROR] File not found: {}", file_id);
293 std::process::exit(1);
294 }
295 }
296
297 Ok(())
298 }
299
300 /// List files in the network
301 async fn list_files(detailed: bool, limit: Option<usize>, config_path: Option<&std::path::Path>) -> Result<()> {
302 info!("Listing files in network");
303
304 // Initialize node manager
305 let config = Config::load(config_path)?;
306 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
307 let node_manager = NodeManager::new(config, storage_path).await?;
308
309 // Get file list
310 let files = node_manager.storage_manager.list_files(limit).await?;
311
312 if files.is_empty() {
313 info!("[INFO] No files found");
314 return Ok(());
315 }
316
317 info!("[INFO] Found {} files:", files.len());
318
319 for (file_id, metadata) in files {
320 if detailed {
321 info!("[INFO] File: {}", file_id);
322 info!(" Name: {}", metadata.name);
323 info!(" Size: {} bytes", metadata.size);
324 info!(" Hash: {}", metadata.file_hash);
325 info!(" Created: {}", metadata.created_at);
326 info!(" Chunks: {}", metadata.chunk_ids.len());
327 if let Some(mime) = &metadata.mime_type {
328 info!(" Type: {}", mime);
329 }
330 info!("");
331 } else {
332 info!(" {} ({} bytes) - {}", file_id, metadata.size, metadata.name);
333 }
334 }
335
336 Ok(())
337 }
338
339 /// Show node status
340 async fn show_status(config_path: Option<&std::path::Path>) -> Result<()> {
341 info!("Checking ZephyrFS node status");
342
343 // Initialize node manager
344 let config = Config::load(config_path)?;
345 let storage_path = std::env::current_dir()?.join("zephyrfs_storage");
346 let node_manager = NodeManager::new(config, storage_path).await?;
347
348 // Get comprehensive status
349 let status = node_manager.get_node_status().await;
350
351 info!("🚀 ZephyrFS Node Status");
352 info!(" 🆔 Node ID: {}", status.node_id);
353 info!(" 📦 Version: {}", status.version);
354 info!(" ⏱️ Uptime: {} seconds", status.uptime_seconds);
355 info!("");
356
357 info!("🌐 Network Status");
358 info!(" 🔗 Peer connections: {}", status.peer_connections);
359 info!(" 📤 Bytes sent: {}", status.bytes_sent);
360 info!(" 📥 Bytes received: {}", status.bytes_received);
361 info!(" ❌ Failed requests: {}", status.failed_requests);
362 info!("");
363
364 info!("💾 Storage Status");
365 info!(" 💽 Total capacity: {} GB", status.storage_capacity / (1024 * 1024 * 1024));
366 info!(" 📊 Used space: {} MB", status.storage_used / (1024 * 1024));
367 info!(" 💚 Available space: {} GB", status.storage_available / (1024 * 1024 * 1024));
368 info!(" 📁 File count: {}", status.file_count);
369 info!(" 🧩 Chunk count: {}", status.chunk_count);
370
371 let usage_percent = if status.storage_capacity > 0 {
372 (status.storage_used as f64 / status.storage_capacity as f64) * 100.0
373 } else {
374 0.0
375 };
376 info!(" 📈 Usage: {:.1}%", usage_percent);
377
378 Ok(())
379 }