Rust · 23373 bytes Raw Blame History
1 use anyhow::{Context, Result};
2 use sha2::{Digest, Sha256};
3 use std::path::{Path, PathBuf};
4 use std::sync::Arc;
5 use tokio::sync::RwLock;
6 use tracing::{debug, info, warn, error};
7
8 use crate::storage::{
9 chunk_store::{ChunkStore, StorageStats},
10 metadata_store::{MetadataStore, FileMetadata as FileMetaData},
11 file_chunker::{FileChunker, FileMetadata as ChunkerFileMetadata, ChunkInfo},
12 };
13
14 /// Storage capacity configuration and limits
15 ///
16 /// Safety: Enforces storage limits to prevent disk exhaustion
17 #[derive(Debug, Clone)]
18 pub struct StorageConfig {
19 /// Maximum storage capacity in bytes
20 pub max_capacity: u64,
21
22 /// Warning threshold (% of capacity)
23 pub warning_threshold: f64,
24
25 /// Critical threshold (% of capacity) - stop accepting new data
26 pub critical_threshold: f64,
27
28 /// Default chunk size for file splitting
29 pub default_chunk_size: usize,
30
31 /// Maximum file size to accept
32 pub max_file_size: u64,
33
34 /// Enable automatic garbage collection
35 pub enable_gc: bool,
36
37 /// Garbage collection interval in seconds
38 pub gc_interval: u64,
39 }
40
41 impl Default for StorageConfig {
42 fn default() -> Self {
43 Self {
44 max_capacity: 10 * 1024 * 1024 * 1024, // 10GB default
45 warning_threshold: 0.8, // 80%
46 critical_threshold: 0.95, // 95%
47 default_chunk_size: 1024 * 1024, // 1MB
48 max_file_size: 1024 * 1024 * 1024, // 1GB max file
49 enable_gc: true,
50 gc_interval: 3600, // 1 hour
51 }
52 }
53 }
54
55 /// Comprehensive storage capacity metrics
56 ///
57 /// Transparency: Detailed capacity tracking for monitoring
58 #[derive(Debug, Clone)]
59 pub struct CapacityInfo {
60 /// Total configured capacity
61 pub total_capacity: u64,
62
63 /// Currently used space
64 pub used_space: u64,
65
66 /// Available space
67 pub available_space: u64,
68
69 /// Usage percentage (0.0 to 1.0)
70 pub usage_percentage: f64,
71
72 /// Number of stored files
73 pub file_count: u64,
74
75 /// Number of stored chunks
76 pub chunk_count: u64,
77
78 /// Average chunk size
79 pub avg_chunk_size: u64,
80
81 /// Storage efficiency (deduplication ratio)
82 pub efficiency_ratio: f64,
83 }
84
85 /// Main storage manager coordinating all storage operations
86 ///
87 /// Safety: Enforces capacity limits and coordinates atomic operations
88 /// Transparency: Comprehensive logging and metrics collection
89 /// Privacy: Handles encrypted storage and secure deletion
90 pub struct StorageManager {
91 /// Chunk storage backend
92 chunk_store: Arc<ChunkStore>,
93
94 /// Metadata storage backend
95 metadata_store: Arc<MetadataStore>,
96
97 /// File chunking system
98 file_chunker: Arc<FileChunker>,
99
100 /// Storage configuration
101 config: StorageConfig,
102
103 /// Base storage path
104 base_path: PathBuf,
105
106 /// Capacity tracking
107 capacity_info: Arc<RwLock<CapacityInfo>>,
108 }
109
110 impl StorageManager {
111 /// Create a new StorageManager with specified configuration
112 ///
113 /// Safety: Initializes all storage backends with security settings
114 pub async fn new<P: AsRef<Path>>(base_path: P, config: StorageConfig) -> Result<Self> {
115 let base_path = base_path.as_ref().to_path_buf();
116 info!("Initializing StorageManager at: {:?}", base_path);
117
118 // Create subdirectories for different storage types
119 let chunk_path = base_path.join("chunks");
120 let metadata_path = base_path.join("metadata");
121
122 std::fs::create_dir_all(&chunk_path)
123 .context("Failed to create chunk storage directory")?;
124 std::fs::create_dir_all(&metadata_path)
125 .context("Failed to create metadata storage directory")?;
126
127 // Initialize storage backends
128 let chunk_store = Arc::new(ChunkStore::new(&chunk_path)
129 .context("Failed to initialize chunk store")?);
130
131 let metadata_store = Arc::new(MetadataStore::new(&metadata_path)
132 .context("Failed to initialize metadata store")?);
133
134 let file_chunker = Arc::new(FileChunker::new(Some(config.default_chunk_size))?);
135
136 // Initialize capacity tracking
137 let capacity_info = Arc::new(RwLock::new(CapacityInfo {
138 total_capacity: config.max_capacity,
139 used_space: 0,
140 available_space: config.max_capacity,
141 usage_percentage: 0.0,
142 file_count: 0,
143 chunk_count: 0,
144 avg_chunk_size: 0,
145 efficiency_ratio: 1.0,
146 }));
147
148 let manager = Self {
149 chunk_store,
150 metadata_store,
151 file_chunker,
152 config,
153 base_path,
154 capacity_info,
155 };
156
157 // Update capacity information from existing storage
158 manager.refresh_capacity_info().await?;
159
160 info!("StorageManager initialized successfully");
161 Ok(manager)
162 }
163
164 /// Store a file with automatic chunking and deduplication
165 ///
166 /// Safety: Enforces capacity limits and validates file integrity
167 /// Transparency: Logs all storage operations with file hashes
168 pub async fn store_file(&self, file_id: &str, data: &[u8], filename: &str) -> Result<String> {
169 info!("Storing file: {} ({} bytes) as {}", filename, data.len(), file_id);
170
171 // Check capacity limits before storing
172 self.check_capacity_limits(data.len() as u64).await?;
173
174 // Check file size limit
175 if data.len() as u64 > self.config.max_file_size {
176 return Err(anyhow::anyhow!(
177 "File size ({} bytes) exceeds maximum allowed size ({} bytes)",
178 data.len(),
179 self.config.max_file_size
180 ));
181 }
182
183 // Chunk the file
184 let metadata = self.file_chunker.chunk_bytes(data, file_id.to_string(), filename.to_string())?;
185 let file_hash = metadata.file_hash.clone();
186
187 debug!("File chunked into {} pieces", metadata.chunks.len());
188
189 // Store all chunks with deduplication
190 let mut chunk_ids = Vec::new();
191 let mut stored_chunks = 0;
192
193 // We need to get the actual chunk data from the original data
194 for chunk_info in &metadata.chunks {
195 let start = chunk_info.offset as usize;
196 let end = start + chunk_info.size as usize;
197 let chunk_data = &data[start..end];
198
199 let chunk_hash = self.chunk_store.store_chunk(&chunk_info.chunk_id, chunk_data).await?;
200 chunk_ids.push(chunk_info.chunk_id.clone());
201 stored_chunks += 1;
202
203 debug!("Stored chunk {}/{}: {} (hash: {})",
204 stored_chunks, metadata.chunks.len(), chunk_info.chunk_id, chunk_hash);
205 }
206
207 // Create file metadata for storage
208 let storage_metadata = FileMetaData {
209 name: filename.to_string(),
210 size: data.len() as u64,
211 mime_type: self.detect_mime_type(data, filename),
212 file_hash: file_hash.clone(),
213 chunk_ids,
214 created_at: std::time::SystemTime::now()
215 .duration_since(std::time::UNIX_EPOCH)?
216 .as_secs(),
217 modified_at: std::time::SystemTime::now()
218 .duration_since(std::time::UNIX_EPOCH)?
219 .as_secs(),
220 permissions: 0o644, // Default read-write for owner, read for others
221 checksum: String::new(), // Will be calculated by metadata store
222 };
223
224 // Store metadata
225 self.metadata_store.store_metadata(file_id, storage_metadata).await?;
226
227 // Update capacity information
228 self.refresh_capacity_info().await?;
229
230 info!("Successfully stored file: {} with hash: {}", file_id, file_hash);
231 Ok(file_hash)
232 }
233
234 /// Retrieve a complete file by reconstructing from chunks
235 ///
236 /// Safety: Verifies integrity of all chunks before reconstruction
237 /// Transparency: Logs retrieval operations and verification steps
238 pub async fn retrieve_file(&self, file_id: &str) -> Result<Option<Vec<u8>>> {
239 debug!("Retrieving file: {}", file_id);
240
241 // Get file metadata
242 let metadata = match self.metadata_store.get_metadata(file_id).await? {
243 Some(meta) => meta,
244 None => {
245 debug!("File metadata not found: {}", file_id);
246 return Ok(None);
247 }
248 };
249
250 info!("Retrieving file: {} ({} bytes, {} chunks)",
251 metadata.name, metadata.size, metadata.chunk_ids.len());
252
253 // Retrieve all chunks
254 let mut chunk_data = Vec::new();
255 for chunk_id in &metadata.chunk_ids {
256 match self.chunk_store.retrieve_chunk(chunk_id).await? {
257 Some(data) => chunk_data.push(data),
258 None => {
259 error!("Missing chunk {} for file {}", chunk_id, file_id);
260 return Err(anyhow::anyhow!(
261 "File reconstruction failed: missing chunk {}", chunk_id
262 ));
263 }
264 }
265 }
266
267 // Create chunker metadata for reconstruction
268 let chunker_metadata = ChunkerFileMetadata {
269 file_id: file_id.to_string(),
270 filename: metadata.name.clone(),
271 total_size: metadata.size,
272 file_hash: metadata.file_hash.clone(),
273 chunks: {
274 let mut offset = 0u64;
275 metadata.chunk_ids.iter().enumerate().map(|(i, chunk_id)| {
276 // Calculate the actual chunk hash
277 let mut hasher = Sha256::new();
278 hasher.update(&chunk_data[i]);
279 let chunk_hash = hex::encode(hasher.finalize());
280
281 let chunk_info = ChunkInfo {
282 chunk_id: chunk_id.clone(),
283 hash: chunk_hash,
284 size: chunk_data[i].len() as u64,
285 index: i as u32,
286 offset,
287 };
288
289 offset += chunk_data[i].len() as u64;
290 chunk_info
291 }).collect()
292 },
293 chunk_size: self.config.default_chunk_size,
294 mime_type: metadata.mime_type.clone(),
295 created_at: metadata.created_at,
296 };
297
298 // Reconstruct file
299 let reconstructed = self.file_chunker.reconstruct_file(&chunker_metadata, chunk_data)?;
300
301 // Verify file integrity
302 let mut hasher = Sha256::new();
303 hasher.update(&reconstructed);
304 let computed_hash = hex::encode(hasher.finalize());
305 if computed_hash != metadata.file_hash {
306 error!("File integrity verification failed for {}: hash mismatch", file_id);
307 return Err(anyhow::anyhow!(
308 "File integrity verification failed: hash mismatch"
309 ));
310 }
311
312 info!("Successfully retrieved and verified file: {}", file_id);
313 Ok(Some(reconstructed))
314 }
315
316 /// Delete a file and its associated chunks (with reference counting)
317 ///
318 /// Safety: Uses atomic operations and reference counting
319 /// Transparency: Comprehensive logging of deletion process
320 pub async fn delete_file(&self, file_id: &str) -> Result<bool> {
321 info!("Deleting file: {}", file_id);
322
323 // Get file metadata first
324 let metadata = match self.metadata_store.get_metadata(file_id).await? {
325 Some(meta) => meta,
326 None => {
327 debug!("Cannot delete non-existent file: {}", file_id);
328 return Ok(false);
329 }
330 };
331
332 // Delete all associated chunks (with reference counting)
333 let mut deleted_chunks = 0;
334 for chunk_id in &metadata.chunk_ids {
335 if self.chunk_store.delete_chunk(chunk_id).await? {
336 deleted_chunks += 1;
337 debug!("Deleted chunk: {}", chunk_id);
338 } else {
339 debug!("Chunk {} still has references, not deleted", chunk_id);
340 }
341 }
342
343 // Delete metadata
344 let metadata_deleted = self.metadata_store.delete_metadata(file_id).await?;
345
346 // Update capacity information
347 self.refresh_capacity_info().await?;
348
349 info!("Successfully deleted file: {} (deleted {} chunks)",
350 file_id, deleted_chunks);
351 Ok(metadata_deleted)
352 }
353
354 /// List all stored files with metadata
355 ///
356 /// Transparency: Provides comprehensive file listing for audit
357 pub async fn list_files(&self, limit: Option<usize>) -> Result<Vec<(String, FileMetaData)>> {
358 self.metadata_store.list_files(limit).await
359 }
360
361 /// Check if a file exists
362 pub async fn file_exists(&self, file_id: &str) -> Result<bool> {
363 self.metadata_store.file_exists(file_id).await
364 }
365
366 /// Retrieve a specific chunk by ID (for P2P sharing)
367 ///
368 /// Safety: Verifies chunk integrity before returning
369 /// Transparency: Logs all chunk access attempts
370 pub async fn retrieve_chunk(&self, chunk_id: &str) -> Result<Option<Vec<u8>>> {
371 debug!("Retrieving chunk for P2P sharing: {}", chunk_id);
372 self.chunk_store.retrieve_chunk(chunk_id).await
373 }
374
375 /// Store a chunk directly (for P2P receiving)
376 ///
377 /// Safety: Includes full integrity verification
378 /// Transparency: Logs all chunk storage operations
379 pub async fn store_chunk(&self, chunk_id: &str, data: &[u8]) -> Result<String> {
380 debug!("Storing chunk from P2P: {} ({} bytes)", chunk_id, data.len());
381 self.chunk_store.store_chunk(chunk_id, data).await
382 }
383
384 /// Get current storage capacity information
385 ///
386 /// Transparency: Real-time capacity metrics for monitoring
387 pub async fn get_capacity_info(&self) -> CapacityInfo {
388 let info = self.capacity_info.read().await;
389 info.clone()
390 }
391
392 /// Get detailed storage statistics
393 ///
394 /// Transparency: Comprehensive storage metrics
395 pub async fn get_storage_stats(&self) -> Result<StorageStats> {
396 Ok(self.chunk_store.get_stats().await)
397 }
398
399 /// Check if storage has enough capacity for new data
400 ///
401 /// Safety: Prevents storage exhaustion
402 async fn check_capacity_limits(&self, required_space: u64) -> Result<()> {
403 let info = self.capacity_info.read().await;
404
405 // Check if we have enough space
406 if info.available_space < required_space {
407 return Err(anyhow::anyhow!(
408 "Insufficient storage space: required {} bytes, available {} bytes",
409 required_space, info.available_space
410 ));
411 }
412
413 // Check if we're approaching critical threshold
414 let projected_usage = (info.used_space + required_space) as f64 / info.total_capacity as f64;
415
416 if projected_usage > self.config.critical_threshold {
417 return Err(anyhow::anyhow!(
418 "Storage usage would exceed critical threshold: {:.1}% > {:.1}%",
419 projected_usage * 100.0,
420 self.config.critical_threshold * 100.0
421 ));
422 }
423
424 // Warn if approaching warning threshold
425 if projected_usage > self.config.warning_threshold {
426 warn!("Storage usage approaching warning threshold: {:.1}%",
427 projected_usage * 100.0);
428 }
429
430 Ok(())
431 }
432
433 /// Refresh capacity information from storage backends
434 ///
435 /// Transparency: Accurate real-time capacity tracking
436 async fn refresh_capacity_info(&self) -> Result<()> {
437 let chunk_stats = self.chunk_store.get_stats().await;
438 let files = self.metadata_store.list_files(None).await?;
439
440 let used_space = chunk_stats.total_size;
441 let available_space = self.config.max_capacity.saturating_sub(used_space);
442 let usage_percentage = used_space as f64 / self.config.max_capacity as f64;
443
444 // Calculate efficiency ratio (deduplication benefits)
445 let logical_size: u64 = files.iter().map(|(_, meta)| meta.size).sum();
446 let efficiency_ratio = if used_space > 0 {
447 logical_size as f64 / used_space as f64
448 } else {
449 1.0
450 };
451
452 let avg_chunk_size = if chunk_stats.total_chunks > 0 {
453 chunk_stats.total_size / chunk_stats.total_chunks
454 } else {
455 0
456 };
457
458 let mut info = self.capacity_info.write().await;
459 *info = CapacityInfo {
460 total_capacity: self.config.max_capacity,
461 used_space,
462 available_space,
463 usage_percentage,
464 file_count: files.len() as u64,
465 chunk_count: chunk_stats.total_chunks,
466 avg_chunk_size,
467 efficiency_ratio,
468 };
469
470 debug!("Capacity info updated: {:.1}% used ({} files, {} chunks)",
471 usage_percentage * 100.0, info.file_count, info.chunk_count);
472
473 Ok(())
474 }
475
476 /// Simple MIME type detection based on file extension and content
477 fn detect_mime_type(&self, data: &[u8], filename: &str) -> Option<String> {
478 // Check magic bytes for common formats
479 if data.len() >= 4 {
480 match &data[0..4] {
481 [0xFF, 0xD8, 0xFF, _] => return Some("image/jpeg".to_string()),
482 [0x89, 0x50, 0x4E, 0x47] => return Some("image/png".to_string()),
483 [0x47, 0x49, 0x46, _] => return Some("image/gif".to_string()),
484 [0x25, 0x50, 0x44, 0x46] => return Some("application/pdf".to_string()),
485 _ => {}
486 }
487 }
488
489 // Fallback to extension-based detection
490 if let Some(extension) = Path::new(filename).extension() {
491 match extension.to_str()?.to_lowercase().as_str() {
492 "txt" => Some("text/plain".to_string()),
493 "json" => Some("application/json".to_string()),
494 "xml" => Some("application/xml".to_string()),
495 "html" => Some("text/html".to_string()),
496 "css" => Some("text/css".to_string()),
497 "js" => Some("application/javascript".to_string()),
498 "mp4" => Some("video/mp4".to_string()),
499 "mp3" => Some("audio/mpeg".to_string()),
500 "zip" => Some("application/zip".to_string()),
501 _ => None,
502 }
503 } else {
504 None
505 }
506 }
507
508 /// Get chunk IDs for a file (needed for coordinator integration)
509 ///
510 /// Returns the list of chunk IDs that comprise the given file
511 pub async fn get_file_chunks(&self, file_id: &str) -> Result<Option<Vec<String>>> {
512 debug!("Getting chunk list for file: {}", file_id);
513
514 match self.metadata_store.get_file(file_id).await? {
515 Some(metadata) => {
516 Ok(Some(metadata.chunk_ids))
517 }
518 None => {
519 debug!("File not found: {}", file_id);
520 Ok(None)
521 }
522 }
523 }
524 }
525
526 #[cfg(test)]
527 mod tests {
528 use super::*;
529 use tempfile::tempdir;
530
531 #[tokio::test]
532 async fn test_storage_manager_creation() {
533 let temp_dir = tempdir().unwrap();
534 let config = StorageConfig::default();
535
536 let manager = StorageManager::new(temp_dir.path(), config).await.unwrap();
537 let capacity = manager.get_capacity_info().await;
538
539 assert_eq!(capacity.used_space, 0);
540 assert_eq!(capacity.file_count, 0);
541 assert_eq!(capacity.chunk_count, 0);
542 }
543
544 #[tokio::test]
545 async fn test_store_and_retrieve_file() {
546 let temp_dir = tempdir().unwrap();
547 let config = StorageConfig::default();
548 let manager = StorageManager::new(temp_dir.path(), config).await.unwrap();
549
550 let file_id = "test-file-1";
551 let filename = "test.txt";
552 let data = b"Hello, ZephyrFS! This is a test file with some content.";
553
554 // Store file
555 let hash = manager.store_file(file_id, data, filename).await.unwrap();
556 assert!(!hash.is_empty());
557
558 // Verify existence
559 assert!(manager.file_exists(file_id).await.unwrap());
560
561 // Retrieve file
562 let retrieved = manager.retrieve_file(file_id).await.unwrap().unwrap();
563 assert_eq!(retrieved, data);
564
565 // Check capacity info
566 let capacity = manager.get_capacity_info().await;
567 assert_eq!(capacity.file_count, 1);
568 assert!(capacity.used_space > 0);
569 assert!(capacity.usage_percentage > 0.0);
570 }
571
572 #[tokio::test]
573 async fn test_capacity_limits() {
574 let temp_dir = tempdir().unwrap();
575 let mut config = StorageConfig::default();
576 config.max_capacity = 100; // Very small capacity
577 config.critical_threshold = 0.5; // 50%
578
579 let manager = StorageManager::new(temp_dir.path(), config).await.unwrap();
580
581 let file_id = "large-file";
582 let filename = "large.txt";
583 let data = vec![0u8; 200]; // Larger than capacity
584
585 // Should fail due to capacity limits
586 let result = manager.store_file(file_id, &data, filename).await;
587 assert!(result.is_err());
588 let error_msg = result.unwrap_err().to_string();
589 assert!(error_msg.contains("Insufficient storage space") || error_msg.contains("Storage usage would exceed"));
590 }
591
592 #[tokio::test]
593 async fn test_file_deletion() {
594 let temp_dir = tempdir().unwrap();
595 let config = StorageConfig::default();
596 let manager = StorageManager::new(temp_dir.path(), config).await.unwrap();
597
598 let file_id = "delete-test";
599 let filename = "delete.txt";
600 let data = b"File to be deleted";
601
602 // Store file
603 manager.store_file(file_id, data, filename).await.unwrap();
604 assert!(manager.file_exists(file_id).await.unwrap());
605
606 // Delete file
607 let deleted = manager.delete_file(file_id).await.unwrap();
608 assert!(deleted);
609 assert!(!manager.file_exists(file_id).await.unwrap());
610
611 // Verify capacity updated
612 let capacity = manager.get_capacity_info().await;
613 assert_eq!(capacity.file_count, 0);
614 }
615
616 #[tokio::test]
617 async fn test_mime_type_detection() {
618 let temp_dir = tempdir().unwrap();
619 let config = StorageConfig::default();
620 let manager = StorageManager::new(temp_dir.path(), config).await.unwrap();
621
622 // Test JPEG magic bytes
623 let jpeg_data = vec![0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10];
624 let mime = manager.detect_mime_type(&jpeg_data, "test.jpg");
625 assert_eq!(mime, Some("image/jpeg".to_string()));
626
627 // Test extension-based detection
628 let text_data = b"plain text content";
629 let mime = manager.detect_mime_type(text_data, "test.txt");
630 assert_eq!(mime, Some("text/plain".to_string()));
631 }
632 }