pinakes-server: integrate chunked upload manager into sync endpoints

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ia2069c8c1f05d0dee8078d9eba5b1aa06a6a6964
This commit is contained in:
raf 2026-02-09 17:11:52 +03:00
commit 419e1d233b
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF

View file

@ -21,8 +21,8 @@ use crate::state::AppState;
use pinakes_core::config::ConflictResolution;
use pinakes_core::model::ContentHash;
use pinakes_core::sync::{
ChunkInfo, DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry,
UploadSession, UploadStatus, generate_device_token, hash_device_token, update_device_cursor,
DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry, UploadSession,
UploadStatus, generate_device_token, hash_device_token, update_device_cursor,
};
use std::path::Path as FilePath;
@ -498,6 +498,14 @@ pub async fn create_upload(
.await
.map_err(|e| ApiError::internal(format!("Failed to create upload session: {}", e)))?;
// Create temp file for chunked upload if manager is available
if let Some(ref manager) = state.chunked_upload_manager {
manager
.create_temp_file(&session)
.await
.map_err(|e| ApiError::internal(format!("Failed to create temp file: {}", e)))?;
}
Ok(Json(session.into()))
}
@ -523,28 +531,25 @@ pub async fn upload_chunk(
return Err(ApiError::bad_request("Invalid chunk index"));
}
// Calculate chunk hash
let hash = blake3::hash(&body);
let chunk_hash = hash.to_hex().to_string();
// Require chunked upload manager to be available
let manager = state
.chunked_upload_manager
.as_ref()
.ok_or_else(|| ApiError::internal("Chunked upload manager not available"))?;
let chunk = ChunkInfo {
upload_id: session_id,
chunk_index,
offset: chunk_index * session.chunk_size,
size: body.len() as u64,
hash: chunk_hash,
received_at: Utc::now(),
};
// Write chunk data to temp file
let chunk = manager
.write_chunk(&session, chunk_index, body.as_ref())
.await
.map_err(|e| ApiError::internal(format!("Failed to write chunk: {}", e)))?;
// Record chunk metadata in database
state
.storage
.record_chunk(session_id, &chunk)
.await
.map_err(|e| ApiError::internal(format!("Failed to record chunk: {}", e)))?;
// Store the chunk data (would integrate with managed storage)
// For now, this is a placeholder - actual implementation would write to temp storage
Ok(Json(ChunkUploadedResponse {
chunk_index,
received: true,
@ -593,6 +598,102 @@ pub async fn complete_upload(
)));
}
// Require chunked upload manager to be available
let manager = state
.chunked_upload_manager
.as_ref()
.ok_or_else(|| ApiError::internal("Chunked upload manager not available"))?;
// Verify and finalize the temp file
let temp_path = manager
.finalize(&session, &chunks)
.await
.map_err(|e| ApiError::internal(format!("Failed to finalize upload: {}", e)))?;
// Validate and resolve target path securely
let target_path = std::path::Path::new(&session.target_path);
// Reject absolute paths from client
if target_path.is_absolute() {
return Err(ApiError::bad_request("Absolute paths are not allowed"));
}
// Reject empty paths
if target_path.as_os_str().is_empty() {
return Err(ApiError::bad_request("Empty target path not allowed"));
}
// Get root directory from config
let config = state.config.read().await;
let root = config
.directories
.roots
.first()
.cloned()
.ok_or_else(|| ApiError::internal("No root directory configured"))?;
drop(config);
// Build candidate path
let candidate = root.join(target_path);
// Canonicalize root to resolve symlinks and get absolute path
let root_canon = tokio::fs::canonicalize(&root)
.await
.map_err(|e| ApiError::internal(format!("Failed to canonicalize root: {}", e)))?;
// Ensure parent directory exists before canonicalizing candidate
if let Some(parent) = candidate.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| ApiError::internal(format!("Failed to create directory: {}", e)))?;
}
// Try to canonicalize the candidate path (without the final file)
// If it exists, canonicalize it; otherwise canonicalize parent and append filename
let final_path = if let Ok(canon) = tokio::fs::canonicalize(&candidate).await {
canon
} else if let Some(parent) = candidate.parent() {
let parent_canon = tokio::fs::canonicalize(parent)
.await
.map_err(|e| ApiError::internal(format!("Failed to canonicalize parent: {}", e)))?;
if let Some(filename) = candidate.file_name() {
parent_canon.join(filename)
} else {
return Err(ApiError::bad_request("Invalid target path"));
}
} else {
return Err(ApiError::bad_request("Invalid target path"));
};
// Ensure resolved path is still under root (path traversal check)
if !final_path.starts_with(&root_canon) {
return Err(ApiError::bad_request("Path traversal not allowed"));
}
// Move temp file to final location (with cross-filesystem fallback)
if let Err(e) = tokio::fs::rename(&temp_path, &final_path).await {
// Check for cross-filesystem error
if e.kind() == std::io::ErrorKind::CrossesDevices || e.raw_os_error() == Some(18)
// EXDEV on Linux
{
// Fallback: copy then remove
tokio::fs::copy(&temp_path, &final_path)
.await
.map_err(|e| ApiError::internal(format!("Failed to copy file: {}", e)))?;
let _ = tokio::fs::remove_file(&temp_path).await;
} else {
return Err(ApiError::internal(format!("Failed to move file: {}", e)));
}
}
tracing::info!(
session_id = %id,
target = %final_path.display(),
"completed chunked upload"
);
// Mark session as completed
session.status = UploadStatus::Completed;
state
@ -607,8 +708,8 @@ pub async fn complete_upload(
sequence: 0,
change_type: SyncChangeType::Created,
media_id: None,
path: session.target_path,
content_hash: Some(session.expected_hash),
path: session.target_path.clone(),
content_hash: Some(session.expected_hash.clone()),
file_size: Some(session.expected_size),
metadata_json: None,
changed_by_device: Some(session.device_id),
@ -636,6 +737,13 @@ pub async fn cancel_upload(
.await
.map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?;
// Clean up temp file if manager is available
if let Some(ref manager) = state.chunked_upload_manager {
if let Err(e) = manager.cancel(id).await {
tracing::warn!(session_id = %id, error = %e, "failed to clean up temp file");
}
}
session.status = UploadStatus::Cancelled;
state
.storage