finalize server-side plugin system #8

Merged
NotAShelf merged 10 commits from notashelf/push-wxzvxtntoxnn into main 2026-03-08 12:21:45 +00:00
6 changed files with 206 additions and 46 deletions
Showing only changes of commit e9c5390c45 - Show all commits

pinakes-server: integrate plugin system into routes & application state

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ib5d482326cae1dcb43603bffb76a6a186a6a6964
raf 2026-03-08 15:06:11 +03:00
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF

View file

@ -234,11 +234,57 @@ async fn main() -> Result<()> {
)) ))
}; };
// Initialize plugin manager if plugins are enabled (needed before job queue)
let plugin_manager = if config.plugins.enabled {
match pinakes_core::plugin::PluginManager::new(
config.plugins.data_dir.clone(),
config.plugins.cache_dir.clone(),
config.plugins.clone().into(),
) {
Ok(pm) => {
tracing::info!("Plugin manager initialized");
Some(Arc::new(pm))
},
Err(e) => {
tracing::warn!("Failed to initialize plugin manager: {}", e);
None
},
}
} else {
tracing::info!("Plugins disabled in configuration");
None
};
// Initialize plugin pipeline if plugin manager is available
let plugin_pipeline = if let Some(ref pm) = plugin_manager {
match pm.discover_and_load_all().await {
Ok(loaded) => {
tracing::info!(count = loaded.len(), "loaded plugins");
let pipeline = Arc::new(pinakes_core::plugin::PluginPipeline::new(
Arc::clone(pm),
config.plugins.timeouts.clone(),
config.plugins.max_consecutive_failures,
));
if let Err(e) = pipeline.discover_capabilities().await {
tracing::warn!(error = %e, "failed to discover plugin capabilities");
}
Some(pipeline)
},
Err(e) => {
tracing::warn!(error = %e, "plugin discovery failed");
None
},
}
} else {
None
};
// Initialize job queue with executor // Initialize job queue with executor
let job_storage = storage.clone(); let job_storage = storage.clone();
let job_config = config.clone(); let job_config = config.clone();
let job_transcode = transcode_service.clone(); let job_transcode = transcode_service.clone();
let job_webhooks = webhook_dispatcher.clone(); let job_webhooks = webhook_dispatcher.clone();
let job_pipeline = plugin_pipeline.clone();
let job_queue = pinakes_core::jobs::JobQueue::new( let job_queue = pinakes_core::jobs::JobQueue::new(
config.jobs.worker_count, config.jobs.worker_count,
config.jobs.job_timeout_secs, config.jobs.job_timeout_secs,
@ -247,31 +293,50 @@ async fn main() -> Result<()> {
let config = job_config.clone(); let config = job_config.clone();
let transcode_svc = job_transcode.clone(); let transcode_svc = job_transcode.clone();
let webhooks = job_webhooks.clone(); let webhooks = job_webhooks.clone();
let pipeline = job_pipeline.clone();
tokio::spawn(async move { tokio::spawn(async move {
use pinakes_core::jobs::{JobKind, JobQueue}; use pinakes_core::jobs::{JobKind, JobQueue};
match kind { match kind {
JobKind::Scan { path } => { JobKind::Scan { path } => {
if let Some(ref pl) = pipeline {
pl.emit_event(
"ScanStarted",
&serde_json::json!({"path": path.as_ref().map(|p| p.display().to_string())}),
);
}
let ignore = config.scanning.ignore_patterns.clone(); let ignore = config.scanning.ignore_patterns.clone();
let res = if let Some(p) = path { let res = if let Some(p) = path {
pinakes_core::scan::scan_directory(&storage, &p, &ignore).await pinakes_core::scan::scan_directory(
&storage,
&p,
&ignore,
pipeline.as_ref(),
)
.await
} else { } else {
pinakes_core::scan::scan_all_roots(&storage, &ignore) pinakes_core::scan::scan_all_roots(
.await &storage,
.map(|statuses| { &ignore,
let total_found: usize = pipeline.as_ref(),
statuses.iter().map(|s| s.files_found).sum(); )
let total_processed: usize = .await
statuses.iter().map(|s| s.files_processed).sum(); .map(|statuses| {
let all_errors: Vec<String> = let total_found: usize =
statuses.into_iter().flat_map(|s| s.errors).collect(); statuses.iter().map(|s| s.files_found).sum();
pinakes_core::scan::ScanStatus { let total_processed: usize =
scanning: false, statuses.iter().map(|s| s.files_processed).sum();
files_found: total_found, let total_skipped: usize =
files_processed: total_processed, statuses.iter().map(|s| s.files_skipped).sum();
files_skipped: 0, let all_errors: Vec<String> =
errors: all_errors, statuses.into_iter().flat_map(|s| s.errors).collect();
} pinakes_core::scan::ScanStatus {
}) scanning: false,
files_found: total_found,
files_processed: total_processed,
files_skipped: total_skipped,
errors: all_errors,
}
})
}; };
match res { match res {
Ok(status) => { Ok(status) => {
@ -283,6 +348,15 @@ async fn main() -> Result<()> {
}, },
); );
} }
if let Some(ref pl) = pipeline {
pl.emit_event(
"ScanCompleted",
&serde_json::json!({
"files_found": status.files_found,
"files_processed": status.files_processed,
}),
);
}
JobQueue::complete( JobQueue::complete(
&jobs, &jobs,
job_id, job_id,
@ -630,28 +704,6 @@ async fn main() -> Result<()> {
config.jobs.cache_ttl_secs, config.jobs.cache_ttl_secs,
)); ));
// Initialize plugin manager if plugins are enabled (before moving config into
// Arc)
let plugin_manager = if config.plugins.enabled {
match pinakes_core::plugin::PluginManager::new(
config.plugins.data_dir.clone(),
config.plugins.cache_dir.clone(),
config.plugins.clone().into(),
) {
Ok(pm) => {
tracing::info!("Plugin manager initialized");
Some(Arc::new(pm))
},
Err(e) => {
tracing::warn!("Failed to initialize plugin manager: {}", e);
None
},
}
} else {
tracing::info!("Plugins disabled in configuration");
None
};
// Initialize scheduler with cancellation support // Initialize scheduler with cancellation support
let shutdown_token = tokio_util::sync::CancellationToken::new(); let shutdown_token = tokio_util::sync::CancellationToken::new();
let config_arc = Arc::new(RwLock::new(config)); let config_arc = Arc::new(RwLock::new(config));
@ -737,6 +789,7 @@ async fn main() -> Result<()> {
cache, cache,
scheduler, scheduler,
plugin_manager, plugin_manager,
plugin_pipeline,
transcode_service, transcode_service,
managed_storage, managed_storage,
chunked_upload_manager, chunked_upload_manager,

View file

@ -48,6 +48,15 @@ pub async fn create_collection(
req.filter_query.as_deref(), req.filter_query.as_deref(),
) )
.await?; .await?;
state.emit_plugin_event(
"CollectionCreated",
&serde_json::json!({
"id": col.id.to_string(),
"name": col.name,
}),
);
Ok(Json(CollectionResponse::from(col))) Ok(Json(CollectionResponse::from(col)))
} }
@ -73,6 +82,12 @@ pub async fn delete_collection(
Path(id): Path<Uuid>, Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<serde_json::Value>, ApiError> {
state.storage.delete_collection(id).await?; state.storage.delete_collection(id).await?;
state.emit_plugin_event(
"CollectionDeleted",
&serde_json::json!({"id": id.to_string()}),
);
Ok(Json(serde_json::json!({"deleted": true}))) Ok(Json(serde_json::json!({"deleted": true})))
} }

View file

@ -87,8 +87,12 @@ pub async fn import_media(
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<ImportRequest>, Json(req): Json<ImportRequest>,
) -> Result<Json<ImportResponse>, ApiError> { ) -> Result<Json<ImportResponse>, ApiError> {
let result = let result = pinakes_core::import::import_file(
pinakes_core::import::import_file(&state.storage, &req.path).await?; &state.storage,
&req.path,
state.plugin_pipeline.as_ref(),
)
.await?;
if let Some(ref dispatcher) = state.webhook_dispatcher { if let Some(ref dispatcher) = state.webhook_dispatcher {
let id = result.media_id.0.to_string(); let id = result.media_id.0.to_string();
@ -197,6 +201,11 @@ pub async fn update_media(
}); });
} }
state.emit_plugin_event(
"MediaUpdated",
&serde_json::json!({"media_id": item.id.to_string()}),
);
Ok(Json(MediaResponse::from(item))) Ok(Json(MediaResponse::from(item)))
} }
@ -227,6 +236,14 @@ pub async fn delete_media(
tracing::warn!(path = %thumb_path.display(), error = %e, "failed to remove thumbnail"); tracing::warn!(path = %thumb_path.display(), error = %e, "failed to remove thumbnail");
} }
state.emit_plugin_event(
"MediaDeleted",
&serde_json::json!({
"media_id": media_id.to_string(),
"path": item.path.to_string_lossy(),
}),
);
Ok(Json(serde_json::json!({"deleted": true}))) Ok(Json(serde_json::json!({"deleted": true})))
} }
@ -362,8 +379,12 @@ pub async fn import_with_options(
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<ImportWithOptionsRequest>, Json(req): Json<ImportWithOptionsRequest>,
) -> Result<Json<ImportResponse>, ApiError> { ) -> Result<Json<ImportResponse>, ApiError> {
let result = let result = pinakes_core::import::import_file(
pinakes_core::import::import_file(&state.storage, &req.path).await?; &state.storage,
&req.path,
state.plugin_pipeline.as_ref(),
)
.await?;
if !result.was_duplicate { if !result.was_duplicate {
apply_import_post_processing( apply_import_post_processing(
@ -400,7 +421,13 @@ pub async fn batch_import(
let mut errors = 0usize; let mut errors = 0usize;
for path in &req.paths { for path in &req.paths {
match pinakes_core::import::import_file(&state.storage, path).await { match pinakes_core::import::import_file(
&state.storage,
path,
state.plugin_pipeline.as_ref(),
)
.await
{
Ok(result) => { Ok(result) => {
if result.was_duplicate { if result.was_duplicate {
duplicates += 1; duplicates += 1;
@ -458,6 +485,7 @@ pub async fn import_directory_endpoint(
&req.path, &req.path,
&ignore_patterns, &ignore_patterns,
concurrency, concurrency,
state.plugin_pipeline.as_ref(),
) )
.await?; .await?;
@ -1065,6 +1093,11 @@ pub async fn soft_delete_media(
) )
.await?; .await?;
state.emit_plugin_event(
"MediaDeleted",
&serde_json::json!({"media_id": media_id.to_string(), "trashed": true}),
);
Ok(Json(serde_json::json!({"deleted": true, "trashed": true}))) Ok(Json(serde_json::json!({"deleted": true, "trashed": true})))
} }
@ -1106,6 +1139,11 @@ pub async fn restore_media(
) )
.await?; .await?;
state.emit_plugin_event(
"MediaUpdated",
&serde_json::json!({"media_id": media_id.to_string(), "restored": true}),
);
Ok(Json(MediaResponse::from(item))) Ok(Json(MediaResponse::from(item)))
} }

View file

@ -127,6 +127,17 @@ pub async fn toggle_plugin(
})?; })?;
} }
// Re-discover capabilities after toggle so cached data stays current
if let Some(ref pipeline) = state.plugin_pipeline
&& let Err(e) = pipeline.discover_capabilities().await
{
tracing::warn!(
plugin_id = %id,
error = %e,
"failed to re-discover capabilities after plugin toggle"
);
}
Ok(Json(serde_json::json!({ Ok(Json(serde_json::json!({
"id": id, "id": id,
"enabled": req.enabled "enabled": req.enabled
@ -150,5 +161,16 @@ pub async fn reload_plugin(
)) ))
})?; })?;
// Re-discover capabilities after reload so cached data stays current
if let Some(ref pipeline) = state.plugin_pipeline
&& let Err(e) = pipeline.discover_capabilities().await
{
tracing::warn!(
plugin_id = %id,
error = %e,
"failed to re-discover capabilities after plugin reload"
);
}
Ok(Json(serde_json::json!({"reloaded": true}))) Ok(Json(serde_json::json!({"reloaded": true})))
} }

View file

@ -58,6 +58,15 @@ pub async fn tag_media(
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<serde_json::Value>, ApiError> {
pinakes_core::tags::tag_media(&state.storage, MediaId(media_id), req.tag_id) pinakes_core::tags::tag_media(&state.storage, MediaId(media_id), req.tag_id)
.await?; .await?;
state.emit_plugin_event(
"MediaTagged",
&serde_json::json!({
"media_id": media_id.to_string(),
"tag_id": req.tag_id.to_string(),
}),
);
Ok(Json(serde_json::json!({"tagged": true}))) Ok(Json(serde_json::json!({"tagged": true})))
} }
@ -67,6 +76,15 @@ pub async fn untag_media(
) -> Result<Json<serde_json::Value>, ApiError> { ) -> Result<Json<serde_json::Value>, ApiError> {
pinakes_core::tags::untag_media(&state.storage, MediaId(media_id), tag_id) pinakes_core::tags::untag_media(&state.storage, MediaId(media_id), tag_id)
.await?; .await?;
state.emit_plugin_event(
"MediaUntagged",
&serde_json::json!({
"media_id": media_id.to_string(),
"tag_id": tag_id.to_string(),
}),
);
Ok(Json(serde_json::json!({"untagged": true}))) Ok(Json(serde_json::json!({"untagged": true})))
} }

