pinakes/crates/pinakes-core/src/managed_storage.rs
NotAShelf cd1161ee5d
chore: bump deps; fix clippy lints & cleanup
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I4c4815ad145650a07f108614034d2e996a6a6964
2026-03-06 18:29:35 +03:00

413 lines
11 KiB
Rust

//! Content-addressable managed storage service.
//!
//! Provides server-side file storage with:
//! - BLAKE3 content hashing for deduplication
//! - Hierarchical storage layout: `<root>/<hash[0:2]>/<hash[2:4]>/<full_hash>`
//! - Integrity verification on read (optional)
use std::path::{Path, PathBuf};
use tokio::{
fs,
io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader},
};
use tracing::{debug, info, warn};
use crate::{
error::{PinakesError, Result},
model::ContentHash,
};
/// Content-addressable storage service for managed files.
#[derive(Debug, Clone)]
pub struct ManagedStorageService {
root_dir: PathBuf,
max_upload_size: u64,
verify_on_read: bool,
}
impl ManagedStorageService {
/// Create a new managed storage service.
pub fn new(
root_dir: PathBuf,
max_upload_size: u64,
verify_on_read: bool,
) -> Self {
Self {
root_dir,
max_upload_size,
verify_on_read,
}
}
/// Initialize the storage directory structure.
pub async fn init(&self) -> Result<()> {
fs::create_dir_all(&self.root_dir).await?;
info!(path = %self.root_dir.display(), "initialized managed storage");
Ok(())
}
/// Get the storage path for a content hash.
///
/// Layout: `<root>/<hash[0:2]>/<hash[2:4]>/<full_hash>`
pub fn path(&self, hash: &ContentHash) -> PathBuf {
let h = &hash.0;
if h.len() >= 4 {
self.root_dir.join(&h[0..2]).join(&h[2..4]).join(h)
} else {
// Fallback for short hashes (shouldn't happen with BLAKE3)
self.root_dir.join(h)
}
}
/// Check if a blob exists in storage.
pub async fn exists(&self, hash: &ContentHash) -> bool {
self.path(hash).exists()
}
/// Store a file from an async reader, computing the hash as we go.
///
/// Returns the content hash and file size.
/// If the file already exists with the same hash, returns early
/// (deduplication).
pub async fn store_stream<R: AsyncRead + Unpin>(
&self,
mut reader: R,
) -> Result<(ContentHash, u64)> {
// First, stream to a temp file while computing the hash
let temp_dir = self.root_dir.join("temp");
fs::create_dir_all(&temp_dir).await?;
let temp_id = uuid::Uuid::now_v7();
let temp_path = temp_dir.join(temp_id.to_string());
let mut hasher = blake3::Hasher::new();
let mut temp_file = fs::File::create(&temp_path).await?;
let mut total_size = 0u64;
let mut buf = vec![0u8; 64 * 1024]; // 64KB buffer
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
total_size += n as u64;
if total_size > self.max_upload_size {
// Clean up temp file
drop(temp_file);
let _ = fs::remove_file(&temp_path).await;
return Err(PinakesError::UploadTooLarge(total_size));
}
hasher.update(&buf[..n]);
temp_file.write_all(&buf[..n]).await?;
}
temp_file.flush().await?;
temp_file.sync_all().await?;
drop(temp_file);
let hash = ContentHash::new(hasher.finalize().to_hex().to_string());
let final_path = self.path(&hash);
// Check if file already exists (deduplication)
if final_path.exists() {
// Verify size matches
let existing_meta = fs::metadata(&final_path).await?;
if existing_meta.len() == total_size {
debug!(hash = %hash, "blob already exists, deduplicating");
let _ = fs::remove_file(&temp_path).await;
return Ok((hash, total_size));
} else {
warn!(
hash = %hash,
expected = total_size,
actual = existing_meta.len(),
"size mismatch for existing blob, replacing"
);
}
}
// Move temp file to final location
if let Some(parent) = final_path.parent() {
fs::create_dir_all(parent).await?;
}
fs::rename(&temp_path, &final_path).await?;
info!(hash = %hash, size = total_size, "stored new blob");
Ok((hash, total_size))
}
/// Store a file from a path.
pub async fn store_file(&self, path: &Path) -> Result<(ContentHash, u64)> {
let file = fs::File::open(path).await?;
let reader = BufReader::new(file);
self.store_stream(reader).await
}
/// Store bytes directly.
pub async fn store_bytes(&self, data: &[u8]) -> Result<(ContentHash, u64)> {
use std::io::Cursor;
let cursor = Cursor::new(data);
self.store_stream(cursor).await
}
/// Open a blob for reading.
pub async fn open(&self, hash: &ContentHash) -> Result<fs::File> {
let path = self.path(hash);
if !path.exists() {
return Err(PinakesError::BlobNotFound(hash.0.clone()));
}
if self.verify_on_read {
self.verify(hash).await?;
}
fs::File::open(&path).await.map_err(PinakesError::Io)
}
/// Read a blob entirely into memory.
pub async fn read(&self, hash: &ContentHash) -> Result<Vec<u8>> {
let path = self.path(hash);
if !path.exists() {
return Err(PinakesError::BlobNotFound(hash.0.clone()));
}
let data = fs::read(&path).await?;
if self.verify_on_read {
let computed = blake3::hash(&data);
if computed.to_hex().to_string() != hash.0 {
return Err(PinakesError::StorageIntegrity(format!(
"hash mismatch for blob {}",
hash
)));
}
}
Ok(data)
}
/// Verify the integrity of a stored blob.
pub async fn verify(&self, hash: &ContentHash) -> Result<bool> {
let path = self.path(hash);
if !path.exists() {
return Ok(false);
}
let file = fs::File::open(&path).await?;
let mut reader = BufReader::new(file);
let mut hasher = blake3::Hasher::new();
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
let computed = hasher.finalize().to_hex().to_string();
if computed != hash.0 {
warn!(
expected = %hash,
computed = %computed,
"blob integrity check failed"
);
return Err(PinakesError::StorageIntegrity(format!(
"hash mismatch: expected {}, computed {}",
hash, computed
)));
}
debug!(hash = %hash, "blob integrity verified");
Ok(true)
}
/// Delete a blob from storage.
pub async fn delete(&self, hash: &ContentHash) -> Result<()> {
let path = self.path(hash);
if path.exists() {
fs::remove_file(&path).await?;
info!(hash = %hash, "deleted blob");
// Try to remove empty parent directories
if let Some(parent) = path.parent() {
let _ = fs::remove_dir(parent).await;
if let Some(grandparent) = parent.parent() {
let _ = fs::remove_dir(grandparent).await;
}
}
}
Ok(())
}
/// Get the size of a stored blob.
pub async fn size(&self, hash: &ContentHash) -> Result<u64> {
let path = self.path(hash);
if !path.exists() {
return Err(PinakesError::BlobNotFound(hash.0.clone()));
}
let meta = fs::metadata(&path).await?;
Ok(meta.len())
}
/// List all blob hashes in storage.
pub async fn list_all(&self) -> Result<Vec<ContentHash>> {
let mut hashes = Vec::new();
let mut entries = fs::read_dir(&self.root_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() && path.file_name().map(|n| n.len()) == Some(2) {
let mut sub_entries = fs::read_dir(&path).await?;
while let Some(sub_entry) = sub_entries.next_entry().await? {
let sub_path = sub_entry.path();
if sub_path.is_dir()
&& sub_path.file_name().map(|n| n.len()) == Some(2)
{
let mut file_entries = fs::read_dir(&sub_path).await?;
while let Some(file_entry) = file_entries.next_entry().await? {
let file_path = file_entry.path();
if file_path.is_file()
&& let Some(name) = file_path.file_name()
{
hashes
.push(ContentHash::new(name.to_string_lossy().to_string()));
}
}
}
}
}
}
Ok(hashes)
}
/// Calculate total storage used by all blobs.
pub async fn total_size(&self) -> Result<u64> {
let hashes = self.list_all().await?;
let mut total = 0u64;
for hash in hashes {
if let Ok(size) = self.size(&hash).await {
total += size;
}
}
Ok(total)
}
/// Clean up any orphaned temp files.
pub async fn cleanup_temp(&self) -> Result<u64> {
let temp_dir = self.root_dir.join("temp");
if !temp_dir.exists() {
return Ok(0);
}
let mut count = 0u64;
let mut entries = fs::read_dir(&temp_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_file() {
// Check if temp file is old (> 1 hour)
if let Ok(meta) = fs::metadata(&path).await
&& let Ok(modified) = meta.modified()
{
let age = std::time::SystemTime::now()
.duration_since(modified)
.unwrap_or_default();
if age.as_secs() > 3600 {
let _ = fs::remove_file(&path).await;
count += 1;
}
}
}
}
if count > 0 {
info!(count, "cleaned up orphaned temp files");
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn test_store_and_retrieve() {
let dir = tempdir().unwrap();
let service =
ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, false);
service.init().await.unwrap();
let data = b"hello, world!";
let (hash, size) = service.store_bytes(data).await.unwrap();
assert_eq!(size, data.len() as u64);
assert!(service.exists(&hash).await);
let retrieved = service.read(&hash).await.unwrap();
assert_eq!(retrieved, data);
}
#[tokio::test]
async fn test_deduplication() {
let dir = tempdir().unwrap();
let service =
ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, false);
service.init().await.unwrap();
let data = b"duplicate content";
let (hash1, _) = service.store_bytes(data).await.unwrap();
let (hash2, _) = service.store_bytes(data).await.unwrap();
assert_eq!(hash1.0, hash2.0);
assert_eq!(service.list_all().await.unwrap().len(), 1);
}
#[tokio::test]
async fn test_verify_integrity() {
let dir = tempdir().unwrap();
let service =
ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, true);
service.init().await.unwrap();
let data = b"verify me";
let (hash, _) = service.store_bytes(data).await.unwrap();
assert!(service.verify(&hash).await.unwrap());
}
#[tokio::test]
async fn test_upload_too_large() {
let dir = tempdir().unwrap();
let service =
ManagedStorageService::new(dir.path().to_path_buf(), 100, false);
service.init().await.unwrap();
let data = vec![0u8; 200];
let result = service.store_bytes(&data).await;
assert!(matches!(result, Err(PinakesError::UploadTooLarge(_))));
}
#[tokio::test]
async fn test_delete() {
let dir = tempdir().unwrap();
let service =
ManagedStorageService::new(dir.path().to_path_buf(), 1024 * 1024, false);
service.init().await.unwrap();
let data = b"delete me";
let (hash, _) = service.store_bytes(data).await.unwrap();
assert!(service.exists(&hash).await);
service.delete(&hash).await.unwrap();
assert!(!service.exists(&hash).await);
}
}