//! Chunked upload handling for large file sync. use std::path::{Path, PathBuf}; use chrono::Utc; use tokio::{ fs, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; use tracing::{debug, info}; use uuid::Uuid; use super::{ChunkInfo, UploadSession}; use crate::error::{PinakesError, Result}; /// Manager for chunked uploads. #[derive(Debug, Clone)] pub struct ChunkedUploadManager { temp_dir: PathBuf, } impl ChunkedUploadManager { /// Create a new chunked upload manager. pub fn new(temp_dir: PathBuf) -> Self { Self { temp_dir } } /// Initialize the temp directory. pub async fn init(&self) -> Result<()> { fs::create_dir_all(&self.temp_dir).await?; Ok(()) } /// Get the temp file path for an upload session. pub fn temp_path(&self, session_id: Uuid) -> PathBuf { self.temp_dir.join(format!("{}.upload", session_id)) } /// Create the temp file for a new upload session. pub async fn create_temp_file(&self, session: &UploadSession) -> Result<()> { let path = self.temp_path(session.id); // Create a sparse file of the expected size let file = fs::File::create(&path).await?; file.set_len(session.expected_size).await?; debug!( session_id = %session.id, size = session.expected_size, "created temp file for upload" ); Ok(()) } /// Write a chunk to the temp file. pub async fn write_chunk( &self, session: &UploadSession, chunk_index: u64, data: &[u8], ) -> Result { let path = self.temp_path(session.id); if !path.exists() { return Err(PinakesError::UploadSessionNotFound(session.id.to_string())); } // Calculate offset let offset = chunk_index * session.chunk_size; // Validate chunk if offset >= session.expected_size { return Err(PinakesError::ChunkOutOfOrder { expected: session.chunk_count - 1, actual: chunk_index, }); } // Calculate expected chunk size let expected_size = if chunk_index == session.chunk_count - 1 { // Last chunk may be smaller session.expected_size - offset } else { session.chunk_size }; if data.len() as u64 != expected_size { return Err(PinakesError::InvalidData(format!( "chunk {} has wrong size: expected {}, got {}", chunk_index, expected_size, data.len() ))); } // Write chunk to file at offset let mut file = fs::OpenOptions::new().write(true).open(&path).await?; file.seek(std::io::SeekFrom::Start(offset)).await?; file.write_all(data).await?; file.flush().await?; // Compute chunk hash let hash = blake3::hash(data).to_hex().to_string(); debug!( session_id = %session.id, chunk_index, offset, size = data.len(), "wrote chunk" ); Ok(ChunkInfo { upload_id: session.id, chunk_index, offset, size: data.len() as u64, hash, received_at: Utc::now(), }) } /// Verify and finalize the upload. /// /// Checks that: /// 1. All chunks are received /// 2. File size matches expected /// 3. Content hash matches expected pub async fn finalize( &self, session: &UploadSession, received_chunks: &[ChunkInfo], ) -> Result { let path = self.temp_path(session.id); // Check all chunks received if received_chunks.len() as u64 != session.chunk_count { return Err(PinakesError::InvalidData(format!( "missing chunks: expected {}, got {}", session.chunk_count, received_chunks.len() ))); } // Verify chunk indices let mut indices: Vec = received_chunks.iter().map(|c| c.chunk_index).collect(); indices.sort(); for (i, idx) in indices.iter().enumerate() { if *idx != i as u64 { return Err(PinakesError::InvalidData(format!( "chunk {} missing or out of order", i ))); } } // Verify file size let metadata = fs::metadata(&path).await?; if metadata.len() != session.expected_size { return Err(PinakesError::InvalidData(format!( "file size mismatch: expected {}, got {}", session.expected_size, metadata.len() ))); } // Verify content hash let computed_hash = compute_file_hash(&path).await?; if computed_hash != session.expected_hash.0 { return Err(PinakesError::StorageIntegrity(format!( "hash mismatch: expected {}, computed {}", session.expected_hash, computed_hash ))); } info!( session_id = %session.id, hash = %session.expected_hash, size = session.expected_size, "finalized chunked upload" ); Ok(path) } /// Cancel an upload and clean up temp file. pub async fn cancel(&self, session_id: Uuid) -> Result<()> { let path = self.temp_path(session_id); if path.exists() { fs::remove_file(&path).await?; debug!(session_id = %session_id, "cancelled upload, removed temp file"); } Ok(()) } /// Clean up expired temp files. pub async fn cleanup_expired(&self, max_age_hours: u64) -> Result { let mut count = 0u64; let max_age = std::time::Duration::from_secs(max_age_hours * 3600); let mut entries = fs::read_dir(&self.temp_dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.extension().map(|e| e == "upload").unwrap_or(false) && let Ok(metadata) = fs::metadata(&path).await && let Ok(modified) = metadata.modified() { let age = std::time::SystemTime::now() .duration_since(modified) .unwrap_or_default(); if age > max_age { let _ = fs::remove_file(&path).await; count += 1; } } } if count > 0 { info!(count, "cleaned up expired upload temp files"); } Ok(count) } } /// Compute the BLAKE3 hash of a file. async fn compute_file_hash(path: &Path) -> Result { let mut file = fs::File::open(path).await?; let mut hasher = blake3::Hasher::new(); let mut buf = vec![0u8; 64 * 1024]; loop { let n = file.read(&mut buf).await?; if n == 0 { break; } hasher.update(&buf[..n]); } Ok(hasher.finalize().to_hex().to_string()) } #[cfg(test)] mod tests { use tempfile::tempdir; use super::*; use crate::{model::ContentHash, sync::UploadStatus}; #[tokio::test] async fn test_chunked_upload() { let dir = tempdir().unwrap(); let manager = ChunkedUploadManager::new(dir.path().to_path_buf()); manager.init().await.unwrap(); // Create test data let data = b"Hello, World! This is test data for chunked upload."; let hash = blake3::hash(data).to_hex().to_string(); let chunk_size = 20u64; let session = UploadSession { id: Uuid::now_v7(), device_id: super::super::DeviceId::new(), target_path: "/test/file.txt".to_string(), expected_hash: ContentHash::new(hash.clone()), expected_size: data.len() as u64, chunk_size, chunk_count: (data.len() as u64 + chunk_size - 1) / chunk_size, status: UploadStatus::InProgress, created_at: Utc::now(), expires_at: Utc::now() + chrono::Duration::hours(24), last_activity: Utc::now(), }; manager.create_temp_file(&session).await.unwrap(); // Write chunks let mut chunks = Vec::new(); for i in 0..session.chunk_count { let start = (i * chunk_size) as usize; let end = ((i + 1) * chunk_size).min(data.len() as u64) as usize; let chunk_data = &data[start..end]; let chunk = manager.write_chunk(&session, i, chunk_data).await.unwrap(); chunks.push(chunk); } // Finalize let final_path = manager.finalize(&session, &chunks).await.unwrap(); assert!(final_path.exists()); // Verify content let content = fs::read(&final_path).await.unwrap(); assert_eq!(&content[..], data); } }