From 419e1d233bc29262715105cce6c556c696bc15c5 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 9 Feb 2026 17:11:52 +0300 Subject: [PATCH] pinakes-server: integrate chunked upload manager into sync endpoints Signed-off-by: NotAShelf Change-Id: Ia2069c8c1f05d0dee8078d9eba5b1aa06a6a6964 --- crates/pinakes-server/src/routes/sync.rs | 144 ++++++++++++++++++++--- 1 file changed, 126 insertions(+), 18 deletions(-) diff --git a/crates/pinakes-server/src/routes/sync.rs b/crates/pinakes-server/src/routes/sync.rs index 620c6a7..f31d08a 100644 --- a/crates/pinakes-server/src/routes/sync.rs +++ b/crates/pinakes-server/src/routes/sync.rs @@ -21,8 +21,8 @@ use crate::state::AppState; use pinakes_core::config::ConflictResolution; use pinakes_core::model::ContentHash; use pinakes_core::sync::{ - ChunkInfo, DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry, - UploadSession, UploadStatus, generate_device_token, hash_device_token, update_device_cursor, + DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry, UploadSession, + UploadStatus, generate_device_token, hash_device_token, update_device_cursor, }; use std::path::Path as FilePath; @@ -498,6 +498,14 @@ pub async fn create_upload( .await .map_err(|e| ApiError::internal(format!("Failed to create upload session: {}", e)))?; + // Create temp file for chunked upload if manager is available + if let Some(ref manager) = state.chunked_upload_manager { + manager + .create_temp_file(&session) + .await + .map_err(|e| ApiError::internal(format!("Failed to create temp file: {}", e)))?; + } + Ok(Json(session.into())) } @@ -523,28 +531,25 @@ pub async fn upload_chunk( return Err(ApiError::bad_request("Invalid chunk index")); } - // Calculate chunk hash - let hash = blake3::hash(&body); - let chunk_hash = hash.to_hex().to_string(); + // Require chunked upload manager to be available + let manager = state + .chunked_upload_manager + .as_ref() + .ok_or_else(|| ApiError::internal("Chunked upload manager not available"))?; - let chunk = ChunkInfo { - upload_id: session_id, - chunk_index, - offset: chunk_index * session.chunk_size, - size: body.len() as u64, - hash: chunk_hash, - received_at: Utc::now(), - }; + // Write chunk data to temp file + let chunk = manager + .write_chunk(&session, chunk_index, body.as_ref()) + .await + .map_err(|e| ApiError::internal(format!("Failed to write chunk: {}", e)))?; + // Record chunk metadata in database state .storage .record_chunk(session_id, &chunk) .await .map_err(|e| ApiError::internal(format!("Failed to record chunk: {}", e)))?; - // Store the chunk data (would integrate with managed storage) - // For now, this is a placeholder - actual implementation would write to temp storage - Ok(Json(ChunkUploadedResponse { chunk_index, received: true, @@ -593,6 +598,102 @@ pub async fn complete_upload( ))); } + // Require chunked upload manager to be available + let manager = state + .chunked_upload_manager + .as_ref() + .ok_or_else(|| ApiError::internal("Chunked upload manager not available"))?; + + // Verify and finalize the temp file + let temp_path = manager + .finalize(&session, &chunks) + .await + .map_err(|e| ApiError::internal(format!("Failed to finalize upload: {}", e)))?; + + // Validate and resolve target path securely + let target_path = std::path::Path::new(&session.target_path); + + // Reject absolute paths from client + if target_path.is_absolute() { + return Err(ApiError::bad_request("Absolute paths are not allowed")); + } + + // Reject empty paths + if target_path.as_os_str().is_empty() { + return Err(ApiError::bad_request("Empty target path not allowed")); + } + + // Get root directory from config + let config = state.config.read().await; + let root = config + .directories + .roots + .first() + .cloned() + .ok_or_else(|| ApiError::internal("No root directory configured"))?; + drop(config); + + // Build candidate path + let candidate = root.join(target_path); + + // Canonicalize root to resolve symlinks and get absolute path + let root_canon = tokio::fs::canonicalize(&root) + .await + .map_err(|e| ApiError::internal(format!("Failed to canonicalize root: {}", e)))?; + + // Ensure parent directory exists before canonicalizing candidate + if let Some(parent) = candidate.parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|e| ApiError::internal(format!("Failed to create directory: {}", e)))?; + } + + // Try to canonicalize the candidate path (without the final file) + // If it exists, canonicalize it; otherwise canonicalize parent and append filename + let final_path = if let Ok(canon) = tokio::fs::canonicalize(&candidate).await { + canon + } else if let Some(parent) = candidate.parent() { + let parent_canon = tokio::fs::canonicalize(parent) + .await + .map_err(|e| ApiError::internal(format!("Failed to canonicalize parent: {}", e)))?; + + if let Some(filename) = candidate.file_name() { + parent_canon.join(filename) + } else { + return Err(ApiError::bad_request("Invalid target path")); + } + } else { + return Err(ApiError::bad_request("Invalid target path")); + }; + + // Ensure resolved path is still under root (path traversal check) + if !final_path.starts_with(&root_canon) { + return Err(ApiError::bad_request("Path traversal not allowed")); + } + + // Move temp file to final location (with cross-filesystem fallback) + if let Err(e) = tokio::fs::rename(&temp_path, &final_path).await { + // Check for cross-filesystem error + if e.kind() == std::io::ErrorKind::CrossesDevices || e.raw_os_error() == Some(18) + // EXDEV on Linux + { + // Fallback: copy then remove + tokio::fs::copy(&temp_path, &final_path) + .await + .map_err(|e| ApiError::internal(format!("Failed to copy file: {}", e)))?; + + let _ = tokio::fs::remove_file(&temp_path).await; + } else { + return Err(ApiError::internal(format!("Failed to move file: {}", e))); + } + } + + tracing::info!( + session_id = %id, + target = %final_path.display(), + "completed chunked upload" + ); + // Mark session as completed session.status = UploadStatus::Completed; state @@ -607,8 +708,8 @@ pub async fn complete_upload( sequence: 0, change_type: SyncChangeType::Created, media_id: None, - path: session.target_path, - content_hash: Some(session.expected_hash), + path: session.target_path.clone(), + content_hash: Some(session.expected_hash.clone()), file_size: Some(session.expected_size), metadata_json: None, changed_by_device: Some(session.device_id), @@ -636,6 +737,13 @@ pub async fn cancel_upload( .await .map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?; + // Clean up temp file if manager is available + if let Some(ref manager) = state.chunked_upload_manager { + if let Err(e) = manager.cancel(id).await { + tracing::warn!(session_id = %id, error = %e, "failed to clean up temp file"); + } + } + session.status = UploadStatus::Cancelled; state .storage