From 52f0b5defc2ad93a11872ad7b7d665fa1fef2c7f Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 00:42:14 +0300 Subject: [PATCH] pinakes-server: wire backup, session refresh, webhooks, and rate limit config Signed-off-by: NotAShelf Change-Id: If2855d44cc700c0f65a5f5ac850ee3866a6a6964 --- crates/pinakes-server/src/app.rs | 145 ++++++++++++------- crates/pinakes-server/src/error.rs | 18 +-- crates/pinakes-server/src/main.rs | 73 +++++++--- crates/pinakes-server/src/routes/auth.rs | 55 +++++-- crates/pinakes-server/src/routes/backup.rs | 47 ++++++ crates/pinakes-server/src/routes/mod.rs | 1 + crates/pinakes-server/src/routes/webhooks.rs | 21 ++- crates/pinakes-server/src/state.rs | 2 + 8 files changed, 257 insertions(+), 105 deletions(-) create mode 100644 crates/pinakes-server/src/routes/backup.rs diff --git a/crates/pinakes-server/src/app.rs b/crates/pinakes-server/src/app.rs index 778a2fa..39a2509 100644 --- a/crates/pinakes-server/src/app.rs +++ b/crates/pinakes-server/src/app.rs @@ -18,62 +18,69 @@ use tower_http::{ use crate::{auth, routes, state::AppState}; /// Create the router with optional TLS configuration for HSTS headers -pub fn create_router(state: AppState) -> Router { - create_router_with_tls(state, None) +pub fn create_router( + state: AppState, + rate_limits: &pinakes_core::config::RateLimitConfig, +) -> Router { + create_router_with_tls(state, rate_limits, None) +} + +/// Build a governor rate limiter from per-second and burst-size values. +/// Panics if the config is invalid (callers must validate before use). +fn build_governor( + per_second: u64, + burst_size: u32, +) -> Arc< + tower_governor::governor::GovernorConfig< + tower_governor::key_extractor::PeerIpKeyExtractor, + governor::middleware::NoOpMiddleware, + >, +> { + Arc::new( + GovernorConfigBuilder::default() + .per_second(per_second) + .burst_size(burst_size) + .finish() + .expect("rate limit config was validated at startup"), + ) } /// Create the router with TLS configuration for security headers pub fn create_router_with_tls( state: AppState, + rate_limits: &pinakes_core::config::RateLimitConfig, tls_config: Option<&pinakes_core::config::TlsConfig>, ) -> Router { - // Global rate limit: 100 requests/sec per IP - let global_governor = Arc::new( - GovernorConfigBuilder::default() - .per_second(1) - .burst_size(100) - .finish() - .expect("valid global rate limit config"), + let global_governor = build_governor( + rate_limits.global_per_second, + rate_limits.global_burst_size, ); - - // Strict rate limit for login: 5 requests/min per IP - let login_governor = Arc::new( - GovernorConfigBuilder::default() - .per_second(12) // replenish one every 12 seconds - .burst_size(5) - .finish() - .expect("valid login rate limit config"), + let login_governor = + build_governor(rate_limits.login_per_second, rate_limits.login_burst_size); + let search_governor = build_governor( + rate_limits.search_per_second, + rate_limits.search_burst_size, ); - - // Rate limit for search: 10 requests/min per IP - let search_governor = Arc::new( - GovernorConfigBuilder::default() - .per_second(6) // replenish one every 6 seconds (10/min) - .burst_size(10) - .finish() - .expect("valid search rate limit config"), - ); - - // Rate limit for streaming: 5 requests per IP (very restrictive for - // concurrent streams) - let stream_governor = Arc::new( - GovernorConfigBuilder::default() - .per_second(60) // replenish slowly (one per minute) - .burst_size(5) // max 5 concurrent connections - .finish() - .expect("valid stream rate limit config"), + let stream_governor = build_governor( + rate_limits.stream_per_second, + rate_limits.stream_burst_size, ); + let share_governor = + build_governor(rate_limits.share_per_second, rate_limits.share_burst_size); // Login route with strict rate limiting let login_route = Router::new() .route("/auth/login", post(routes::auth::login)) .layer(GovernorLayer::new(login_governor)); + // Share routes with dedicated rate limiting + let share_routes = Router::new() + .route("/s/{token}", get(routes::social::access_shared_media)) + .route("/shared/{token}", get(routes::shares::access_shared)) + .layer(GovernorLayer::new(share_governor)); + // Public routes (no auth required) let public_routes = Router::new() - .route("/s/{token}", get(routes::social::access_shared_media)) - // Enhanced sharing: public share access - .route("/shared/{token}", get(routes::shares::access_shared)) // Kubernetes-style health probes (no auth required for orchestration) .route("/health/live", get(routes::health::liveness)) .route("/health/ready", get(routes::health::readiness)); @@ -139,6 +146,7 @@ pub fn create_router_with_tls( // Auth endpoints (self-service); login is handled separately with a stricter rate limit .route("/auth/logout", post(routes::auth::logout)) .route("/auth/me", get(routes::auth::me)) + .route("/auth/refresh", post(routes::auth::refresh)) .route("/auth/revoke-all", post(routes::auth::revoke_all_sessions)) // Social: ratings & comments (read) .route( @@ -469,6 +477,7 @@ pub fn create_router_with_tls( .route("/config/ui", put(routes::config::update_ui_config)) .route("/database/vacuum", post(routes::database::vacuum_database)) .route("/database/clear", post(routes::database::clear_database)) + .route("/database/backup", post(routes::backup::create_backup)) // Plugin management .route("/plugins", get(routes::plugins::list_plugins)) .route("/plugins/{id}", get(routes::plugins::get_plugin)) @@ -498,22 +507,47 @@ pub fn create_router_with_tls( .route("/auth/sessions", get(routes::auth::list_active_sessions)) .layer(middleware::from_fn(auth::require_admin)); - // CORS: allow same-origin by default, plus the desktop UI origin - let cors = CorsLayer::new() - .allow_origin([ - HeaderValue::from_static("http://localhost:3000"), - HeaderValue::from_static("http://127.0.0.1:3000"), - HeaderValue::from_static("tauri://localhost"), - ]) - .allow_methods([ - Method::GET, - Method::POST, - Method::PUT, - Method::PATCH, - Method::DELETE, - ]) - .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION]) - .allow_credentials(true); + // CORS configuration: use config-driven origins if specified, + // otherwise fall back to default localhost origins + let cors = { + let origins: Vec = + if let Ok(config_read) = state.config.try_read() { + if config_read.server.cors_enabled + && !config_read.server.cors_origins.is_empty() + { + config_read + .server + .cors_origins + .iter() + .filter_map(|o| HeaderValue::from_str(o).ok()) + .collect() + } else { + vec![ + HeaderValue::from_static("http://localhost:3000"), + HeaderValue::from_static("http://127.0.0.1:3000"), + HeaderValue::from_static("tauri://localhost"), + ] + } + } else { + vec![ + HeaderValue::from_static("http://localhost:3000"), + HeaderValue::from_static("http://127.0.0.1:3000"), + HeaderValue::from_static("tauri://localhost"), + ] + }; + + CorsLayer::new() + .allow_origin(origins) + .allow_methods([ + Method::GET, + Method::POST, + Method::PUT, + Method::PATCH, + Method::DELETE, + ]) + .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION]) + .allow_credentials(true) + }; // Create protected routes with auth middleware let protected_api = Router::new() @@ -527,10 +561,11 @@ pub fn create_router_with_tls( auth::require_auth, )); - // Combine protected and public routes + // Combine protected, public, and share routes let full_api = Router::new() .merge(login_route) .merge(public_routes) + .merge(share_routes) .merge(protected_api); // Build security headers layer diff --git a/crates/pinakes-server/src/error.rs b/crates/pinakes-server/src/error.rs index 3d147cb..8a4f345 100644 --- a/crates/pinakes-server/src/error.rs +++ b/crates/pinakes-server/src/error.rs @@ -18,10 +18,10 @@ impl IntoResponse for ApiError { PinakesError::NotFound(msg) => (StatusCode::NOT_FOUND, msg.clone()), PinakesError::FileNotFound(path) => { // Only expose the file name, not the full path - let name = path - .file_name() - .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| "unknown".to_string()); + let name = path.file_name().map_or_else( + || "unknown".to_string(), + |n| n.to_string_lossy().to_string(), + ); tracing::debug!(path = %path.display(), "file not found"); (StatusCode::NOT_FOUND, format!("file not found: {name}")) }, @@ -31,10 +31,10 @@ impl IntoResponse for ApiError { }, PinakesError::DuplicateHash(msg) => (StatusCode::CONFLICT, msg.clone()), PinakesError::UnsupportedMediaType(path) => { - let name = path - .file_name() - .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| "unknown".to_string()); + let name = path.file_name().map_or_else( + || "unknown".to_string(), + |n| n.to_string_lossy().to_string(), + ); ( StatusCode::BAD_REQUEST, format!("unsupported media type: {name}"), @@ -74,7 +74,7 @@ impl IntoResponse for ApiError { let body = serde_json::to_string(&ErrorResponse { error: message.clone(), }) - .unwrap_or_else(|_| format!(r#"{{"error":"{}"}}"#, message)); + .unwrap_or_else(|_| format!(r#"{{"error":"{message}"}}"#)); (status, [("content-type", "application/json")], body).into_response() } } diff --git a/crates/pinakes-server/src/main.rs b/crates/pinakes-server/src/main.rs index 8cbd67a..a271630 100644 --- a/crates/pinakes-server/src/main.rs +++ b/crates/pinakes-server/src/main.rs @@ -39,8 +39,8 @@ struct Cli { } /// Resolve the configuration file path. -/// Returns (path, was_explicit) where was_explicit indicates if the path was -/// explicitly provided by the user (vs discovered). +/// Returns (path, `was_explicit`) where `was_explicit` indicates if the path +/// was explicitly provided by the user (vs discovered). fn resolve_config_path(explicit: Option<&std::path::Path>) -> (PathBuf, bool) { if let Some(path) = explicit { return (path.to_path_buf(), true); @@ -219,16 +219,34 @@ async fn main() -> Result<()> { None }; + // Initialize webhook dispatcher early so the job queue executor can use it + let webhook_dispatcher: Option< + std::sync::Arc, + > = if config.webhooks.is_empty() { + None + } else { + tracing::info!( + count = config.webhooks.len(), + "webhook dispatcher initialized" + ); + Some(pinakes_core::webhooks::WebhookDispatcher::new( + config.webhooks.clone(), + )) + }; + // Initialize job queue with executor let job_storage = storage.clone(); let job_config = config.clone(); let job_transcode = transcode_service.clone(); + let job_webhooks = webhook_dispatcher.clone(); let job_queue = pinakes_core::jobs::JobQueue::new( config.jobs.worker_count, + config.jobs.job_timeout_secs, move |job_id, kind, cancel, jobs| { let storage = job_storage.clone(); let config = job_config.clone(); let transcode_svc = job_transcode.clone(); + let webhooks = job_webhooks.clone(); tokio::spawn(async move { use pinakes_core::jobs::{JobKind, JobQueue}; match kind { @@ -257,6 +275,14 @@ async fn main() -> Result<()> { }; match res { Ok(status) => { + if let Some(ref dispatcher) = webhooks { + dispatcher.dispatch( + pinakes_core::webhooks::WebhookEvent::ScanCompleted { + files_found: status.files_found, + files_processed: status.files_processed, + }, + ); + } JobQueue::complete( &jobs, job_id, @@ -287,7 +313,7 @@ async fn main() -> Result<()> { &jobs, job_id, i as f32 / total as f32, - format!("{}/{}", i, total), + format!("{i}/{total}"), ) .await; match storage.get_media(*mid).await { @@ -299,7 +325,7 @@ async fn main() -> Result<()> { let tc = thumb_config.clone(); let res = tokio::task::spawn_blocking(move || { pinakes_core::thumbnail::generate_thumbnail_with_config( - id, &source, mt, &td, &tc, + id, &source, &mt, &td, &tc, ) }) .await; @@ -311,11 +337,11 @@ async fn main() -> Result<()> { generated += 1; }, Ok(Ok(None)) => {}, - Ok(Err(e)) => errors.push(format!("{}: {}", mid, e)), - Err(e) => errors.push(format!("{}: {}", mid, e)), + Ok(Err(e)) => errors.push(format!("{mid}: {e}")), + Err(e) => errors.push(format!("{mid}: {e}")), } }, - Err(e) => errors.push(format!("{}: {}", mid, e)), + Err(e) => errors.push(format!("{mid}: {e}")), } } JobQueue::complete( @@ -422,7 +448,7 @@ async fn main() -> Result<()> { .await; }, Err(e) => { - JobQueue::fail(&jobs, job_id, e.to_string()).await + JobQueue::fail(&jobs, job_id, e.to_string()).await; }, } }, @@ -593,7 +619,7 @@ async fn main() -> Result<()> { }, } }, - }; + } drop(cancel); }) }, @@ -714,6 +740,7 @@ async fn main() -> Result<()> { transcode_service, managed_storage, chunked_upload_manager, + webhook_dispatcher, session_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new( pinakes_server::state::MAX_SESSION_BACKGROUND_TASKS, )), @@ -725,7 +752,7 @@ async fn main() -> Result<()> { let cancel = shutdown_token.clone(); tokio::spawn(async move { let mut interval = - tokio::time::interval(std::time::Duration::from_secs(15 * 60)); + tokio::time::interval(std::time::Duration::from_mins(15)); loop { tokio::select! { _ = interval.tick() => { @@ -739,7 +766,7 @@ async fn main() -> Result<()> { _ => {} } } - _ = cancel.cancelled() => { + () = cancel.cancelled() => { break; } } @@ -753,7 +780,7 @@ async fn main() -> Result<()> { let cancel = shutdown_token.clone(); tokio::spawn(async move { let mut interval = - tokio::time::interval(std::time::Duration::from_secs(60 * 60)); + tokio::time::interval(std::time::Duration::from_hours(1)); loop { tokio::select! { _ = interval.tick() => { @@ -767,7 +794,7 @@ async fn main() -> Result<()> { _ => {} } } - _ = cancel.cancelled() => { + () = cancel.cancelled() => { break; } } @@ -777,13 +804,14 @@ async fn main() -> Result<()> { let config_read = config_arc.read().await; let tls_config = config_read.server.tls.clone(); + let rate_limits = config_read.rate_limits.clone(); drop(config_read); // Create router with TLS config for HSTS headers let router = if tls_config.enabled { - app::create_router_with_tls(state, Some(&tls_config)) + app::create_router_with_tls(state, &rate_limits, Some(&tls_config)) } else { - app::create_router(state) + app::create_router(state, &rate_limits) }; if tls_config.enabled { @@ -836,7 +864,7 @@ async fn main() -> Result<()> { tracing::warn!(error = %e, "HTTP redirect server error"); } } - _ = shutdown.cancelled() => { + () = shutdown.cancelled() => { info!("HTTP redirect server shutting down"); } } @@ -884,13 +912,14 @@ fn create_https_redirect_router(https_host: String, https_port: u16) -> Router { Router::new().fallback(any(move |uri: axum::http::Uri| { let https_host = https_host.clone(); async move { - let path_and_query = - uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); + let path_and_query = uri + .path_and_query() + .map_or("/", axum::http::uri::PathAndQuery::as_str); let https_url = if https_port == 443 { - format!("https://{}{}", https_host, path_and_query) + format!("https://{https_host}{path_and_query}") } else { - format!("https://{}:{}{}", https_host, https_port, path_and_query) + format!("https://{https_host}:{https_port}{path_and_query}") }; Redirect::permanent(&https_url) @@ -928,7 +957,7 @@ async fn shutdown_signal() { let terminate = std::future::pending::<()>(); tokio::select! { - _ = ctrl_c => info!("received Ctrl+C, shutting down"), - _ = terminate => info!("received SIGTERM, shutting down"), + () = ctrl_c => info!("received Ctrl+C, shutting down"), + () = terminate => info!("received SIGTERM, shutting down"), } } diff --git a/crates/pinakes-server/src/routes/auth.rs b/crates/pinakes-server/src/routes/auth.rs index af8c45b..3b4672e 100644 --- a/crates/pinakes-server/src/routes/auth.rs +++ b/crates/pinakes-server/src/routes/auth.rs @@ -57,10 +57,10 @@ pub async fn login( // Authentication fails if user wasn't found OR password was invalid if !user_found || !password_valid { // Log different messages for debugging but return same error - if !user_found { - tracing::warn!(username = %req.username, "login failed: unknown user"); - } else { + if user_found { tracing::warn!(username = %req.username, "login failed: invalid password"); + } else { + tracing::warn!(username = %req.username, "login failed: unknown user"); } // Record failed login attempt in audit log @@ -103,7 +103,8 @@ pub async fn login( username: username.clone(), role: role.to_string(), created_at: now, - expires_at: now + chrono::Duration::hours(24), // 24 hour sessions + expires_at: now + + chrono::Duration::hours(config.accounts.session_expiry_hours as i64), last_accessed: now, }; @@ -119,7 +120,7 @@ pub async fn login( &state.storage, None, pinakes_core::model::AuditAction::LoginSuccess, - Some(format!("username: {}, role: {}", username, role)), + Some(format!("username: {username}, role: {role}")), ) .await { @@ -151,17 +152,16 @@ pub async fn logout( } // Record logout in audit log - if let Some(user) = username { - if let Err(e) = pinakes_core::audit::record_action( + if let Some(user) = username + && let Err(e) = pinakes_core::audit::record_action( &state.storage, None, pinakes_core::model::AuditAction::Logout, - Some(format!("username: {}", user)), + Some(format!("username: {user}")), ) .await - { - tracing::warn!(error = %e, "failed to record logout audit"); - } + { + tracing::warn!(error = %e, "failed to record logout audit"); } } StatusCode::OK @@ -191,7 +191,7 @@ pub async fn me( Ok(Json(UserInfoResponse { username: session.username.clone(), - role: session.role.clone(), + role: session.role, })) } @@ -202,6 +202,35 @@ fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> { .and_then(|s| s.strip_prefix("Bearer ")) } +/// Refresh the current session, extending its expiry by the configured +/// duration. +pub async fn refresh( + State(state): State, + headers: HeaderMap, +) -> Result, StatusCode> { + let token = extract_bearer_token(&headers).ok_or(StatusCode::UNAUTHORIZED)?; + + let config = state.config.read().await; + let expiry_hours = config.accounts.session_expiry_hours as i64; + drop(config); + + let new_expires_at = + chrono::Utc::now() + chrono::Duration::hours(expiry_hours); + + match state.storage.extend_session(token, new_expires_at).await { + Ok(Some(expires)) => { + Ok(Json(serde_json::json!({ + "expires_at": expires.to_rfc3339() + }))) + }, + Ok(None) => Err(StatusCode::UNAUTHORIZED), + Err(e) => { + tracing::error!(error = %e, "failed to extend session"); + Err(StatusCode::INTERNAL_SERVER_ERROR) + }, + } +} + /// Revoke all sessions for the current user pub async fn revoke_all_sessions( State(state): State, @@ -234,7 +263,7 @@ pub async fn revoke_all_sessions( &state.storage, None, pinakes_core::model::AuditAction::Logout, - Some(format!("revoked all sessions for username: {}", username)), + Some(format!("revoked all sessions for username: {username}")), ) .await { diff --git a/crates/pinakes-server/src/routes/backup.rs b/crates/pinakes-server/src/routes/backup.rs new file mode 100644 index 0000000..dcca2b7 --- /dev/null +++ b/crates/pinakes-server/src/routes/backup.rs @@ -0,0 +1,47 @@ +use axum::{ + extract::State, + http::header::{CONTENT_DISPOSITION, CONTENT_TYPE}, + response::{IntoResponse, Response}, +}; + +use crate::{error::ApiError, state::AppState}; + +/// Create a database backup and return it as a downloadable file. +/// POST /api/v1/admin/backup +/// +/// For `SQLite`: creates a backup via VACUUM INTO and returns the file. +/// For `PostgreSQL`: returns unsupported error (use `pg_dump` instead). +pub async fn create_backup( + State(state): State, +) -> Result { + // Use a unique temp directory to avoid predictable paths + let backup_dir = std::env::temp_dir() + .join(format!("pinakes-backup-{}", uuid::Uuid::now_v7())); + tokio::fs::create_dir_all(&backup_dir) + .await + .map_err(|e| ApiError(pinakes_core::error::PinakesError::Io(e)))?; + + let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S"); + let filename = format!("pinakes_backup_{timestamp}.db"); + let backup_path = backup_dir.join(&filename); + + state.storage.backup(&backup_path).await?; + + // Read the backup into memory and clean up the temp file + let bytes = tokio::fs::read(&backup_path) + .await + .map_err(|e| ApiError(pinakes_core::error::PinakesError::Io(e)))?; + let _ = tokio::fs::remove_dir_all(&backup_dir).await; + + let disposition = format!("attachment; filename=\"{filename}\""); + Ok( + ( + [ + (CONTENT_TYPE, "application/octet-stream".to_owned()), + (CONTENT_DISPOSITION, disposition), + ], + bytes, + ) + .into_response(), + ) +} diff --git a/crates/pinakes-server/src/routes/mod.rs b/crates/pinakes-server/src/routes/mod.rs index 137512c..97819da 100644 --- a/crates/pinakes-server/src/routes/mod.rs +++ b/crates/pinakes-server/src/routes/mod.rs @@ -1,6 +1,7 @@ pub mod analytics; pub mod audit; pub mod auth; +pub mod backup; pub mod books; pub mod collections; pub mod config; diff --git a/crates/pinakes-server/src/routes/webhooks.rs b/crates/pinakes-server/src/routes/webhooks.rs index 8c900bc..b2c5ca5 100644 --- a/crates/pinakes-server/src/routes/webhooks.rs +++ b/crates/pinakes-server/src/routes/webhooks.rs @@ -31,10 +31,19 @@ pub async fn test_webhook( ) -> Result, ApiError> { let config = state.config.read().await; let count = config.webhooks.len(); - // Emit a test event to all configured webhooks - // In production, the event bus would handle delivery - Ok(Json(serde_json::json!({ - "webhooks_configured": count, - "test_sent": true - }))) + drop(config); + + if let Some(ref dispatcher) = state.webhook_dispatcher { + dispatcher.dispatch(pinakes_core::webhooks::WebhookEvent::Test); + Ok(Json(serde_json::json!({ + "webhooks_configured": count, + "test_sent": true + }))) + } else { + Ok(Json(serde_json::json!({ + "webhooks_configured": 0, + "test_sent": false, + "message": "no webhooks configured" + }))) + } } diff --git a/crates/pinakes-server/src/state.rs b/crates/pinakes-server/src/state.rs index fd8b023..584b598 100644 --- a/crates/pinakes-server/src/state.rs +++ b/crates/pinakes-server/src/state.rs @@ -11,6 +11,7 @@ use pinakes_core::{ storage::DynStorageBackend, sync::ChunkedUploadManager, transcode::TranscodeService, + webhooks::WebhookDispatcher, }; use tokio::sync::{RwLock, Semaphore}; @@ -34,5 +35,6 @@ pub struct AppState { pub transcode_service: Option>, pub managed_storage: Option>, pub chunked_upload_manager: Option>, + pub webhook_dispatcher: Option>, pub session_semaphore: Arc, }