| 1 |
use anyhow::{Context, Result}; |
| 2 |
use rocksdb::{DB, Options, WriteBatch}; |
| 3 |
use serde::{Deserialize, Serialize}; |
| 4 |
use sha2::{Digest, Sha256}; |
| 5 |
use std::collections::HashMap; |
| 6 |
use std::path::Path; |
| 7 |
use std::sync::Arc; |
| 8 |
use tokio::sync::RwLock; |
| 9 |
use tracing::{debug, info, warn}; |
| 10 |
|
| 11 |
/// Metadata for stored chunks |
| 12 |
/// |
| 13 |
/// Privacy: Only stores necessary operational data, no user content |
| 14 |
#[derive(Debug, Clone, Serialize, Deserialize)] |
| 15 |
pub struct ChunkMetadata { |
| 16 |
/// SHA-256 hash of the chunk content |
| 17 |
pub hash: String, |
| 18 |
|
| 19 |
/// Size of the chunk in bytes |
| 20 |
pub size: u64, |
| 21 |
|
| 22 |
/// Timestamp when chunk was stored |
| 23 |
pub stored_at: u64, |
| 24 |
|
| 25 |
/// Reference count (how many files reference this chunk) |
| 26 |
pub ref_count: u32, |
| 27 |
|
| 28 |
/// Verification checksum for integrity |
| 29 |
pub checksum: String, |
| 30 |
} |
| 31 |
|
| 32 |
/// Thread-safe, persistent chunk storage using RocksDB |
| 33 |
/// |
| 34 |
/// Safety: All operations include integrity checks and atomic updates |
| 35 |
/// Transparency: All storage operations are logged for audit |
| 36 |
/// Privacy: Chunk content is stored separately from indexing metadata |
| 37 |
pub struct ChunkStore { |
| 38 |
/// RocksDB instance for metadata |
| 39 |
db: Arc<DB>, |
| 40 |
|
| 41 |
/// In-memory cache for frequently accessed metadata |
| 42 |
metadata_cache: Arc<RwLock<HashMap<String, ChunkMetadata>>>, |
| 43 |
|
| 44 |
/// Storage statistics |
| 45 |
stats: Arc<RwLock<StorageStats>>, |
| 46 |
} |
| 47 |
|
| 48 |
#[derive(Debug, Default)] |
| 49 |
pub struct StorageStats { |
| 50 |
pub total_chunks: u64, |
| 51 |
pub total_size: u64, |
| 52 |
pub cache_hits: u64, |
| 53 |
pub cache_misses: u64, |
| 54 |
} |
| 55 |
|
| 56 |
impl ChunkStore { |
| 57 |
/// Create a new ChunkStore |
| 58 |
/// |
| 59 |
/// Safety: Creates database with secure configuration |
| 60 |
pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> { |
| 61 |
info!("Initializing ChunkStore with security-focused configuration"); |
| 62 |
|
| 63 |
let mut opts = Options::default(); |
| 64 |
opts.create_if_missing(true); |
| 65 |
opts.set_paranoid_checks(true); // Safety: Enable paranoid consistency checks |
| 66 |
opts.set_use_fsync(true); // Safety: Force fsync for durability |
| 67 |
|
| 68 |
let db = DB::open(&opts, db_path) |
| 69 |
.context("Failed to open chunk metadata database")?; |
| 70 |
|
| 71 |
let store = Self { |
| 72 |
db: Arc::new(db), |
| 73 |
metadata_cache: Arc::new(RwLock::new(HashMap::new())), |
| 74 |
stats: Arc::new(RwLock::new(StorageStats::default())), |
| 75 |
}; |
| 76 |
|
| 77 |
// Load existing statistics |
| 78 |
store.load_stats_from_db()?; |
| 79 |
|
| 80 |
info!("ChunkStore initialized successfully"); |
| 81 |
Ok(store) |
| 82 |
} |
| 83 |
|
| 84 |
/// Store a chunk with full integrity verification |
| 85 |
/// |
| 86 |
/// Safety: Includes hash verification, atomic operations, and rollback on failure |
| 87 |
/// Transparency: All operations logged with chunk hashes |
| 88 |
pub async fn store_chunk(&self, chunk_id: &str, data: &[u8]) -> Result<String> { |
| 89 |
debug!("Storing chunk: {} ({} bytes)", chunk_id, data.len()); |
| 90 |
|
| 91 |
// Calculate and verify hash |
| 92 |
let mut hasher = Sha256::new(); |
| 93 |
hasher.update(data); |
| 94 |
let hash = hex::encode(hasher.finalize()); |
| 95 |
|
| 96 |
// Create checksum for integrity verification |
| 97 |
let checksum = self.calculate_checksum(data, &hash); |
| 98 |
|
| 99 |
let metadata = ChunkMetadata { |
| 100 |
hash: hash.clone(), |
| 101 |
size: data.len() as u64, |
| 102 |
stored_at: std::time::SystemTime::now() |
| 103 |
.duration_since(std::time::UNIX_EPOCH)? |
| 104 |
.as_secs(), |
| 105 |
ref_count: 1, |
| 106 |
checksum, |
| 107 |
}; |
| 108 |
|
| 109 |
// Atomic database update |
| 110 |
let mut batch = WriteBatch::default(); |
| 111 |
|
| 112 |
// Store metadata |
| 113 |
let metadata_key = format!("meta:{}", chunk_id); |
| 114 |
let metadata_bytes = bincode::serialize(&metadata) |
| 115 |
.context("Failed to serialize chunk metadata")?; |
| 116 |
batch.put(&metadata_key, metadata_bytes); |
| 117 |
|
| 118 |
// Store actual chunk data |
| 119 |
let data_key = format!("data:{}", chunk_id); |
| 120 |
batch.put(&data_key, data); |
| 121 |
|
| 122 |
// Check if chunk already exists and increment reference count |
| 123 |
if let Some(existing_metadata) = self.get_chunk_metadata(chunk_id).await? { |
| 124 |
warn!("Chunk {} already exists, incrementing reference count", chunk_id); |
| 125 |
let mut updated_metadata = existing_metadata; |
| 126 |
updated_metadata.ref_count += 1; |
| 127 |
|
| 128 |
// Update only metadata, not data |
| 129 |
let metadata_key = format!("meta:{}", chunk_id); |
| 130 |
let metadata_bytes = bincode::serialize(&updated_metadata)?; |
| 131 |
self.db.put(&metadata_key, metadata_bytes)?; |
| 132 |
|
| 133 |
// Update cache |
| 134 |
{ |
| 135 |
let mut cache = self.metadata_cache.write().await; |
| 136 |
cache.insert(chunk_id.to_string(), updated_metadata); |
| 137 |
} |
| 138 |
|
| 139 |
return Ok(hash); |
| 140 |
} |
| 141 |
|
| 142 |
// Commit atomic batch |
| 143 |
self.db.write(batch) |
| 144 |
.context("Failed to write chunk to database")?; |
| 145 |
|
| 146 |
// Update cache and statistics |
| 147 |
{ |
| 148 |
let mut cache = self.metadata_cache.write().await; |
| 149 |
cache.insert(chunk_id.to_string(), metadata.clone()); |
| 150 |
|
| 151 |
let mut stats = self.stats.write().await; |
| 152 |
if cache.len() == 1 { // New chunk |
| 153 |
stats.total_chunks += 1; |
| 154 |
stats.total_size += metadata.size; |
| 155 |
} |
| 156 |
} |
| 157 |
|
| 158 |
info!("Successfully stored chunk: {} with hash: {}", chunk_id, hash); |
| 159 |
Ok(hash) |
| 160 |
} |
| 161 |
|
| 162 |
/// Retrieve a chunk with integrity verification |
| 163 |
/// |
| 164 |
/// Safety: Verifies hash and checksum before returning data |
| 165 |
/// Transparency: Cache hits/misses are tracked and logged |
| 166 |
pub async fn retrieve_chunk(&self, chunk_id: &str) -> Result<Option<Vec<u8>>> { |
| 167 |
debug!("Retrieving chunk: {}", chunk_id); |
| 168 |
|
| 169 |
// Check metadata first |
| 170 |
let metadata = match self.get_chunk_metadata(chunk_id).await? { |
| 171 |
Some(meta) => meta, |
| 172 |
None => { |
| 173 |
debug!("Chunk {} not found", chunk_id); |
| 174 |
return Ok(None); |
| 175 |
} |
| 176 |
}; |
| 177 |
|
| 178 |
// Retrieve actual data |
| 179 |
let data_key = format!("data:{}", chunk_id); |
| 180 |
let data = match self.db.get(&data_key)? { |
| 181 |
Some(bytes) => bytes, |
| 182 |
None => { |
| 183 |
warn!("Chunk {} metadata exists but data is missing!", chunk_id); |
| 184 |
return Ok(None); |
| 185 |
} |
| 186 |
}; |
| 187 |
|
| 188 |
// Verify integrity |
| 189 |
if !self.verify_chunk_integrity(&data, &metadata).await? { |
| 190 |
warn!("Chunk {} failed integrity check!", chunk_id); |
| 191 |
return Err(anyhow::anyhow!("Chunk integrity verification failed")); |
| 192 |
} |
| 193 |
|
| 194 |
info!("Successfully retrieved and verified chunk: {}", chunk_id); |
| 195 |
Ok(Some(data)) |
| 196 |
} |
| 197 |
|
| 198 |
/// Delete a chunk (with reference counting) |
| 199 |
/// |
| 200 |
/// Safety: Uses reference counting to prevent accidental deletion |
| 201 |
/// Transparency: Deletion operations are fully logged |
| 202 |
pub async fn delete_chunk(&self, chunk_id: &str) -> Result<bool> { |
| 203 |
debug!("Attempting to delete chunk: {}", chunk_id); |
| 204 |
|
| 205 |
let mut metadata = match self.get_chunk_metadata(chunk_id).await? { |
| 206 |
Some(meta) => meta, |
| 207 |
None => { |
| 208 |
debug!("Cannot delete non-existent chunk: {}", chunk_id); |
| 209 |
return Ok(false); |
| 210 |
} |
| 211 |
}; |
| 212 |
|
| 213 |
// Decrement reference count |
| 214 |
metadata.ref_count = metadata.ref_count.saturating_sub(1); |
| 215 |
|
| 216 |
if metadata.ref_count > 0 { |
| 217 |
// Update metadata with new reference count |
| 218 |
let metadata_key = format!("meta:{}", chunk_id); |
| 219 |
let metadata_bytes = bincode::serialize(&metadata)?; |
| 220 |
self.db.put(&metadata_key, metadata_bytes)?; |
| 221 |
|
| 222 |
debug!("Decremented reference count for chunk: {} (now: {})", |
| 223 |
chunk_id, metadata.ref_count); |
| 224 |
|
| 225 |
// Update cache |
| 226 |
let mut cache = self.metadata_cache.write().await; |
| 227 |
cache.insert(chunk_id.to_string(), metadata); |
| 228 |
|
| 229 |
return Ok(false); // Not actually deleted |
| 230 |
} |
| 231 |
|
| 232 |
// Reference count is 0, actually delete |
| 233 |
let mut batch = WriteBatch::default(); |
| 234 |
batch.delete(format!("meta:{}", chunk_id)); |
| 235 |
batch.delete(format!("data:{}", chunk_id)); |
| 236 |
|
| 237 |
self.db.write(batch) |
| 238 |
.context("Failed to delete chunk from database")?; |
| 239 |
|
| 240 |
// Update cache and statistics |
| 241 |
{ |
| 242 |
let mut cache = self.metadata_cache.write().await; |
| 243 |
cache.remove(chunk_id); |
| 244 |
|
| 245 |
let mut stats = self.stats.write().await; |
| 246 |
stats.total_chunks = stats.total_chunks.saturating_sub(1); |
| 247 |
stats.total_size = stats.total_size.saturating_sub(metadata.size); |
| 248 |
} |
| 249 |
|
| 250 |
info!("Successfully deleted chunk: {}", chunk_id); |
| 251 |
Ok(true) |
| 252 |
} |
| 253 |
|
| 254 |
/// Check if a chunk exists |
| 255 |
pub async fn chunk_exists(&self, chunk_id: &str) -> Result<bool> { |
| 256 |
// Check cache first |
| 257 |
{ |
| 258 |
let cache = self.metadata_cache.read().await; |
| 259 |
if cache.contains_key(chunk_id) { |
| 260 |
let mut stats = self.stats.write().await; |
| 261 |
stats.cache_hits += 1; |
| 262 |
return Ok(true); |
| 263 |
} |
| 264 |
} |
| 265 |
|
| 266 |
// Check database |
| 267 |
let metadata_key = format!("meta:{}", chunk_id); |
| 268 |
let exists = self.db.get(&metadata_key)?.is_some(); |
| 269 |
|
| 270 |
// Update cache miss count |
| 271 |
{ |
| 272 |
let mut stats = self.stats.write().await; |
| 273 |
stats.cache_misses += 1; |
| 274 |
} |
| 275 |
|
| 276 |
Ok(exists) |
| 277 |
} |
| 278 |
|
| 279 |
/// Get chunk metadata (with caching) |
| 280 |
async fn get_chunk_metadata(&self, chunk_id: &str) -> Result<Option<ChunkMetadata>> { |
| 281 |
// Check cache first |
| 282 |
{ |
| 283 |
let cache = self.metadata_cache.read().await; |
| 284 |
if let Some(metadata) = cache.get(chunk_id) { |
| 285 |
let mut stats = self.stats.write().await; |
| 286 |
stats.cache_hits += 1; |
| 287 |
return Ok(Some(metadata.clone())); |
| 288 |
} |
| 289 |
} |
| 290 |
|
| 291 |
// Load from database |
| 292 |
let metadata_key = format!("meta:{}", chunk_id); |
| 293 |
let metadata_bytes = match self.db.get(&metadata_key)? { |
| 294 |
Some(bytes) => bytes, |
| 295 |
None => return Ok(None), |
| 296 |
}; |
| 297 |
|
| 298 |
let metadata: ChunkMetadata = bincode::deserialize(&metadata_bytes) |
| 299 |
.context("Failed to deserialize chunk metadata")?; |
| 300 |
|
| 301 |
// Update cache |
| 302 |
{ |
| 303 |
let mut cache = self.metadata_cache.write().await; |
| 304 |
cache.insert(chunk_id.to_string(), metadata.clone()); |
| 305 |
|
| 306 |
let mut stats = self.stats.write().await; |
| 307 |
stats.cache_misses += 1; |
| 308 |
} |
| 309 |
|
| 310 |
Ok(Some(metadata)) |
| 311 |
} |
| 312 |
|
| 313 |
/// Verify chunk integrity using hash and checksum |
| 314 |
/// |
| 315 |
/// Safety: Double verification prevents data corruption |
| 316 |
async fn verify_chunk_integrity(&self, data: &[u8], metadata: &ChunkMetadata) -> Result<bool> { |
| 317 |
// Verify size |
| 318 |
if data.len() as u64 != metadata.size { |
| 319 |
return Ok(false); |
| 320 |
} |
| 321 |
|
| 322 |
// Verify hash |
| 323 |
let mut hasher = Sha256::new(); |
| 324 |
hasher.update(data); |
| 325 |
let computed_hash = hex::encode(hasher.finalize()); |
| 326 |
|
| 327 |
if computed_hash != metadata.hash { |
| 328 |
return Ok(false); |
| 329 |
} |
| 330 |
|
| 331 |
// Verify checksum |
| 332 |
let computed_checksum = self.calculate_checksum(data, &computed_hash); |
| 333 |
if computed_checksum != metadata.checksum { |
| 334 |
return Ok(false); |
| 335 |
} |
| 336 |
|
| 337 |
Ok(true) |
| 338 |
} |
| 339 |
|
| 340 |
/// Calculate additional checksum for integrity verification |
| 341 |
fn calculate_checksum(&self, data: &[u8], hash: &str) -> String { |
| 342 |
let mut hasher = blake3::Hasher::new(); |
| 343 |
hasher.update(data); |
| 344 |
hasher.update(hash.as_bytes()); |
| 345 |
hex::encode(hasher.finalize().as_bytes()) |
| 346 |
} |
| 347 |
|
| 348 |
/// Load storage statistics from database |
| 349 |
fn load_stats_from_db(&self) -> Result<()> { |
| 350 |
// Implementation for loading stats would go here |
| 351 |
// For now, we'll calculate on-the-fly |
| 352 |
Ok(()) |
| 353 |
} |
| 354 |
|
| 355 |
/// Get current storage statistics |
| 356 |
/// |
| 357 |
/// Transparency: Provide comprehensive storage metrics |
| 358 |
pub async fn get_stats(&self) -> StorageStats { |
| 359 |
let stats = self.stats.read().await; |
| 360 |
StorageStats { |
| 361 |
total_chunks: stats.total_chunks, |
| 362 |
total_size: stats.total_size, |
| 363 |
cache_hits: stats.cache_hits, |
| 364 |
cache_misses: stats.cache_misses, |
| 365 |
} |
| 366 |
} |
| 367 |
} |
| 368 |
|
| 369 |
#[cfg(test)] |
| 370 |
mod tests { |
| 371 |
use super::*; |
| 372 |
use tempfile::tempdir; |
| 373 |
|
| 374 |
#[tokio::test] |
| 375 |
async fn test_chunk_store_creation() { |
| 376 |
let temp_dir = tempdir().unwrap(); |
| 377 |
let store = ChunkStore::new(temp_dir.path()).unwrap(); |
| 378 |
|
| 379 |
let stats = store.get_stats().await; |
| 380 |
assert_eq!(stats.total_chunks, 0); |
| 381 |
assert_eq!(stats.total_size, 0); |
| 382 |
} |
| 383 |
|
| 384 |
#[tokio::test] |
| 385 |
async fn test_store_and_retrieve_chunk() { |
| 386 |
let temp_dir = tempdir().unwrap(); |
| 387 |
let store = ChunkStore::new(temp_dir.path()).unwrap(); |
| 388 |
|
| 389 |
let chunk_id = "test-chunk-1"; |
| 390 |
let data = b"Hello, ZephyrFS! This is test data."; |
| 391 |
|
| 392 |
// Store chunk |
| 393 |
let hash = store.store_chunk(chunk_id, data).await.unwrap(); |
| 394 |
assert!(!hash.is_empty()); |
| 395 |
|
| 396 |
// Verify existence |
| 397 |
assert!(store.chunk_exists(chunk_id).await.unwrap()); |
| 398 |
|
| 399 |
// Retrieve chunk |
| 400 |
let retrieved = store.retrieve_chunk(chunk_id).await.unwrap().unwrap(); |
| 401 |
assert_eq!(retrieved, data); |
| 402 |
|
| 403 |
// Check stats |
| 404 |
let stats = store.get_stats().await; |
| 405 |
assert_eq!(stats.total_chunks, 1); |
| 406 |
assert_eq!(stats.total_size, data.len() as u64); |
| 407 |
} |
| 408 |
|
| 409 |
#[tokio::test] |
| 410 |
async fn test_chunk_reference_counting() { |
| 411 |
let temp_dir = tempdir().unwrap(); |
| 412 |
let store = ChunkStore::new(temp_dir.path()).unwrap(); |
| 413 |
|
| 414 |
let chunk_id = "ref-count-test"; |
| 415 |
let data = b"Reference counting test data"; |
| 416 |
|
| 417 |
// Store chunk twice to get ref_count of 2 |
| 418 |
store.store_chunk(chunk_id, data).await.unwrap(); |
| 419 |
store.store_chunk(chunk_id, data).await.unwrap(); |
| 420 |
|
| 421 |
// First delete attempt should not actually delete |
| 422 |
let deleted = store.delete_chunk(chunk_id).await.unwrap(); |
| 423 |
assert!(!deleted); |
| 424 |
assert!(store.chunk_exists(chunk_id).await.unwrap()); |
| 425 |
|
| 426 |
// Second delete attempt should actually delete |
| 427 |
let deleted = store.delete_chunk(chunk_id).await.unwrap(); |
| 428 |
assert!(deleted); |
| 429 |
assert!(!store.chunk_exists(chunk_id).await.unwrap()); |
| 430 |
} |
| 431 |
|
| 432 |
#[tokio::test] |
| 433 |
async fn test_integrity_verification() { |
| 434 |
let temp_dir = tempdir().unwrap(); |
| 435 |
let store = ChunkStore::new(temp_dir.path()).unwrap(); |
| 436 |
|
| 437 |
let chunk_id = "integrity-test"; |
| 438 |
let data = b"Integrity verification test"; |
| 439 |
|
| 440 |
store.store_chunk(chunk_id, data).await.unwrap(); |
| 441 |
|
| 442 |
// Retrieve should succeed with valid data |
| 443 |
let retrieved = store.retrieve_chunk(chunk_id).await.unwrap(); |
| 444 |
assert!(retrieved.is_some()); |
| 445 |
assert_eq!(retrieved.unwrap(), data); |
| 446 |
} |
| 447 |
|
| 448 |
#[tokio::test] |
| 449 |
async fn test_nonexistent_chunk() { |
| 450 |
let temp_dir = tempdir().unwrap(); |
| 451 |
let store = ChunkStore::new(temp_dir.path()).unwrap(); |
| 452 |
|
| 453 |
// Should return None for non-existent chunk |
| 454 |
let result = store.retrieve_chunk("does-not-exist").await.unwrap(); |
| 455 |
assert!(result.is_none()); |
| 456 |
|
| 457 |
// Should return false for existence check |
| 458 |
assert!(!store.chunk_exists("does-not-exist").await.unwrap()); |
| 459 |
|
| 460 |
// Should return false for delete attempt |
| 461 |
let deleted = store.delete_chunk("does-not-exist").await.unwrap(); |
| 462 |
assert!(!deleted); |
| 463 |
} |
| 464 |
} |