diff --git a/crates/pinakes-core/src/import.rs b/crates/pinakes-core/src/import.rs index 5d4935d..6d3c657 100644 --- a/crates/pinakes-core/src/import.rs +++ b/crates/pinakes-core/src/import.rs @@ -21,6 +21,7 @@ use crate::{ MediaItem, StorageMode, }, + plugin::PluginPipeline, storage::DynStorageBackend, thumbnail, }; @@ -106,8 +107,10 @@ pub async fn validate_path_in_roots( pub async fn import_file( storage: &DynStorageBackend, path: &Path, + pipeline: Option<&Arc>, ) -> Result { - import_file_with_options(storage, path, &ImportOptions::default()).await + import_file_with_options(storage, path, &ImportOptions::default(), pipeline) + .await } /// Import a file with configurable options for incremental scanning @@ -119,6 +122,7 @@ pub async fn import_file_with_options( storage: &DynStorageBackend, path: &Path, options: &ImportOptions, + pipeline: Option<&Arc>, ) -> Result { let path = path.canonicalize()?; @@ -128,8 +132,12 @@ pub async fn import_file_with_options( validate_path_in_roots(storage, &path).await?; - let media_type = MediaType::from_path(&path) - .ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?; + let media_type = if let Some(pl) = pipeline { + pl.resolve_media_type(&path).await + } else { + MediaType::from_path(&path) + } + .ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?; let current_mtime = get_file_mtime(&path); @@ -169,7 +177,9 @@ pub async fn import_file_with_options( let file_meta = std::fs::metadata(&path)?; let file_size = file_meta.len(); - let extracted = { + let extracted = if let Some(pl) = pipeline { + pl.extract_metadata(&path, &media_type).await? + } else { let path_clone = path.clone(); let media_type_clone = media_type.clone(); tokio::task::spawn_blocking(move || { @@ -189,7 +199,15 @@ pub async fn import_file_with_options( let media_id = MediaId::new(); // Generate thumbnail for image types - let thumb_path = { + let thumb_path = if let Some(pl) = pipeline { + pl.generate_thumbnail( + media_id, + &path, + &media_type, + &thumbnail::default_thumbnail_dir(), + ) + .await? + } else { let source = path.clone(); let thumb_dir = thumbnail::default_thumbnail_dir(); let media_type_clone = media_type.clone(); @@ -218,6 +236,9 @@ pub async fn import_file_with_options( let is_markdown = media_type == MediaType::Builtin(BuiltinMediaType::Markdown); + // Capture media type debug string before moving into MediaItem + let media_type_debug = format!("{media_type:?}"); + let item = MediaItem { id: media_id, path: path.clone(), @@ -299,6 +320,15 @@ pub async fn import_file_with_options( ) .await?; + if let Some(pl) = pipeline { + let payload = serde_json::json!({ + "media_id": media_id.to_string(), + "path": path.to_string_lossy(), + "media_type": media_type_debug, + }); + pl.emit_event("MediaImported", &payload); + } + info!(media_id = %media_id, path = %path.display(), "imported media file"); Ok(ImportResult { @@ -349,6 +379,7 @@ pub async fn import_directory( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result>> { import_directory_with_options( storage, @@ -356,6 +387,7 @@ pub async fn import_directory( ignore_patterns, DEFAULT_IMPORT_CONCURRENCY, &ImportOptions::default(), + pipeline, ) .await } @@ -372,6 +404,7 @@ pub async fn import_directory_with_concurrency( dir: &Path, ignore_patterns: &[String], concurrency: usize, + pipeline: Option<&Arc>, ) -> Result>> { import_directory_with_options( storage, @@ -379,6 +412,7 @@ pub async fn import_directory_with_concurrency( ignore_patterns, concurrency, &ImportOptions::default(), + pipeline, ) .await } @@ -395,11 +429,13 @@ pub async fn import_directory_with_options( ignore_patterns: &[String], concurrency: usize, options: &ImportOptions, + pipeline: Option<&Arc>, ) -> Result>> { let concurrency = concurrency.clamp(1, 256); let dir = dir.to_path_buf(); let patterns = ignore_patterns.to_vec(); let options = options.clone(); + let pipeline = pipeline.cloned(); let entries: Vec = { let dir = dir.clone(); @@ -425,9 +461,11 @@ pub async fn import_directory_with_options( let storage = Arc::clone(storage); let path = entry_path.clone(); let opts = options.clone(); + let pl = pipeline.clone(); join_set.spawn(async move { - let result = import_file_with_options(&storage, &path, &opts).await; + let result = + import_file_with_options(&storage, &path, &opts, pl.as_ref()).await; (path, result) }); diff --git a/crates/pinakes-core/src/scan.rs b/crates/pinakes-core/src/scan.rs index 8fe459f..ce43896 100644 --- a/crates/pinakes-core/src/scan.rs +++ b/crates/pinakes-core/src/scan.rs @@ -11,7 +11,12 @@ use notify::{PollWatcher, RecursiveMode, Watcher}; use tokio::sync::mpsc; use tracing::{info, warn}; -use crate::{error::Result, import, storage::DynStorageBackend}; +use crate::{ + error::Result, + import, + plugin::PluginPipeline, + storage::DynStorageBackend, +}; /// Status of a directory scan operation. pub struct ScanStatus { @@ -122,6 +127,7 @@ pub async fn scan_directory( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result { scan_directory_with_options( storage, @@ -129,6 +135,7 @@ pub async fn scan_directory( ignore_patterns, None, &ScanOptions::default(), + pipeline, ) .await } @@ -154,13 +161,21 @@ pub async fn scan_directory_incremental( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result { let options = ScanOptions { incremental: true, force_full: false, }; - scan_directory_with_options(storage, dir, ignore_patterns, None, &options) - .await + scan_directory_with_options( + storage, + dir, + ignore_patterns, + None, + &options, + pipeline, + ) + .await } /// Scans a directory with progress reporting. @@ -184,6 +199,7 @@ pub async fn scan_directory_with_progress( dir: &Path, ignore_patterns: &[String], progress: Option<&ScanProgress>, + pipeline: Option<&Arc>, ) -> Result { scan_directory_with_options( storage, @@ -191,6 +207,7 @@ pub async fn scan_directory_with_progress( ignore_patterns, progress, &ScanOptions::default(), + pipeline, ) .await } @@ -207,6 +224,7 @@ pub async fn scan_directory_with_options( ignore_patterns: &[String], progress: Option<&ScanProgress>, scan_options: &ScanOptions, + pipeline: Option<&Arc>, ) -> Result { info!( dir = %dir.display(), @@ -230,8 +248,9 @@ pub async fn scan_directory_with_options( storage, dir, ignore_patterns, - 8, // Default concurrency + 8, // default concurrency &import_options, + pipeline, ) .await?; @@ -301,12 +320,14 @@ pub async fn scan_directory_with_options( pub async fn scan_all_roots( storage: &DynStorageBackend, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result> { scan_all_roots_with_options( storage, ignore_patterns, None, &ScanOptions::default(), + pipeline, ) .await } @@ -328,12 +349,20 @@ pub async fn scan_all_roots( pub async fn scan_all_roots_incremental( storage: &DynStorageBackend, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result> { let options = ScanOptions { incremental: true, force_full: false, }; - scan_all_roots_with_options(storage, ignore_patterns, None, &options).await + scan_all_roots_with_options( + storage, + ignore_patterns, + None, + &options, + pipeline, + ) + .await } /// Scans all root directories with progress reporting. @@ -355,12 +384,14 @@ pub async fn scan_all_roots_with_progress( storage: &DynStorageBackend, ignore_patterns: &[String], progress: Option<&ScanProgress>, + pipeline: Option<&Arc>, ) -> Result> { scan_all_roots_with_options( storage, ignore_patterns, progress, &ScanOptions::default(), + pipeline, ) .await } @@ -386,6 +417,7 @@ pub async fn scan_all_roots_with_options( ignore_patterns: &[String], progress: Option<&ScanProgress>, scan_options: &ScanOptions, + pipeline: Option<&Arc>, ) -> Result> { let roots = storage.list_root_dirs().await?; let mut statuses = Vec::new(); @@ -397,6 +429,7 @@ pub async fn scan_all_roots_with_options( ignore_patterns, progress, scan_options, + pipeline, ) .await { @@ -536,7 +569,7 @@ pub async fn watch_and_import( && !crate::import::should_ignore(&path, &ignore_patterns) { info!(path = %path.display(), "detected file change, importing"); - if let Err(e) = import::import_file(&storage, &path).await { + if let Err(e) = import::import_file(&storage, &path, None).await { warn!(path = %path.display(), error = %e, "failed to import changed file"); } }