From 01fc2021c0b9163b6f0a185f170b376f49505cf5 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 7 Mar 2026 16:55:43 +0300 Subject: [PATCH] pinakes-server: bound session concurrency; handle JoinError; make analytics retention configurable Signed-off-by: NotAShelf Change-Id: Iaa35af821862eeadba0a4f384b2aec2c6a6a6964 --- crates/pinakes-server/src/auth.rs | 38 ++++++++++++++++----------- crates/pinakes-server/src/error.rs | 7 +++++ crates/pinakes-server/src/main.rs | 7 ++++- crates/pinakes-server/src/state.rs | 7 ++++- crates/pinakes-server/tests/api.rs | 2 ++ crates/pinakes-server/tests/plugin.rs | 1 + 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/crates/pinakes-server/src/auth.rs b/crates/pinakes-server/src/auth.rs index a6306b7..6c3c989 100644 --- a/crates/pinakes-server/src/auth.rs +++ b/crates/pinakes-server/src/auth.rs @@ -84,26 +84,32 @@ pub async fn require_auth( let now = chrono::Utc::now(); if session.expires_at < now { let username = session.username.clone(); - // Delete expired session asynchronously (fire-and-forget) - let storage = state.storage.clone(); - let token_owned = token.clone(); - tokio::spawn(async move { - if let Err(e) = storage.delete_session(&token_owned).await { - tracing::error!(error = %e, "failed to delete expired session"); - } - }); + // Delete expired session in a bounded background task + if let Ok(permit) = state.session_semaphore.clone().try_acquire_owned() { + let storage = state.storage.clone(); + let token_owned = token.clone(); + tokio::spawn(async move { + if let Err(e) = storage.delete_session(&token_owned).await { + tracing::error!(error = %e, "failed to delete expired session"); + } + drop(permit); + }); + } tracing::info!(username = %username, "session expired"); return unauthorized("session expired"); } - // Update last_accessed timestamp asynchronously (fire-and-forget) - let storage = state.storage.clone(); - let token_owned = token.clone(); - tokio::spawn(async move { - if let Err(e) = storage.touch_session(&token_owned).await { - tracing::warn!(error = %e, "failed to update session last_accessed"); - } - }); + // Update last_accessed timestamp in a bounded background task + if let Ok(permit) = state.session_semaphore.clone().try_acquire_owned() { + let storage = state.storage.clone(); + let token_owned = token.clone(); + tokio::spawn(async move { + if let Err(e) = storage.touch_session(&token_owned).await { + tracing::warn!(error = %e, "failed to update session last_accessed"); + } + drop(permit); + }); + } // Parse role from string let role = match session.role.as_str() { diff --git a/crates/pinakes-server/src/error.rs b/crates/pinakes-server/src/error.rs index bb57191..3d147cb 100644 --- a/crates/pinakes-server/src/error.rs +++ b/crates/pinakes-server/src/error.rs @@ -48,6 +48,13 @@ impl IntoResponse for ApiError { (StatusCode::UNAUTHORIZED, msg.clone()) }, PinakesError::Authorization(msg) => (StatusCode::FORBIDDEN, msg.clone()), + PinakesError::Serialization(msg) => { + tracing::error!(error = %msg, "serialization error"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "data serialization error".to_string(), + ) + }, PinakesError::Config(_) => { tracing::error!(error = %self.0, "configuration error"); ( diff --git a/crates/pinakes-server/src/main.rs b/crates/pinakes-server/src/main.rs index 2c1eb28..8cbd67a 100644 --- a/crates/pinakes-server/src/main.rs +++ b/crates/pinakes-server/src/main.rs @@ -557,7 +557,9 @@ async fn main() -> Result<()> { .await; }, JobKind::CleanupAnalytics => { - let before = chrono::Utc::now() - chrono::Duration::days(90); + let retention_days = config.analytics.retention_days; + let before = chrono::Utc::now() + - chrono::Duration::days(retention_days as i64); match storage.cleanup_old_events(before).await { Ok(count) => { JobQueue::complete( @@ -712,6 +714,9 @@ async fn main() -> Result<()> { transcode_service, managed_storage, chunked_upload_manager, + session_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new( + pinakes_server::state::MAX_SESSION_BACKGROUND_TASKS, + )), }; // Periodic session cleanup (every 15 minutes) diff --git a/crates/pinakes-server/src/state.rs b/crates/pinakes-server/src/state.rs index 150ad3f..fd8b023 100644 --- a/crates/pinakes-server/src/state.rs +++ b/crates/pinakes-server/src/state.rs @@ -12,11 +12,15 @@ use pinakes_core::{ sync::ChunkedUploadManager, transcode::TranscodeService, }; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, Semaphore}; // Note: Sessions are now stored in the database via StorageBackend // See storage::SessionData and related methods +/// Max concurrent background session operations (touch/delete). +/// Prevents unbounded task spawning under high request load. +pub const MAX_SESSION_BACKGROUND_TASKS: usize = 64; + #[derive(Clone)] pub struct AppState { pub storage: DynStorageBackend, @@ -30,4 +34,5 @@ pub struct AppState { pub transcode_service: Option>, pub managed_storage: Option>, pub chunked_upload_manager: Option>, + pub session_semaphore: Arc, } diff --git a/crates/pinakes-server/tests/api.rs b/crates/pinakes-server/tests/api.rs index bd30789..5fe46f0 100644 --- a/crates/pinakes-server/tests/api.rs +++ b/crates/pinakes-server/tests/api.rs @@ -185,6 +185,7 @@ async fn setup_app() -> axum::Router { transcode_service: None, managed_storage: None, chunked_upload_manager: None, + session_semaphore: Arc::new(tokio::sync::Semaphore::new(64)), }; pinakes_server::app::create_router(state) @@ -259,6 +260,7 @@ async fn setup_app_with_auth() -> (axum::Router, String, String, String) { transcode_service: None, managed_storage: None, chunked_upload_manager: None, + session_semaphore: Arc::new(tokio::sync::Semaphore::new(64)), }; let app = pinakes_server::app::create_router(state); diff --git a/crates/pinakes-server/tests/plugin.rs b/crates/pinakes-server/tests/plugin.rs index 988e3ef..0a07462 100644 --- a/crates/pinakes-server/tests/plugin.rs +++ b/crates/pinakes-server/tests/plugin.rs @@ -144,6 +144,7 @@ async fn setup_app_with_plugins() transcode_service: None, managed_storage: None, chunked_upload_manager: None, + session_semaphore: Arc::new(tokio::sync::Semaphore::new(64)), }; let router = pinakes_server::app::create_router(state);