diff --git a/crates/pinakes-server/src/auth.rs b/crates/pinakes-server/src/auth.rs index 6c3c989..6405612 100644 --- a/crates/pinakes-server/src/auth.rs +++ b/crates/pinakes-server/src/auth.rs @@ -9,23 +9,28 @@ use pinakes_core::config::UserRole; use crate::state::AppState; /// Constant-time string comparison to prevent timing attacks on API keys. +/// +/// Always iterates to `max(len_a, len_b)` so that neither a length difference +/// nor a byte mismatch causes an early return. fn constant_time_eq(a: &str, b: &str) -> bool { - if a.len() != b.len() { - return false; + let a = a.as_bytes(); + let b = b.as_bytes(); + let len = a.len().max(b.len()); + let mut result = a.len() ^ b.len(); // non-zero if lengths differ + for i in 0..len { + let ab = a.get(i).copied().unwrap_or(0); + let bb = b.get(i).copied().unwrap_or(0); + result |= usize::from(ab ^ bb); } - a.as_bytes() - .iter() - .zip(b.as_bytes()) - .fold(0u8, |acc, (x, y)| acc | (x ^ y)) - == 0 + result == 0 } /// Axum middleware that checks for a valid Bearer token. /// /// If `accounts.enabled == true`: look up bearer token in database session -/// store. If `accounts.enabled == false`: use existing api_key logic (unchanged -/// behavior). Skips authentication for the `/health` and `/auth/login` path -/// suffixes. +/// store. If `accounts.enabled == false`: use existing `api_key` logic +/// (unchanged behavior). Skips authentication for the `/health` and +/// `/auth/login` path suffixes. pub async fn require_auth( State(state): State, mut request: Request, @@ -58,7 +63,7 @@ pub async fn require_auth( .get("authorization") .and_then(|v| v.to_str().ok()) .and_then(|s| s.strip_prefix("Bearer ")) - .map(|s| s.to_string()); + .map(std::string::ToString::to_string); let Some(token) = token else { tracing::debug!(path = %path, "rejected: missing Authorization header"); @@ -83,7 +88,7 @@ pub async fn require_auth( // Check session expiry let now = chrono::Utc::now(); if session.expires_at < now { - let username = session.username.clone(); + let username = session.username; // Delete expired session in a bounded background task if let Ok(permit) = state.session_semaphore.clone().try_acquire_owned() { let storage = state.storage.clone(); @@ -124,7 +129,7 @@ pub async fn require_auth( // Inject role and username into request extensions request.extensions_mut().insert(role); - request.extensions_mut().insert(session.username.clone()); + request.extensions_mut().insert(session.username); } else { // Legacy API key auth let api_key = std::env::var("PINAKES_API_KEY") @@ -202,7 +207,7 @@ pub async fn require_admin(request: Request, next: Next) -> Response { } } -/// Resolve the authenticated username (from request extensions) to a UserId. +/// Resolve the authenticated username (from request extensions) to a `UserId`. /// /// Returns an error if the user cannot be found. pub async fn resolve_user_id( diff --git a/crates/pinakes-server/src/routes/analytics.rs b/crates/pinakes-server/src/routes/analytics.rs index 360fef5..19a3ef0 100644 --- a/crates/pinakes-server/src/routes/analytics.rs +++ b/crates/pinakes-server/src/routes/analytics.rs @@ -8,7 +8,19 @@ use pinakes_core::{ }; use uuid::Uuid; -use crate::{auth::resolve_user_id, dto::*, error::ApiError, state::AppState}; +use crate::{ + auth::resolve_user_id, + dto::{ + MediaResponse, + MostViewedResponse, + PaginationParams, + RecordUsageEventRequest, + WatchProgressRequest, + WatchProgressResponse, + }, + error::ApiError, + state::AppState, +}; const MAX_LIMIT: u64 = 100; @@ -87,6 +99,11 @@ pub async fn update_watch_progress( Path(id): Path, Json(req): Json, ) -> Result, ApiError> { + if !req.progress_secs.is_finite() || req.progress_secs < 0.0 { + return Err(ApiError::bad_request( + "progress_secs must be a non-negative finite number", + )); + } let user_id = resolve_user_id(&state.storage, &username).await?; state .storage diff --git a/crates/pinakes-server/src/routes/books.rs b/crates/pinakes-server/src/routes/books.rs index d716326..f513d9c 100644 --- a/crates/pinakes-server/src/routes/books.rs +++ b/crates/pinakes-server/src/routes/books.rs @@ -131,11 +131,11 @@ pub struct SearchBooksQuery { pub limit: u64, } -fn default_offset() -> u64 { +const fn default_offset() -> u64 { 0 } -fn default_limit() -> u64 { +const fn default_limit() -> u64 { 50 } @@ -178,7 +178,7 @@ pub async fn list_books( ) -> Result { let pagination = Pagination { offset: query.offset, - limit: query.limit, + limit: query.limit.min(1000), sort: None, }; @@ -290,6 +290,9 @@ pub async fn update_reading_progress( Path(media_id): Path, Json(req): Json, ) -> Result { + if req.current_page < 0 { + return Err(ApiError::bad_request("current_page must be non-negative")); + } let user_id = resolve_user_id(&state.storage, &username).await?; let media_id = MediaId(media_id); diff --git a/crates/pinakes-server/src/routes/media.rs b/crates/pinakes-server/src/routes/media.rs index 5570310..09d8410 100644 --- a/crates/pinakes-server/src/routes/media.rs +++ b/crates/pinakes-server/src/routes/media.rs @@ -8,11 +8,41 @@ use pinakes_core::{ }; use uuid::Uuid; -use crate::{dto::*, error::ApiError, state::AppState}; +use crate::{ + dto::{ + BatchCollectionRequest, + BatchDeleteRequest, + BatchImportItemResult, + BatchImportRequest, + BatchImportResponse, + BatchMoveRequest, + BatchOperationResponse, + BatchTagRequest, + BatchUpdateRequest, + DirectoryImportRequest, + DirectoryPreviewFile, + DirectoryPreviewResponse, + EmptyTrashResponse, + ImportRequest, + ImportResponse, + ImportWithOptionsRequest, + MediaCountResponse, + MediaResponse, + MoveMediaRequest, + PaginationParams, + RenameMediaRequest, + SetCustomFieldRequest, + TrashInfoResponse, + TrashResponse, + UpdateMediaRequest, + }, + error::ApiError, + state::AppState, +}; /// Apply tags and add to collection after a successful import. -/// Shared logic used by import_with_options, batch_import, and -/// import_directory_endpoint. +/// Shared logic used by `import_with_options`, `batch_import`, and +/// `import_directory_endpoint`. async fn apply_import_post_processing( storage: &DynStorageBackend, media_id: MediaId, @@ -59,6 +89,17 @@ pub async fn import_media( ) -> Result, ApiError> { let result = pinakes_core::import::import_file(&state.storage, &req.path).await?; + + if let Some(ref dispatcher) = state.webhook_dispatcher { + let id = result.media_id.0.to_string(); + dispatcher.dispatch(pinakes_core::webhooks::WebhookEvent::MediaCreated { + media_id: id.clone(), + }); + dispatcher.dispatch( + pinakes_core::webhooks::WebhookEvent::ImportCompleted { media_id: id }, + ); + } + Ok(Json(ImportResponse { media_id: result.media_id.0.to_string(), was_duplicate: result.was_duplicate, @@ -150,6 +191,12 @@ pub async fn update_media( ) .await?; + if let Some(ref dispatcher) = state.webhook_dispatcher { + dispatcher.dispatch(pinakes_core::webhooks::WebhookEvent::MediaUpdated { + media_id: item.id.0.to_string(), + }); + } + Ok(Json(MediaResponse::from(item))) } @@ -473,7 +520,7 @@ pub async fn preview_directory( })?; let recursive = req .get("recursive") - .and_then(|v| v.as_bool()) + .and_then(serde_json::Value::as_bool) .unwrap_or(true); let dir = std::path::PathBuf::from(path_str); if !dir.is_dir() { @@ -515,8 +562,7 @@ pub async fn preview_directory( // Skip hidden files/dirs if path .file_name() - .map(|n| n.to_string_lossy().starts_with('.')) - .unwrap_or(false) + .is_some_and(|n| n.to_string_lossy().starts_with('.')) { continue; } @@ -528,7 +574,7 @@ pub async fn preview_directory( && let Some(mt) = pinakes_core::media_type::MediaType::from_path(&path) { - let size = entry.metadata().ok().map(|m| m.len()).unwrap_or(0); + let size = entry.metadata().ok().map_or(0, |m| m.len()); let file_name = path .file_name() .map(|n| n.to_string_lossy().to_string()) @@ -579,8 +625,7 @@ pub async fn set_custom_field( if req.value.len() > MAX_LONG_TEXT { return Err(ApiError( pinakes_core::error::PinakesError::InvalidOperation(format!( - "field value exceeds {} characters", - MAX_LONG_TEXT + "field value exceeds {MAX_LONG_TEXT} characters" )), )); } @@ -747,7 +792,7 @@ pub async fn batch_add_to_collection( ) .await { - Ok(_) => processed += 1, + Ok(()) => processed += 1, Err(e) => errors.push(format!("{media_id}: {e}")), } } @@ -1113,10 +1158,7 @@ pub async fn permanent_delete_media( Query(params): Query>, ) -> Result, ApiError> { let media_id = MediaId(id); - let permanent = params - .get("permanent") - .map(|v| v == "true") - .unwrap_or(false); + let permanent = params.get("permanent").is_some_and(|v| v == "true"); if permanent { // Get item info before delete @@ -1161,6 +1203,12 @@ pub async fn permanent_delete_media( tracing::warn!(path = %thumb_path.display(), error = %e, "failed to remove thumbnail"); } + if let Some(ref dispatcher) = state.webhook_dispatcher { + dispatcher.dispatch(pinakes_core::webhooks::WebhookEvent::MediaDeleted { + media_id: id.to_string(), + }); + } + Ok(Json( serde_json::json!({"deleted": true, "permanent": true}), )) diff --git a/crates/pinakes-server/src/routes/shares.rs b/crates/pinakes-server/src/routes/shares.rs index c6236c6..06b7e6d 100644 --- a/crates/pinakes-server/src/routes/shares.rs +++ b/crates/pinakes-server/src/routes/shares.rs @@ -13,11 +13,13 @@ use pinakes_core::{ ShareActivity, ShareActivityAction, ShareId, + ShareMutatePermissions, ShareNotification, ShareNotificationType, SharePermissions, ShareRecipient, ShareTarget, + ShareViewPermissions, generate_share_token, hash_share_password, verify_share_password, @@ -37,6 +39,7 @@ use crate::{ ShareActivityResponse, ShareNotificationResponse, ShareResponse, + SharedContentResponse, UpdateShareRequest, }, error::{ApiError, ApiResult}, @@ -51,14 +54,25 @@ pub async fn create_share( Json(req): Json, ) -> ApiResult> { let config = state.config.read().await; - if !config.sharing.enabled { + if !config.sharing.enabled() { return Err(ApiError::bad_request("Sharing is not enabled")); } // Validate public links are allowed - if req.recipient_type == "public_link" && !config.sharing.allow_public_links { + if req.recipient_type == "public_link" && !config.sharing.allow_public_links() + { return Err(ApiError::bad_request("Public links are not allowed")); } + + // Enforce password requirement for public links if configured + if req.recipient_type == "public_link" + && config.sharing.require_public_link_password + && req.password.is_none() + { + return Err(ApiError::bad_request( + "Public links require a password per server policy", + )); + } drop(config); let owner_id = resolve_user_id(&state.storage, &username).await?; @@ -124,12 +138,16 @@ pub async fn create_share( // Parse permissions let permissions = if let Some(perms) = req.permissions { SharePermissions { - can_view: perms.can_view.unwrap_or(true), - can_download: perms.can_download.unwrap_or(false), - can_edit: perms.can_edit.unwrap_or(false), - can_delete: perms.can_delete.unwrap_or(false), - can_reshare: perms.can_reshare.unwrap_or(false), - can_add: perms.can_add.unwrap_or(false), + view: ShareViewPermissions { + can_view: perms.can_view.unwrap_or(true), + can_download: perms.can_download.unwrap_or(false), + can_reshare: perms.can_reshare.unwrap_or(false), + }, + mutate: ShareMutatePermissions { + can_edit: perms.can_edit.unwrap_or(false), + can_delete: perms.can_delete.unwrap_or(false), + can_add: perms.can_add.unwrap_or(false), + }, } } else { SharePermissions::view_only() @@ -156,9 +174,10 @@ pub async fn create_share( updated_at: Utc::now(), }; - let created = state.storage.create_share(&share).await.map_err(|e| { - ApiError::internal(format!("Failed to create share: {}", e)) - })?; + let created = + state.storage.create_share(&share).await.map_err(|e| { + ApiError::internal(format!("Failed to create share: {e}")) + })?; // Send notification to recipient if it's a user share if let ShareRecipient::User { user_id } = &created.recipient { @@ -198,7 +217,7 @@ pub async fn list_outgoing( .storage .list_shares_by_owner(user_id, &pagination) .await - .map_err(|e| ApiError::internal(format!("Failed to list shares: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Failed to list shares: {e}")))?; Ok(Json(shares.into_iter().map(Into::into).collect())) } @@ -221,7 +240,7 @@ pub async fn list_incoming( .storage .list_shares_for_user(user_id, &pagination) .await - .map_err(|e| ApiError::internal(format!("Failed to list shares: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Failed to list shares: {e}")))?; Ok(Json(shares.into_iter().map(Into::into).collect())) } @@ -238,7 +257,7 @@ pub async fn get_share( .storage .get_share(ShareId(id)) .await - .map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Share not found: {e}")))?; // Check authorization let is_owner = share.owner_id == user_id; @@ -269,7 +288,7 @@ pub async fn update_share( .storage .get_share(ShareId(id)) .await - .map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Share not found: {e}")))?; // Only owner can update if share.owner_id != user_id { @@ -279,14 +298,22 @@ pub async fn update_share( // Update fields if let Some(perms) = req.permissions { share.permissions = SharePermissions { - can_view: perms.can_view.unwrap_or(share.permissions.can_view), - can_download: perms - .can_download - .unwrap_or(share.permissions.can_download), - can_edit: perms.can_edit.unwrap_or(share.permissions.can_edit), - can_delete: perms.can_delete.unwrap_or(share.permissions.can_delete), - can_reshare: perms.can_reshare.unwrap_or(share.permissions.can_reshare), - can_add: perms.can_add.unwrap_or(share.permissions.can_add), + view: ShareViewPermissions { + can_view: perms.can_view.unwrap_or(share.permissions.view.can_view), + can_download: perms + .can_download + .unwrap_or(share.permissions.view.can_download), + can_reshare: perms + .can_reshare + .unwrap_or(share.permissions.view.can_reshare), + }, + mutate: ShareMutatePermissions { + can_edit: perms.can_edit.unwrap_or(share.permissions.mutate.can_edit), + can_delete: perms + .can_delete + .unwrap_or(share.permissions.mutate.can_delete), + can_add: perms.can_add.unwrap_or(share.permissions.mutate.can_add), + }, }; } @@ -304,9 +331,10 @@ pub async fn update_share( share.updated_at = Utc::now(); - let updated = state.storage.update_share(&share).await.map_err(|e| { - ApiError::internal(format!("Failed to update share: {}", e)) - })?; + let updated = + state.storage.update_share(&share).await.map_err(|e| { + ApiError::internal(format!("Failed to update share: {e}")) + })?; // Notify recipient of update if let ShareRecipient::User { user_id } = &updated.recipient { @@ -339,7 +367,7 @@ pub async fn delete_share( .storage .get_share(ShareId(id)) .await - .map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Share not found: {e}")))?; // Only owner can delete if share.owner_id != user_id { @@ -362,9 +390,11 @@ pub async fn delete_share( } } - state.storage.delete_share(ShareId(id)).await.map_err(|e| { - ApiError::internal(format!("Failed to delete share: {}", e)) - })?; + state + .storage + .delete_share(ShareId(id)) + .await + .map_err(|e| ApiError::internal(format!("Failed to delete share: {e}")))?; Ok(StatusCode::NO_CONTENT) } @@ -386,7 +416,7 @@ pub async fn batch_delete( .storage .get_share(*share_id) .await - .map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Share not found: {e}")))?; if share.owner_id != user_id { return Err(ApiError::forbidden(format!( @@ -400,9 +430,7 @@ pub async fn batch_delete( .storage .batch_delete_shares(&share_ids) .await - .map_err(|e| { - ApiError::internal(format!("Failed to batch delete: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to batch delete: {e}")))?; Ok(Json(serde_json::json!({ "deleted": deleted }))) } @@ -414,12 +442,12 @@ pub async fn access_shared( Path(token): Path, Query(params): Query, ConnectInfo(addr): ConnectInfo, -) -> ApiResult> { +) -> ApiResult> { let share = state .storage .get_share_by_token(&token) .await - .map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Share not found: {e}")))?; // Check expiration if let Some(expires_at) = share.expires_at @@ -463,9 +491,7 @@ pub async fn access_shared( .storage .record_share_access(share.id) .await - .map_err(|e| { - ApiError::internal(format!("Failed to record access: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to record access: {e}")))?; // Log the access let activity = ShareActivity { @@ -482,17 +508,87 @@ pub async fn access_shared( // Return the shared content match &share.target { ShareTarget::Media { media_id } => { - let item = - state.storage.get_media(*media_id).await.map_err(|e| { - ApiError::not_found(format!("Media not found: {}", e)) + let item = state + .storage + .get_media(*media_id) + .await + .map_err(|e| ApiError::not_found(format!("Media not found: {e}")))?; + + Ok(Json(SharedContentResponse::Single(MediaResponse::from( + item, + )))) + }, + ShareTarget::Collection { collection_id } => { + let members = state + .storage + .get_collection_members(*collection_id) + .await + .map_err(|e| { + ApiError::not_found(format!("Collection not found: {e}")) })?; - Ok(Json(item.into())) + let items: Vec = + members.into_iter().map(MediaResponse::from).collect(); + + Ok(Json(SharedContentResponse::Multiple { items })) }, - _ => { - Err(ApiError::bad_request( - "Collection/tag sharing not yet fully implemented", - )) + ShareTarget::Tag { tag_id } => { + let tag = state + .storage + .get_tag(*tag_id) + .await + .map_err(|e| ApiError::not_found(format!("Tag not found: {e}")))?; + + let request = pinakes_core::search::SearchRequest { + query: pinakes_core::search::SearchQuery::TagFilter( + tag.name.clone(), + ), + sort: pinakes_core::search::SortOrder::default(), + pagination: Pagination::new(0, 100, None), + }; + + let results = state + .storage + .search(&request) + .await + .map_err(|e| ApiError::internal(format!("Search failed: {e}")))?; + + let items: Vec = + results.items.into_iter().map(MediaResponse::from).collect(); + + Ok(Json(SharedContentResponse::Multiple { items })) + }, + ShareTarget::SavedSearch { search_id } => { + let saved = + state + .storage + .get_saved_search(*search_id) + .await + .map_err(|e| { + ApiError::not_found(format!("Saved search not found: {e}")) + })?; + + let parsed_query = pinakes_core::search::parse_search_query(&saved.query) + .map_err(|e| { + ApiError::internal(format!("Failed to parse search query: {e}")) + })?; + + let request = pinakes_core::search::SearchRequest { + query: parsed_query, + sort: pinakes_core::search::SortOrder::default(), + pagination: Pagination::new(0, 100, None), + }; + + let results = state + .storage + .search(&request) + .await + .map_err(|e| ApiError::internal(format!("Search failed: {e}")))?; + + let items: Vec = + results.items.into_iter().map(MediaResponse::from).collect(); + + Ok(Json(SharedContentResponse::Multiple { items })) }, } } @@ -510,7 +606,7 @@ pub async fn get_activity( .storage .get_share(ShareId(id)) .await - .map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Share not found: {e}")))?; // Only owner can view activity if share.owner_id != user_id { @@ -529,9 +625,7 @@ pub async fn get_activity( .storage .get_share_activity(ShareId(id), &pagination) .await - .map_err(|e| { - ApiError::internal(format!("Failed to get activity: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to get activity: {e}")))?; Ok(Json(activity.into_iter().map(Into::into).collect())) } @@ -548,7 +642,7 @@ pub async fn get_notifications( .get_unread_notifications(user_id) .await .map_err(|e| { - ApiError::internal(format!("Failed to get notifications: {}", e)) + ApiError::internal(format!("Failed to get notifications: {e}")) })?; Ok(Json(notifications.into_iter().map(Into::into).collect())) @@ -558,16 +652,15 @@ pub async fn get_notifications( /// POST /api/notifications/shares/{id}/read pub async fn mark_notification_read( State(state): State, - Extension(_username): Extension, + Extension(username): Extension, Path(id): Path, ) -> ApiResult { + let user_id = resolve_user_id(&state.storage, &username).await?; state .storage - .mark_notification_read(id) + .mark_notification_read(id, user_id) .await - .map_err(|e| { - ApiError::internal(format!("Failed to mark as read: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to mark as read: {e}")))?; Ok(StatusCode::OK) } @@ -584,7 +677,7 @@ pub async fn mark_all_read( .mark_all_notifications_read(user_id) .await .map_err(|e| { - ApiError::internal(format!("Failed to mark all as read: {}", e)) + ApiError::internal(format!("Failed to mark all as read: {e}")) })?; Ok(StatusCode::OK) diff --git a/crates/pinakes-server/src/routes/sync.rs b/crates/pinakes-server/src/routes/sync.rs index 802563a..e2ef48d 100644 --- a/crates/pinakes-server/src/routes/sync.rs +++ b/crates/pinakes-server/src/routes/sync.rs @@ -25,6 +25,7 @@ use pinakes_core::{ update_device_cursor, }, }; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio_util::io::ReaderStream; use uuid::Uuid; @@ -99,7 +100,7 @@ pub async fn register_device( .register_device(&device, &token_hash) .await .map_err(|e| { - ApiError::internal(format!("Failed to register device: {}", e)) + ApiError::internal(format!("Failed to register device: {e}")) })?; Ok(Json(DeviceRegistrationResponse { @@ -115,14 +116,11 @@ pub async fn list_devices( Extension(username): Extension, ) -> ApiResult>> { let user_id = resolve_user_id(&state.storage, &username).await?; - let devices = - state - .storage - .list_user_devices(user_id) - .await - .map_err(|e| { - ApiError::internal(format!("Failed to list devices: {}", e)) - })?; + let devices = state + .storage + .list_user_devices(user_id) + .await + .map_err(|e| ApiError::internal(format!("Failed to list devices: {e}")))?; Ok(Json(devices.into_iter().map(Into::into).collect())) } @@ -139,7 +137,7 @@ pub async fn get_device( .storage .get_device(DeviceId(id)) .await - .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { @@ -162,7 +160,7 @@ pub async fn update_device( .storage .get_device(DeviceId(id)) .await - .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { @@ -176,9 +174,11 @@ pub async fn update_device( device.enabled = enabled; } - state.storage.update_device(&device).await.map_err(|e| { - ApiError::internal(format!("Failed to update device: {}", e)) - })?; + state + .storage + .update_device(&device) + .await + .map_err(|e| ApiError::internal(format!("Failed to update device: {e}")))?; Ok(Json(device.into())) } @@ -195,7 +195,7 @@ pub async fn delete_device( .storage .get_device(DeviceId(id)) .await - .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { @@ -206,9 +206,7 @@ pub async fn delete_device( .storage .delete_device(DeviceId(id)) .await - .map_err(|e| { - ApiError::internal(format!("Failed to delete device: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to delete device: {e}")))?; Ok(StatusCode::NO_CONTENT) } @@ -225,7 +223,7 @@ pub async fn regenerate_token( .storage .get_device(DeviceId(id)) .await - .map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?; // Verify ownership if device.user_id != user_id { @@ -244,7 +242,7 @@ pub async fn regenerate_token( .register_device(&device, &token_hash) .await .map_err(|e| { - ApiError::internal(format!("Failed to regenerate token: {}", e)) + ApiError::internal(format!("Failed to regenerate token: {e}")) })?; Ok(Json(DeviceRegistrationResponse { @@ -272,7 +270,7 @@ pub async fn get_changes( .storage .get_changes_since(cursor, limit + 1) .await - .map_err(|e| ApiError::internal(format!("Failed to get changes: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Failed to get changes: {e}")))?; let has_more = changes.len() > limit as usize; let changes: Vec = changes @@ -281,7 +279,7 @@ pub async fn get_changes( .map(Into::into) .collect(); - let new_cursor = changes.last().map(|c| c.sequence).unwrap_or(cursor); + let new_cursor = changes.last().map_or(cursor, |c| c.sequence); Ok(Json(ChangesResponse { changes, @@ -357,12 +355,8 @@ pub async fn report_changes( // No conflict, check if upload is needed match change.change_type.as_str() { - "created" | "modified" => { - if change.content_hash.is_some() { - upload_required.push(change.path); - } else { - accepted.push(change.path); - } + "created" | "modified" if change.content_hash.is_some() => { + upload_required.push(change.path); }, "deleted" => { // Record deletion @@ -415,15 +409,13 @@ pub async fn acknowledge_changes( .storage .get_device_by_token(&token_hash) .await - .map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))? + .map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; // Update device cursor update_device_cursor(&state.storage, device.id, req.cursor) .await - .map_err(|e| { - ApiError::internal(format!("Failed to update cursor: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to update cursor: {e}")))?; Ok(StatusCode::OK) } @@ -445,16 +437,14 @@ pub async fn list_conflicts( .storage .get_device_by_token(&token_hash) .await - .map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))? + .map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; let conflicts = state .storage .get_unresolved_conflicts(device.id) .await - .map_err(|e| { - ApiError::internal(format!("Failed to get conflicts: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to get conflicts: {e}")))?; Ok(Json(conflicts.into_iter().map(Into::into).collect())) } @@ -479,7 +469,7 @@ pub async fn resolve_conflict( .resolve_conflict(id, resolution) .await .map_err(|e| { - ApiError::internal(format!("Failed to resolve conflict: {}", e)) + ApiError::internal(format!("Failed to resolve conflict: {e}")) })?; Ok(StatusCode::OK) @@ -510,7 +500,7 @@ pub async fn create_upload( .storage .get_device_by_token(&token_hash) .await - .map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))? + .map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))? .ok_or_else(|| ApiError::unauthorized("Invalid device token"))?; let chunk_size = req.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE); @@ -536,13 +526,13 @@ pub async fn create_upload( .create_upload_session(&session) .await .map_err(|e| { - ApiError::internal(format!("Failed to create upload session: {}", e)) + ApiError::internal(format!("Failed to create upload session: {e}")) })?; // Create temp file for chunked upload if manager is available if let Some(ref manager) = state.chunked_upload_manager { manager.create_temp_file(&session).await.map_err(|e| { - ApiError::internal(format!("Failed to create temp file: {}", e)) + ApiError::internal(format!("Failed to create temp file: {e}")) })?; } @@ -563,7 +553,7 @@ pub async fn upload_chunk( .get_upload_session(session_id) .await .map_err(|e| { - ApiError::not_found(format!("Upload session not found: {}", e)) + ApiError::not_found(format!("Upload session not found: {e}")) })?; if session.status == UploadStatus::Expired { @@ -583,16 +573,14 @@ pub async fn upload_chunk( let chunk = manager .write_chunk(&session, chunk_index, body.as_ref()) .await - .map_err(|e| ApiError::internal(format!("Failed to write chunk: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Failed to write chunk: {e}")))?; // Record chunk metadata in database state .storage .record_chunk(session_id, &chunk) .await - .map_err(|e| { - ApiError::internal(format!("Failed to record chunk: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to record chunk: {e}")))?; Ok(Json(ChunkUploadedResponse { chunk_index, @@ -607,7 +595,7 @@ pub async fn get_upload_status( Path(id): Path, ) -> ApiResult> { let session = state.storage.get_upload_session(id).await.map_err(|e| { - ApiError::not_found(format!("Upload session not found: {}", e)) + ApiError::not_found(format!("Upload session not found: {e}")) })?; Ok(Json(session.into())) @@ -621,14 +609,15 @@ pub async fn complete_upload( ) -> ApiResult { let mut session = state.storage.get_upload_session(id).await.map_err(|e| { - ApiError::not_found(format!("Upload session not found: {}", e)) + ApiError::not_found(format!("Upload session not found: {e}")) })?; // Verify all chunks received - let chunks = - state.storage.get_upload_chunks(id).await.map_err(|e| { - ApiError::internal(format!("Failed to get chunks: {}", e)) - })?; + let chunks = state + .storage + .get_upload_chunks(id) + .await + .map_err(|e| ApiError::internal(format!("Failed to get chunks: {e}")))?; if chunks.len() != session.chunk_count as usize { return Err(ApiError::bad_request(format!( @@ -645,7 +634,7 @@ pub async fn complete_upload( // Verify and finalize the temp file let temp_path = manager.finalize(&session, &chunks).await.map_err(|e| { - ApiError::internal(format!("Failed to finalize upload: {}", e)) + ApiError::internal(format!("Failed to finalize upload: {e}")) })?; // Validate and resolve target path securely @@ -676,13 +665,13 @@ pub async fn complete_upload( // Canonicalize root to resolve symlinks and get absolute path let root_canon = tokio::fs::canonicalize(&root).await.map_err(|e| { - ApiError::internal(format!("Failed to canonicalize root: {}", e)) + ApiError::internal(format!("Failed to canonicalize root: {e}")) })?; // Ensure parent directory exists before canonicalizing candidate if let Some(parent) = candidate.parent() { tokio::fs::create_dir_all(parent).await.map_err(|e| { - ApiError::internal(format!("Failed to create directory: {}", e)) + ApiError::internal(format!("Failed to create directory: {e}")) })?; } @@ -694,7 +683,7 @@ pub async fn complete_upload( canon } else if let Some(parent) = candidate.parent() { let parent_canon = tokio::fs::canonicalize(parent).await.map_err(|e| { - ApiError::internal(format!("Failed to canonicalize parent: {}", e)) + ApiError::internal(format!("Failed to canonicalize parent: {e}")) })?; if let Some(filename) = candidate.file_name() { @@ -721,13 +710,11 @@ pub async fn complete_upload( // Fallback: copy then remove tokio::fs::copy(&temp_path, &final_path) .await - .map_err(|e| { - ApiError::internal(format!("Failed to copy file: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to copy file: {e}")))?; let _ = tokio::fs::remove_file(&temp_path).await; } else { - return Err(ApiError::internal(format!("Failed to move file: {}", e))); + return Err(ApiError::internal(format!("Failed to move file: {e}"))); } } @@ -744,7 +731,7 @@ pub async fn complete_upload( .update_upload_session(&session) .await .map_err(|e| { - ApiError::internal(format!("Failed to update session: {}", e)) + ApiError::internal(format!("Failed to update session: {e}")) })?; // Record the sync change @@ -765,9 +752,7 @@ pub async fn complete_upload( .storage .record_sync_change(&entry) .await - .map_err(|e| { - ApiError::internal(format!("Failed to record change: {}", e)) - })?; + .map_err(|e| ApiError::internal(format!("Failed to record change: {e}")))?; Ok(StatusCode::OK) } @@ -780,7 +765,7 @@ pub async fn cancel_upload( ) -> ApiResult { let mut session = state.storage.get_upload_session(id).await.map_err(|e| { - ApiError::not_found(format!("Upload session not found: {}", e)) + ApiError::not_found(format!("Upload session not found: {e}")) })?; // Clean up temp file if manager is available @@ -796,7 +781,7 @@ pub async fn cancel_upload( .update_upload_session(&session) .await .map_err(|e| { - ApiError::internal(format!("Failed to cancel session: {}", e)) + ApiError::internal(format!("Failed to cancel session: {e}")) })?; Ok(StatusCode::NO_CONTENT) @@ -813,16 +798,17 @@ pub async fn download_file( .storage .get_media_by_path(FilePath::new(&path)) .await - .map_err(|e| ApiError::internal(format!("Failed to get media: {}", e)))? + .map_err(|e| ApiError::internal(format!("Failed to get media: {e}")))? .ok_or_else(|| ApiError::not_found("File not found"))?; let file = tokio::fs::File::open(&item.path) .await - .map_err(|e| ApiError::not_found(format!("File not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("File not found: {e}")))?; - let metadata = file.metadata().await.map_err(|e| { - ApiError::internal(format!("Failed to get metadata: {}", e)) - })?; + let metadata = file + .metadata() + .await + .map_err(|e| ApiError::internal(format!("Failed to get metadata: {e}")))?; let file_size = metadata.len(); @@ -835,11 +821,15 @@ pub async fn download_file( let (start, end) = range; let length = end - start + 1; - let file = tokio::fs::File::open(&item.path).await.map_err(|e| { - ApiError::internal(format!("Failed to reopen file: {}", e)) - })?; + let mut file = tokio::fs::File::open(&item.path) + .await + .map_err(|e| ApiError::internal(format!("Failed to reopen file: {e}")))?; + file + .seek(std::io::SeekFrom::Start(start)) + .await + .map_err(|e| ApiError::internal(format!("Failed to seek file: {e}")))?; - let stream = ReaderStream::new(file); + let stream = ReaderStream::new(file.take(length)); let body = Body::from_stream(stream); return Ok( @@ -850,7 +840,7 @@ pub async fn download_file( (header::CONTENT_LENGTH, length.to_string()), ( header::CONTENT_RANGE, - format!("bytes {}-{}/{}", start, end, file_size), + format!("bytes {start}-{end}/{file_size}"), ), (header::ACCEPT_RANGES, "bytes".to_string()), ], diff --git a/crates/pinakes-server/src/routes/upload.rs b/crates/pinakes-server/src/routes/upload.rs index 9db7932..947757f 100644 --- a/crates/pinakes-server/src/routes/upload.rs +++ b/crates/pinakes-server/src/routes/upload.rs @@ -52,22 +52,21 @@ pub async fn upload_file( .next_field() .await .map_err(|e| { - ApiError::bad_request(format!("Failed to read multipart field: {}", e)) + ApiError::bad_request(format!("Failed to read multipart field: {e}")) })? .ok_or_else(|| ApiError::bad_request("No file provided"))?; let original_filename = field .file_name() - .map(|s| s.to_string()) - .unwrap_or_else(|| "unknown".to_string()); + .map_or_else(|| "unknown".to_string(), std::string::ToString::to_string); - let content_type = field - .content_type() - .map(|s| s.to_string()) - .unwrap_or_else(|| "application/octet-stream".to_string()); + let content_type = field.content_type().map_or_else( + || "application/octet-stream".to_string(), + std::string::ToString::to_string, + ); let data = field.bytes().await.map_err(|e| { - ApiError::bad_request(format!("Failed to read file data: {}", e)) + ApiError::bad_request(format!("Failed to read file data: {e}")) })?; // Process the upload @@ -79,7 +78,7 @@ pub async fn upload_file( Some(&content_type), ) .await - .map_err(|e| ApiError::internal(format!("Upload failed: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Upload failed: {e}")))?; Ok(Json(result.into())) } @@ -95,7 +94,7 @@ pub async fn download_file( .storage .get_media(media_id) .await - .map_err(|e| ApiError::not_found(format!("Media not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Media not found: {e}")))?; let managed_storage = state .managed_storage @@ -107,7 +106,7 @@ pub async fn download_file( // For external files, stream from their original path let file = tokio::fs::File::open(&item.path) .await - .map_err(|e| ApiError::not_found(format!("File not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("File not found: {e}")))?; let stream = ReaderStream::new(file); let body = axum::body::Body::from_stream(stream); @@ -132,7 +131,7 @@ pub async fn download_file( let file = managed_storage .open(&item.content_hash) .await - .map_err(|e| ApiError::not_found(format!("Blob not found: {}", e)))?; + .map_err(|e| ApiError::not_found(format!("Blob not found: {e}")))?; let stream = ReaderStream::new(file); let body = axum::body::Body::from_stream(stream); @@ -171,7 +170,7 @@ pub async fn move_to_managed( media_id, ) .await - .map_err(|e| ApiError::internal(format!("Migration failed: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Migration failed: {e}")))?; Ok(StatusCode::NO_CONTENT) } @@ -185,7 +184,7 @@ pub async fn managed_stats( .storage .managed_storage_stats() .await - .map_err(|e| ApiError::internal(format!("Failed to get stats: {}", e)))?; + .map_err(|e| ApiError::internal(format!("Failed to get stats: {e}")))?; Ok(Json(stats.into())) }