View file

@ -5,7 +5,7 @@ use pinakes_core::{
config::Config, config::Config,
jobs::JobQueue, jobs::JobQueue,
managed_storage::ManagedStorageService, managed_storage::ManagedStorageService,
plugin::PluginManager, plugin::{PluginManager, PluginPipeline},
scan::ScanProgress, scan::ScanProgress,
scheduler::TaskScheduler, scheduler::TaskScheduler,
storage::DynStorageBackend, storage::DynStorageBackend,
@ -32,9 +32,23 @@ pub struct AppState {
pub cache: Arc<CacheLayer>, pub cache: Arc<CacheLayer>,
pub scheduler: Arc<TaskScheduler>, pub scheduler: Arc<TaskScheduler>,
pub plugin_manager: Option<Arc<PluginManager>>, pub plugin_manager: Option<Arc<PluginManager>>,
pub plugin_pipeline: Option<Arc<PluginPipeline>>,
pub transcode_service: Option<Arc<TranscodeService>>, pub transcode_service: Option<Arc<TranscodeService>>,
pub managed_storage: Option<Arc<ManagedStorageService>>, pub managed_storage: Option<Arc<ManagedStorageService>>,
pub chunked_upload_manager: Option<Arc<ChunkedUploadManager>>, pub chunked_upload_manager: Option<Arc<ChunkedUploadManager>>,
pub webhook_dispatcher: Option<Arc<WebhookDispatcher>>, pub webhook_dispatcher: Option<Arc<WebhookDispatcher>>,
pub session_semaphore: Arc<Semaphore>, pub session_semaphore: Arc<Semaphore>,
} }
impl AppState {
/// Emit a plugin event if the pipeline is active.
pub fn emit_plugin_event(
&self,
event_type: &str,
payload: &serde_json::Value,
) {
if let Some(ref pipeline) = self.plugin_pipeline {
pipeline.emit_event(event_type, payload);
}
}
}