diff --git a/Cargo.lock b/Cargo.lock index 8ba5120..18005af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4819,8 +4819,10 @@ dependencies = [ "matroska", "mime_guess", "moka", + "native-tls", "notify", "pinakes-plugin-api", + "postgres-native-tls", "postgres-types", "refinery", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 25d81bc..1c81955 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,8 @@ tokio-postgres = { version = "0.7.16", features = [ ] } deadpool-postgres = "0.14.1" postgres-types = { version = "0.2.12", features = ["derive"] } +postgres-native-tls = "0.5.2" +native-tls = "0.2.14" # Migrations refinery = { version = "0.9.0", features = ["rusqlite", "tokio-postgres"] } diff --git a/crates/pinakes-core/Cargo.toml b/crates/pinakes-core/Cargo.toml index 6d1f3d2..f809097 100644 --- a/crates/pinakes-core/Cargo.toml +++ b/crates/pinakes-core/Cargo.toml @@ -24,6 +24,8 @@ rusqlite = { workspace = true } tokio-postgres = { workspace = true } deadpool-postgres = { workspace = true } postgres-types = { workspace = true } +postgres-native-tls = { workspace = true } +native-tls = { workspace = true } refinery = { workspace = true } walkdir = { workspace = true } notify = { workspace = true } diff --git a/crates/pinakes-core/src/config.rs b/crates/pinakes-core/src/config.rs index 5d3e7a6..7da84f2 100644 --- a/crates/pinakes-core/src/config.rs +++ b/crates/pinakes-core/src/config.rs @@ -2,6 +2,78 @@ use std::path::{Path, PathBuf}; use serde::{Deserialize, Serialize}; +/// Expand environment variables in a string. +/// Supports both ${VAR_NAME} and $VAR_NAME syntax. +/// Returns an error if a referenced variable is not set. +fn expand_env_var_string(input: &str) -> crate::error::Result { + let mut result = String::new(); + let mut chars = input.chars().peekable(); + + while let Some(ch) = chars.next() { + if ch == '$' { + // Check if it's ${VAR} or $VAR syntax + let use_braces = chars.peek() == Some(&'{'); + if use_braces { + chars.next(); // consume '{' + } + + // Collect variable name + let mut var_name = String::new(); + while let Some(&next_ch) = chars.peek() { + if use_braces { + if next_ch == '}' { + chars.next(); // consume '}' + break; + } + var_name.push(next_ch); + chars.next(); + } else { + // For $VAR syntax, stop at non-alphanumeric/underscore + if next_ch.is_alphanumeric() || next_ch == '_' { + var_name.push(next_ch); + chars.next(); + } else { + break; + } + } + } + + if var_name.is_empty() { + return Err(crate::error::PinakesError::Config( + "empty environment variable name".to_string(), + )); + } + + // Look up the environment variable + match std::env::var(&var_name) { + Ok(value) => result.push_str(&value), + Err(_) => { + return Err(crate::error::PinakesError::Config(format!( + "environment variable not set: {}", + var_name + ))); + } + } + } else if ch == '\\' { + // Handle escaped characters + if let Some(&next_ch) = chars.peek() { + if next_ch == '$' { + chars.next(); // consume the escaped $ + result.push('$'); + } else { + result.push(ch); + } + } else { + result.push(ch); + } + } else { + result.push(ch); + } + } + + Ok(result) +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { pub storage: StorageConfig, @@ -456,6 +528,15 @@ pub struct PostgresConfig { pub username: String, pub password: String, pub max_connections: usize, + /// Enable TLS for PostgreSQL connections + #[serde(default)] + pub tls_enabled: bool, + /// Verify TLS certificates (default: true) + #[serde(default = "default_true")] + pub tls_verify_ca: bool, + /// Path to custom CA certificate file (PEM format) + #[serde(default)] + pub tls_ca_cert_path: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -484,6 +565,11 @@ pub struct ServerConfig { /// If set, all requests (except /health) must include `Authorization: Bearer `. /// Can also be set via `PINAKES_API_KEY` environment variable. pub api_key: Option, + /// Explicitly disable authentication (INSECURE - use only for development). + /// When true, all requests are allowed without authentication. + /// This must be explicitly set to true; empty api_key alone is not sufficient. + #[serde(default)] + pub authentication_disabled: bool, /// TLS/HTTPS configuration #[serde(default)] pub tls: TlsConfig, @@ -570,8 +656,45 @@ impl Config { let content = std::fs::read_to_string(path).map_err(|e| { crate::error::PinakesError::Config(format!("failed to read config file: {e}")) })?; - toml::from_str(&content) - .map_err(|e| crate::error::PinakesError::Config(format!("failed to parse config: {e}"))) + let mut config: Self = toml::from_str(&content).map_err(|e| { + crate::error::PinakesError::Config(format!("failed to parse config: {e}")) + })?; + config.expand_env_vars()?; + Ok(config) + } + + /// Expand environment variables in secret fields. + /// Supports ${VAR_NAME} and $VAR_NAME syntax. + fn expand_env_vars(&mut self) -> crate::error::Result<()> { + // Postgres password + if let Some(ref mut postgres) = self.storage.postgres { + postgres.password = expand_env_var_string(&postgres.password)?; + } + + // Server API key + if let Some(ref api_key) = self.server.api_key { + self.server.api_key = Some(expand_env_var_string(api_key)?); + } + + // Webhook secrets + for webhook in &mut self.webhooks { + if let Some(ref secret) = webhook.secret { + webhook.secret = Some(expand_env_var_string(secret)?); + } + } + + // Enrichment API keys + if let Some(ref api_key) = self.enrichment.sources.musicbrainz.api_key { + self.enrichment.sources.musicbrainz.api_key = Some(expand_env_var_string(api_key)?); + } + if let Some(ref api_key) = self.enrichment.sources.tmdb.api_key { + self.enrichment.sources.tmdb.api_key = Some(expand_env_var_string(api_key)?); + } + if let Some(ref api_key) = self.enrichment.sources.lastfm.api_key { + self.enrichment.sources.lastfm.api_key = Some(expand_env_var_string(api_key)?); + } + + Ok(()) } /// Try loading from file, falling back to defaults if the file doesn't exist. @@ -643,6 +766,50 @@ impl Config { if self.scanning.import_concurrency == 0 || self.scanning.import_concurrency > 256 { return Err("import_concurrency must be between 1 and 256".into()); } + + // Validate authentication configuration + let has_api_key = self + .server + .api_key + .as_ref() + .map_or(false, |k| !k.is_empty()); + let has_accounts = !self.accounts.users.is_empty(); + let auth_disabled = self.server.authentication_disabled; + + if !auth_disabled && !has_api_key && !has_accounts { + return Err( + "authentication is not configured: set an api_key, configure user accounts, \ + or explicitly set authentication_disabled = true" + .into(), + ); + } + + // Empty API key is not allowed (must use authentication_disabled flag) + if let Some(ref api_key) = self.server.api_key { + if api_key.is_empty() { + return Err("empty api_key is not allowed. To disable authentication, \ + set authentication_disabled = true instead" + .into()); + } + } + + // Require TLS when authentication is enabled on non-localhost + let is_localhost = self.server.host == "127.0.0.1" + || self.server.host == "localhost" + || self.server.host == "::1"; + + if (has_api_key || has_accounts) + && !auth_disabled + && !is_localhost + && !self.server.tls.enabled + { + return Err( + "TLS must be enabled when authentication is used on non-localhost hosts. \ + Set server.tls.enabled = true or bind to localhost only" + .into(), + ); + } + // Validate TLS configuration self.server.tls.validate()?; Ok(()) @@ -690,6 +857,7 @@ impl Default for Config { host: "127.0.0.1".to_string(), port: 3000, api_key: None, + authentication_disabled: false, tls: TlsConfig::default(), }, ui: UiConfig::default(), @@ -714,6 +882,7 @@ mod tests { fn test_config_with_concurrency(concurrency: usize) -> Config { let mut config = Config::default(); config.scanning.import_concurrency = concurrency; + config.server.authentication_disabled = true; // Disable auth for concurrency tests config } @@ -758,4 +927,125 @@ mod tests { let config = test_config_with_concurrency(256); assert!(config.validate().is_ok()); } + + // Environment variable expansion tests + #[test] + fn test_expand_env_var_simple() { + unsafe { + std::env::set_var("TEST_VAR_SIMPLE", "test_value"); + } + let result = expand_env_var_string("$TEST_VAR_SIMPLE"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "test_value"); + unsafe { + std::env::remove_var("TEST_VAR_SIMPLE"); + } + } + + #[test] + fn test_expand_env_var_braces() { + unsafe { + std::env::set_var("TEST_VAR_BRACES", "test_value"); + } + let result = expand_env_var_string("${TEST_VAR_BRACES}"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "test_value"); + unsafe { + std::env::remove_var("TEST_VAR_BRACES"); + } + } + + #[test] + fn test_expand_env_var_embedded() { + unsafe { + std::env::set_var("TEST_VAR_EMBEDDED", "value"); + } + let result = expand_env_var_string("prefix_${TEST_VAR_EMBEDDED}_suffix"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "prefix_value_suffix"); + unsafe { + std::env::remove_var("TEST_VAR_EMBEDDED"); + } + } + + #[test] + fn test_expand_env_var_multiple() { + unsafe { + std::env::set_var("VAR1", "value1"); + std::env::set_var("VAR2", "value2"); + } + let result = expand_env_var_string("${VAR1}_${VAR2}"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "value1_value2"); + unsafe { + std::env::remove_var("VAR1"); + std::env::remove_var("VAR2"); + } + } + + #[test] + fn test_expand_env_var_missing() { + let result = expand_env_var_string("${NONEXISTENT_VAR}"); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("environment variable not set") + ); + } + + #[test] + fn test_expand_env_var_empty_name() { + let result = expand_env_var_string("${}"); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("empty environment variable name") + ); + } + + #[test] + fn test_expand_env_var_escaped() { + let result = expand_env_var_string("\\$NOT_A_VAR"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "$NOT_A_VAR"); + } + + #[test] + fn test_expand_env_var_no_vars() { + let result = expand_env_var_string("plain_text"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "plain_text"); + } + + #[test] + fn test_expand_env_var_underscore() { + unsafe { + std::env::set_var("TEST_VAR_NAME", "value"); + } + let result = expand_env_var_string("$TEST_VAR_NAME"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "value"); + unsafe { + std::env::remove_var("TEST_VAR_NAME"); + } + } + + #[test] + fn test_expand_env_var_mixed_syntax() { + unsafe { + std::env::set_var("VAR1_MIXED", "v1"); + std::env::set_var("VAR2_MIXED", "v2"); + } + let result = expand_env_var_string("$VAR1_MIXED and ${VAR2_MIXED}"); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "v1 and v2"); + unsafe { + std::env::remove_var("VAR1_MIXED"); + std::env::remove_var("VAR2_MIXED"); + } + } } diff --git a/crates/pinakes-core/src/integrity.rs b/crates/pinakes-core/src/integrity.rs index 258c85e..4f9c1c0 100644 --- a/crates/pinakes-core/src/integrity.rs +++ b/crates/pinakes-core/src/integrity.rs @@ -1,3 +1,4 @@ +use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use serde::{Deserialize, Serialize}; @@ -5,6 +6,7 @@ use tracing::{info, warn}; use crate::error::Result; use crate::hash::compute_file_hash; +use crate::media_type::MediaType; use crate::model::{ContentHash, MediaId}; use crate::storage::DynStorageBackend; @@ -66,31 +68,202 @@ impl std::str::FromStr for IntegrityStatus { } } -/// Detect orphaned media items (files that no longer exist on disk). +/// Detect orphaned media items (files that no longer exist on disk), +/// untracked files (files on disk not in database), and moved files (same hash, different path). pub async fn detect_orphans(storage: &DynStorageBackend) -> Result { let media_paths = storage.list_media_paths().await?; let mut orphaned_ids = Vec::new(); - let moved_files = Vec::new(); + // Build hash index: ContentHash -> Vec<(MediaId, PathBuf)> + let mut hash_index: HashMap> = HashMap::new(); + for (id, path, hash) in &media_paths { + hash_index + .entry(hash.clone()) + .or_insert_with(Vec::new) + .push((*id, path.clone())); + } + + // Detect orphaned files (in DB but not on disk) for (id, path, _hash) in &media_paths { if !path.exists() { orphaned_ids.push(*id); } } + // Detect moved files (orphaned items with same hash existing elsewhere) + let moved_files = detect_moved_files(&orphaned_ids, &media_paths, &hash_index); + + // Detect untracked files (on disk but not in DB) + let untracked_paths = detect_untracked_files(storage, &media_paths).await?; + info!( orphaned = orphaned_ids.len(), + untracked = untracked_paths.len(), + moved = moved_files.len(), total = media_paths.len(), "orphan detection complete" ); Ok(OrphanReport { orphaned_ids, - untracked_paths: Vec::new(), + untracked_paths, moved_files, }) } +/// Detect files that appear to have moved (same content hash, different path). +fn detect_moved_files( + orphaned_ids: &[MediaId], + media_paths: &[(MediaId, PathBuf, ContentHash)], + hash_index: &HashMap>, +) -> Vec<(MediaId, PathBuf, PathBuf)> { + let mut moved = Vec::new(); + + // Build lookup map for orphaned items: MediaId -> (PathBuf, ContentHash) + let orphaned_map: HashMap = media_paths + .iter() + .filter(|(id, _, _)| orphaned_ids.contains(id)) + .map(|(id, path, hash)| (*id, (path.clone(), hash.clone()))) + .collect(); + + // For each orphaned item, check if there's another file with the same hash + for (orphaned_id, (old_path, hash)) in &orphaned_map { + if let Some(items_with_hash) = hash_index.get(hash) { + // Find other items with same hash that exist on disk + for (other_id, new_path) in items_with_hash { + // Skip if it's the same item + if other_id == orphaned_id { + continue; + } + + // Check if the new path exists + if new_path.exists() { + moved.push((*orphaned_id, old_path.clone(), new_path.clone())); + // Only report first match (most likely candidate) + break; + } + } + } + } + + moved +} + +/// Detect files on disk that are not tracked in the database. +async fn detect_untracked_files( + storage: &DynStorageBackend, + media_paths: &[(MediaId, PathBuf, ContentHash)], +) -> Result> { + // Get root directories + let roots = storage.list_root_dirs().await?; + if roots.is_empty() { + return Ok(Vec::new()); + } + + // Build set of tracked paths for fast lookup + let tracked_paths: HashSet = media_paths + .iter() + .map(|(_, path, _)| path.clone()) + .collect(); + + // Get ignore patterns (we'll need to load config somehow, for now use empty) + let ignore_patterns: Vec = vec![ + ".*".to_string(), + "node_modules".to_string(), + "__pycache__".to_string(), + "target".to_string(), + ]; + + // Walk filesystem for each root in parallel (limit concurrency to 4) + let mut filesystem_paths = HashSet::new(); + let mut tasks = tokio::task::JoinSet::new(); + + for root in roots { + let ignore_patterns = ignore_patterns.clone(); + tasks.spawn_blocking(move || -> Result> { + let mut paths = Vec::new(); + + let walker = walkdir::WalkDir::new(&root) + .follow_links(false) + .into_iter() + .filter_entry(|e| { + // Skip directories that match ignore patterns + if e.file_type().is_dir() { + let name = e.file_name().to_string_lossy(); + for pattern in &ignore_patterns { + if pattern.starts_with("*.") { + // Extension pattern + if let Some(ext) = pattern.strip_prefix("*.") { + if name.ends_with(ext) { + return false; + } + } + } else if pattern.contains('*') { + // Glob pattern - simplified matching + let pattern_without_stars = pattern.replace('*', ""); + if name.contains(&pattern_without_stars) { + return false; + } + } else if name.as_ref() == pattern + || name.starts_with(&format!("{pattern}.")) + { + // Exact match or starts with pattern + return false; + } + } + } + true + }); + + for entry in walker { + match entry { + Ok(entry) => { + let path = entry.path(); + + // Only process files + if !path.is_file() { + continue; + } + + // Check if it's a supported media type + if MediaType::from_path(path).is_some() { + paths.push(path.to_path_buf()); + } + } + Err(e) => { + warn!(error = %e, "failed to read directory entry"); + } + } + } + + Ok(paths) + }); + } + + // Collect results from all tasks + while let Some(result) = tasks.join_next().await { + match result { + Ok(Ok(paths)) => { + filesystem_paths.extend(paths); + } + Ok(Err(e)) => { + warn!(error = %e, "failed to walk directory"); + } + Err(e) => { + warn!(error = %e, "task join error"); + } + } + } + + // Compute set difference: filesystem - tracked + let untracked: Vec = filesystem_paths + .difference(&tracked_paths) + .cloned() + .collect(); + + Ok(untracked) +} + /// Resolve orphaned media items by deleting them from the database. pub async fn resolve_orphans( storage: &DynStorageBackend, diff --git a/crates/pinakes-core/src/storage/mod.rs b/crates/pinakes-core/src/storage/mod.rs index e2aad80..f2154f5 100644 --- a/crates/pinakes-core/src/storage/mod.rs +++ b/crates/pinakes-core/src/storage/mod.rs @@ -31,6 +31,18 @@ pub struct DatabaseStats { pub backend_name: String, } +/// Session data for database-backed session storage. +#[derive(Debug, Clone)] +pub struct SessionData { + pub session_token: String, + pub user_id: Option, + pub username: String, + pub role: String, + pub created_at: DateTime, + pub expires_at: DateTime, + pub last_accessed: DateTime, +} + #[async_trait::async_trait] pub trait StorageBackend: Send + Sync + 'static { // Migrations @@ -412,6 +424,28 @@ pub trait StorageBackend: Send + Sync + 'static { progress: f32, ) -> Result<()>; async fn cleanup_expired_transcodes(&self, before: DateTime) -> Result; + + // ===== Session Management ===== + /// Create a new session in the database + async fn create_session(&self, session: &SessionData) -> Result<()>; + + /// Get a session by its token, returns None if not found or expired + async fn get_session(&self, session_token: &str) -> Result>; + + /// Update the last_accessed timestamp for a session + async fn touch_session(&self, session_token: &str) -> Result<()>; + + /// Delete a specific session + async fn delete_session(&self, session_token: &str) -> Result<()>; + + /// Delete all sessions for a specific user + async fn delete_user_sessions(&self, username: &str) -> Result; + + /// Delete all expired sessions (where expires_at < now) + async fn delete_expired_sessions(&self) -> Result; + + /// List all active sessions (optionally filtered by username) + async fn list_active_sessions(&self, username: Option<&str>) -> Result>; } /// Comprehensive library statistics. diff --git a/crates/pinakes-core/src/storage/postgres.rs b/crates/pinakes-core/src/storage/postgres.rs index afa35de..c48dcdf 100644 --- a/crates/pinakes-core/src/storage/postgres.rs +++ b/crates/pinakes-core/src/storage/postgres.rs @@ -3,6 +3,8 @@ use std::path::PathBuf; use chrono::Utc; use deadpool_postgres::{Config as PoolConfig, Pool, Runtime}; +use native_tls::TlsConnector; +use postgres_native_tls::MakeTlsConnector; use tokio_postgres::types::ToSql; use tokio_postgres::{NoTls, Row}; use uuid::Uuid; @@ -27,19 +29,72 @@ impl PostgresBackend { pool_config.user = Some(config.username.clone()); pool_config.password = Some(config.password.clone()); - let pool = pool_config - .create_pool(Some(Runtime::Tokio1), NoTls) - .map_err(|e| { - PinakesError::Database(format!("failed to create connection pool: {e}")) + if config.tls_enabled { + // Build TLS connector + let mut tls_builder = TlsConnector::builder(); + + // Load custom CA certificate if provided + if let Some(ref ca_cert_path) = config.tls_ca_cert_path { + let cert_bytes = std::fs::read(ca_cert_path).map_err(|e| { + PinakesError::Config(format!( + "failed to read CA certificate file {}: {e}", + ca_cert_path.display() + )) + })?; + let cert = native_tls::Certificate::from_pem(&cert_bytes).map_err(|e| { + PinakesError::Config(format!( + "failed to parse CA certificate {}: {e}", + ca_cert_path.display() + )) + })?; + tls_builder.add_root_certificate(cert); + } + + // Configure certificate validation + if !config.tls_verify_ca { + tracing::warn!( + "PostgreSQL TLS certificate verification disabled - this is insecure!" + ); + tls_builder.danger_accept_invalid_certs(true); + } + + let connector = tls_builder.build().map_err(|e| { + PinakesError::Database(format!("failed to build TLS connector: {e}")) + })?; + let tls = MakeTlsConnector::new(connector); + + let pool = pool_config + .create_pool(Some(Runtime::Tokio1), tls) + .map_err(|e| { + PinakesError::Database(format!("failed to create connection pool: {e}")) + })?; + + // Verify connectivity + let _ = pool.get().await.map_err(|e| { + PinakesError::Database(format!("failed to connect to postgres: {e}")) })?; - // Verify connectivity - let _ = pool - .get() - .await - .map_err(|e| PinakesError::Database(format!("failed to connect to postgres: {e}")))?; + tracing::info!("PostgreSQL connection established with TLS"); + Ok(Self { pool }) + } else { + tracing::warn!( + "PostgreSQL TLS is disabled - connection is unencrypted. \ + Set postgres.tls_enabled = true to enable encryption." + ); - Ok(Self { pool }) + let pool = pool_config + .create_pool(Some(Runtime::Tokio1), NoTls) + .map_err(|e| { + PinakesError::Database(format!("failed to create connection pool: {e}")) + })?; + + // Verify connectivity + let _ = pool.get().await.map_err(|e| { + PinakesError::Database(format!("failed to connect to postgres: {e}")) + })?; + + Ok(Self { pool }) + } } } @@ -3229,6 +3284,167 @@ impl StorageBackend for PostgresBackend { .await?; Ok(affected) } + + // ===== Session Management ===== + + async fn create_session(&self, session: &crate::storage::SessionData) -> Result<()> { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + client + .execute( + "INSERT INTO sessions (session_token, user_id, username, role, created_at, expires_at, last_accessed) + VALUES ($1, $2, $3, $4, $5, $6, $7)", + &[ + &session.session_token, + &session.user_id, + &session.username, + &session.role, + &session.created_at, + &session.expires_at, + &session.last_accessed, + ], + ) + .await?; + Ok(()) + } + + async fn get_session( + &self, + session_token: &str, + ) -> Result> { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + let row = client + .query_opt( + "SELECT session_token, user_id, username, role, created_at, expires_at, last_accessed + FROM sessions WHERE session_token = $1", + &[&session_token], + ) + .await?; + + Ok(row.map(|r| crate::storage::SessionData { + session_token: r.get(0), + user_id: r.get(1), + username: r.get(2), + role: r.get(3), + created_at: r.get(4), + expires_at: r.get(5), + last_accessed: r.get(6), + })) + } + + async fn touch_session(&self, session_token: &str) -> Result<()> { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + let now = chrono::Utc::now(); + client + .execute( + "UPDATE sessions SET last_accessed = $1 WHERE session_token = $2", + &[&now, &session_token], + ) + .await?; + Ok(()) + } + + async fn delete_session(&self, session_token: &str) -> Result<()> { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + client + .execute( + "DELETE FROM sessions WHERE session_token = $1", + &[&session_token], + ) + .await?; + Ok(()) + } + + async fn delete_user_sessions(&self, username: &str) -> Result { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + let affected = client + .execute("DELETE FROM sessions WHERE username = $1", &[&username]) + .await?; + Ok(affected) + } + + async fn delete_expired_sessions(&self) -> Result { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + let now = chrono::Utc::now(); + let affected = client + .execute("DELETE FROM sessions WHERE expires_at < $1", &[&now]) + .await?; + Ok(affected) + } + + async fn list_active_sessions( + &self, + username: Option<&str>, + ) -> Result> { + let client = self + .pool + .get() + .await + .map_err(|e| PinakesError::Database(format!("pool error: {e}")))?; + + let now = chrono::Utc::now(); + let rows = if let Some(user) = username { + client + .query( + "SELECT session_token, user_id, username, role, created_at, expires_at, last_accessed + FROM sessions WHERE expires_at > $1 AND username = $2 + ORDER BY last_accessed DESC", + &[&now, &user], + ) + .await? + } else { + client + .query( + "SELECT session_token, user_id, username, role, created_at, expires_at, last_accessed + FROM sessions WHERE expires_at > $1 + ORDER BY last_accessed DESC", + &[&now], + ) + .await? + }; + + Ok(rows + .into_iter() + .map(|r| crate::storage::SessionData { + session_token: r.get(0), + user_id: r.get(1), + username: r.get(2), + role: r.get(3), + created_at: r.get(4), + expires_at: r.get(5), + last_accessed: r.get(6), + }) + .collect()) + } } impl PostgresBackend { diff --git a/crates/pinakes-core/src/storage/sqlite.rs b/crates/pinakes-core/src/storage/sqlite.rs index f226856..7d47673 100644 --- a/crates/pinakes-core/src/storage/sqlite.rs +++ b/crates/pinakes-core/src/storage/sqlite.rs @@ -3580,6 +3580,227 @@ impl StorageBackend for SqliteBackend { .map_err(|_| PinakesError::Database("cleanup_expired_transcodes timed out".into()))? .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? } + + // ===== Session Management ===== + + async fn create_session(&self, session: &crate::storage::SessionData) -> Result<()> { + let conn = self.conn.clone(); + let session_token = session.session_token.clone(); + let user_id = session.user_id.clone(); + let username = session.username.clone(); + let role = session.role.clone(); + let created_at = session.created_at.to_rfc3339(); + let expires_at = session.expires_at.to_rfc3339(); + let last_accessed = session.last_accessed.to_rfc3339(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + db.execute( + "INSERT INTO sessions (session_token, user_id, username, role, created_at, expires_at, last_accessed) + VALUES (?, ?, ?, ?, ?, ?, ?)", + params![ + &session_token, + &user_id, + &username, + &role, + &created_at, + &expires_at, + &last_accessed + ], + )?; + Ok(()) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("create_session timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } + + async fn get_session( + &self, + session_token: &str, + ) -> Result> { + let conn = self.conn.clone(); + let token = session_token.to_string(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + + let result = db + .query_row( + "SELECT session_token, user_id, username, role, created_at, expires_at, last_accessed + FROM sessions WHERE session_token = ?", + [&token], + |row| { + let created_at_str: String = row.get(4)?; + let expires_at_str: String = row.get(5)?; + let last_accessed_str: String = row.get(6)?; + + Ok(crate::storage::SessionData { + session_token: row.get(0)?, + user_id: row.get(1)?, + username: row.get(2)?, + role: row.get(3)?, + created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))? + .with_timezone(&chrono::Utc), + expires_at: chrono::DateTime::parse_from_rfc3339(&expires_at_str) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))? + .with_timezone(&chrono::Utc), + last_accessed: chrono::DateTime::parse_from_rfc3339(&last_accessed_str) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))? + .with_timezone(&chrono::Utc), + }) + }, + ) + .optional()?; + + Ok(result) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("get_session timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } + + async fn touch_session(&self, session_token: &str) -> Result<()> { + let conn = self.conn.clone(); + let token = session_token.to_string(); + let now = chrono::Utc::now().to_rfc3339(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + db.execute( + "UPDATE sessions SET last_accessed = ? WHERE session_token = ?", + params![&now, &token], + )?; + Ok(()) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("touch_session timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } + + async fn delete_session(&self, session_token: &str) -> Result<()> { + let conn = self.conn.clone(); + let token = session_token.to_string(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + db.execute("DELETE FROM sessions WHERE session_token = ?", [&token])?; + Ok(()) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("delete_session timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } + + async fn delete_user_sessions(&self, username: &str) -> Result { + let conn = self.conn.clone(); + let user = username.to_string(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + let affected = db.execute("DELETE FROM sessions WHERE username = ?", [&user])?; + Ok(affected as u64) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("delete_user_sessions timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } + + async fn delete_expired_sessions(&self) -> Result { + let conn = self.conn.clone(); + let now = chrono::Utc::now().to_rfc3339(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + let affected = db.execute("DELETE FROM sessions WHERE expires_at < ?", [&now])?; + Ok(affected as u64) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("delete_expired_sessions timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } + + async fn list_active_sessions( + &self, + username: Option<&str>, + ) -> Result> { + let conn = self.conn.clone(); + let user_filter = username.map(|s| s.to_string()); + let now = chrono::Utc::now().to_rfc3339(); + + let fut = tokio::task::spawn_blocking(move || { + let db = conn.lock().map_err(|e| { + PinakesError::Database(format!("failed to acquire database lock: {}", e)) + })?; + + let (query, params): (&str, Vec) = if let Some(user) = user_filter { + ( + "SELECT session_token, user_id, username, role, created_at, expires_at, last_accessed + FROM sessions WHERE expires_at > ? AND username = ? + ORDER BY last_accessed DESC", + vec![now, user], + ) + } else { + ( + "SELECT session_token, user_id, username, role, created_at, expires_at, last_accessed + FROM sessions WHERE expires_at > ? + ORDER BY last_accessed DESC", + vec![now], + ) + }; + + let mut stmt = db.prepare(query)?; + let param_refs: Vec<&dyn rusqlite::ToSql> = + params.iter().map(|p| p as &dyn rusqlite::ToSql).collect(); + let rows = stmt.query_map(¶m_refs[..], |row| { + let created_at_str: String = row.get(4)?; + let expires_at_str: String = row.get(5)?; + let last_accessed_str: String = row.get(6)?; + + Ok(crate::storage::SessionData { + session_token: row.get(0)?, + user_id: row.get(1)?, + username: row.get(2)?, + role: row.get(3)?, + created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))? + .with_timezone(&chrono::Utc), + expires_at: chrono::DateTime::parse_from_rfc3339(&expires_at_str) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))? + .with_timezone(&chrono::Utc), + last_accessed: chrono::DateTime::parse_from_rfc3339(&last_accessed_str) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))? + .with_timezone(&chrono::Utc), + }) + })?; + + rows.collect::, _>>() + .map_err(|e| e.into()) + }); + tokio::time::timeout(std::time::Duration::from_secs(10), fut) + .await + .map_err(|_| PinakesError::Database("list_active_sessions timed out".into()))? + .map_err(|e: tokio::task::JoinError| PinakesError::Database(e.to_string()))? + } } // Needed for `query_row(...).optional()` diff --git a/crates/pinakes-core/tests/integrity_enhanced_test.rs b/crates/pinakes-core/tests/integrity_enhanced_test.rs new file mode 100644 index 0000000..86d44b2 --- /dev/null +++ b/crates/pinakes-core/tests/integrity_enhanced_test.rs @@ -0,0 +1,262 @@ +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; + +use pinakes_core::integrity::detect_orphans; +use pinakes_core::media_type::{BuiltinMediaType, MediaType}; +use pinakes_core::model::{ContentHash, MediaId, MediaItem}; +use pinakes_core::storage::{DynStorageBackend, StorageBackend, sqlite::SqliteBackend}; +use tempfile::TempDir; +use uuid::Uuid; + +async fn setup_test_storage() -> (DynStorageBackend, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join(format!("test_{}.db", Uuid::now_v7())); + + let storage = SqliteBackend::new(&db_path).unwrap(); + storage.run_migrations().await.unwrap(); + + (Arc::new(storage), temp_dir) +} + +fn create_test_media_item(path: PathBuf, hash: &str) -> MediaItem { + use std::collections::HashMap; + + MediaItem { + id: MediaId(Uuid::now_v7()), + path, + file_name: "test.mp3".to_string(), + media_type: MediaType::Builtin(BuiltinMediaType::Mp3), + content_hash: ContentHash(hash.to_string()), + file_size: 1000, + title: None, + artist: None, + album: None, + genre: None, + year: None, + duration_secs: None, + description: None, + thumbnail_path: None, + custom_fields: HashMap::new(), + file_mtime: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + } +} + +#[tokio::test] +async fn test_detect_orphaned_files() { + let (storage, temp_dir) = setup_test_storage().await; + + // Create a media item pointing to a file that doesn't exist + let nonexistent_path = temp_dir.path().join("nonexistent.mp3"); + let orphaned_item = create_test_media_item(nonexistent_path, "hash1"); + + storage.insert_media(&orphaned_item).await.unwrap(); + + // Detect orphans + let report = detect_orphans(&storage).await.unwrap(); + + // Should detect the orphaned file + assert_eq!(report.orphaned_ids.len(), 1); + assert_eq!(report.orphaned_ids[0], orphaned_item.id); +} + +#[tokio::test] +async fn test_detect_untracked_files() { + let (storage, temp_dir) = setup_test_storage().await; + + // Create a root directory + let root_dir = temp_dir.path().join("media"); + fs::create_dir(&root_dir).unwrap(); + storage.add_root_dir(root_dir.clone()).await.unwrap(); + + // Create actual files on disk + let tracked_file = root_dir.join("tracked.mp3"); + let untracked_file = root_dir.join("untracked.mp3"); + + fs::write(&tracked_file, b"tracked content").unwrap(); + fs::write(&untracked_file, b"untracked content").unwrap(); + + // Add only one file to the database + let tracked_item = create_test_media_item(tracked_file.clone(), "hash_tracked"); + storage.insert_media(&tracked_item).await.unwrap(); + + // Detect orphans (including untracked files) + let report = detect_orphans(&storage).await.unwrap(); + + // Should detect the untracked file + assert_eq!(report.untracked_paths.len(), 1); + assert!(report.untracked_paths.contains(&untracked_file)); +} + +#[tokio::test] +async fn test_detect_moved_files() { + // Note: Due to UNIQUE constraint on content_hash, moved files detection + // won't find true duplicates. This test validates the detection logic + // works but won't find matches due to schema constraints. + + let (storage, temp_dir) = setup_test_storage().await; + + // Create files + let old_path = temp_dir.path().join("old_location.mp3"); + + fs::write(&old_path, b"content").unwrap(); + + // Create media item + let old_item = create_test_media_item(old_path.clone(), "hash_unique"); + storage.insert_media(&old_item).await.unwrap(); + + // Delete the file to make it orphaned + fs::remove_file(&old_path).unwrap(); + + // Detect orphans + let report = detect_orphans(&storage).await.unwrap(); + + // Should detect the orphaned file, but no moved files (no duplicates exist) + assert_eq!(report.orphaned_ids.len(), 1); + // With UNIQUE constraint on content_hash, we can't have duplicates, + // so moved_files will be empty + assert_eq!(report.moved_files.len(), 0); +} + +#[tokio::test] +async fn test_ignore_patterns_respected() { + let (storage, temp_dir) = setup_test_storage().await; + + // Create a root directory + let root_dir = temp_dir.path().join("media"); + fs::create_dir(&root_dir).unwrap(); + storage.add_root_dir(root_dir.clone()).await.unwrap(); + + // Create a hidden directory that should be ignored + let hidden_dir = root_dir.join(".hidden"); + fs::create_dir(&hidden_dir).unwrap(); + + let hidden_file = hidden_dir.join("hidden.mp3"); + fs::write(&hidden_file, b"hidden content").unwrap(); + + // Create a normal file + let normal_file = root_dir.join("normal.mp3"); + fs::write(&normal_file, b"normal content").unwrap(); + + // Detect orphans + let report = detect_orphans(&storage).await.unwrap(); + + // Should only detect the normal file, not the hidden one + assert_eq!(report.untracked_paths.len(), 1); + assert!(report.untracked_paths.contains(&normal_file)); + assert!(!report.untracked_paths.contains(&hidden_file)); +} + +#[tokio::test] +async fn test_only_supported_media_types() { + let (storage, temp_dir) = setup_test_storage().await; + + // Create a root directory + let root_dir = temp_dir.path().join("media"); + fs::create_dir(&root_dir).unwrap(); + storage.add_root_dir(root_dir.clone()).await.unwrap(); + + // Create files with different extensions + let mp3_file = root_dir.join("audio.mp3"); + let txt_file = root_dir.join("readme.txt"); + let exe_file = root_dir.join("program.exe"); + + fs::write(&mp3_file, b"audio").unwrap(); + fs::write(&txt_file, b"text").unwrap(); + fs::write(&exe_file, b"binary").unwrap(); + + // Detect orphans + let report = detect_orphans(&storage).await.unwrap(); + + // Should only detect supported media types (mp3 and txt are supported) + // exe should not be detected + assert!(report.untracked_paths.len() <= 2); + assert!(!report.untracked_paths.contains(&exe_file)); +} + +#[tokio::test] +async fn test_complete_orphan_workflow() { + let (storage, temp_dir) = setup_test_storage().await; + + // Setup root directory + let root_dir = temp_dir.path().join("media"); + fs::create_dir(&root_dir).unwrap(); + storage.add_root_dir(root_dir.clone()).await.unwrap(); + + // Create various scenarios + + // 1. Orphaned file (in DB, not on disk) + let orphaned_path = root_dir.join("orphaned.mp3"); + let orphaned_item = create_test_media_item(orphaned_path.clone(), "hash_orphaned"); + storage.insert_media(&orphaned_item).await.unwrap(); + + // 2. Untracked file (on disk, not in DB) + let untracked_path = root_dir.join("untracked.mp3"); + fs::write(&untracked_path, b"untracked").unwrap(); + + // 3. Another orphaned file (can't test moved files with UNIQUE constraint) + let another_orphaned = root_dir.join("another_orphaned.mp3"); + let another_item = create_test_media_item(another_orphaned.clone(), "hash_another"); + storage.insert_media(&another_item).await.unwrap(); + // Don't create the file, so it's orphaned + + // 4. Tracked file (normal case) + let tracked_path = root_dir.join("tracked.mp3"); + fs::write(&tracked_path, b"tracked").unwrap(); + + let tracked_item = create_test_media_item(tracked_path.clone(), "hash_tracked"); + storage.insert_media(&tracked_item).await.unwrap(); + + // Detect all orphans + let report = detect_orphans(&storage).await.unwrap(); + + // Verify results + assert_eq!(report.orphaned_ids.len(), 2); // orphaned + another_orphaned + assert!(report.orphaned_ids.contains(&orphaned_item.id)); + assert!(report.orphaned_ids.contains(&another_item.id)); + + assert_eq!(report.untracked_paths.len(), 1); + assert!(report.untracked_paths.contains(&untracked_path)); + + // No moved files due to UNIQUE constraint on content_hash + assert_eq!(report.moved_files.len(), 0); +} + +#[tokio::test] +async fn test_large_directory_performance() { + let (storage, temp_dir) = setup_test_storage().await; + + let root_dir = temp_dir.path().join("media"); + fs::create_dir(&root_dir).unwrap(); + storage.add_root_dir(root_dir.clone()).await.unwrap(); + + // Create many files + for i in 0..1000 { + let file_path = root_dir.join(format!("file_{}.mp3", i)); + fs::write(&file_path, format!("content {}", i)).unwrap(); + } + + // Add half to database + for i in 0..500 { + let file_path = root_dir.join(format!("file_{}.mp3", i)); + let item = create_test_media_item(file_path, &format!("hash_{}", i)); + storage.insert_media(&item).await.unwrap(); + } + + // Measure time + let start = std::time::Instant::now(); + let report = detect_orphans(&storage).await.unwrap(); + let elapsed = start.elapsed(); + + // Should complete in reasonable time (< 5 seconds for 1000 files) + assert!( + elapsed.as_secs() < 5, + "Detection took too long: {:?}", + elapsed + ); + + // Should detect 500 untracked files + assert_eq!(report.untracked_paths.len(), 500); +} diff --git a/crates/pinakes-core/tests/session_persistence_test.rs b/crates/pinakes-core/tests/session_persistence_test.rs new file mode 100644 index 0000000..61951c4 --- /dev/null +++ b/crates/pinakes-core/tests/session_persistence_test.rs @@ -0,0 +1,308 @@ +use chrono::Utc; +use pinakes_core::storage::{SessionData, StorageBackend}; +use tempfile::TempDir; + +async fn setup_sqlite_storage() -> pinakes_core::storage::sqlite::SqliteBackend { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir + .path() + .join(format!("test_{}.db", uuid::Uuid::now_v7())); + + let storage = pinakes_core::storage::sqlite::SqliteBackend::new(&db_path).unwrap(); + storage.run_migrations().await.unwrap(); + + // Keep temp_dir alive by leaking it (tests are short-lived anyway) + std::mem::forget(temp_dir); + + storage +} + +#[tokio::test] +async fn test_create_and_get_session() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + let session = SessionData { + session_token: "test_token_123".to_string(), + user_id: Some("user_1".to_string()), + username: "testuser".to_string(), + role: "admin".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + + // Create session + storage.create_session(&session).await.unwrap(); + + // Get session + let retrieved = storage.get_session("test_token_123").await.unwrap(); + assert!(retrieved.is_some()); + + let retrieved = retrieved.unwrap(); + assert_eq!(retrieved.session_token, "test_token_123"); + assert_eq!(retrieved.username, "testuser"); + assert_eq!(retrieved.role, "admin"); +} + +#[tokio::test] +async fn test_get_nonexistent_session() { + let storage = setup_sqlite_storage().await; + + let result = storage.get_session("nonexistent").await.unwrap(); + assert!(result.is_none()); +} + +#[tokio::test] +async fn test_touch_session() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + let session = SessionData { + session_token: "test_token_456".to_string(), + user_id: None, + username: "testuser".to_string(), + role: "viewer".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + + storage.create_session(&session).await.unwrap(); + + // Wait a bit + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Touch session + storage.touch_session("test_token_456").await.unwrap(); + + // Verify last_accessed was updated + let updated = storage + .get_session("test_token_456") + .await + .unwrap() + .unwrap(); + assert!(updated.last_accessed > now); +} + +#[tokio::test] +async fn test_delete_session() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + let session = SessionData { + session_token: "delete_me".to_string(), + user_id: None, + username: "testuser".to_string(), + role: "editor".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + + storage.create_session(&session).await.unwrap(); + assert!(storage.get_session("delete_me").await.unwrap().is_some()); + + // Delete session + storage.delete_session("delete_me").await.unwrap(); + + // Verify it's gone + assert!(storage.get_session("delete_me").await.unwrap().is_none()); +} + +#[tokio::test] +async fn test_delete_user_sessions() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + + // Create multiple sessions for the same user + for i in 0..3 { + let session = SessionData { + session_token: format!("token_{}", i), + user_id: None, + username: "testuser".to_string(), + role: "viewer".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + storage.create_session(&session).await.unwrap(); + } + + // Create session for different user + let other_session = SessionData { + session_token: "other_token".to_string(), + user_id: None, + username: "otheruser".to_string(), + role: "viewer".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + storage.create_session(&other_session).await.unwrap(); + + // Delete all sessions for testuser + let deleted = storage.delete_user_sessions("testuser").await.unwrap(); + assert_eq!(deleted, 3); + + // Verify testuser sessions are gone + for i in 0..3 { + assert!( + storage + .get_session(&format!("token_{}", i)) + .await + .unwrap() + .is_none() + ); + } + + // Verify otheruser session still exists + assert!(storage.get_session("other_token").await.unwrap().is_some()); +} + +#[tokio::test] +async fn test_delete_expired_sessions() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + + // Create expired session + let expired = SessionData { + session_token: "expired_token".to_string(), + user_id: None, + username: "testuser".to_string(), + role: "viewer".to_string(), + created_at: now - chrono::Duration::hours(25), + expires_at: now - chrono::Duration::hours(1), // Expired 1 hour ago + last_accessed: now - chrono::Duration::hours(2), + }; + storage.create_session(&expired).await.unwrap(); + + // Create valid session + let valid = SessionData { + session_token: "valid_token".to_string(), + user_id: None, + username: "testuser".to_string(), + role: "viewer".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + storage.create_session(&valid).await.unwrap(); + + // Delete expired sessions + let deleted = storage.delete_expired_sessions().await.unwrap(); + assert_eq!(deleted, 1); + + // Verify expired is gone, valid remains + assert!( + storage + .get_session("expired_token") + .await + .unwrap() + .is_none() + ); + assert!(storage.get_session("valid_token").await.unwrap().is_some()); +} + +#[tokio::test] +async fn test_list_active_sessions() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + + // Create active sessions for different users + for i in 0..3 { + let session = SessionData { + session_token: format!("user1_token_{}", i), + user_id: None, + username: "user1".to_string(), + role: "viewer".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + storage.create_session(&session).await.unwrap(); + } + + for i in 0..2 { + let session = SessionData { + session_token: format!("user2_token_{}", i), + user_id: None, + username: "user2".to_string(), + role: "admin".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + storage.create_session(&session).await.unwrap(); + } + + // Create expired session + let expired = SessionData { + session_token: "expired".to_string(), + user_id: None, + username: "user1".to_string(), + role: "viewer".to_string(), + created_at: now - chrono::Duration::hours(25), + expires_at: now - chrono::Duration::hours(1), + last_accessed: now - chrono::Duration::hours(2), + }; + storage.create_session(&expired).await.unwrap(); + + // List all active sessions + let all_active = storage.list_active_sessions(None).await.unwrap(); + assert_eq!(all_active.len(), 5); // 3 + 2, expired not included + + // List active sessions for user1 + let user1_active = storage.list_active_sessions(Some("user1")).await.unwrap(); + assert_eq!(user1_active.len(), 3); + + // List active sessions for user2 + let user2_active = storage.list_active_sessions(Some("user2")).await.unwrap(); + assert_eq!(user2_active.len(), 2); +} + +#[tokio::test] +async fn test_concurrent_session_operations() { + let storage = setup_sqlite_storage().await; + + let now = Utc::now(); + let storage = std::sync::Arc::new(storage); + + // Create sessions concurrently + let mut handles = vec![]; + for i in 0..10 { + let storage = storage.clone(); + let handle = tokio::spawn(async move { + let session = SessionData { + session_token: format!("concurrent_{}", i), + user_id: None, + username: format!("user{}", i), + role: "viewer".to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), + last_accessed: now, + }; + storage.create_session(&session).await.unwrap(); + }); + handles.push(handle); + } + + // Wait for all to complete + for handle in handles { + handle.await.unwrap(); + } + + // Verify all sessions were created + for i in 0..10 { + assert!( + storage + .get_session(&format!("concurrent_{}", i)) + .await + .unwrap() + .is_some() + ); + } +} diff --git a/crates/pinakes-server/src/app.rs b/crates/pinakes-server/src/app.rs index 0508420..b196a8f 100644 --- a/crates/pinakes-server/src/app.rs +++ b/crates/pinakes-server/src/app.rs @@ -44,6 +44,24 @@ pub fn create_router_with_tls( .unwrap(), ); + // Rate limit for search: 10 requests/min per IP + let search_governor = Arc::new( + GovernorConfigBuilder::default() + .per_second(6) // replenish one every 6 seconds (10/min) + .burst_size(10) + .finish() + .unwrap(), + ); + + // Rate limit for streaming: 5 requests per IP (very restrictive for concurrent streams) + let stream_governor = Arc::new( + GovernorConfigBuilder::default() + .per_second(60) // replenish slowly (one per minute) + .burst_size(5) // max 5 concurrent connections + .finish() + .unwrap(), + ); + // Login route with strict rate limiting let login_route = Router::new() .route("/auth/login", post(routes::auth::login)) @@ -58,6 +76,21 @@ pub fn create_router_with_tls( .route("/health/live", get(routes::health::liveness)) .route("/health/ready", get(routes::health::readiness)); + // Search routes with enhanced rate limiting (10 req/min) + let search_routes = Router::new() + .route("/search", get(routes::search::search)) + .route("/search", post(routes::search::search_post)) + .layer(GovernorLayer { + config: search_governor, + }); + + // Streaming routes with enhanced rate limiting (5 concurrent) + let streaming_routes = Router::new() + .route("/media/{id}/stream", get(routes::media::stream_media)) + .layer(GovernorLayer { + config: stream_governor, + }); + // Read-only routes: any authenticated user (Viewer+) let viewer_routes = Router::new() .route("/health", get(routes::health::health)) @@ -65,11 +98,8 @@ pub fn create_router_with_tls( .route("/media/count", get(routes::media::get_media_count)) .route("/media", get(routes::media::list_media)) .route("/media/{id}", get(routes::media::get_media)) - .route("/media/{id}/stream", get(routes::media::stream_media)) .route("/media/{id}/thumbnail", get(routes::media::get_thumbnail)) .route("/media/{media_id}/tags", get(routes::tags::get_media_tags)) - .route("/search", get(routes::search::search)) - .route("/search", post(routes::search::search_post)) .route("/tags", get(routes::tags::list_tags)) .route("/tags/{id}", get(routes::tags::get_tag)) .route("/collections", get(routes::collections::list_collections)) @@ -107,6 +137,7 @@ pub fn create_router_with_tls( // Auth endpoints (self-service) — login handled separately with stricter rate limit .route("/auth/logout", post(routes::auth::logout)) .route("/auth/me", get(routes::auth::me)) + .route("/auth/revoke-all", post(routes::auth::revoke_all_sessions)) // Social: ratings & comments (read) .route( "/media/{id}/ratings", @@ -374,6 +405,8 @@ pub fn create_router_with_tls( "/users/{id}/libraries", delete(routes::users::revoke_library_access), ) + // Session management (admin) + .route("/auth/sessions", get(routes::auth::list_active_sessions)) .layer(middleware::from_fn(auth::require_admin)); // CORS: allow same-origin by default, plus the desktop UI origin @@ -396,6 +429,8 @@ pub fn create_router_with_tls( // Create protected routes with auth middleware let protected_api = Router::new() .merge(viewer_routes) + .merge(search_routes) + .merge(streaming_routes) .merge(editor_routes) .merge(admin_routes) .layer(middleware::from_fn_with_state( diff --git a/crates/pinakes-server/src/auth.rs b/crates/pinakes-server/src/auth.rs index 6a372ff..b2064f3 100644 --- a/crates/pinakes-server/src/auth.rs +++ b/crates/pinakes-server/src/auth.rs @@ -21,7 +21,7 @@ fn constant_time_eq(a: &str, b: &str) -> bool { /// Axum middleware that checks for a valid Bearer token. /// -/// If `accounts.enabled == true`: look up bearer token in session store. +/// If `accounts.enabled == true`: look up bearer token in database session store. /// If `accounts.enabled == false`: use existing api_key logic (unchanged behavior). /// Skips authentication for the `/health` and `/auth/login` path suffixes. pub async fn require_auth( @@ -38,8 +38,19 @@ pub async fn require_auth( let config = state.config.read().await; + // Check if authentication is explicitly disabled + if config.server.authentication_disabled { + drop(config); + tracing::warn!("authentication is disabled - allowing all requests"); + request.extensions_mut().insert(UserRole::Admin); + request.extensions_mut().insert("admin".to_string()); + return next.run(request).await; + } + if config.accounts.enabled { - // Session-based auth + drop(config); + + // Session-based auth using database let token = request .headers() .get("authorization") @@ -47,32 +58,63 @@ pub async fn require_auth( .and_then(|s| s.strip_prefix("Bearer ")) .map(|s| s.to_string()); - drop(config); - let Some(token) = token else { tracing::debug!(path = %path, "rejected: missing Authorization header"); return unauthorized("missing Authorization header"); }; - let sessions = state.sessions.read().await; - let Some(session) = sessions.get(&token) else { - tracing::debug!(path = %path, "rejected: invalid session token"); - return unauthorized("invalid or expired session token"); + // Look up session in database + let session_result = state.storage.get_session(&token).await; + let session = match session_result { + Ok(Some(session)) => session, + Ok(None) => { + tracing::debug!(path = %path, "rejected: invalid session token"); + return unauthorized("invalid or expired session token"); + } + Err(e) => { + tracing::error!(error = %e, "failed to query session from database"); + return (StatusCode::INTERNAL_SERVER_ERROR, "database error").into_response(); + } }; // Check session expiry - if session.is_expired() { + let now = chrono::Utc::now(); + if session.expires_at < now { let username = session.username.clone(); - drop(sessions); - // Remove expired session - let mut sessions_mut = state.sessions.write().await; - sessions_mut.remove(&token); + // Delete expired session asynchronously (fire-and-forget) + let storage = state.storage.clone(); + let token_owned = token.clone(); + tokio::spawn(async move { + if let Err(e) = storage.delete_session(&token_owned).await { + tracing::error!(error = %e, "failed to delete expired session"); + } + }); tracing::info!(username = %username, "session expired"); return unauthorized("session expired"); } + // Update last_accessed timestamp asynchronously (fire-and-forget) + let storage = state.storage.clone(); + let token_owned = token.clone(); + tokio::spawn(async move { + if let Err(e) = storage.touch_session(&token_owned).await { + tracing::warn!(error = %e, "failed to update session last_accessed"); + } + }); + + // Parse role from string + let role = match session.role.as_str() { + "admin" => UserRole::Admin, + "editor" => UserRole::Editor, + "viewer" => UserRole::Viewer, + _ => { + tracing::warn!(role = %session.role, "unknown role, defaulting to viewer"); + UserRole::Viewer + } + }; + // Inject role and username into request extensions - request.extensions_mut().insert(session.role); + request.extensions_mut().insert(role); request.extensions_mut().insert(session.username.clone()); } else { // Legacy API key auth @@ -81,35 +123,38 @@ pub async fn require_auth( .or_else(|| config.server.api_key.clone()); drop(config); - if let Some(ref expected_key) = api_key { - if expected_key.is_empty() { - // Empty key means no auth required - request.extensions_mut().insert(UserRole::Admin); - request.extensions_mut().insert("admin".to_string()); - return next.run(request).await; + let Some(ref expected_key) = api_key else { + tracing::error!("no authentication configured"); + return unauthorized("authentication not configured"); + }; + + if expected_key.is_empty() { + // Empty key is not allowed - must use authentication_disabled flag + tracing::error!("empty api_key rejected, use authentication_disabled flag instead"); + return unauthorized("authentication not properly configured"); + } + + let auth_header = request + .headers() + .get("authorization") + .and_then(|v| v.to_str().ok()); + + match auth_header { + Some(header) if header.starts_with("Bearer ") => { + let token = &header[7..]; + if !constant_time_eq(token, expected_key.as_str()) { + tracing::warn!(path = %path, "rejected: invalid API key"); + return unauthorized("invalid api key"); + } } - - let auth_header = request - .headers() - .get("authorization") - .and_then(|v| v.to_str().ok()); - - match auth_header { - Some(header) if header.starts_with("Bearer ") => { - let token = &header[7..]; - if !constant_time_eq(token, expected_key.as_str()) { - tracing::warn!(path = %path, "rejected: invalid API key"); - return unauthorized("invalid api key"); - } - } - _ => { - return unauthorized( - "missing or malformed Authorization header, expected: Bearer ", - ); - } + _ => { + return unauthorized( + "missing or malformed Authorization header, expected: Bearer ", + ); } } - // When no api_key is configured, or key matches, grant admin + + // API key matches, grant admin request.extensions_mut().insert(UserRole::Admin); request.extensions_mut().insert("admin".to_string()); } diff --git a/crates/pinakes-server/src/main.rs b/crates/pinakes-server/src/main.rs index f792121..d362117 100644 --- a/crates/pinakes-server/src/main.rs +++ b/crates/pinakes-server/src/main.rs @@ -98,6 +98,24 @@ async fn main() -> Result<()> { .validate() .map_err(|e| anyhow::anyhow!("invalid configuration: {e}"))?; + // Warn about authentication configuration + if config.server.authentication_disabled { + tracing::warn!( + "⚠️ AUTHENTICATION IS DISABLED - All requests will be allowed without authentication!" + ); + tracing::warn!("⚠️ This is INSECURE and should only be used for development."); + } else { + let has_api_key = config + .server + .api_key + .as_ref() + .map_or(false, |k| !k.is_empty()); + let has_accounts = !config.accounts.users.is_empty(); + if !has_api_key && !has_accounts { + tracing::error!("⚠️ No authentication method configured!"); + } + } + // Apply CLI overrides if let Some(host) = cli.host { config.server.host = host; @@ -466,7 +484,6 @@ async fn main() -> Result<()> { config: config_arc.clone(), config_path: Some(config_path), scan_progress: pinakes_core::scan::ScanProgress::new(), - sessions: Arc::new(RwLock::new(std::collections::HashMap::new())), job_queue, cache, scheduler, @@ -476,14 +493,22 @@ async fn main() -> Result<()> { // Periodic session cleanup (every 15 minutes) { - let sessions = state.sessions.clone(); + let storage_clone = storage.clone(); let cancel = shutdown_token.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(15 * 60)); loop { tokio::select! { _ = interval.tick() => { - pinakes_server::state::cleanup_expired_sessions(&sessions).await; + match storage_clone.delete_expired_sessions().await { + Ok(count) if count > 0 => { + tracing::info!(count = count, "cleaned up expired sessions"); + } + Ok(_) => {} // No sessions to clean up + Err(e) => { + tracing::error!(error = %e, "failed to cleanup expired sessions"); + } + } } _ = cancel.cancelled() => { break; diff --git a/crates/pinakes-server/src/routes/auth.rs b/crates/pinakes-server/src/routes/auth.rs index b6e124c..b64bab0 100644 --- a/crates/pinakes-server/src/routes/auth.rs +++ b/crates/pinakes-server/src/routes/auth.rs @@ -83,17 +83,21 @@ pub async fn login( let role = user.role; let username = user.username.clone(); - // Store session - { - let mut sessions = state.sessions.write().await; - sessions.insert( - token.clone(), - crate::state::SessionInfo { - username: username.clone(), - role, - created_at: chrono::Utc::now(), - }, - ); + // Create session in database + let now = chrono::Utc::now(); + let session_data = pinakes_core::storage::SessionData { + session_token: token.clone(), + user_id: None, // Could be set if we had user IDs + username: username.clone(), + role: role.to_string(), + created_at: now, + expires_at: now + chrono::Duration::hours(24), // 24 hour sessions + last_accessed: now, + }; + + if let Err(e) = state.storage.create_session(&session_data).await { + tracing::error!(error = %e, "failed to create session in database"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); } tracing::info!(username = %username, role = %role, "login successful"); @@ -116,13 +120,17 @@ pub async fn login( pub async fn logout(State(state): State, headers: HeaderMap) -> StatusCode { if let Some(token) = extract_bearer_token(&headers) { - let sessions = state.sessions.read().await; - let username = sessions.get(token).map(|s| s.username.clone()); - drop(sessions); + // Get username before deleting session + let username = match state.storage.get_session(token).await { + Ok(Some(session)) => Some(session.username), + _ => None, + }; - let mut sessions = state.sessions.write().await; - sessions.remove(token); - drop(sessions); + // Delete session from database + if let Err(e) = state.storage.delete_session(token).await { + tracing::error!(error = %e, "failed to delete session from database"); + return StatusCode::INTERNAL_SERVER_ERROR; + } // Record logout in audit log if let Some(user) = username { @@ -153,12 +161,16 @@ pub async fn me( drop(config); let token = extract_bearer_token(&headers).ok_or(StatusCode::UNAUTHORIZED)?; - let sessions = state.sessions.read().await; - let session = sessions.get(token).ok_or(StatusCode::UNAUTHORIZED)?; + let session = state + .storage + .get_session(token) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::UNAUTHORIZED)?; Ok(Json(UserInfoResponse { username: session.username.clone(), - role: session.role.to_string(), + role: session.role.clone(), })) } @@ -168,3 +180,89 @@ fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> { .and_then(|v| v.to_str().ok()) .and_then(|s| s.strip_prefix("Bearer ")) } + +/// Revoke all sessions for the current user +pub async fn revoke_all_sessions(State(state): State, headers: HeaderMap) -> StatusCode { + let token = match extract_bearer_token(&headers) { + Some(t) => t, + None => return StatusCode::UNAUTHORIZED, + }; + + // Get current session to find username + let session = match state.storage.get_session(token).await { + Ok(Some(s)) => s, + Ok(None) => return StatusCode::UNAUTHORIZED, + Err(e) => { + tracing::error!(error = %e, "failed to get session"); + return StatusCode::INTERNAL_SERVER_ERROR; + } + }; + + let username = session.username.clone(); + + // Delete all sessions for this user + match state.storage.delete_user_sessions(&username).await { + Ok(count) => { + tracing::info!(username = %username, count = count, "revoked all user sessions"); + + // Record in audit log + let _ = pinakes_core::audit::record_action( + &state.storage, + None, + pinakes_core::model::AuditAction::Logout, + Some(format!("revoked all sessions for username: {}", username)), + ) + .await; + + StatusCode::OK + } + Err(e) => { + tracing::error!(error = %e, "failed to revoke sessions"); + StatusCode::INTERNAL_SERVER_ERROR + } + } +} + +/// List all active sessions (admin only) +#[derive(serde::Serialize)] +pub struct SessionListResponse { + pub sessions: Vec, +} + +#[derive(serde::Serialize)] +pub struct SessionInfo { + pub username: String, + pub role: String, + pub created_at: String, + pub last_accessed: String, + pub expires_at: String, +} + +pub async fn list_active_sessions( + State(state): State, +) -> Result, StatusCode> { + // Get all active sessions + let sessions = state + .storage + .list_active_sessions(None) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to list active sessions"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let session_infos = sessions + .into_iter() + .map(|s| SessionInfo { + username: s.username, + role: s.role, + created_at: s.created_at.to_rfc3339(), + last_accessed: s.last_accessed.to_rfc3339(), + expires_at: s.expires_at.to_rfc3339(), + }) + .collect(); + + Ok(Json(SessionListResponse { + sessions: session_infos, + })) +} diff --git a/crates/pinakes-server/src/state.rs b/crates/pinakes-server/src/state.rs index 3d67a08..0cdfa9e 100644 --- a/crates/pinakes-server/src/state.rs +++ b/crates/pinakes-server/src/state.rs @@ -1,11 +1,10 @@ -use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; use pinakes_core::cache::CacheLayer; -use pinakes_core::config::{Config, UserRole}; +use pinakes_core::config::Config; use pinakes_core::jobs::JobQueue; use pinakes_core::plugin::PluginManager; use pinakes_core::scan::ScanProgress; @@ -13,31 +12,8 @@ use pinakes_core::scheduler::TaskScheduler; use pinakes_core::storage::DynStorageBackend; use pinakes_core::transcode::TranscodeService; -/// Default session TTL: 24 hours. -pub const SESSION_TTL_SECS: i64 = 24 * 60 * 60; - -#[derive(Debug, Clone)] -pub struct SessionInfo { - pub username: String, - pub role: UserRole, - pub created_at: chrono::DateTime, -} - -impl SessionInfo { - /// Returns true if this session has exceeded its TTL. - pub fn is_expired(&self) -> bool { - let age = chrono::Utc::now() - self.created_at; - age.num_seconds() > SESSION_TTL_SECS - } -} - -pub type SessionStore = Arc>>; - -/// Remove all expired sessions from the store. -pub async fn cleanup_expired_sessions(sessions: &SessionStore) { - let mut store = sessions.write().await; - store.retain(|_, info| !info.is_expired()); -} +// Note: Sessions are now stored in the database via StorageBackend +// See storage::SessionData and related methods #[derive(Clone)] pub struct AppState { @@ -45,7 +21,6 @@ pub struct AppState { pub config: Arc>, pub config_path: Option, pub scan_progress: ScanProgress, - pub sessions: SessionStore, pub job_queue: Arc, pub cache: Arc, pub scheduler: Arc, diff --git a/crates/pinakes-server/tests/api_test.rs b/crates/pinakes-server/tests/api_test.rs index 9c36100..925ea4a 100644 --- a/crates/pinakes-server/tests/api_test.rs +++ b/crates/pinakes-server/tests/api_test.rs @@ -113,6 +113,7 @@ fn default_config() -> Config { port: 3000, api_key: None, tls: TlsConfig::default(), + authentication_disabled: true, }, ui: UiConfig::default(), accounts: AccountsConfig::default(), @@ -149,7 +150,6 @@ async fn setup_app() -> axum::Router { config, config_path: None, scan_progress: pinakes_core::scan::ScanProgress::new(), - sessions: Arc::new(RwLock::new(std::collections::HashMap::new())), job_queue, cache: Arc::new(CacheLayer::new(60)), scheduler: Arc::new(scheduler), @@ -187,6 +187,7 @@ async fn setup_app_with_auth() -> (axum::Router, String, String, String) { } let mut config = default_config(); + config.server.authentication_disabled = false; // Enable authentication for these tests config.accounts.enabled = true; config.accounts.users = vec![ UserAccount { @@ -220,7 +221,6 @@ async fn setup_app_with_auth() -> (axum::Router, String, String, String) { config, config_path: None, scan_progress: pinakes_core::scan::ScanProgress::new(), - sessions: Arc::new(RwLock::new(std::collections::HashMap::new())), job_queue, cache: Arc::new(CacheLayer::new(60)), scheduler: Arc::new(scheduler), diff --git a/crates/pinakes-server/tests/plugin_test.rs b/crates/pinakes-server/tests/plugin_test.rs index a21d8c7..6111271 100644 --- a/crates/pinakes-server/tests/plugin_test.rs +++ b/crates/pinakes-server/tests/plugin_test.rs @@ -78,6 +78,7 @@ async fn setup_app_with_plugins() -> (axum::Router, Arc, tempfile port: 3000, api_key: None, tls: TlsConfig::default(), + authentication_disabled: true, }, ui: UiConfig::default(), accounts: AccountsConfig::default(), @@ -106,7 +107,6 @@ async fn setup_app_with_plugins() -> (axum::Router, Arc, tempfile config, config_path: None, scan_progress: pinakes_core::scan::ScanProgress::new(), - sessions: Arc::new(RwLock::new(std::collections::HashMap::new())), job_queue, cache: Arc::new(CacheLayer::new(60)), scheduler: Arc::new(scheduler), diff --git a/migrations/postgres/V11__session_persistence.sql b/migrations/postgres/V11__session_persistence.sql new file mode 100644 index 0000000..8603d0b --- /dev/null +++ b/migrations/postgres/V11__session_persistence.sql @@ -0,0 +1,18 @@ +-- Session persistence for database-backed sessions +-- Replaces in-memory session storage + +CREATE TABLE IF NOT EXISTS sessions ( + session_token TEXT PRIMARY KEY NOT NULL, + user_id TEXT, + username TEXT NOT NULL, + role TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + last_accessed TIMESTAMPTZ NOT NULL +); + +-- Index for efficient cleanup of expired sessions +CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at); + +-- Index for listing sessions by username +CREATE INDEX IF NOT EXISTS idx_sessions_username ON sessions(username); diff --git a/migrations/sqlite/V11__session_persistence.sql b/migrations/sqlite/V11__session_persistence.sql new file mode 100644 index 0000000..b4e2753 --- /dev/null +++ b/migrations/sqlite/V11__session_persistence.sql @@ -0,0 +1,18 @@ +-- Session persistence for database-backed sessions +-- Replaces in-memory session storage + +CREATE TABLE IF NOT EXISTS sessions ( + session_token TEXT PRIMARY KEY NOT NULL, + user_id TEXT, + username TEXT NOT NULL, + role TEXT NOT NULL, + created_at TEXT NOT NULL, + expires_at TEXT NOT NULL, + last_accessed TEXT NOT NULL +); + +-- Index for efficient cleanup of expired sessions +CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at); + +-- Index for listing sessions by username +CREATE INDEX IF NOT EXISTS idx_sessions_username ON sessions(username);