use std::path::Path as FilePath; use axum::{ Json, body::Body, extract::{Extension, Path, Query, State}, http::{HeaderMap, StatusCode, header}, response::IntoResponse, }; use chrono::Utc; use pinakes_core::{ config::ConflictResolution, model::ContentHash, sync::{ DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry, UploadSession, UploadStatus, generate_device_token, hash_device_token, update_device_cursor, }, }; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio_util::io::ReaderStream; use uuid::Uuid; use crate::{ auth::resolve_user_id, dto::{ AcknowledgeChangesRequest, ChangesResponse, ChunkUploadedResponse, ConflictResponse, CreateUploadSessionRequest, DeviceRegistrationResponse, DeviceResponse, GetChangesParams, RegisterDeviceRequest, ReportChangesRequest, ReportChangesResponse, ResolveConflictRequest, SyncChangeResponse, UpdateDeviceRequest, UploadSessionResponse, }, error::{ApiError, ApiResult}, state::AppState, }; const DEFAULT_CHUNK_SIZE: u64 = 4 * 1024 * 1024; // 4MB const DEFAULT_CHANGES_LIMIT: u64 = 100; /// Register a new sync device /// POST /api/sync/devices #[utoipa::path( post, path = "/api/v1/sync/devices", tag = "sync", request_body = RegisterDeviceRequest, responses( (status = 200, description = "Device registered", body = DeviceRegistrationResponse), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), (status = 500, description = "Internal server error"), ), security(("bearer_auth" = [])) )] pub async fn register_device( State(state): State, Extension(username): Extension, Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } drop(config); let user_id = resolve_user_id(&state.storage, &username).await?; let device_type = req .device_type .parse::() .map_err(|_| ApiError::bad_request("Invalid device type"))?; // Generate device token let device_token = generate_device_token(); let token_hash = hash_device_token(&device_token); let now = Utc::now(); let device = SyncDevice { id: DeviceId(Uuid::now_v7()), user_id, name: req.name, device_type, client_version: req.client_version, os_info: req.os_info, last_sync_at: None, last_seen_at: now, sync_cursor: Some(0), enabled: true, created_at: now, updated_at: now, }; let registered = state .storage .register_device(&device, &token_hash) .await .map_err(|e| { ApiError::internal(format!("Failed to register device: {e}")) })?; Ok(Json(DeviceRegistrationResponse { device: registered.into(), device_token, })) } /// List user's sync devices /// GET /api/sync/devices #[utoipa::path( get, path = "/api/v1/sync/devices", tag = "sync", responses( (status = 200, description = "List of devices", body = Vec), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn list_devices( State(state): State, Extension(username): Extension, ) -> ApiResult>> { let user_id = resolve_user_id(&state.storage, &username).await?; let devices = state .storage .list_user_devices(user_id) .await .map_err(|e| ApiError::internal(format!("Failed to list devices: {e}")))?; Ok(Json(devices.into_iter().map(Into::into).collect())) } /// Get device details /// GET /api/sync/devices/{id} #[utoipa::path( get, path = "/api/v1/sync/devices/{id}", tag = "sync", params(("id" = Uuid, Path, description = "Device ID")), responses( (status = 200, description = "Device details", body = DeviceResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn get_device( State(state): State, Extension(username): Extension, Path(id): Path, ) -> ApiResult> { let user_id = resolve_user_id(&state.storage, &username).await?; let device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden("Not authorized to access this device")); } Ok(Json(device.into())) } /// Update a device /// PUT /api/sync/devices/{id} #[utoipa::path( put, path = "/api/v1/sync/devices/{id}", tag = "sync", params(("id" = Uuid, Path, description = "Device ID")), request_body = UpdateDeviceRequest, responses( (status = 200, description = "Device updated", body = DeviceResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn update_device( State(state): State, Extension(username): Extension, Path(id): Path, Json(req): Json, ) -> ApiResult> { let user_id = resolve_user_id(&state.storage, &username).await?; let mut device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden("Not authorized to update this device")); } if let Some(name) = req.name { device.name = name; } if let Some(enabled) = req.enabled { device.enabled = enabled; } state .storage .update_device(&device) .await .map_err(|e| ApiError::internal(format!("Failed to update device: {e}")))?; Ok(Json(device.into())) } /// Delete a device /// DELETE /api/sync/devices/{id} #[utoipa::path( delete, path = "/api/v1/sync/devices/{id}", tag = "sync", params(("id" = Uuid, Path, description = "Device ID")), responses( (status = 204, description = "Device deleted"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn delete_device( State(state): State, Extension(username): Extension, Path(id): Path, ) -> ApiResult { let user_id = resolve_user_id(&state.storage, &username).await?; let device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden("Not authorized to delete this device")); } state .storage .delete_device(DeviceId(id)) .await .map_err(|e| ApiError::internal(format!("Failed to delete device: {e}")))?; Ok(StatusCode::NO_CONTENT) } /// Regenerate device token /// POST /api/sync/devices/{id}/token #[utoipa::path( post, path = "/api/v1/sync/devices/{id}/token", tag = "sync", params(("id" = Uuid, Path, description = "Device ID")), responses( (status = 200, description = "Token regenerated", body = DeviceRegistrationResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn regenerate_token( State(state): State, Extension(username): Extension, Path(id): Path, ) -> ApiResult> { let user_id = resolve_user_id(&state.storage, &username).await?; let device = state .storage .get_device(DeviceId(id)) .await .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { return Err(ApiError::forbidden( "Not authorized to regenerate token for this device", )); } // Generate new token let new_token = generate_device_token(); let token_hash = hash_device_token(&new_token); // Re-register with new token (this updates the token hash) let updated = state .storage .register_device(&device, &token_hash) .await .map_err(|e| { ApiError::internal(format!("Failed to regenerate token: {e}")) })?; Ok(Json(DeviceRegistrationResponse { device: updated.into(), device_token: new_token, })) } /// Get changes since cursor /// GET /api/sync/changes #[utoipa::path( get, path = "/api/v1/sync/changes", tag = "sync", params( ("cursor" = Option, Query, description = "Sync cursor"), ("limit" = Option, Query, description = "Max changes (max 1000)"), ), responses( (status = 200, description = "Changes since cursor", body = ChangesResponse), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn get_changes( State(state): State, Query(params): Query, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } drop(config); let cursor = params.cursor.unwrap_or(0); let limit = params.limit.unwrap_or(DEFAULT_CHANGES_LIMIT).min(1000); let changes = state .storage .get_changes_since(cursor, limit + 1) .await .map_err(|e| ApiError::internal(format!("Failed to get changes: {e}")))?; let has_more = changes.len() > limit as usize; let changes: Vec = changes .into_iter() .take(limit as usize) .map(Into::into) .collect(); let new_cursor = changes.last().map_or(cursor, |c| c.sequence); Ok(Json(ChangesResponse { changes, cursor: new_cursor, has_more, })) } /// Report local changes from client /// POST /api/sync/report #[utoipa::path( post, path = "/api/v1/sync/report", tag = "sync", request_body = ReportChangesRequest, responses( (status = 200, description = "Changes processed", body = ReportChangesResponse), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn report_changes( State(state): State, Extension(_username): Extension, Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } let conflict_resolution = config.sync.default_conflict_resolution; drop(config); let mut accepted = Vec::new(); let mut conflicts = Vec::new(); let mut upload_required = Vec::new(); for change in req.changes { // Check for conflicts if let Some(content_hash) = &change.content_hash { let server_state = state .storage .get_media_by_path(FilePath::new(&change.path)) .await .ok() .flatten(); if let Some(server_item) = server_state { let client_hash = ContentHash(content_hash.clone()); if server_item.content_hash != client_hash { // Conflict detected let conflict = SyncConflict { id: Uuid::now_v7(), device_id: DeviceId(Uuid::nil()), /* Will be set by device * context */ path: change.path.clone(), local_hash: content_hash.clone(), local_mtime: change.local_mtime.unwrap_or(0), server_hash: server_item.content_hash.to_string(), server_mtime: server_item.updated_at.timestamp(), detected_at: Utc::now(), resolved_at: None, resolution: None, }; // Auto-resolve if configured match conflict_resolution { ConflictResolution::ServerWins => { // Client should download server version accepted.push(change.path); }, ConflictResolution::ClientWins => { // Client should upload upload_required.push(change.path); }, ConflictResolution::KeepBoth | ConflictResolution::Manual => { conflicts.push(conflict.into()); }, } continue; } } } // No conflict, check if upload is needed match change.change_type.as_str() { "created" | "modified" if change.content_hash.is_some() => { upload_required.push(change.path); }, "deleted" => { // Record deletion let entry = SyncLogEntry { id: Uuid::now_v7(), sequence: 0, // Will be assigned by storage change_type: SyncChangeType::Deleted, media_id: None, path: change.path.clone(), content_hash: None, file_size: None, metadata_json: None, changed_by_device: None, timestamp: Utc::now(), }; if state.storage.record_sync_change(&entry).await.is_ok() { accepted.push(change.path); } }, _ => { accepted.push(change.path); }, } } Ok(Json(ReportChangesResponse { accepted, conflicts, upload_required, })) } /// Acknowledge processed changes /// POST /api/sync/ack #[utoipa::path( post, path = "/api/v1/sync/ack", tag = "sync", request_body = AcknowledgeChangesRequest, responses( (status = 200, description = "Changes acknowledged"), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn acknowledge_changes( State(state): State, Extension(_username): Extension, headers: HeaderMap, Json(req): Json, ) -> ApiResult { // Get device from header or context let device_token = headers .get("X-Device-Token") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?; let token_hash = hash_device_token(device_token); let device = state .storage .get_device_by_token(&token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; // Update device cursor update_device_cursor(&state.storage, device.id, req.cursor) .await .map_err(|e| ApiError::internal(format!("Failed to update cursor: {e}")))?; Ok(StatusCode::OK) } /// List unresolved conflicts /// GET /api/sync/conflicts #[utoipa::path( get, path = "/api/v1/sync/conflicts", tag = "sync", responses( (status = 200, description = "Unresolved conflicts", body = Vec), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn list_conflicts( State(state): State, Extension(_username): Extension, headers: HeaderMap, ) -> ApiResult>> { let device_token = headers .get("X-Device-Token") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?; let token_hash = hash_device_token(device_token); let device = state .storage .get_device_by_token(&token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; let conflicts = state .storage .get_unresolved_conflicts(device.id) .await .map_err(|e| ApiError::internal(format!("Failed to get conflicts: {e}")))?; Ok(Json(conflicts.into_iter().map(Into::into).collect())) } /// Resolve a sync conflict /// POST /api/sync/conflicts/{id}/resolve #[utoipa::path( post, path = "/api/v1/sync/conflicts/{id}/resolve", tag = "sync", params(("id" = Uuid, Path, description = "Conflict ID")), request_body = ResolveConflictRequest, responses( (status = 200, description = "Conflict resolved"), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn resolve_conflict( State(state): State, Extension(_username): Extension, Path(id): Path, Json(req): Json, ) -> ApiResult { let resolution = match req.resolution.as_str() { "server_wins" => ConflictResolution::ServerWins, "client_wins" => ConflictResolution::ClientWins, "keep_both" => ConflictResolution::KeepBoth, _ => return Err(ApiError::bad_request("Invalid resolution type")), }; state .storage .resolve_conflict(id, resolution) .await .map_err(|e| { ApiError::internal(format!("Failed to resolve conflict: {e}")) })?; Ok(StatusCode::OK) } /// Create an upload session for chunked upload /// POST /api/sync/upload #[utoipa::path( post, path = "/api/v1/sync/upload", tag = "sync", request_body = CreateUploadSessionRequest, responses( (status = 200, description = "Upload session created", body = UploadSessionResponse), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), ), security(("bearer_auth" = [])) )] pub async fn create_upload( State(state): State, Extension(_username): Extension, headers: HeaderMap, Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; if !config.sync.enabled { return Err(ApiError::bad_request("Sync is not enabled")); } let upload_timeout_hours = config.sync.upload_timeout_hours; drop(config); let device_token = headers .get("X-Device-Token") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?; let token_hash = hash_device_token(device_token); let device = state .storage .get_device_by_token(&token_hash) .await .map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; let chunk_size = req.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE); let chunk_count = req.expected_size.div_ceil(chunk_size); let now = Utc::now(); let session = UploadSession { id: Uuid::now_v7(), device_id: device.id, target_path: req.target_path, expected_hash: ContentHash(req.expected_hash), expected_size: req.expected_size, chunk_size, chunk_count, status: UploadStatus::Pending, created_at: now, expires_at: now + chrono::Duration::hours(upload_timeout_hours as i64), last_activity: now, }; state .storage .create_upload_session(&session) .await .map_err(|e| { ApiError::internal(format!("Failed to create upload session: {e}")) })?; // Create temp file for chunked upload if manager is available if let Some(ref manager) = state.chunked_upload_manager { manager.create_temp_file(&session).await.map_err(|e| { ApiError::internal(format!("Failed to create temp file: {e}")) })?; } Ok(Json(session.into())) } /// Upload a chunk /// PUT /api/sync/upload/{id}/chunks/{index} #[utoipa::path( put, path = "/api/v1/sync/upload/{id}/chunks/{index}", tag = "sync", params( ("id" = Uuid, Path, description = "Upload session ID"), ("index" = u64, Path, description = "Chunk index"), ), request_body(content = Vec, description = "Chunk binary data", content_type = "application/octet-stream"), responses( (status = 200, description = "Chunk received", body = ChunkUploadedResponse), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn upload_chunk( State(state): State, Path((session_id, chunk_index)): Path<(Uuid, u64)>, _headers: HeaderMap, body: axum::body::Bytes, ) -> ApiResult> { let session = state .storage .get_upload_session(session_id) .await .map_err(|e| { ApiError::not_found(format!("Upload session not found: {e}")) })?; if session.status == UploadStatus::Expired { return Err(ApiError::bad_request("Upload session has expired")); } if chunk_index >= session.chunk_count { return Err(ApiError::bad_request("Invalid chunk index")); } // Require chunked upload manager to be available let manager = state.chunked_upload_manager.as_ref().ok_or_else(|| { ApiError::internal("Chunked upload manager not available") })?; // Write chunk data to temp file let chunk = manager .write_chunk(&session, chunk_index, body.as_ref()) .await .map_err(|e| ApiError::internal(format!("Failed to write chunk: {e}")))?; // Record chunk metadata in database state .storage .record_chunk(session_id, &chunk) .await .map_err(|e| ApiError::internal(format!("Failed to record chunk: {e}")))?; Ok(Json(ChunkUploadedResponse { chunk_index, received: true, })) } /// Get upload session status /// GET /api/sync/upload/{id} #[utoipa::path( get, path = "/api/v1/sync/upload/{id}", tag = "sync", params(("id" = Uuid, Path, description = "Upload session ID")), responses( (status = 200, description = "Upload session status", body = UploadSessionResponse), (status = 401, description = "Unauthorized"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn get_upload_status( State(state): State, Path(id): Path, ) -> ApiResult> { let session = state.storage.get_upload_session(id).await.map_err(|e| { ApiError::not_found(format!("Upload session not found: {e}")) })?; Ok(Json(session.into())) } /// Complete an upload session /// POST /api/sync/upload/{id}/complete #[utoipa::path( post, path = "/api/v1/sync/upload/{id}/complete", tag = "sync", params(("id" = Uuid, Path, description = "Upload session ID")), responses( (status = 200, description = "Upload completed"), (status = 400, description = "Bad request"), (status = 401, description = "Unauthorized"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn complete_upload( State(state): State, Path(id): Path, ) -> ApiResult { let mut session = state.storage.get_upload_session(id).await.map_err(|e| { ApiError::not_found(format!("Upload session not found: {e}")) })?; // Verify all chunks received let chunks = state .storage .get_upload_chunks(id) .await .map_err(|e| ApiError::internal(format!("Failed to get chunks: {e}")))?; if chunks.len() != session.chunk_count as usize { return Err(ApiError::bad_request(format!( "Missing chunks: expected {}, got {}", session.chunk_count, chunks.len() ))); } // Require chunked upload manager to be available let manager = state.chunked_upload_manager.as_ref().ok_or_else(|| { ApiError::internal("Chunked upload manager not available") })?; // Verify and finalize the temp file let temp_path = manager.finalize(&session, &chunks).await.map_err(|e| { ApiError::internal(format!("Failed to finalize upload: {e}")) })?; // Validate and resolve target path securely let target_path = std::path::Path::new(&session.target_path); // Reject absolute paths from client if target_path.is_absolute() { return Err(ApiError::bad_request("Absolute paths are not allowed")); } // Reject empty paths if target_path.as_os_str().is_empty() { return Err(ApiError::bad_request("Empty target path not allowed")); } // Get root directory from config let config = state.config.read().await; let root = config .directories .roots .first() .cloned() .ok_or_else(|| ApiError::internal("No root directory configured"))?; drop(config); // Build candidate path let candidate = root.join(target_path); // Canonicalize root to resolve symlinks and get absolute path let root_canon = tokio::fs::canonicalize(&root).await.map_err(|e| { ApiError::internal(format!("Failed to canonicalize root: {e}")) })?; // Ensure parent directory exists before canonicalizing candidate if let Some(parent) = candidate.parent() { tokio::fs::create_dir_all(parent).await.map_err(|e| { ApiError::internal(format!("Failed to create directory: {e}")) })?; } // Try to canonicalize the candidate path (without the final file) // If it exists, canonicalize it; otherwise canonicalize parent and append // filename let final_path = if let Ok(canon) = tokio::fs::canonicalize(&candidate).await { canon } else if let Some(parent) = candidate.parent() { let parent_canon = tokio::fs::canonicalize(parent).await.map_err(|e| { ApiError::internal(format!("Failed to canonicalize parent: {e}")) })?; if let Some(filename) = candidate.file_name() { parent_canon.join(filename) } else { return Err(ApiError::bad_request("Invalid target path")); } } else { return Err(ApiError::bad_request("Invalid target path")); }; // Ensure resolved path is still under root (path traversal check) if !final_path.starts_with(&root_canon) { return Err(ApiError::bad_request("Path traversal not allowed")); } // Move temp file to final location (with cross-filesystem fallback) if let Err(e) = tokio::fs::rename(&temp_path, &final_path).await { // Check for cross-filesystem error if e.kind() == std::io::ErrorKind::CrossesDevices || e.raw_os_error() == Some(18) // EXDEV on Linux { // Fallback: copy then remove tokio::fs::copy(&temp_path, &final_path) .await .map_err(|e| ApiError::internal(format!("Failed to copy file: {e}")))?; let _ = tokio::fs::remove_file(&temp_path).await; } else { return Err(ApiError::internal(format!("Failed to move file: {e}"))); } } tracing::info!( session_id = %id, target = %final_path.display(), "completed chunked upload" ); // Mark session as completed session.status = UploadStatus::Completed; state .storage .update_upload_session(&session) .await .map_err(|e| { ApiError::internal(format!("Failed to update session: {e}")) })?; // Record the sync change let entry = SyncLogEntry { id: Uuid::now_v7(), sequence: 0, change_type: SyncChangeType::Created, media_id: None, path: session.target_path.clone(), content_hash: Some(session.expected_hash.clone()), file_size: Some(session.expected_size), metadata_json: None, changed_by_device: Some(session.device_id), timestamp: Utc::now(), }; state .storage .record_sync_change(&entry) .await .map_err(|e| ApiError::internal(format!("Failed to record change: {e}")))?; Ok(StatusCode::OK) } /// Cancel an upload session /// DELETE /api/sync/upload/{id} #[utoipa::path( delete, path = "/api/v1/sync/upload/{id}", tag = "sync", params(("id" = Uuid, Path, description = "Upload session ID")), responses( (status = 204, description = "Upload cancelled"), (status = 401, description = "Unauthorized"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn cancel_upload( State(state): State, Path(id): Path, ) -> ApiResult { let mut session = state.storage.get_upload_session(id).await.map_err(|e| { ApiError::not_found(format!("Upload session not found: {e}")) })?; // Clean up temp file if manager is available if let Some(ref manager) = state.chunked_upload_manager && let Err(e) = manager.cancel(id).await { tracing::warn!(session_id = %id, error = %e, "failed to clean up temp file"); } session.status = UploadStatus::Cancelled; state .storage .update_upload_session(&session) .await .map_err(|e| { ApiError::internal(format!("Failed to cancel session: {e}")) })?; Ok(StatusCode::NO_CONTENT) } /// Download a file for sync (supports Range header) /// GET /api/sync/download/{*path} #[utoipa::path( get, path = "/api/v1/sync/download/{path}", tag = "sync", params(("path" = String, Path, description = "File path")), responses( (status = 200, description = "File content"), (status = 206, description = "Partial content"), (status = 401, description = "Unauthorized"), (status = 404, description = "Not found"), ), security(("bearer_auth" = [])) )] pub async fn download_file( State(state): State, Path(path): Path, headers: HeaderMap, ) -> ApiResult { let item = state .storage .get_media_by_path(FilePath::new(&path)) .await .map_err(|e| ApiError::internal(format!("Failed to get media: {e}")))? .ok_or_else(|| ApiError::not_found("File not found"))?; let file = tokio::fs::File::open(&item.path) .await .map_err(|e| ApiError::not_found(format!("File not found: {e}")))?; let metadata = file .metadata() .await .map_err(|e| ApiError::internal(format!("Failed to get metadata: {e}")))?; let file_size = metadata.len(); // Check for Range header if let Some(range_header) = headers.get(header::RANGE) && let Ok(range_str) = range_header.to_str() && let Some(range) = parse_range_header(range_str, file_size) { // Partial content response let (start, end) = range; let length = end - start + 1; let mut file = tokio::fs::File::open(&item.path) .await .map_err(|e| ApiError::internal(format!("Failed to reopen file: {e}")))?; file .seek(std::io::SeekFrom::Start(start)) .await .map_err(|e| ApiError::internal(format!("Failed to seek file: {e}")))?; let stream = ReaderStream::new(file.take(length)); let body = Body::from_stream(stream); return Ok( ( StatusCode::PARTIAL_CONTENT, [ (header::CONTENT_TYPE, item.media_type.mime_type()), (header::CONTENT_LENGTH, length.to_string()), ( header::CONTENT_RANGE, format!("bytes {start}-{end}/{file_size}"), ), (header::ACCEPT_RANGES, "bytes".to_string()), ], body, ) .into_response(), ); } // Full content response let stream = ReaderStream::new(file); let body = Body::from_stream(stream); Ok( ( StatusCode::OK, [ (header::CONTENT_TYPE, item.media_type.mime_type()), (header::CONTENT_LENGTH, file_size.to_string()), (header::ACCEPT_RANGES, "bytes".to_string()), ], body, ) .into_response(), ) } /// Parse HTTP Range header fn parse_range_header(range: &str, file_size: u64) -> Option<(u64, u64)> { let range = range.strip_prefix("bytes=")?; let parts: Vec<&str> = range.split('-').collect(); if parts.len() != 2 { return None; } let start: u64 = parts[0].parse().ok()?; let end: u64 = if parts[1].is_empty() { file_size - 1 } else { parts[1].parse().ok()? }; if start > end || end >= file_size { return None; } Some((start, end)) }