pinakes-server: bound session concurrency; handle JoinError; make analytics

retention configurable

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Iaa35af821862eeadba0a4f384b2aec2c6a6a6964
This commit is contained in:
raf 2026-03-07 16:55:43 +03:00
commit 01fc2021c0
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
6 changed files with 42 additions and 16 deletions

View file

@ -84,26 +84,32 @@ pub async fn require_auth(
let now = chrono::Utc::now(); let now = chrono::Utc::now();
if session.expires_at < now { if session.expires_at < now {
let username = session.username.clone(); let username = session.username.clone();
// Delete expired session asynchronously (fire-and-forget) // Delete expired session in a bounded background task
let storage = state.storage.clone(); if let Ok(permit) = state.session_semaphore.clone().try_acquire_owned() {
let token_owned = token.clone(); let storage = state.storage.clone();
tokio::spawn(async move { let token_owned = token.clone();
if let Err(e) = storage.delete_session(&token_owned).await { tokio::spawn(async move {
tracing::error!(error = %e, "failed to delete expired session"); if let Err(e) = storage.delete_session(&token_owned).await {
} tracing::error!(error = %e, "failed to delete expired session");
}); }
drop(permit);
});
}
tracing::info!(username = %username, "session expired"); tracing::info!(username = %username, "session expired");
return unauthorized("session expired"); return unauthorized("session expired");
} }
// Update last_accessed timestamp asynchronously (fire-and-forget) // Update last_accessed timestamp in a bounded background task
let storage = state.storage.clone(); if let Ok(permit) = state.session_semaphore.clone().try_acquire_owned() {
let token_owned = token.clone(); let storage = state.storage.clone();
tokio::spawn(async move { let token_owned = token.clone();
if let Err(e) = storage.touch_session(&token_owned).await { tokio::spawn(async move {
tracing::warn!(error = %e, "failed to update session last_accessed"); if let Err(e) = storage.touch_session(&token_owned).await {
} tracing::warn!(error = %e, "failed to update session last_accessed");
}); }
drop(permit);
});
}
// Parse role from string // Parse role from string
let role = match session.role.as_str() { let role = match session.role.as_str() {

View file

@ -48,6 +48,13 @@ impl IntoResponse for ApiError {
(StatusCode::UNAUTHORIZED, msg.clone()) (StatusCode::UNAUTHORIZED, msg.clone())
}, },
PinakesError::Authorization(msg) => (StatusCode::FORBIDDEN, msg.clone()), PinakesError::Authorization(msg) => (StatusCode::FORBIDDEN, msg.clone()),
PinakesError::Serialization(msg) => {
tracing::error!(error = %msg, "serialization error");
(
StatusCode::INTERNAL_SERVER_ERROR,
"data serialization error".to_string(),
)
},
PinakesError::Config(_) => { PinakesError::Config(_) => {
tracing::error!(error = %self.0, "configuration error"); tracing::error!(error = %self.0, "configuration error");
( (

View file

@ -557,7 +557,9 @@ async fn main() -> Result<()> {
.await; .await;
}, },
JobKind::CleanupAnalytics => { JobKind::CleanupAnalytics => {
let before = chrono::Utc::now() - chrono::Duration::days(90); let retention_days = config.analytics.retention_days;
let before = chrono::Utc::now()
- chrono::Duration::days(retention_days as i64);
match storage.cleanup_old_events(before).await { match storage.cleanup_old_events(before).await {
Ok(count) => { Ok(count) => {
JobQueue::complete( JobQueue::complete(
@ -712,6 +714,9 @@ async fn main() -> Result<()> {
transcode_service, transcode_service,
managed_storage, managed_storage,
chunked_upload_manager, chunked_upload_manager,
session_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(
pinakes_server::state::MAX_SESSION_BACKGROUND_TASKS,
)),
}; };
// Periodic session cleanup (every 15 minutes) // Periodic session cleanup (every 15 minutes)

View file

@ -12,11 +12,15 @@ use pinakes_core::{
sync::ChunkedUploadManager, sync::ChunkedUploadManager,
transcode::TranscodeService, transcode::TranscodeService,
}; };
use tokio::sync::RwLock; use tokio::sync::{RwLock, Semaphore};
// Note: Sessions are now stored in the database via StorageBackend // Note: Sessions are now stored in the database via StorageBackend
// See storage::SessionData and related methods // See storage::SessionData and related methods
/// Max concurrent background session operations (touch/delete).
/// Prevents unbounded task spawning under high request load.
pub const MAX_SESSION_BACKGROUND_TASKS: usize = 64;
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub storage: DynStorageBackend, pub storage: DynStorageBackend,
@ -30,4 +34,5 @@ pub struct AppState {
pub transcode_service: Option<Arc<TranscodeService>>, pub transcode_service: Option<Arc<TranscodeService>>,
pub managed_storage: Option<Arc<ManagedStorageService>>, pub managed_storage: Option<Arc<ManagedStorageService>>,
pub chunked_upload_manager: Option<Arc<ChunkedUploadManager>>, pub chunked_upload_manager: Option<Arc<ChunkedUploadManager>>,
pub session_semaphore: Arc<Semaphore>,
} }

View file

@ -185,6 +185,7 @@ async fn setup_app() -> axum::Router {
transcode_service: None, transcode_service: None,
managed_storage: None, managed_storage: None,
chunked_upload_manager: None, chunked_upload_manager: None,
session_semaphore: Arc::new(tokio::sync::Semaphore::new(64)),
}; };
pinakes_server::app::create_router(state) pinakes_server::app::create_router(state)
@ -259,6 +260,7 @@ async fn setup_app_with_auth() -> (axum::Router, String, String, String) {
transcode_service: None, transcode_service: None,
managed_storage: None, managed_storage: None,
chunked_upload_manager: None, chunked_upload_manager: None,
session_semaphore: Arc::new(tokio::sync::Semaphore::new(64)),
}; };
let app = pinakes_server::app::create_router(state); let app = pinakes_server::app::create_router(state);

View file

@ -144,6 +144,7 @@ async fn setup_app_with_plugins()
transcode_service: None, transcode_service: None,
managed_storage: None, managed_storage: None,
chunked_upload_manager: None, chunked_upload_manager: None,
session_semaphore: Arc::new(tokio::sync::Semaphore::new(64)),
}; };
let router = pinakes_server::app::create_router(state); let router = pinakes_server::app::create_router(state);