use axum::{ Json, body::Body, extract::{Extension, Path, Query, State}, http::{HeaderMap, StatusCode, header}, response::IntoResponse, }; use chrono::Utc; use tokio_util::io::ReaderStream; use uuid::Uuid; use crate::auth::resolve_user_id; use crate::dto::{ AcknowledgeChangesRequest, ChangesResponse, ChunkUploadedResponse, ConflictResponse, CreateUploadSessionRequest, DeviceRegistrationResponse, DeviceResponse, GetChangesParams, RegisterDeviceRequest, ReportChangesRequest, ReportChangesResponse, ResolveConflictRequest, SyncChangeResponse, UpdateDeviceRequest, UploadSessionResponse, }; use crate::error::{ApiError, ApiResult}; use crate::state::AppState; use pinakes_core::config::ConflictResolution; use pinakes_core::model::ContentHash; use pinakes_core::sync::{ DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry, UploadSession, UploadStatus, generate_device_token, hash_device_token, update_device_cursor, }; use std::path::Path as FilePath; const DEFAULT_CHUNK_SIZE: u64 = 4 * 1024 * 1024; // 4MB const DEFAULT_CHANGES_LIMIT: u64 = 100; /// Register a new sync device /// POST /api/sync/devices pub async fn register_device( State(state): State, Extension(username): Extension, Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } drop(config); let user_id = resolve_user_id(&state.storage, &username).await?; let device_type = req .device_type .parse::() .map_err(|_| ApiError::bad_request("Invalid device type"))?; // Generate device token let device_token = generate_device_token(); let token_hash = hash_device_token(&device_token); let now = Utc::now(); let device = SyncDevice { id: DeviceId(Uuid::now_v7()), user_id, name: req.name, device_type, client_version: req.client_version, os_info: req.os_info, last_sync_at: None, last_seen_at: now, sync_cursor: Some(0), enabled: true, created_at: now, updated_at: now, }; let registered = state .storage .register_device(&device, &token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to register device: {}", e)))?; Ok(Json(DeviceRegistrationResponse { device: registered.into(), device_token, })) } /// List user's sync devices /// GET /api/sync/devices pub async fn list_devices( State(state): State, Extension(username): Extension, ) -> ApiResult>> { let user_id = resolve_user_id(&state.storage, &username).await?; let devices = state .storage .list_user_devices(user_id) .await .map_err(|e| ApiError::internal(format!("Failed to list devices: {}", e)))?; Ok(Json(devices.into_iter().map(Into::into).collect())) } /// Get device details /// GET /api/sync/devices/{id} pub async fn get_device( State(state): State, Extension(username): Extension, Path(id): Path, ) -> ApiResult> { let user_id = resolve_user_id(&state.storage, &username).await?; let device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden("Not authorized to access this device")); } Ok(Json(device.into())) } /// Update a device /// PUT /api/sync/devices/{id} pub async fn update_device( State(state): State, Extension(username): Extension, Path(id): Path, Json(req): Json, ) -> ApiResult> { let user_id = resolve_user_id(&state.storage, &username).await?; let mut device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden("Not authorized to update this device")); } if let Some(name) = req.name { device.name = name; } if let Some(enabled) = req.enabled { device.enabled = enabled; } state .storage .update_device(&device) .await .map_err(|e| ApiError::internal(format!("Failed to update device: {}", e)))?; Ok(Json(device.into())) } /// Delete a device /// DELETE /api/sync/devices/{id} pub async fn delete_device( State(state): State, Extension(username): Extension, Path(id): Path, ) -> ApiResult { let user_id = resolve_user_id(&state.storage, &username).await?; let device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden("Not authorized to delete this device")); } state .storage .delete_device(DeviceId(id)) .await .map_err(|e| ApiError::internal(format!("Failed to delete device: {}", e)))?; Ok(StatusCode::NO_CONTENT) } /// Regenerate device token /// POST /api/sync/devices/{id}/token pub async fn regenerate_token( State(state): State, Extension(username): Extension, Path(id): Path, ) -> ApiResult> { let user_id = resolve_user_id(&state.storage, &username).await?; let device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden( "Not authorized to regenerate token for this device", )); } // Generate new token let new_token = generate_device_token(); let token_hash = hash_device_token(&new_token); // Re-register with new token (this updates the token hash) let updated = state .storage .register_device(&device, &token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to regenerate token: {}", e)))?; Ok(Json(DeviceRegistrationResponse { device: updated.into(), device_token: new_token, })) } /// Get changes since cursor /// GET /api/sync/changes pub async fn get_changes( State(state): State, Query(params): Query, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } drop(config); let cursor = params.cursor.unwrap_or(0); let limit = params.limit.unwrap_or(DEFAULT_CHANGES_LIMIT); let changes = state .storage .get_changes_since(cursor, limit + 1) .await .map_err(|e| ApiError::internal(format!("Failed to get changes: {}", e)))?; let has_more = changes.len() > limit as usize; let changes: Vec = changes .into_iter() .take(limit as usize) .map(Into::into) .collect(); let new_cursor = changes.last().map(|c| c.sequence).unwrap_or(cursor); Ok(Json(ChangesResponse { changes, cursor: new_cursor, has_more, })) } /// Report local changes from client /// POST /api/sync/report pub async fn report_changes( State(state): State, Extension(_username): Extension, Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } let conflict_resolution = config.sync.default_conflict_resolution.clone(); drop(config); let mut accepted = Vec::new(); let mut conflicts = Vec::new(); let mut upload_required = Vec::new(); for change in req.changes { // Check for conflicts if let Some(content_hash) = &change.content_hash { let server_state = state .storage .get_media_by_path(FilePath::new(&change.path)) .await .ok() .flatten(); if let Some(server_item) = server_state { let client_hash = ContentHash(content_hash.clone()); if server_item.content_hash != client_hash { // Conflict detected let conflict = SyncConflict { id: Uuid::now_v7(), device_id: DeviceId(Uuid::nil()), // Will be set by device context path: change.path.clone(), local_hash: content_hash.clone(), local_mtime: change.local_mtime.unwrap_or(0), server_hash: server_item.content_hash.to_string(), server_mtime: server_item.updated_at.timestamp(), detected_at: Utc::now(), resolved_at: None, resolution: None, }; // Auto-resolve if configured match conflict_resolution { ConflictResolution::ServerWins => { // Client should download server version accepted.push(change.path); } ConflictResolution::ClientWins => { // Client should upload upload_required.push(change.path); } ConflictResolution::KeepBoth | ConflictResolution::Manual => { conflicts.push(conflict.into()); } } continue; } } } // No conflict, check if upload is needed match change.change_type.as_str() { "created" | "modified" => { if change.content_hash.is_some() { upload_required.push(change.path); } else { accepted.push(change.path); } } "deleted" => { // Record deletion let entry = SyncLogEntry { id: Uuid::now_v7(), sequence: 0, // Will be assigned by storage change_type: SyncChangeType::Deleted, media_id: None, path: change.path.clone(), content_hash: None, file_size: None, metadata_json: None, changed_by_device: None, timestamp: Utc::now(), }; if state.storage.record_sync_change(&entry).await.is_ok() { accepted.push(change.path); } } _ => { accepted.push(change.path); } } } Ok(Json(ReportChangesResponse { accepted, conflicts, upload_required, })) } /// Acknowledge processed changes /// POST /api/sync/ack pub async fn acknowledge_changes( State(state): State, Extension(_username): Extension, headers: HeaderMap, Json(req): Json, ) -> ApiResult { // Get device from header or context let device_token = headers .get("X-Device-Token") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?; let token_hash = hash_device_token(device_token); let device = state .storage .get_device_by_token(&token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; // Update device cursor update_device_cursor(&state.storage, device.id, req.cursor) .await .map_err(|e| ApiError::internal(format!("Failed to update cursor: {}", e)))?; Ok(StatusCode::OK) } /// List unresolved conflicts /// GET /api/sync/conflicts pub async fn list_conflicts( State(state): State, Extension(_username): Extension, headers: HeaderMap, ) -> ApiResult>> { let device_token = headers .get("X-Device-Token") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?; let token_hash = hash_device_token(device_token); let device = state .storage .get_device_by_token(&token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; let conflicts = state .storage .get_unresolved_conflicts(device.id) .await .map_err(|e| ApiError::internal(format!("Failed to get conflicts: {}", e)))?; Ok(Json(conflicts.into_iter().map(Into::into).collect())) } /// Resolve a sync conflict /// POST /api/sync/conflicts/{id}/resolve pub async fn resolve_conflict( State(state): State, Extension(_username): Extension, Path(id): Path, Json(req): Json, ) -> ApiResult { let resolution = match req.resolution.as_str() { "server_wins" => ConflictResolution::ServerWins, "client_wins" => ConflictResolution::ClientWins, "keep_both" => ConflictResolution::KeepBoth, _ => return Err(ApiError::bad_request("Invalid resolution type")), }; state .storage .resolve_conflict(id, resolution) .await .map_err(|e| ApiError::internal(format!("Failed to resolve conflict: {}", e)))?; Ok(StatusCode::OK) } /// Create an upload session for chunked upload /// POST /api/sync/upload pub async fn create_upload( State(state): State, Extension(_username): Extension, headers: HeaderMap, Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } let upload_timeout_hours = config.sync.upload_timeout_hours; drop(config); let device_token = headers .get("X-Device-Token") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?; let token_hash = hash_device_token(device_token); let device = state .storage .get_device_by_token(&token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; let chunk_size = req.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE); let chunk_count = (req.expected_size + chunk_size - 1) / chunk_size; let now = Utc::now(); let session = UploadSession { id: Uuid::now_v7(), device_id: device.id, target_path: req.target_path, expected_hash: ContentHash(req.expected_hash), expected_size: req.expected_size, chunk_size, chunk_count, status: UploadStatus::Pending, created_at: now, expires_at: now + chrono::Duration::hours(upload_timeout_hours as i64), last_activity: now, }; state .storage .create_upload_session(&session) .await .map_err(|e| ApiError::internal(format!("Failed to create upload session: {}", e)))?; // Create temp file for chunked upload if manager is available if let Some(ref manager) = state.chunked_upload_manager { manager .create_temp_file(&session) .await .map_err(|e| ApiError::internal(format!("Failed to create temp file: {}", e)))?; } Ok(Json(session.into())) } /// Upload a chunk /// PUT /api/sync/upload/{id}/chunks/{index} pub async fn upload_chunk( State(state): State, Path((session_id, chunk_index)): Path<(Uuid, u64)>, _headers: HeaderMap, body: axum::body::Bytes, ) -> ApiResult> { let session = state .storage .get_upload_session(session_id) .await .map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?; if session.status == UploadStatus::Expired { return Err(ApiError::bad_request("Upload session has expired")); } if chunk_index >= session.chunk_count { return Err(ApiError::bad_request("Invalid chunk index")); } // Require chunked upload manager to be available let manager = state .chunked_upload_manager .as_ref() .ok_or_else(|| ApiError::internal("Chunked upload manager not available"))?; // Write chunk data to temp file let chunk = manager .write_chunk(&session, chunk_index, body.as_ref()) .await .map_err(|e| ApiError::internal(format!("Failed to write chunk: {}", e)))?; // Record chunk metadata in database state .storage .record_chunk(session_id, &chunk) .await .map_err(|e| ApiError::internal(format!("Failed to record chunk: {}", e)))?; Ok(Json(ChunkUploadedResponse { chunk_index, received: true, })) } /// Get upload session status /// GET /api/sync/upload/{id} pub async fn get_upload_status( State(state): State, Path(id): Path, ) -> ApiResult> { let session = state .storage .get_upload_session(id) .await .map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?; Ok(Json(session.into())) } /// Complete an upload session /// POST /api/sync/upload/{id}/complete pub async fn complete_upload( State(state): State, Path(id): Path, ) -> ApiResult { let mut session = state .storage .get_upload_session(id) .await .map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?; // Verify all chunks received let chunks = state .storage .get_upload_chunks(id) .await .map_err(|e| ApiError::internal(format!("Failed to get chunks: {}", e)))?; if chunks.len() != session.chunk_count as usize { return Err(ApiError::bad_request(format!( "Missing chunks: expected {}, got {}", session.chunk_count, chunks.len() ))); } // Require chunked upload manager to be available let manager = state .chunked_upload_manager .as_ref() .ok_or_else(|| ApiError::internal("Chunked upload manager not available"))?; // Verify and finalize the temp file let temp_path = manager .finalize(&session, &chunks) .await .map_err(|e| ApiError::internal(format!("Failed to finalize upload: {}", e)))?; // Validate and resolve target path securely let target_path = std::path::Path::new(&session.target_path); // Reject absolute paths from client if target_path.is_absolute() { return Err(ApiError::bad_request("Absolute paths are not allowed")); } // Reject empty paths if target_path.as_os_str().is_empty() { return Err(ApiError::bad_request("Empty target path not allowed")); } // Get root directory from config let config = state.config.read().await; let root = config .directories .roots .first() .cloned() .ok_or_else(|| ApiError::internal("No root directory configured"))?; drop(config); // Build candidate path let candidate = root.join(target_path); // Canonicalize root to resolve symlinks and get absolute path let root_canon = tokio::fs::canonicalize(&root) .await .map_err(|e| ApiError::internal(format!("Failed to canonicalize root: {}", e)))?; // Ensure parent directory exists before canonicalizing candidate if let Some(parent) = candidate.parent() { tokio::fs::create_dir_all(parent) .await .map_err(|e| ApiError::internal(format!("Failed to create directory: {}", e)))?; } // Try to canonicalize the candidate path (without the final file) // If it exists, canonicalize it; otherwise canonicalize parent and append filename let final_path = if let Ok(canon) = tokio::fs::canonicalize(&candidate).await { canon } else if let Some(parent) = candidate.parent() { let parent_canon = tokio::fs::canonicalize(parent) .await .map_err(|e| ApiError::internal(format!("Failed to canonicalize parent: {}", e)))?; if let Some(filename) = candidate.file_name() { parent_canon.join(filename) } else { return Err(ApiError::bad_request("Invalid target path")); } } else { return Err(ApiError::bad_request("Invalid target path")); }; // Ensure resolved path is still under root (path traversal check) if !final_path.starts_with(&root_canon) { return Err(ApiError::bad_request("Path traversal not allowed")); } // Move temp file to final location (with cross-filesystem fallback) if let Err(e) = tokio::fs::rename(&temp_path, &final_path).await { // Check for cross-filesystem error if e.kind() == std::io::ErrorKind::CrossesDevices || e.raw_os_error() == Some(18) // EXDEV on Linux { // Fallback: copy then remove tokio::fs::copy(&temp_path, &final_path) .await .map_err(|e| ApiError::internal(format!("Failed to copy file: {}", e)))?; let _ = tokio::fs::remove_file(&temp_path).await; } else { return Err(ApiError::internal(format!("Failed to move file: {}", e))); } } tracing::info!( session_id = %id, target = %final_path.display(), "completed chunked upload" ); // Mark session as completed session.status = UploadStatus::Completed; state .storage .update_upload_session(&session) .await .map_err(|e| ApiError::internal(format!("Failed to update session: {}", e)))?; // Record the sync change let entry = SyncLogEntry { id: Uuid::now_v7(), sequence: 0, change_type: SyncChangeType::Created, media_id: None, path: session.target_path.clone(), content_hash: Some(session.expected_hash.clone()), file_size: Some(session.expected_size), metadata_json: None, changed_by_device: Some(session.device_id), timestamp: Utc::now(), }; state .storage .record_sync_change(&entry) .await .map_err(|e| ApiError::internal(format!("Failed to record change: {}", e)))?; Ok(StatusCode::OK) } /// Cancel an upload session /// DELETE /api/sync/upload/{id} pub async fn cancel_upload( State(state): State, Path(id): Path, ) -> ApiResult { let mut session = state .storage .get_upload_session(id) .await .map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?; // Clean up temp file if manager is available if let Some(ref manager) = state.chunked_upload_manager { if let Err(e) = manager.cancel(id).await { tracing::warn!(session_id = %id, error = %e, "failed to clean up temp file"); } } session.status = UploadStatus::Cancelled; state .storage .update_upload_session(&session) .await .map_err(|e| ApiError::internal(format!("Failed to cancel session: {}", e)))?; Ok(StatusCode::NO_CONTENT) } /// Download a file for sync (supports Range header) /// GET /api/sync/download/{*path} pub async fn download_file( State(state): State, Path(path): Path, headers: HeaderMap, ) -> ApiResult { let item = state .storage .get_media_by_path(FilePath::new(&path)) .await .map_err(|e| ApiError::internal(format!("Failed to get media: {}", e)))? .ok_or_else(|| ApiError::not_found("File not found"))?; let file = tokio::fs::File::open(&item.path) .await .map_err(|e| ApiError::not_found(format!("File not found: {}", e)))?; let metadata = file .metadata() .await .map_err(|e| ApiError::internal(format!("Failed to get metadata: {}", e)))?; let file_size = metadata.len(); // Check for Range header if let Some(range_header) = headers.get(header::RANGE) { if let Ok(range_str) = range_header.to_str() { if let Some(range) = parse_range_header(range_str, file_size) { // Partial content response let (start, end) = range; let length = end - start + 1; let file = tokio::fs::File::open(&item.path) .await .map_err(|e| ApiError::internal(format!("Failed to reopen file: {}", e)))?; let stream = ReaderStream::new(file); let body = Body::from_stream(stream); return Ok(( StatusCode::PARTIAL_CONTENT, [ (header::CONTENT_TYPE, item.media_type.mime_type()), (header::CONTENT_LENGTH, length.to_string()), ( header::CONTENT_RANGE, format!("bytes {}-{}/{}", start, end, file_size), ), (header::ACCEPT_RANGES, "bytes".to_string()), ], body, ) .into_response()); } } } // Full content response let stream = ReaderStream::new(file); let body = Body::from_stream(stream); Ok(( StatusCode::OK, [ (header::CONTENT_TYPE, item.media_type.mime_type()), (header::CONTENT_LENGTH, file_size.to_string()), (header::ACCEPT_RANGES, "bytes".to_string()), ], body, ) .into_response()) } /// Parse HTTP Range header fn parse_range_header(range: &str, file_size: u64) -> Option<(u64, u64)> { let range = range.strip_prefix("bytes=")?; let parts: Vec<&str> = range.split('-').collect(); if parts.len() != 2 { return None; } let start: u64 = parts[0].parse().ok()?; let end: u64 = if parts[1].is_empty() { file_size - 1 } else { parts[1].parse().ok()? }; if start > end || end >= file_size { return None; } Some((start, end)) }