From e9c5390c453d60dcf89c967c9a41c35f4fab1693 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 15:06:11 +0300 Subject: [PATCH] pinakes-server: integrate plugin system into routes & application state Signed-off-by: NotAShelf Change-Id: Ib5d482326cae1dcb43603bffb76a6a186a6a6964 --- crates/pinakes-server/src/main.rs | 133 ++++++++++++------ .../pinakes-server/src/routes/collections.rs | 15 ++ crates/pinakes-server/src/routes/media.rs | 48 ++++++- crates/pinakes-server/src/routes/plugins.rs | 22 +++ crates/pinakes-server/src/routes/tags.rs | 18 +++ crates/pinakes-server/src/state.rs | 16 ++- 6 files changed, 206 insertions(+), 46 deletions(-) diff --git a/crates/pinakes-server/src/main.rs b/crates/pinakes-server/src/main.rs index a271630..28dbeb9 100644 --- a/crates/pinakes-server/src/main.rs +++ b/crates/pinakes-server/src/main.rs @@ -234,11 +234,57 @@ async fn main() -> Result<()> { )) }; + // Initialize plugin manager if plugins are enabled (needed before job queue) + let plugin_manager = if config.plugins.enabled { + match pinakes_core::plugin::PluginManager::new( + config.plugins.data_dir.clone(), + config.plugins.cache_dir.clone(), + config.plugins.clone().into(), + ) { + Ok(pm) => { + tracing::info!("Plugin manager initialized"); + Some(Arc::new(pm)) + }, + Err(e) => { + tracing::warn!("Failed to initialize plugin manager: {}", e); + None + }, + } + } else { + tracing::info!("Plugins disabled in configuration"); + None + }; + + // Initialize plugin pipeline if plugin manager is available + let plugin_pipeline = if let Some(ref pm) = plugin_manager { + match pm.discover_and_load_all().await { + Ok(loaded) => { + tracing::info!(count = loaded.len(), "loaded plugins"); + let pipeline = Arc::new(pinakes_core::plugin::PluginPipeline::new( + Arc::clone(pm), + config.plugins.timeouts.clone(), + config.plugins.max_consecutive_failures, + )); + if let Err(e) = pipeline.discover_capabilities().await { + tracing::warn!(error = %e, "failed to discover plugin capabilities"); + } + Some(pipeline) + }, + Err(e) => { + tracing::warn!(error = %e, "plugin discovery failed"); + None + }, + } + } else { + None + }; + // Initialize job queue with executor let job_storage = storage.clone(); let job_config = config.clone(); let job_transcode = transcode_service.clone(); let job_webhooks = webhook_dispatcher.clone(); + let job_pipeline = plugin_pipeline.clone(); let job_queue = pinakes_core::jobs::JobQueue::new( config.jobs.worker_count, config.jobs.job_timeout_secs, @@ -247,31 +293,50 @@ async fn main() -> Result<()> { let config = job_config.clone(); let transcode_svc = job_transcode.clone(); let webhooks = job_webhooks.clone(); + let pipeline = job_pipeline.clone(); tokio::spawn(async move { use pinakes_core::jobs::{JobKind, JobQueue}; match kind { JobKind::Scan { path } => { + if let Some(ref pl) = pipeline { + pl.emit_event( + "ScanStarted", + &serde_json::json!({"path": path.as_ref().map(|p| p.display().to_string())}), + ); + } let ignore = config.scanning.ignore_patterns.clone(); let res = if let Some(p) = path { - pinakes_core::scan::scan_directory(&storage, &p, &ignore).await + pinakes_core::scan::scan_directory( + &storage, + &p, + &ignore, + pipeline.as_ref(), + ) + .await } else { - pinakes_core::scan::scan_all_roots(&storage, &ignore) - .await - .map(|statuses| { - let total_found: usize = - statuses.iter().map(|s| s.files_found).sum(); - let total_processed: usize = - statuses.iter().map(|s| s.files_processed).sum(); - let all_errors: Vec = - statuses.into_iter().flat_map(|s| s.errors).collect(); - pinakes_core::scan::ScanStatus { - scanning: false, - files_found: total_found, - files_processed: total_processed, - files_skipped: 0, - errors: all_errors, - } - }) + pinakes_core::scan::scan_all_roots( + &storage, + &ignore, + pipeline.as_ref(), + ) + .await + .map(|statuses| { + let total_found: usize = + statuses.iter().map(|s| s.files_found).sum(); + let total_processed: usize = + statuses.iter().map(|s| s.files_processed).sum(); + let total_skipped: usize = + statuses.iter().map(|s| s.files_skipped).sum(); + let all_errors: Vec = + statuses.into_iter().flat_map(|s| s.errors).collect(); + pinakes_core::scan::ScanStatus { + scanning: false, + files_found: total_found, + files_processed: total_processed, + files_skipped: total_skipped, + errors: all_errors, + } + }) }; match res { Ok(status) => { @@ -283,6 +348,15 @@ async fn main() -> Result<()> { }, ); } + if let Some(ref pl) = pipeline { + pl.emit_event( + "ScanCompleted", + &serde_json::json!({ + "files_found": status.files_found, + "files_processed": status.files_processed, + }), + ); + } JobQueue::complete( &jobs, job_id, @@ -630,28 +704,6 @@ async fn main() -> Result<()> { config.jobs.cache_ttl_secs, )); - // Initialize plugin manager if plugins are enabled (before moving config into - // Arc) - let plugin_manager = if config.plugins.enabled { - match pinakes_core::plugin::PluginManager::new( - config.plugins.data_dir.clone(), - config.plugins.cache_dir.clone(), - config.plugins.clone().into(), - ) { - Ok(pm) => { - tracing::info!("Plugin manager initialized"); - Some(Arc::new(pm)) - }, - Err(e) => { - tracing::warn!("Failed to initialize plugin manager: {}", e); - None - }, - } - } else { - tracing::info!("Plugins disabled in configuration"); - None - }; - // Initialize scheduler with cancellation support let shutdown_token = tokio_util::sync::CancellationToken::new(); let config_arc = Arc::new(RwLock::new(config)); @@ -737,6 +789,7 @@ async fn main() -> Result<()> { cache, scheduler, plugin_manager, + plugin_pipeline, transcode_service, managed_storage, chunked_upload_manager, diff --git a/crates/pinakes-server/src/routes/collections.rs b/crates/pinakes-server/src/routes/collections.rs index 9ca72c3..159d125 100644 --- a/crates/pinakes-server/src/routes/collections.rs +++ b/crates/pinakes-server/src/routes/collections.rs @@ -48,6 +48,15 @@ pub async fn create_collection( req.filter_query.as_deref(), ) .await?; + + state.emit_plugin_event( + "CollectionCreated", + &serde_json::json!({ + "id": col.id.to_string(), + "name": col.name, + }), + ); + Ok(Json(CollectionResponse::from(col))) } @@ -73,6 +82,12 @@ pub async fn delete_collection( Path(id): Path, ) -> Result, ApiError> { state.storage.delete_collection(id).await?; + + state.emit_plugin_event( + "CollectionDeleted", + &serde_json::json!({"id": id.to_string()}), + ); + Ok(Json(serde_json::json!({"deleted": true}))) } diff --git a/crates/pinakes-server/src/routes/media.rs b/crates/pinakes-server/src/routes/media.rs index 09d8410..a2b3a4a 100644 --- a/crates/pinakes-server/src/routes/media.rs +++ b/crates/pinakes-server/src/routes/media.rs @@ -87,8 +87,12 @@ pub async fn import_media( State(state): State, Json(req): Json, ) -> Result, ApiError> { - let result = - pinakes_core::import::import_file(&state.storage, &req.path).await?; + let result = pinakes_core::import::import_file( + &state.storage, + &req.path, + state.plugin_pipeline.as_ref(), + ) + .await?; if let Some(ref dispatcher) = state.webhook_dispatcher { let id = result.media_id.0.to_string(); @@ -197,6 +201,11 @@ pub async fn update_media( }); } + state.emit_plugin_event( + "MediaUpdated", + &serde_json::json!({"media_id": item.id.to_string()}), + ); + Ok(Json(MediaResponse::from(item))) } @@ -227,6 +236,14 @@ pub async fn delete_media( tracing::warn!(path = %thumb_path.display(), error = %e, "failed to remove thumbnail"); } + state.emit_plugin_event( + "MediaDeleted", + &serde_json::json!({ + "media_id": media_id.to_string(), + "path": item.path.to_string_lossy(), + }), + ); + Ok(Json(serde_json::json!({"deleted": true}))) } @@ -362,8 +379,12 @@ pub async fn import_with_options( State(state): State, Json(req): Json, ) -> Result, ApiError> { - let result = - pinakes_core::import::import_file(&state.storage, &req.path).await?; + let result = pinakes_core::import::import_file( + &state.storage, + &req.path, + state.plugin_pipeline.as_ref(), + ) + .await?; if !result.was_duplicate { apply_import_post_processing( @@ -400,7 +421,13 @@ pub async fn batch_import( let mut errors = 0usize; for path in &req.paths { - match pinakes_core::import::import_file(&state.storage, path).await { + match pinakes_core::import::import_file( + &state.storage, + path, + state.plugin_pipeline.as_ref(), + ) + .await + { Ok(result) => { if result.was_duplicate { duplicates += 1; @@ -458,6 +485,7 @@ pub async fn import_directory_endpoint( &req.path, &ignore_patterns, concurrency, + state.plugin_pipeline.as_ref(), ) .await?; @@ -1065,6 +1093,11 @@ pub async fn soft_delete_media( ) .await?; + state.emit_plugin_event( + "MediaDeleted", + &serde_json::json!({"media_id": media_id.to_string(), "trashed": true}), + ); + Ok(Json(serde_json::json!({"deleted": true, "trashed": true}))) } @@ -1106,6 +1139,11 @@ pub async fn restore_media( ) .await?; + state.emit_plugin_event( + "MediaUpdated", + &serde_json::json!({"media_id": media_id.to_string(), "restored": true}), + ); + Ok(Json(MediaResponse::from(item))) } diff --git a/crates/pinakes-server/src/routes/plugins.rs b/crates/pinakes-server/src/routes/plugins.rs index 77a03a7..00b77f1 100644 --- a/crates/pinakes-server/src/routes/plugins.rs +++ b/crates/pinakes-server/src/routes/plugins.rs @@ -127,6 +127,17 @@ pub async fn toggle_plugin( })?; } + // Re-discover capabilities after toggle so cached data stays current + if let Some(ref pipeline) = state.plugin_pipeline + && let Err(e) = pipeline.discover_capabilities().await + { + tracing::warn!( + plugin_id = %id, + error = %e, + "failed to re-discover capabilities after plugin toggle" + ); + } + Ok(Json(serde_json::json!({ "id": id, "enabled": req.enabled @@ -150,5 +161,16 @@ pub async fn reload_plugin( )) })?; + // Re-discover capabilities after reload so cached data stays current + if let Some(ref pipeline) = state.plugin_pipeline + && let Err(e) = pipeline.discover_capabilities().await + { + tracing::warn!( + plugin_id = %id, + error = %e, + "failed to re-discover capabilities after plugin reload" + ); + } + Ok(Json(serde_json::json!({"reloaded": true}))) } diff --git a/crates/pinakes-server/src/routes/tags.rs b/crates/pinakes-server/src/routes/tags.rs index 449c78d..3c12ec2 100644 --- a/crates/pinakes-server/src/routes/tags.rs +++ b/crates/pinakes-server/src/routes/tags.rs @@ -58,6 +58,15 @@ pub async fn tag_media( ) -> Result, ApiError> { pinakes_core::tags::tag_media(&state.storage, MediaId(media_id), req.tag_id) .await?; + + state.emit_plugin_event( + "MediaTagged", + &serde_json::json!({ + "media_id": media_id.to_string(), + "tag_id": req.tag_id.to_string(), + }), + ); + Ok(Json(serde_json::json!({"tagged": true}))) } @@ -67,6 +76,15 @@ pub async fn untag_media( ) -> Result, ApiError> { pinakes_core::tags::untag_media(&state.storage, MediaId(media_id), tag_id) .await?; + + state.emit_plugin_event( + "MediaUntagged", + &serde_json::json!({ + "media_id": media_id.to_string(), + "tag_id": tag_id.to_string(), + }), + ); + Ok(Json(serde_json::json!({"untagged": true}))) } diff --git a/crates/pinakes-server/src/state.rs b/crates/pinakes-server/src/state.rs index 584b598..2917ed5 100644 --- a/crates/pinakes-server/src/state.rs +++ b/crates/pinakes-server/src/state.rs @@ -5,7 +5,7 @@ use pinakes_core::{ config::Config, jobs::JobQueue, managed_storage::ManagedStorageService, - plugin::PluginManager, + plugin::{PluginManager, PluginPipeline}, scan::ScanProgress, scheduler::TaskScheduler, storage::DynStorageBackend, @@ -32,9 +32,23 @@ pub struct AppState { pub cache: Arc, pub scheduler: Arc, pub plugin_manager: Option>, + pub plugin_pipeline: Option>, pub transcode_service: Option>, pub managed_storage: Option>, pub chunked_upload_manager: Option>, pub webhook_dispatcher: Option>, pub session_semaphore: Arc, } + +impl AppState { + /// Emit a plugin event if the pipeline is active. + pub fn emit_plugin_event( + &self, + event_type: &str, + payload: &serde_json::Value, + ) { + if let Some(ref pipeline) = self.plugin_pipeline { + pipeline.emit_event(event_type, payload); + } + } +}