From 672e11b592f256efc3a007262c0b796ad2a262ee Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 00:42:06 +0300 Subject: [PATCH] pinakes-core: add configurable rate limits and cors; add webhook dispatcher; bound job history Signed-off-by: NotAShelf Change-Id: Ib0d34cd7878eb9e8d019497234a092466a6a6964 --- crates/pinakes-core/src/config.rs | 463 ++++++++++++++++++++++------ crates/pinakes-core/src/jobs.rs | 88 +++++- crates/pinakes-core/src/lib.rs | 1 + crates/pinakes-core/src/webhooks.rs | 142 +++++++++ 4 files changed, 585 insertions(+), 109 deletions(-) create mode 100644 crates/pinakes-core/src/webhooks.rs diff --git a/crates/pinakes-core/src/config.rs b/crates/pinakes-core/src/config.rs index 543ecff..8a47eca 100644 --- a/crates/pinakes-core/src/config.rs +++ b/crates/pinakes-core/src/config.rs @@ -91,6 +91,8 @@ pub struct Config { #[serde(default)] pub accounts: AccountsConfig, #[serde(default)] + pub rate_limits: RateLimitConfig, + #[serde(default)] pub jobs: JobsConfig, #[serde(default)] pub thumbnails: ThumbnailConfig, @@ -129,25 +131,147 @@ pub struct ScheduledTaskConfig { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct JobsConfig { - #[serde(default = "default_worker_count")] - pub worker_count: usize, - #[serde(default = "default_cache_ttl")] - pub cache_ttl_secs: u64, +pub struct RateLimitConfig { + /// Global rate limit: requests per second (token replenish interval). + /// Default: 1 (combined with `burst_size=100` gives ~100 req/sec) + #[serde(default = "default_global_per_second")] + pub global_per_second: u64, + /// Global rate limit: burst size (max concurrent requests per IP) + #[serde(default = "default_global_burst")] + pub global_burst_size: u32, + /// Login rate limit: seconds between token replenishment. + /// Default: 12 (one token every 12s, combined with burst=5 gives ~5 req/min) + #[serde(default = "default_login_per_second")] + pub login_per_second: u64, + /// Login rate limit: burst size + #[serde(default = "default_login_burst")] + pub login_burst_size: u32, + /// Search rate limit: seconds between token replenishment. + /// Default: 6 (one token every 6s, combined with burst=10 gives ~10 req/min) + #[serde(default = "default_search_per_second")] + pub search_per_second: u64, + /// Search rate limit: burst size + #[serde(default = "default_search_burst")] + pub search_burst_size: u32, + /// Streaming rate limit: seconds between token replenishment. + /// Default: 60 (one per minute) + #[serde(default = "default_stream_per_second")] + pub stream_per_second: u64, + /// Streaming rate limit: burst size (max concurrent streams) + #[serde(default = "default_stream_burst")] + pub stream_burst_size: u32, + /// Share token rate limit: seconds between token replenishment. + /// Default: 2 + #[serde(default = "default_share_per_second")] + pub share_per_second: u64, + /// Share token rate limit: burst size + #[serde(default = "default_share_burst")] + pub share_burst_size: u32, } -fn default_worker_count() -> usize { +const fn default_global_per_second() -> u64 { + 1 +} +const fn default_global_burst() -> u32 { + 100 +} +const fn default_login_per_second() -> u64 { + 12 +} +const fn default_login_burst() -> u32 { + 5 +} +const fn default_search_per_second() -> u64 { + 6 +} +const fn default_search_burst() -> u32 { + 10 +} +const fn default_stream_per_second() -> u64 { + 60 +} +const fn default_stream_burst() -> u32 { + 5 +} +const fn default_share_per_second() -> u64 { 2 } -fn default_cache_ttl() -> u64 { +const fn default_share_burst() -> u32 { + 20 +} + +impl Default for RateLimitConfig { + fn default() -> Self { + Self { + global_per_second: default_global_per_second(), + global_burst_size: default_global_burst(), + login_per_second: default_login_per_second(), + login_burst_size: default_login_burst(), + search_per_second: default_search_per_second(), + search_burst_size: default_search_burst(), + stream_per_second: default_stream_per_second(), + stream_burst_size: default_stream_burst(), + share_per_second: default_share_per_second(), + share_burst_size: default_share_burst(), + } + } +} + +impl RateLimitConfig { + /// Validate that all rate limit values are positive. + /// + /// # Errors + /// + /// Returns an error string if any rate limit value is zero. + pub fn validate(&self) -> Result<(), String> { + for (name, value) in [ + ("global_per_second", self.global_per_second), + ("global_burst_size", u64::from(self.global_burst_size)), + ("login_per_second", self.login_per_second), + ("login_burst_size", u64::from(self.login_burst_size)), + ("search_per_second", self.search_per_second), + ("search_burst_size", u64::from(self.search_burst_size)), + ("stream_per_second", self.stream_per_second), + ("stream_burst_size", u64::from(self.stream_burst_size)), + ("share_per_second", self.share_per_second), + ("share_burst_size", u64::from(self.share_burst_size)), + ] { + if value == 0 { + return Err(format!("{name} must be > 0")); + } + } + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobsConfig { + #[serde(default = "default_worker_count")] + pub worker_count: usize, + #[serde(default = "default_cache_ttl")] + pub cache_ttl_secs: u64, + /// Maximum time a job is allowed to run before being cancelled (in seconds). + /// Set to 0 to disable timeout. Default: 3600 (1 hour). + #[serde(default = "default_job_timeout")] + pub job_timeout_secs: u64, +} + +const fn default_worker_count() -> usize { + 2 +} +const fn default_cache_ttl() -> u64 { 60 } +const fn default_job_timeout() -> u64 { + 3600 +} impl Default for JobsConfig { fn default() -> Self { Self { - worker_count: default_worker_count(), - cache_ttl_secs: default_cache_ttl(), + worker_count: default_worker_count(), + cache_ttl_secs: default_cache_ttl(), + job_timeout_secs: default_job_timeout(), } } } @@ -164,13 +288,13 @@ pub struct ThumbnailConfig { pub video_seek_secs: u32, } -fn default_thumb_size() -> u32 { +const fn default_thumb_size() -> u32 { 320 } -fn default_thumb_quality() -> u8 { +const fn default_thumb_quality() -> u8 { 80 } -fn default_video_seek() -> u32 { +const fn default_video_seek() -> u32 { 2 } @@ -217,13 +341,13 @@ fn default_theme() -> String { fn default_view() -> String { "library".to_string() } -fn default_page_size() -> usize { - 48 +const fn default_page_size() -> usize { + 50 } fn default_view_mode() -> String { "grid".to_string() } -fn default_true() -> bool { +const fn default_true() -> bool { true } @@ -241,12 +365,29 @@ impl Default for UiConfig { } } -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccountsConfig { #[serde(default)] - pub enabled: bool, + pub enabled: bool, #[serde(default)] - pub users: Vec, + pub users: Vec, + /// Session expiry in hours. Defaults to 24. + #[serde(default = "default_session_expiry_hours")] + pub session_expiry_hours: u64, +} + +const fn default_session_expiry_hours() -> u64 { + 24 +} + +impl Default for AccountsConfig { + fn default() -> Self { + Self { + enabled: false, + users: Vec::new(), + session_expiry_hours: default_session_expiry_hours(), + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -269,15 +410,18 @@ pub enum UserRole { } impl UserRole { - pub fn can_read(self) -> bool { + #[must_use] + pub const fn can_read(self) -> bool { true } - pub fn can_write(self) -> bool { + #[must_use] + pub const fn can_write(self) -> bool { matches!(self, Self::Admin | Self::Editor) } - pub fn can_admin(self) -> bool { + #[must_use] + pub const fn can_admin(self) -> bool { matches!(self, Self::Admin) } } @@ -320,11 +464,11 @@ fn default_plugin_cache_dir() -> PathBuf { Config::default_data_dir().join("plugins").join("cache") } -fn default_max_concurrent_ops() -> usize { +const fn default_max_concurrent_ops() -> usize { 4 } -fn default_plugin_timeout() -> u64 { +const fn default_plugin_timeout() -> u64 { 30 } @@ -359,11 +503,11 @@ pub struct TranscodingConfig { pub profiles: Vec, } -fn default_cache_ttl_hours() -> u64 { +const fn default_cache_ttl_hours() -> u64 { 48 } -fn default_max_concurrent_transcodes() -> usize { +const fn default_max_concurrent_transcodes() -> usize { 2 } @@ -444,7 +588,7 @@ pub struct CloudConfig { pub accounts: Vec, } -fn default_auto_sync_interval() -> u64 { +const fn default_auto_sync_interval() -> u64 { 60 } @@ -493,7 +637,7 @@ pub struct AnalyticsConfig { pub retention_days: u64, } -fn default_retention_days() -> u64 { +const fn default_retention_days() -> u64 { 90 } @@ -507,8 +651,9 @@ impl Default for AnalyticsConfig { } } +/// Feature toggles for photo processing (image analysis features). #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PhotoConfig { +pub struct PhotoFeatures { /// Generate perceptual hashes for image duplicate detection (CPU-intensive) #[serde(default = "default_true")] pub generate_perceptual_hash: bool, @@ -520,6 +665,23 @@ pub struct PhotoConfig { /// Generate multi-resolution thumbnails (tiny, grid, preview) #[serde(default)] pub multi_resolution_thumbnails: bool, +} + +impl Default for PhotoFeatures { + fn default() -> Self { + Self { + generate_perceptual_hash: true, + auto_tag_from_exif: false, + multi_resolution_thumbnails: false, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PhotoConfig { + /// Feature toggles for photo processing + #[serde(flatten)] + pub features: PhotoFeatures, /// Auto-detect photo events/albums based on time and location #[serde(default)] @@ -538,28 +700,46 @@ pub struct PhotoConfig { pub event_max_distance_km: f64, } -fn default_min_event_photos() -> usize { +impl PhotoConfig { + /// Returns true if perceptual hashing is enabled. + #[must_use] + pub const fn generate_perceptual_hash(&self) -> bool { + self.features.generate_perceptual_hash + } + + /// Returns true if auto-tagging from EXIF is enabled. + #[must_use] + pub const fn auto_tag_from_exif(&self) -> bool { + self.features.auto_tag_from_exif + } + + /// Returns true if multi-resolution thumbnails are enabled. + #[must_use] + pub const fn multi_resolution_thumbnails(&self) -> bool { + self.features.multi_resolution_thumbnails + } +} + +const fn default_min_event_photos() -> usize { 5 } -fn default_event_time_gap() -> i64 { +const fn default_event_time_gap() -> i64 { 2 * 60 * 60 // 2 hours } -fn default_event_distance() -> f64 { +const fn default_event_distance() -> f64 { 1.0 // 1 km } impl Default for PhotoConfig { fn default() -> Self { Self { - generate_perceptual_hash: true, - auto_tag_from_exif: false, - multi_resolution_thumbnails: false, - enable_event_detection: false, - min_event_photos: default_min_event_photos(), - event_time_gap_secs: default_event_time_gap(), - event_max_distance_km: default_event_distance(), + features: PhotoFeatures::default(), + enable_event_detection: false, + min_event_photos: default_min_event_photos(), + event_time_gap_secs: default_event_time_gap(), + event_max_distance_km: default_event_distance(), } } } @@ -590,7 +770,7 @@ fn default_managed_storage_dir() -> PathBuf { Config::default_data_dir().join("managed") } -fn default_max_upload_size() -> u64 { +const fn default_max_upload_size() -> u64 { 10 * 1024 * 1024 * 1024 // 10GB } @@ -647,23 +827,23 @@ pub struct SyncConfig { pub temp_upload_dir: PathBuf, } -fn default_max_sync_file_size() -> u64 { +const fn default_max_sync_file_size() -> u64 { 4096 // 4GB } -fn default_chunk_size() -> u64 { +const fn default_chunk_size() -> u64 { 4096 // 4MB } -fn default_upload_timeout() -> u64 { +const fn default_upload_timeout() -> u64 { 24 // 24 hours } -fn default_max_concurrent_uploads() -> usize { +const fn default_max_concurrent_uploads() -> usize { 3 } -fn default_sync_log_retention() -> u64 { +const fn default_sync_log_retention() -> u64 { 90 // 90 days } @@ -686,26 +866,44 @@ impl Default for SyncConfig { } } +/// Core permission flags for the sharing subsystem. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SharingConfig { +pub struct SharingPermissions { /// Enable sharing functionality #[serde(default = "default_true")] - pub enabled: bool, + pub enabled: bool, /// Allow creating public share links #[serde(default = "default_true")] - pub allow_public_links: bool, + pub allow_public_links: bool, + /// Allow users to reshare content shared with them + #[serde(default = "default_true")] + pub allow_reshare: bool, +} + +impl Default for SharingPermissions { + fn default() -> Self { + Self { + enabled: true, + allow_public_links: true, + allow_reshare: true, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SharingConfig { + /// Core permission flags for sharing + #[serde(flatten)] + pub permissions: SharingPermissions, /// Require password for public share links #[serde(default)] pub require_public_link_password: bool, - /// Maximum expiry time for public links in hours (0 = unlimited) - #[serde(default)] - pub max_public_link_expiry_hours: u64, - /// Allow users to reshare content shared with them - #[serde(default = "default_true")] - pub allow_reshare: bool, /// Enable share notifications #[serde(default = "default_true")] pub notifications_enabled: bool, + /// Maximum expiry time for public links in hours (0 = unlimited) + #[serde(default)] + pub max_public_link_expiry_hours: u64, /// Notification retention in days #[serde(default = "default_notification_retention")] pub notification_retention_days: u64, @@ -714,23 +912,41 @@ pub struct SharingConfig { pub activity_retention_days: u64, } -fn default_notification_retention() -> u64 { +impl SharingConfig { + /// Returns true if sharing is enabled. + #[must_use] + pub const fn enabled(&self) -> bool { + self.permissions.enabled + } + + /// Returns true if public links are allowed. + #[must_use] + pub const fn allow_public_links(&self) -> bool { + self.permissions.allow_public_links + } + + /// Returns true if resharing is allowed. + #[must_use] + pub const fn allow_reshare(&self) -> bool { + self.permissions.allow_reshare + } +} + +const fn default_notification_retention() -> u64 { 30 } -fn default_activity_retention() -> u64 { +const fn default_activity_retention() -> u64 { 90 } impl Default for SharingConfig { fn default() -> Self { Self { - enabled: true, - allow_public_links: true, + permissions: SharingPermissions::default(), require_public_link_password: false, - max_public_link_expiry_hours: 0, - allow_reshare: true, notifications_enabled: true, + max_public_link_expiry_hours: 0, notification_retention_days: default_notification_retention(), activity_retention_days: default_activity_retention(), } @@ -747,7 +963,7 @@ pub struct TrashConfig { pub auto_empty: bool, } -fn default_trash_retention_days() -> u64 { +const fn default_trash_retention_days() -> u64 { 30 } @@ -776,7 +992,8 @@ pub enum StorageBackendType { } impl StorageBackendType { - pub fn as_str(&self) -> &'static str { + #[must_use] + pub const fn as_str(&self) -> &'static str { match self { Self::Sqlite => "sqlite", Self::Postgres => "postgres", @@ -803,7 +1020,7 @@ pub struct PostgresConfig { pub username: String, pub password: String, pub max_connections: usize, - /// Enable TLS for PostgreSQL connections + /// Enable TLS for `PostgreSQL` connections #[serde(default)] pub tls_enabled: bool, /// Verify TLS certificates (default: true) @@ -828,7 +1045,7 @@ pub struct ScanningConfig { pub import_concurrency: usize, } -fn default_import_concurrency() -> usize { +const fn default_import_concurrency() -> usize { 8 } @@ -842,10 +1059,18 @@ pub struct ServerConfig { pub api_key: Option, /// Explicitly disable authentication (INSECURE - use only for development). /// When true, all requests are allowed without authentication. - /// This must be explicitly set to true; empty api_key alone is not + /// This must be explicitly set to true; empty `api_key` alone is not /// sufficient. #[serde(default)] pub authentication_disabled: bool, + /// Enable CORS (Cross-Origin Resource Sharing). + /// When false, default localhost origins are used. + #[serde(default)] + pub cors_enabled: bool, + /// Allowed CORS origins when `cors_enabled` is true. + /// If empty and `cors_enabled` is true, defaults to localhost origins. + #[serde(default)] + pub cors_origins: Vec, /// TLS/HTTPS configuration #[serde(default)] pub tls: TlsConfig, @@ -863,7 +1088,7 @@ pub struct TlsConfig { /// Path to the TLS private key file (PEM format) #[serde(default)] pub key_path: Option, - /// Enable HTTP to HTTPS redirect (starts a second listener on http_port) + /// Enable HTTP to HTTPS redirect (starts a second listener on `http_port`) #[serde(default)] pub redirect_http: bool, /// Port for HTTP redirect listener (default: 80) @@ -877,12 +1102,12 @@ pub struct TlsConfig { pub hsts_max_age: u64, } -fn default_http_port() -> u16 { +const fn default_http_port() -> u16 { 80 } -fn default_hsts_max_age() -> u64 { - 31536000 // 1 year in seconds +const fn default_hsts_max_age() -> u64 { + 31_536_000 // 1 year in seconds } impl Default for TlsConfig { @@ -901,6 +1126,11 @@ impl Default for TlsConfig { impl TlsConfig { /// Validate TLS configuration + /// + /// # Errors + /// + /// Returns an error string if TLS is enabled but required paths are missing + /// or invalid. pub fn validate(&self) -> Result<(), String> { if self.enabled { if self.cert_path.is_none() { @@ -928,6 +1158,13 @@ impl TlsConfig { } impl Config { + /// Load configuration from a TOML file, expanding environment variables in + /// secret fields. + /// + /// # Errors + /// + /// Returns [`crate::error::PinakesError`] if the file cannot be read, parsed, + /// or contains invalid environment variable references. pub fn from_file(path: &Path) -> crate::error::Result { let content = std::fs::read_to_string(path).map_err(|e| { crate::error::PinakesError::Config(format!( @@ -942,7 +1179,7 @@ impl Config { } /// Expand environment variables in secret fields. - /// Supports ${VAR_NAME} and $VAR_NAME syntax. + /// Supports ${`VAR_NAME`} and $`VAR_NAME` syntax. fn expand_env_vars(&mut self) -> crate::error::Result<()> { // Postgres password if let Some(ref mut postgres) = self.storage.postgres { @@ -979,6 +1216,11 @@ impl Config { } /// Try loading from file, falling back to defaults if the file doesn't exist. + /// + /// # Errors + /// + /// Returns [`crate::error::PinakesError`] if the file exists but cannot be + /// read or parsed. pub fn load_or_default(path: &Path) -> crate::error::Result { if path.exists() { Self::from_file(path) @@ -991,6 +1233,11 @@ impl Config { } /// Save the current config to a TOML file. + /// + /// # Errors + /// + /// Returns [`crate::error::PinakesError`] if the file cannot be written or + /// the config cannot be serialized. pub fn save_to_file(&self, path: &Path) -> crate::error::Result<()> { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent)?; @@ -1005,6 +1252,11 @@ impl Config { } /// Ensure all directories needed by this config exist and are writable. + /// + /// # Errors + /// + /// Returns [`crate::error::PinakesError`] if a required directory cannot be + /// created or is read-only. pub fn ensure_dirs(&self) -> crate::error::Result<()> { if let Some(ref sqlite) = self.storage.sqlite && let Some(parent) = sqlite.path.parent() @@ -1026,20 +1278,29 @@ impl Config { } /// Returns the default config file path following XDG conventions. + #[must_use] pub fn default_config_path() -> PathBuf { - if let Ok(xdg) = std::env::var("XDG_CONFIG_HOME") { - PathBuf::from(xdg).join("pinakes").join("pinakes.toml") - } else if let Ok(home) = std::env::var("HOME") { - PathBuf::from(home) - .join(".config") - .join("pinakes") - .join("pinakes.toml") - } else { - PathBuf::from("pinakes.toml") - } + std::env::var("XDG_CONFIG_HOME").map_or_else( + |_| { + std::env::var("HOME").map_or_else( + |_| PathBuf::from("pinakes.toml"), + |home| { + PathBuf::from(home) + .join(".config") + .join("pinakes") + .join("pinakes.toml") + }, + ) + }, + |xdg| PathBuf::from(xdg).join("pinakes").join("pinakes.toml"), + ) } /// Validate configuration values for correctness. + /// + /// # Errors + /// + /// Returns an error string if any configuration value is invalid. pub fn validate(&self) -> Result<(), String> { if self.server.port == 0 { return Err("server port cannot be 0".into()); @@ -1098,23 +1359,31 @@ impl Config { ); } + // Validate rate limits + self.rate_limits.validate()?; + // Validate TLS configuration self.server.tls.validate()?; Ok(()) } /// Returns the default data directory following XDG conventions. + #[must_use] pub fn default_data_dir() -> PathBuf { - if let Ok(xdg) = std::env::var("XDG_DATA_HOME") { - PathBuf::from(xdg).join("pinakes") - } else if let Ok(home) = std::env::var("HOME") { - PathBuf::from(home) - .join(".local") - .join("share") - .join("pinakes") - } else { - PathBuf::from("pinakes-data") - } + std::env::var("XDG_DATA_HOME").map_or_else( + |_| { + std::env::var("HOME").map_or_else( + |_| PathBuf::from("pinakes-data"), + |home| { + PathBuf::from(home) + .join(".local") + .join("share") + .join("pinakes") + }, + ) + }, + |xdg| PathBuf::from(xdg).join("pinakes"), + ) } } @@ -1146,10 +1415,13 @@ impl Default for Config { port: 3000, api_key: None, authentication_disabled: false, + cors_enabled: false, + cors_origins: vec![], tls: TlsConfig::default(), }, ui: UiConfig::default(), accounts: AccountsConfig::default(), + rate_limits: RateLimitConfig::default(), jobs: JobsConfig::default(), thumbnails: ThumbnailConfig::default(), webhooks: vec![], @@ -1228,11 +1500,14 @@ mod tests { vars: &'a std::collections::HashMap<&str, &str>, ) -> impl Fn(&str) -> crate::error::Result + 'a { move |name| { - vars.get(name).map(|v| v.to_string()).ok_or_else(|| { - crate::error::PinakesError::Config(format!( - "environment variable not set: {name}" - )) - }) + vars + .get(name) + .map(std::string::ToString::to_string) + .ok_or_else(|| { + crate::error::PinakesError::Config(format!( + "environment variable not set: {name}" + )) + }) } } diff --git a/crates/pinakes-core/src/jobs.rs b/crates/pinakes-core/src/jobs.rs index 98fb633..f9487f8 100644 --- a/crates/pinakes-core/src/jobs.rs +++ b/crates/pinakes-core/src/jobs.rs @@ -81,7 +81,14 @@ impl JobQueue { /// /// The `executor` callback is invoked for each job; it receives the job kind, /// a progress-reporting callback, and a cancellation token. - pub fn new(worker_count: usize, executor: F) -> Arc + /// + /// `job_timeout_secs` sets the maximum time a job can run before being + /// cancelled. Set to 0 to disable the timeout. + pub fn new( + worker_count: usize, + job_timeout_secs: u64, + executor: F, + ) -> Arc where F: Fn( Uuid, @@ -103,10 +110,10 @@ impl JobQueue { let executor = Arc::new(executor); for _ in 0..worker_count { - let rx = rx.clone(); - let jobs = jobs.clone(); - let cancellations = cancellations.clone(); - let executor = executor.clone(); + let rx = Arc::clone(&rx); + let jobs = Arc::clone(&jobs); + let cancellations = Arc::clone(&cancellations); + let executor = Arc::clone(&executor); tokio::spawn(async move { loop { @@ -128,9 +135,26 @@ impl JobQueue { } } + let cancel_token = item.cancel.clone(); let handle = - executor(item.job_id, item.kind, item.cancel, jobs.clone()); - let _ = handle.await; + executor(item.job_id, item.kind, item.cancel, Arc::clone(&jobs)); + + if job_timeout_secs > 0 { + let timeout = std::time::Duration::from_secs(job_timeout_secs); + if tokio::time::timeout(timeout, handle).await.is_err() { + // Timeout: cancel the job and mark as failed + cancel_token.cancel(); + let mut map = jobs.write().await; + if let Some(job) = map.get_mut(&item.job_id) { + job.status = JobStatus::Failed { + error: format!("job timed out after {job_timeout_secs}s"), + }; + job.updated_at = Utc::now(); + } + } + } else { + let _ = handle.await; + } // Clean up cancellation token cancellations.write().await.remove(&item.job_id); @@ -159,7 +183,33 @@ impl JobQueue { updated_at: now, }; - self.jobs.write().await.insert(id, job); + { + let mut map = self.jobs.write().await; + map.insert(id, job); + // Prune old terminal jobs to prevent unbounded memory growth. + // Keep at most 500 completed/failed/cancelled entries, removing the + // oldest. + const MAX_TERMINAL_JOBS: usize = 500; + let mut terminal: Vec<(Uuid, chrono::DateTime)> = map + .iter() + .filter(|(_, j)| { + matches!( + j.status, + JobStatus::Completed { .. } + | JobStatus::Failed { .. } + | JobStatus::Cancelled + ) + }) + .map(|(k, j)| (*k, j.updated_at)) + .collect(); + if terminal.len() > MAX_TERMINAL_JOBS { + terminal.sort_by_key(|(_, t)| *t); + let to_remove = terminal.len() - MAX_TERMINAL_JOBS; + for (stale_id, _) in terminal.into_iter().take(to_remove) { + map.remove(&stale_id); + } + } + } self.cancellations.write().await.insert(id, cancel.clone()); let item = WorkerItem { @@ -180,20 +230,28 @@ impl JobQueue { /// List all jobs, most recent first. pub async fn list(&self) -> Vec { - let map = self.jobs.read().await; - let mut jobs: Vec = map.values().cloned().collect(); + let mut jobs: Vec = { + let map = self.jobs.read().await; + map.values().cloned().collect() + }; jobs.sort_by_key(|job| std::cmp::Reverse(job.created_at)); jobs } /// Cancel a running or pending job. pub async fn cancel(&self, id: Uuid) -> bool { - if let Some(token) = self.cancellations.read().await.get(&id) { + let token = { + let guard = self.cancellations.read().await; + guard.get(&id).cloned() + }; + if let Some(token) = token { token.cancel(); - let mut map = self.jobs.write().await; - if let Some(job) = map.get_mut(&id) { - job.status = JobStatus::Cancelled; - job.updated_at = Utc::now(); + { + let mut map = self.jobs.write().await; + if let Some(job) = map.get_mut(&id) { + job.status = JobStatus::Cancelled; + job.updated_at = Utc::now(); + } } true } else { diff --git a/crates/pinakes-core/src/lib.rs b/crates/pinakes-core/src/lib.rs index 871aa1d..17fdb19 100644 --- a/crates/pinakes-core/src/lib.rs +++ b/crates/pinakes-core/src/lib.rs @@ -34,3 +34,4 @@ pub mod thumbnail; pub mod transcode; pub mod upload; pub mod users; +pub mod webhooks; diff --git a/crates/pinakes-core/src/webhooks.rs b/crates/pinakes-core/src/webhooks.rs new file mode 100644 index 0000000..424d86a --- /dev/null +++ b/crates/pinakes-core/src/webhooks.rs @@ -0,0 +1,142 @@ +use std::sync::Arc; + +use chrono::Utc; +use serde::Serialize; +use tracing::{error, info, warn}; + +use crate::config::WebhookConfig; + +/// Events that can trigger webhook delivery. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "event", content = "data")] +pub enum WebhookEvent { + #[serde(rename = "media.created")] + MediaCreated { media_id: String }, + #[serde(rename = "media.updated")] + MediaUpdated { media_id: String }, + #[serde(rename = "media.deleted")] + MediaDeleted { media_id: String }, + #[serde(rename = "scan.completed")] + ScanCompleted { + files_found: usize, + files_processed: usize, + }, + #[serde(rename = "import.completed")] + ImportCompleted { media_id: String }, + #[serde(rename = "test")] + Test, +} + +impl WebhookEvent { + /// Returns the event type string for matching against webhook config filters. + #[must_use] + pub const fn event_type(&self) -> &str { + match self { + Self::MediaCreated { .. } => "media.created", + Self::MediaUpdated { .. } => "media.updated", + Self::MediaDeleted { .. } => "media.deleted", + Self::ScanCompleted { .. } => "scan.completed", + Self::ImportCompleted { .. } => "import.completed", + Self::Test => "test", + } + } +} + +/// Payload sent to webhook endpoints. +#[derive(Debug, Serialize)] +struct WebhookPayload<'a> { + event: &'a WebhookEvent, + timestamp: String, +} + +/// Dispatches webhook events to configured endpoints. +pub struct WebhookDispatcher { + webhooks: Vec, + client: reqwest::Client, +} + +impl WebhookDispatcher { + /// Create a new dispatcher with the given webhook configurations. + #[must_use] + pub fn new(webhooks: Vec) -> Arc { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap_or_default(); + Arc::new(Self { webhooks, client }) + } + + /// Dispatch an event to all matching webhooks. + /// This is fire-and-forget, errors are logged but not propagated. + pub fn dispatch(self: &Arc, event: WebhookEvent) { + let this = self.clone(); + tokio::spawn(async move { + this.dispatch_inner(&event).await; + }); + } + + async fn dispatch_inner(&self, event: &WebhookEvent) { + let event_type = event.event_type(); + let payload = WebhookPayload { + event, + timestamp: Utc::now().to_rfc3339(), + }; + + let body = match serde_json::to_vec(&payload) { + Ok(b) => b, + Err(e) => { + error!(error = %e, "failed to serialize webhook payload"); + return; + }, + }; + + for webhook in &self.webhooks { + // Check if this webhook is interested in this event type + if !webhook.events.is_empty() + && !webhook.events.iter().any(|e| e == event_type || e == "*") + { + continue; + } + + let mut req = self + .client + .post(&webhook.url) + .header("Content-Type", "application/json") + .header("X-Pinakes-Event", event_type); + + // Add keyed BLAKE3 signature if secret is configured + if let Some(ref secret) = webhook.secret { + // Derive a 32-byte key from the secret using BLAKE3 + let key = + blake3::derive_key("pinakes webhook signature", secret.as_bytes()); + let mut hasher = blake3::Hasher::new_keyed(&key); + hasher.update(&body); + let signature = hasher.finalize().to_hex(); + req = req.header("X-Pinakes-Signature", format!("blake3={signature}")); + } + + match req.body(body.clone()).send().await { + Ok(resp) => { + if resp.status().is_success() { + info!(url = %webhook.url, event = event_type, "webhook delivered"); + } else { + warn!( + url = %webhook.url, + event = event_type, + status = %resp.status(), + "webhook delivery returned non-success status" + ); + } + }, + Err(e) => { + warn!( + url = %webhook.url, + event = event_type, + error = %e, + "webhook delivery failed" + ); + }, + } + } + } +}