//! Content-addressable managed storage service. //! //! Provides server-side file storage with: //! - BLAKE3 content hashing for deduplication //! - Hierarchical storage layout: `///` //! - Integrity verification on read (optional) use std::path::{Path, PathBuf}; use tokio::{ fs, io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}, }; use tracing::{debug, info, warn}; use crate::{ error::{PinakesError, Result}, model::ContentHash, }; /// Content-addressable storage service for managed files. #[derive(Debug, Clone)] pub struct ManagedStorageService { root_dir: PathBuf, max_upload_size: u64, verify_on_read: bool, } impl ManagedStorageService { /// Create a new managed storage service. #[must_use] pub const fn new( root_dir: PathBuf, max_upload_size: u64, verify_on_read: bool, ) -> Self { Self { root_dir, max_upload_size, verify_on_read, } } /// Initialize the storage directory structure. /// /// # Errors /// /// Returns [`PinakesError`] if the directory cannot be created. pub async fn init(&self) -> Result<()> { fs::create_dir_all(&self.root_dir).await?; info!(path = %self.root_dir.display(), "initialized managed storage"); Ok(()) } /// Get the storage path for a content hash. /// /// Layout: `///` #[must_use] pub fn path(&self, hash: &ContentHash) -> PathBuf { let h = &hash.0; if h.len() >= 4 { self.root_dir.join(&h[0..2]).join(&h[2..4]).join(h) } else { // Fallback for short hashes (shouldn't happen with BLAKE3) self.root_dir.join(h) } } /// Check if a blob exists in storage. #[must_use] pub fn exists(&self, hash: &ContentHash) -> bool { self.path(hash).exists() } /// Store a file from an async reader, computing the hash as we go. /// /// Returns the content hash and file size. /// If the file already exists with the same hash, returns early /// (deduplication). /// /// # Errors /// /// Returns [`PinakesError`] if the file cannot be stored or exceeds the size /// limit. pub async fn store_stream( &self, mut reader: R, ) -> Result<(ContentHash, u64)> { // First, stream to a temp file while computing the hash let temp_dir = self.root_dir.join("temp"); fs::create_dir_all(&temp_dir).await?; let temp_id = uuid::Uuid::now_v7(); let temp_path = temp_dir.join(temp_id.to_string()); let mut hasher = blake3::Hasher::new(); let mut temp_file = fs::File::create(&temp_path).await?; let mut total_size = 0u64; let mut buf = vec![0u8; 64 * 1024]; // 64KB buffer loop { let n = reader.read(&mut buf).await?; if n == 0 { break; } total_size += n as u64; if total_size > self.max_upload_size { // Clean up temp file drop(temp_file); let _ = fs::remove_file(&temp_path).await; return Err(PinakesError::UploadTooLarge(total_size)); } hasher.update(&buf[..n]); temp_file.write_all(&buf[..n]).await?; } temp_file.flush().await?; temp_file.sync_all().await?; drop(temp_file); let hash = ContentHash::new(hasher.finalize().to_hex().to_string()); let final_path = self.path(&hash); // Check if file already exists (deduplication) if final_path.exists() { // Verify size matches let existing_meta = fs::metadata(&final_path).await?; if existing_meta.len() == total_size { debug!(hash = %hash, "blob already exists, deduplicating"); let _ = fs::remove_file(&temp_path).await; return Ok((hash, total_size)); } warn!( hash = %hash, expected = total_size, actual = existing_meta.len(), "size mismatch for existing blob, replacing" ); } // Move temp file to final location if let Some(parent) = final_path.parent() { fs::create_dir_all(parent).await?; } fs::rename(&temp_path, &final_path).await?; info!(hash = %hash, size = total_size, "stored new blob"); Ok((hash, total_size)) } /// Store a file from a path. /// /// # Errors /// /// Returns [`PinakesError`] if the file cannot be opened or stored. pub async fn store_file(&self, path: &Path) -> Result<(ContentHash, u64)> { let file = fs::File::open(path).await?; let reader = BufReader::new(file); self.store_stream(reader).await } /// Store bytes directly. /// /// # Errors /// /// Returns [`PinakesError`] if the data cannot be stored or exceeds the size /// limit. pub async fn store_bytes(&self, data: &[u8]) -> Result<(ContentHash, u64)> { use std::io::Cursor; let cursor = Cursor::new(data); self.store_stream(cursor).await } /// Open a blob for reading. /// /// # Errors /// /// Returns [`PinakesError`] if the blob does not exist or cannot be opened. pub async fn open(&self, hash: &ContentHash) -> Result { let path = self.path(hash); if !path.exists() { return Err(PinakesError::BlobNotFound(hash.0.clone())); } if self.verify_on_read { self.verify(hash).await?; } fs::File::open(&path).await.map_err(PinakesError::Io) } /// Read a blob entirely into memory. /// /// # Errors /// /// Returns [`PinakesError`] if the blob does not exist, cannot be read, or /// fails integrity check. pub async fn read(&self, hash: &ContentHash) -> Result> { let path = self.path(hash); if !path.exists() { return Err(PinakesError::BlobNotFound(hash.0.clone())); } let data = fs::read(&path).await?; if self.verify_on_read { let computed = blake3::hash(&data); if computed.to_hex().to_string() != hash.0 { return Err(PinakesError::StorageIntegrity(format!( "hash mismatch for blob {hash}" ))); } } Ok(data) } /// Verify the integrity of a stored blob. /// /// # Errors /// /// Returns [`PinakesError`] if the blob cannot be read or has a hash /// mismatch. pub async fn verify(&self, hash: &ContentHash) -> Result { let path = self.path(hash); if !path.exists() { return Ok(false); } let file = fs::File::open(&path).await?; let mut reader = BufReader::new(file); let mut hasher = blake3::Hasher::new(); let mut buf = vec![0u8; 64 * 1024]; loop { let n = reader.read(&mut buf).await?; if n == 0 { break; } hasher.update(&buf[..n]); } let computed = hasher.finalize().to_hex().to_string(); if computed != hash.0 { warn!( expected = %hash, computed = %computed, "blob integrity check failed" ); return Err(PinakesError::StorageIntegrity(format!( "hash mismatch: expected {hash}, computed {computed}" ))); } debug!(hash = %hash, "blob integrity verified"); Ok(true) } /// Delete a blob from storage. /// /// # Errors /// /// Returns [`PinakesError`] if the blob cannot be removed. pub async fn delete(&self, hash: &ContentHash) -> Result<()> { let path = self.path(hash); if path.exists() { fs::remove_file(&path).await?; info!(hash = %hash, "deleted blob"); // Try to remove empty parent directories if let Some(parent) = path.parent() { let _ = fs::remove_dir(parent).await; if let Some(grandparent) = parent.parent() { let _ = fs::remove_dir(grandparent).await; } } } Ok(()) } /// Get the size of a stored blob. /// /// # Errors /// /// Returns [`PinakesError`] if the blob does not exist or metadata cannot be /// read. pub async fn size(&self, hash: &ContentHash) -> Result { let path = self.path(hash); if !path.exists() { return Err(PinakesError::BlobNotFound(hash.0.clone())); } let meta = fs::metadata(&path).await?; Ok(meta.len()) } /// List all blob hashes in storage. /// /// # Errors /// /// Returns [`PinakesError`] if the storage directory cannot be read. pub async fn list_all(&self) -> Result> { let mut hashes = Vec::new(); let mut entries = fs::read_dir(&self.root_dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_dir() && path.file_name().map(std::ffi::OsStr::len) == Some(2) { let mut sub_entries = fs::read_dir(&path).await?; while let Some(sub_entry) = sub_entries.next_entry().await? { let sub_path = sub_entry.path(); if sub_path.is_dir() && sub_path.file_name().map(std::ffi::OsStr::len) == Some(2) { let mut file_entries = fs::read_dir(&sub_path).await?; while let Some(file_entry) = file_entries.next_entry().await? { let file_path = file_entry.path(); if file_path.is_file() && let Some(name) = file_path.file_name() { hashes .push(ContentHash::new(name.to_string_lossy().to_string())); } } } } } } Ok(hashes) } /// Calculate total storage used by all blobs. /// /// # Errors /// /// Returns [`StorageError`] if listing blobs or querying sizes fails. pub async fn total_size(&self) -> Result { let hashes = self.list_all().await?; let mut total = 0u64; for hash in hashes { if let Ok(size) = self.size(&hash).await { total += size; } } Ok(total) } /// Clean up any orphaned temp files. /// /// # Errors /// /// Returns [`PinakesError`] if the temp directory cannot be read. pub async fn cleanup_temp(&self) -> Result { let temp_dir = self.root_dir.join("temp"); if !temp_dir.exists() { return Ok(0); } let mut count = 0u64; let mut entries = fs::read_dir(&temp_dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_file() { // Check if temp file is old (> 1 hour) if let Ok(meta) = fs::metadata(&path).await && let Ok(modified) = meta.modified() { let age = std::time::SystemTime::now() .duration_since(modified) .unwrap_or_default(); if age.as_secs() > 3600 { let _ = fs::remove_file(&path).await; count += 1; } } } } if count > 0 { info!(count, "cleaned up orphaned temp files"); } Ok(count) } } #[cfg(test)] mod tests { use tempfile::tempdir; use super::*; #[tokio::test] async fn test_store_and_retrieve() { let dir = tempdir().unwrap(); let service = ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, false); service.init().await.unwrap(); let data = b"hello, world!"; let (hash, size) = service.store_bytes(data).await.unwrap(); assert_eq!(size, data.len() as u64); assert!(service.exists(&hash)); let retrieved = service.read(&hash).await.unwrap(); assert_eq!(retrieved, data); } #[tokio::test] async fn test_deduplication() { let dir = tempdir().unwrap(); let service = ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, false); service.init().await.unwrap(); let data = b"duplicate content"; let (hash1, _) = service.store_bytes(data).await.unwrap(); let (hash2, _) = service.store_bytes(data).await.unwrap(); assert_eq!(hash1.0, hash2.0); assert_eq!(service.list_all().await.unwrap().len(), 1); } #[tokio::test] async fn test_verify_integrity() { let dir = tempdir().unwrap(); let service = ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, true); service.init().await.unwrap(); let data = b"verify me"; let (hash, _) = service.store_bytes(data).await.unwrap(); assert!(service.verify(&hash).await.unwrap()); } #[tokio::test] async fn test_upload_too_large() { let dir = tempdir().unwrap(); let service = ManagedStorageService::new(dir.path().to_path_buf(), 100, false); service.init().await.unwrap(); let data = vec![0u8; 200]; let result = service.store_bytes(&data).await; assert!(matches!(result, Err(PinakesError::UploadTooLarge(_)))); } #[tokio::test] async fn test_delete() { let dir = tempdir().unwrap(); let service = ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, false); service.init().await.unwrap(); let data = b"delete me"; let (hash, _) = service.store_bytes(data).await.unwrap(); assert!(service.exists(&hash)); service.delete(&hash).await.unwrap(); assert!(!service.exists(&hash)); } }