pinakes-server: wire backup, session refresh, webhooks, and rate limit config

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: If2855d44cc700c0f65a5f5ac850ee3866a6a6964
This commit is contained in:
raf 2026-03-08 00:42:14 +03:00
commit 52f0b5defc
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
8 changed files with 257 additions and 105 deletions

View file

@ -18,62 +18,69 @@ use tower_http::{
use crate::{auth, routes, state::AppState};
/// Create the router with optional TLS configuration for HSTS headers
pub fn create_router(state: AppState) -> Router {
create_router_with_tls(state, None)
pub fn create_router(
state: AppState,
rate_limits: &pinakes_core::config::RateLimitConfig,
) -> Router {
create_router_with_tls(state, rate_limits, None)
}
/// Build a governor rate limiter from per-second and burst-size values.
/// Panics if the config is invalid (callers must validate before use).
fn build_governor(
per_second: u64,
burst_size: u32,
) -> Arc<
tower_governor::governor::GovernorConfig<
tower_governor::key_extractor::PeerIpKeyExtractor,
governor::middleware::NoOpMiddleware,
>,
> {
Arc::new(
GovernorConfigBuilder::default()
.per_second(per_second)
.burst_size(burst_size)
.finish()
.expect("rate limit config was validated at startup"),
)
}
/// Create the router with TLS configuration for security headers
pub fn create_router_with_tls(
state: AppState,
rate_limits: &pinakes_core::config::RateLimitConfig,
tls_config: Option<&pinakes_core::config::TlsConfig>,
) -> Router {
// Global rate limit: 100 requests/sec per IP
let global_governor = Arc::new(
GovernorConfigBuilder::default()
.per_second(1)
.burst_size(100)
.finish()
.expect("valid global rate limit config"),
let global_governor = build_governor(
rate_limits.global_per_second,
rate_limits.global_burst_size,
);
// Strict rate limit for login: 5 requests/min per IP
let login_governor = Arc::new(
GovernorConfigBuilder::default()
.per_second(12) // replenish one every 12 seconds
.burst_size(5)
.finish()
.expect("valid login rate limit config"),
let login_governor =
build_governor(rate_limits.login_per_second, rate_limits.login_burst_size);
let search_governor = build_governor(
rate_limits.search_per_second,
rate_limits.search_burst_size,
);
// Rate limit for search: 10 requests/min per IP
let search_governor = Arc::new(
GovernorConfigBuilder::default()
.per_second(6) // replenish one every 6 seconds (10/min)
.burst_size(10)
.finish()
.expect("valid search rate limit config"),
);
// Rate limit for streaming: 5 requests per IP (very restrictive for
// concurrent streams)
let stream_governor = Arc::new(
GovernorConfigBuilder::default()
.per_second(60) // replenish slowly (one per minute)
.burst_size(5) // max 5 concurrent connections
.finish()
.expect("valid stream rate limit config"),
let stream_governor = build_governor(
rate_limits.stream_per_second,
rate_limits.stream_burst_size,
);
let share_governor =
build_governor(rate_limits.share_per_second, rate_limits.share_burst_size);
// Login route with strict rate limiting
let login_route = Router::new()
.route("/auth/login", post(routes::auth::login))
.layer(GovernorLayer::new(login_governor));
// Share routes with dedicated rate limiting
let share_routes = Router::new()
.route("/s/{token}", get(routes::social::access_shared_media))
.route("/shared/{token}", get(routes::shares::access_shared))
.layer(GovernorLayer::new(share_governor));
// Public routes (no auth required)
let public_routes = Router::new()
.route("/s/{token}", get(routes::social::access_shared_media))
// Enhanced sharing: public share access
.route("/shared/{token}", get(routes::shares::access_shared))
// Kubernetes-style health probes (no auth required for orchestration)
.route("/health/live", get(routes::health::liveness))
.route("/health/ready", get(routes::health::readiness));
@ -139,6 +146,7 @@ pub fn create_router_with_tls(
// Auth endpoints (self-service); login is handled separately with a stricter rate limit
.route("/auth/logout", post(routes::auth::logout))
.route("/auth/me", get(routes::auth::me))
.route("/auth/refresh", post(routes::auth::refresh))
.route("/auth/revoke-all", post(routes::auth::revoke_all_sessions))
// Social: ratings & comments (read)
.route(
@ -469,6 +477,7 @@ pub fn create_router_with_tls(
.route("/config/ui", put(routes::config::update_ui_config))
.route("/database/vacuum", post(routes::database::vacuum_database))
.route("/database/clear", post(routes::database::clear_database))
.route("/database/backup", post(routes::backup::create_backup))
// Plugin management
.route("/plugins", get(routes::plugins::list_plugins))
.route("/plugins/{id}", get(routes::plugins::get_plugin))
@ -498,22 +507,47 @@ pub fn create_router_with_tls(
.route("/auth/sessions", get(routes::auth::list_active_sessions))
.layer(middleware::from_fn(auth::require_admin));
// CORS: allow same-origin by default, plus the desktop UI origin
let cors = CorsLayer::new()
.allow_origin([
HeaderValue::from_static("http://localhost:3000"),
HeaderValue::from_static("http://127.0.0.1:3000"),
HeaderValue::from_static("tauri://localhost"),
])
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::PATCH,
Method::DELETE,
])
.allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION])
.allow_credentials(true);
// CORS configuration: use config-driven origins if specified,
// otherwise fall back to default localhost origins
let cors = {
let origins: Vec<HeaderValue> =
if let Ok(config_read) = state.config.try_read() {
if config_read.server.cors_enabled
&& !config_read.server.cors_origins.is_empty()
{
config_read
.server
.cors_origins
.iter()
.filter_map(|o| HeaderValue::from_str(o).ok())
.collect()
} else {
vec![
HeaderValue::from_static("http://localhost:3000"),
HeaderValue::from_static("http://127.0.0.1:3000"),
HeaderValue::from_static("tauri://localhost"),
]
}
} else {
vec![
HeaderValue::from_static("http://localhost:3000"),
HeaderValue::from_static("http://127.0.0.1:3000"),
HeaderValue::from_static("tauri://localhost"),
]
};
CorsLayer::new()
.allow_origin(origins)
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::PATCH,
Method::DELETE,
])
.allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION])
.allow_credentials(true)
};
// Create protected routes with auth middleware
let protected_api = Router::new()
@ -527,10 +561,11 @@ pub fn create_router_with_tls(
auth::require_auth,
));
// Combine protected and public routes
// Combine protected, public, and share routes
let full_api = Router::new()
.merge(login_route)
.merge(public_routes)
.merge(share_routes)
.merge(protected_api);
// Build security headers layer

View file

@ -18,10 +18,10 @@ impl IntoResponse for ApiError {
PinakesError::NotFound(msg) => (StatusCode::NOT_FOUND, msg.clone()),
PinakesError::FileNotFound(path) => {
// Only expose the file name, not the full path
let name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let name = path.file_name().map_or_else(
|| "unknown".to_string(),
|n| n.to_string_lossy().to_string(),
);
tracing::debug!(path = %path.display(), "file not found");
(StatusCode::NOT_FOUND, format!("file not found: {name}"))
},
@ -31,10 +31,10 @@ impl IntoResponse for ApiError {
},
PinakesError::DuplicateHash(msg) => (StatusCode::CONFLICT, msg.clone()),
PinakesError::UnsupportedMediaType(path) => {
let name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let name = path.file_name().map_or_else(
|| "unknown".to_string(),
|n| n.to_string_lossy().to_string(),
);
(
StatusCode::BAD_REQUEST,
format!("unsupported media type: {name}"),
@ -74,7 +74,7 @@ impl IntoResponse for ApiError {
let body = serde_json::to_string(&ErrorResponse {
error: message.clone(),
})
.unwrap_or_else(|_| format!(r#"{{"error":"{}"}}"#, message));
.unwrap_or_else(|_| format!(r#"{{"error":"{message}"}}"#));
(status, [("content-type", "application/json")], body).into_response()
}
}

View file

@ -39,8 +39,8 @@ struct Cli {
}
/// Resolve the configuration file path.
/// Returns (path, was_explicit) where was_explicit indicates if the path was
/// explicitly provided by the user (vs discovered).
/// Returns (path, `was_explicit`) where `was_explicit` indicates if the path
/// was explicitly provided by the user (vs discovered).
fn resolve_config_path(explicit: Option<&std::path::Path>) -> (PathBuf, bool) {
if let Some(path) = explicit {
return (path.to_path_buf(), true);
@ -219,16 +219,34 @@ async fn main() -> Result<()> {
None
};
// Initialize webhook dispatcher early so the job queue executor can use it
let webhook_dispatcher: Option<
std::sync::Arc<pinakes_core::webhooks::WebhookDispatcher>,
> = if config.webhooks.is_empty() {
None
} else {
tracing::info!(
count = config.webhooks.len(),
"webhook dispatcher initialized"
);
Some(pinakes_core::webhooks::WebhookDispatcher::new(
config.webhooks.clone(),
))
};
// Initialize job queue with executor
let job_storage = storage.clone();
let job_config = config.clone();
let job_transcode = transcode_service.clone();
let job_webhooks = webhook_dispatcher.clone();
let job_queue = pinakes_core::jobs::JobQueue::new(
config.jobs.worker_count,
config.jobs.job_timeout_secs,
move |job_id, kind, cancel, jobs| {
let storage = job_storage.clone();
let config = job_config.clone();
let transcode_svc = job_transcode.clone();
let webhooks = job_webhooks.clone();
tokio::spawn(async move {
use pinakes_core::jobs::{JobKind, JobQueue};
match kind {
@ -257,6 +275,14 @@ async fn main() -> Result<()> {
};
match res {
Ok(status) => {
if let Some(ref dispatcher) = webhooks {
dispatcher.dispatch(
pinakes_core::webhooks::WebhookEvent::ScanCompleted {
files_found: status.files_found,
files_processed: status.files_processed,
},
);
}
JobQueue::complete(
&jobs,
job_id,
@ -287,7 +313,7 @@ async fn main() -> Result<()> {
&jobs,
job_id,
i as f32 / total as f32,
format!("{}/{}", i, total),
format!("{i}/{total}"),
)
.await;
match storage.get_media(*mid).await {
@ -299,7 +325,7 @@ async fn main() -> Result<()> {
let tc = thumb_config.clone();
let res = tokio::task::spawn_blocking(move || {
pinakes_core::thumbnail::generate_thumbnail_with_config(
id, &source, mt, &td, &tc,
id, &source, &mt, &td, &tc,
)
})
.await;
@ -311,11 +337,11 @@ async fn main() -> Result<()> {
generated += 1;
},
Ok(Ok(None)) => {},
Ok(Err(e)) => errors.push(format!("{}: {}", mid, e)),
Err(e) => errors.push(format!("{}: {}", mid, e)),
Ok(Err(e)) => errors.push(format!("{mid}: {e}")),
Err(e) => errors.push(format!("{mid}: {e}")),
}
},
Err(e) => errors.push(format!("{}: {}", mid, e)),
Err(e) => errors.push(format!("{mid}: {e}")),
}
}
JobQueue::complete(
@ -422,7 +448,7 @@ async fn main() -> Result<()> {
.await;
},
Err(e) => {
JobQueue::fail(&jobs, job_id, e.to_string()).await
JobQueue::fail(&jobs, job_id, e.to_string()).await;
},
}
},
@ -593,7 +619,7 @@ async fn main() -> Result<()> {
},
}
},
};
}
drop(cancel);
})
},
@ -714,6 +740,7 @@ async fn main() -> Result<()> {
transcode_service,
managed_storage,
chunked_upload_manager,
webhook_dispatcher,
session_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(
pinakes_server::state::MAX_SESSION_BACKGROUND_TASKS,
)),
@ -725,7 +752,7 @@ async fn main() -> Result<()> {
let cancel = shutdown_token.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(15 * 60));
tokio::time::interval(std::time::Duration::from_mins(15));
loop {
tokio::select! {
_ = interval.tick() => {
@ -739,7 +766,7 @@ async fn main() -> Result<()> {
_ => {}
}
}
_ = cancel.cancelled() => {
() = cancel.cancelled() => {
break;
}
}
@ -753,7 +780,7 @@ async fn main() -> Result<()> {
let cancel = shutdown_token.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(60 * 60));
tokio::time::interval(std::time::Duration::from_hours(1));
loop {
tokio::select! {
_ = interval.tick() => {
@ -767,7 +794,7 @@ async fn main() -> Result<()> {
_ => {}
}
}
_ = cancel.cancelled() => {
() = cancel.cancelled() => {
break;
}
}
@ -777,13 +804,14 @@ async fn main() -> Result<()> {
let config_read = config_arc.read().await;
let tls_config = config_read.server.tls.clone();
let rate_limits = config_read.rate_limits.clone();
drop(config_read);
// Create router with TLS config for HSTS headers
let router = if tls_config.enabled {
app::create_router_with_tls(state, Some(&tls_config))
app::create_router_with_tls(state, &rate_limits, Some(&tls_config))
} else {
app::create_router(state)
app::create_router(state, &rate_limits)
};
if tls_config.enabled {
@ -836,7 +864,7 @@ async fn main() -> Result<()> {
tracing::warn!(error = %e, "HTTP redirect server error");
}
}
_ = shutdown.cancelled() => {
() = shutdown.cancelled() => {
info!("HTTP redirect server shutting down");
}
}
@ -884,13 +912,14 @@ fn create_https_redirect_router(https_host: String, https_port: u16) -> Router {
Router::new().fallback(any(move |uri: axum::http::Uri| {
let https_host = https_host.clone();
async move {
let path_and_query =
uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/");
let path_and_query = uri
.path_and_query()
.map_or("/", axum::http::uri::PathAndQuery::as_str);
let https_url = if https_port == 443 {
format!("https://{}{}", https_host, path_and_query)
format!("https://{https_host}{path_and_query}")
} else {
format!("https://{}:{}{}", https_host, https_port, path_and_query)
format!("https://{https_host}:{https_port}{path_and_query}")
};
Redirect::permanent(&https_url)
@ -928,7 +957,7 @@ async fn shutdown_signal() {
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => info!("received Ctrl+C, shutting down"),
_ = terminate => info!("received SIGTERM, shutting down"),
() = ctrl_c => info!("received Ctrl+C, shutting down"),
() = terminate => info!("received SIGTERM, shutting down"),
}
}

View file

@ -57,10 +57,10 @@ pub async fn login(
// Authentication fails if user wasn't found OR password was invalid
if !user_found || !password_valid {
// Log different messages for debugging but return same error
if !user_found {
tracing::warn!(username = %req.username, "login failed: unknown user");
} else {
if user_found {
tracing::warn!(username = %req.username, "login failed: invalid password");
} else {
tracing::warn!(username = %req.username, "login failed: unknown user");
}
// Record failed login attempt in audit log
@ -103,7 +103,8 @@ pub async fn login(
username: username.clone(),
role: role.to_string(),
created_at: now,
expires_at: now + chrono::Duration::hours(24), // 24 hour sessions
expires_at: now
+ chrono::Duration::hours(config.accounts.session_expiry_hours as i64),
last_accessed: now,
};
@ -119,7 +120,7 @@ pub async fn login(
&state.storage,
None,
pinakes_core::model::AuditAction::LoginSuccess,
Some(format!("username: {}, role: {}", username, role)),
Some(format!("username: {username}, role: {role}")),
)
.await
{
@ -151,17 +152,16 @@ pub async fn logout(
}
// Record logout in audit log
if let Some(user) = username {
if let Err(e) = pinakes_core::audit::record_action(
if let Some(user) = username
&& let Err(e) = pinakes_core::audit::record_action(
&state.storage,
None,
pinakes_core::model::AuditAction::Logout,
Some(format!("username: {}", user)),
Some(format!("username: {user}")),
)
.await
{
tracing::warn!(error = %e, "failed to record logout audit");
}
{
tracing::warn!(error = %e, "failed to record logout audit");
}
}
StatusCode::OK
@ -191,7 +191,7 @@ pub async fn me(
Ok(Json(UserInfoResponse {
username: session.username.clone(),
role: session.role.clone(),
role: session.role,
}))
}
@ -202,6 +202,35 @@ fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> {
.and_then(|s| s.strip_prefix("Bearer "))
}
/// Refresh the current session, extending its expiry by the configured
/// duration.
pub async fn refresh(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<serde_json::Value>, StatusCode> {
let token = extract_bearer_token(&headers).ok_or(StatusCode::UNAUTHORIZED)?;
let config = state.config.read().await;
let expiry_hours = config.accounts.session_expiry_hours as i64;
drop(config);
let new_expires_at =
chrono::Utc::now() + chrono::Duration::hours(expiry_hours);
match state.storage.extend_session(token, new_expires_at).await {
Ok(Some(expires)) => {
Ok(Json(serde_json::json!({
"expires_at": expires.to_rfc3339()
})))
},
Ok(None) => Err(StatusCode::UNAUTHORIZED),
Err(e) => {
tracing::error!(error = %e, "failed to extend session");
Err(StatusCode::INTERNAL_SERVER_ERROR)
},
}
}
/// Revoke all sessions for the current user
pub async fn revoke_all_sessions(
State(state): State<AppState>,
@ -234,7 +263,7 @@ pub async fn revoke_all_sessions(
&state.storage,
None,
pinakes_core::model::AuditAction::Logout,
Some(format!("revoked all sessions for username: {}", username)),
Some(format!("revoked all sessions for username: {username}")),
)
.await
{

View file

@ -0,0 +1,47 @@
use axum::{
extract::State,
http::header::{CONTENT_DISPOSITION, CONTENT_TYPE},
response::{IntoResponse, Response},
};
use crate::{error::ApiError, state::AppState};
/// Create a database backup and return it as a downloadable file.
/// POST /api/v1/admin/backup
///
/// For `SQLite`: creates a backup via VACUUM INTO and returns the file.
/// For `PostgreSQL`: returns unsupported error (use `pg_dump` instead).
pub async fn create_backup(
State(state): State<AppState>,
) -> Result<Response, ApiError> {
// Use a unique temp directory to avoid predictable paths
let backup_dir = std::env::temp_dir()
.join(format!("pinakes-backup-{}", uuid::Uuid::now_v7()));
tokio::fs::create_dir_all(&backup_dir)
.await
.map_err(|e| ApiError(pinakes_core::error::PinakesError::Io(e)))?;
let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
let filename = format!("pinakes_backup_{timestamp}.db");
let backup_path = backup_dir.join(&filename);
state.storage.backup(&backup_path).await?;
// Read the backup into memory and clean up the temp file
let bytes = tokio::fs::read(&backup_path)
.await
.map_err(|e| ApiError(pinakes_core::error::PinakesError::Io(e)))?;
let _ = tokio::fs::remove_dir_all(&backup_dir).await;
let disposition = format!("attachment; filename=\"{filename}\"");
Ok(
(
[
(CONTENT_TYPE, "application/octet-stream".to_owned()),
(CONTENT_DISPOSITION, disposition),
],
bytes,
)
.into_response(),
)
}

View file

@ -1,6 +1,7 @@
pub mod analytics;
pub mod audit;
pub mod auth;
pub mod backup;
pub mod books;
pub mod collections;
pub mod config;

View file

@ -31,10 +31,19 @@ pub async fn test_webhook(
) -> Result<Json<serde_json::Value>, ApiError> {
let config = state.config.read().await;
let count = config.webhooks.len();
// Emit a test event to all configured webhooks
// In production, the event bus would handle delivery
Ok(Json(serde_json::json!({
"webhooks_configured": count,
"test_sent": true
})))
drop(config);
if let Some(ref dispatcher) = state.webhook_dispatcher {
dispatcher.dispatch(pinakes_core::webhooks::WebhookEvent::Test);
Ok(Json(serde_json::json!({
"webhooks_configured": count,
"test_sent": true
})))
} else {
Ok(Json(serde_json::json!({
"webhooks_configured": 0,
"test_sent": false,
"message": "no webhooks configured"
})))
}
}

View file

@ -11,6 +11,7 @@ use pinakes_core::{
storage::DynStorageBackend,
sync::ChunkedUploadManager,
transcode::TranscodeService,
webhooks::WebhookDispatcher,
};
use tokio::sync::{RwLock, Semaphore};
@ -34,5 +35,6 @@ pub struct AppState {
pub transcode_service: Option<Arc<TranscodeService>>,
pub managed_storage: Option<Arc<ManagedStorageService>>,
pub chunked_upload_manager: Option<Arc<ChunkedUploadManager>>,
pub webhook_dispatcher: Option<Arc<WebhookDispatcher>>,
pub session_semaphore: Arc<Semaphore>,
}