finalize server-side plugin system #8

Merged
NotAShelf merged 10 commits from notashelf/push-wxzvxtntoxnn into main 2026-03-08 12:21:45 +00:00
2 changed files with 83 additions and 12 deletions
Showing only changes of commit f686e8a777 - Show all commits

pinakes-core: emit plugin events from scan and import pipelines

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ib992e292a3272c52f9b7c18164ec61f56a6a6964
raf 2026-03-08 15:01:46 +03:00
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF

View file

@ -21,6 +21,7 @@ use crate::{
MediaItem,
StorageMode,
},
plugin::PluginPipeline,
storage::DynStorageBackend,
thumbnail,
};
@ -106,8 +107,10 @@ pub async fn validate_path_in_roots(
pub async fn import_file(
storage: &DynStorageBackend,
path: &Path,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ImportResult> {
import_file_with_options(storage, path, &ImportOptions::default()).await
import_file_with_options(storage, path, &ImportOptions::default(), pipeline)
.await
}
/// Import a file with configurable options for incremental scanning
@ -119,6 +122,7 @@ pub async fn import_file_with_options(
storage: &DynStorageBackend,
path: &Path,
options: &ImportOptions,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ImportResult> {
let path = path.canonicalize()?;
@ -128,8 +132,12 @@ pub async fn import_file_with_options(
validate_path_in_roots(storage, &path).await?;
let media_type = MediaType::from_path(&path)
.ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?;
let media_type = if let Some(pl) = pipeline {
pl.resolve_media_type(&path).await
} else {
MediaType::from_path(&path)
}
.ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?;
let current_mtime = get_file_mtime(&path);
@ -169,7 +177,9 @@ pub async fn import_file_with_options(
let file_meta = std::fs::metadata(&path)?;
let file_size = file_meta.len();
let extracted = {
let extracted = if let Some(pl) = pipeline {
pl.extract_metadata(&path, &media_type).await?
} else {
let path_clone = path.clone();
let media_type_clone = media_type.clone();
tokio::task::spawn_blocking(move || {
@ -189,7 +199,15 @@ pub async fn import_file_with_options(
let media_id = MediaId::new();
// Generate thumbnail for image types
let thumb_path = {
let thumb_path = if let Some(pl) = pipeline {
pl.generate_thumbnail(
media_id,
&path,
&media_type,
&thumbnail::default_thumbnail_dir(),
)
.await?
} else {
let source = path.clone();
let thumb_dir = thumbnail::default_thumbnail_dir();
let media_type_clone = media_type.clone();
@ -218,6 +236,9 @@ pub async fn import_file_with_options(
let is_markdown =
media_type == MediaType::Builtin(BuiltinMediaType::Markdown);
// Capture media type debug string before moving into MediaItem
let media_type_debug = format!("{media_type:?}");
let item = MediaItem {
id: media_id,
path: path.clone(),
@ -299,6 +320,15 @@ pub async fn import_file_with_options(
)
.await?;
if let Some(pl) = pipeline {
let payload = serde_json::json!({
"media_id": media_id.to_string(),
"path": path.to_string_lossy(),
"media_type": media_type_debug,
});
pl.emit_event("MediaImported", &payload);
}
info!(media_id = %media_id, path = %path.display(), "imported media file");
Ok(ImportResult {
@ -349,6 +379,7 @@ pub async fn import_directory(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<std::result::Result<ImportResult, PinakesError>>> {
import_directory_with_options(
storage,
@ -356,6 +387,7 @@ pub async fn import_directory(
ignore_patterns,
DEFAULT_IMPORT_CONCURRENCY,
&ImportOptions::default(),
pipeline,
)
.await
}
@ -372,6 +404,7 @@ pub async fn import_directory_with_concurrency(
dir: &Path,
ignore_patterns: &[String],
concurrency: usize,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<std::result::Result<ImportResult, PinakesError>>> {
import_directory_with_options(
storage,
@ -379,6 +412,7 @@ pub async fn import_directory_with_concurrency(
ignore_patterns,
concurrency,
&ImportOptions::default(),
pipeline,
)
.await
}
@ -395,11 +429,13 @@ pub async fn import_directory_with_options(
ignore_patterns: &[String],
concurrency: usize,
options: &ImportOptions,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<std::result::Result<ImportResult, PinakesError>>> {
let concurrency = concurrency.clamp(1, 256);
let dir = dir.to_path_buf();
let patterns = ignore_patterns.to_vec();
let options = options.clone();
let pipeline = pipeline.cloned();
let entries: Vec<PathBuf> = {
let dir = dir.clone();
@ -425,9 +461,11 @@ pub async fn import_directory_with_options(
let storage = Arc::clone(storage);
let path = entry_path.clone();
let opts = options.clone();
let pl = pipeline.clone();
join_set.spawn(async move {
let result = import_file_with_options(&storage, &path, &opts).await;
let result =
import_file_with_options(&storage, &path, &opts, pl.as_ref()).await;
(path, result)
});

View file

@ -11,7 +11,12 @@ use notify::{PollWatcher, RecursiveMode, Watcher};
use tokio::sync::mpsc;
use tracing::{info, warn};
use crate::{error::Result, import, storage::DynStorageBackend};
use crate::{
error::Result,
import,
plugin::PluginPipeline,
storage::DynStorageBackend,
};
/// Status of a directory scan operation.
pub struct ScanStatus {
@ -122,6 +127,7 @@ pub async fn scan_directory(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ScanStatus> {
scan_directory_with_options(
storage,
@ -129,6 +135,7 @@ pub async fn scan_directory(
ignore_patterns,
None,
&ScanOptions::default(),
pipeline,
)
.await
}
@ -154,13 +161,21 @@ pub async fn scan_directory_incremental(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ScanStatus> {
let options = ScanOptions {
incremental: true,
force_full: false,
};
scan_directory_with_options(storage, dir, ignore_patterns, None, &options)
.await
scan_directory_with_options(
storage,
dir,
ignore_patterns,
None,
&options,
pipeline,
)
.await
}
/// Scans a directory with progress reporting.
@ -184,6 +199,7 @@ pub async fn scan_directory_with_progress(
dir: &Path,
ignore_patterns: &[String],
progress: Option<&ScanProgress>,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ScanStatus> {
scan_directory_with_options(
storage,
@ -191,6 +207,7 @@ pub async fn scan_directory_with_progress(
ignore_patterns,
progress,
&ScanOptions::default(),
pipeline,
)
.await
}
@ -207,6 +224,7 @@ pub async fn scan_directory_with_options(
ignore_patterns: &[String],
progress: Option<&ScanProgress>,
scan_options: &ScanOptions,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ScanStatus> {
info!(
dir = %dir.display(),
@ -230,8 +248,9 @@ pub async fn scan_directory_with_options(
storage,
dir,
ignore_patterns,
8, // Default concurrency
8, // default concurrency
&import_options,
pipeline,
)
.await?;
@ -301,12 +320,14 @@ pub async fn scan_directory_with_options(
pub async fn scan_all_roots(
storage: &DynStorageBackend,
ignore_patterns: &[String],
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<ScanStatus>> {
scan_all_roots_with_options(
storage,
ignore_patterns,
None,
&ScanOptions::default(),
pipeline,
)
.await
}
@ -328,12 +349,20 @@ pub async fn scan_all_roots(
pub async fn scan_all_roots_incremental(
storage: &DynStorageBackend,
ignore_patterns: &[String],
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<ScanStatus>> {
let options = ScanOptions {
incremental: true,
force_full: false,
};
scan_all_roots_with_options(storage, ignore_patterns, None, &options).await
scan_all_roots_with_options(
storage,
ignore_patterns,
None,
&options,
pipeline,
)
.await
}
/// Scans all root directories with progress reporting.
@ -355,12 +384,14 @@ pub async fn scan_all_roots_with_progress(
storage: &DynStorageBackend,
ignore_patterns: &[String],
progress: Option<&ScanProgress>,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<ScanStatus>> {
scan_all_roots_with_options(
storage,
ignore_patterns,
progress,
&ScanOptions::default(),
pipeline,
)
.await
}
@ -386,6 +417,7 @@ pub async fn scan_all_roots_with_options(
ignore_patterns: &[String],
progress: Option<&ScanProgress>,
scan_options: &ScanOptions,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<ScanStatus>> {
let roots = storage.list_root_dirs().await?;
let mut statuses = Vec::new();
@ -397,6 +429,7 @@ pub async fn scan_all_roots_with_options(
ignore_patterns,
progress,
scan_options,
pipeline,
)
.await
{
@ -536,7 +569,7 @@ pub async fn watch_and_import(
&& !crate::import::should_ignore(&path, &ignore_patterns)
{
info!(path = %path.display(), "detected file change, importing");
if let Err(e) = import::import_file(&storage, &path).await {
if let Err(e) = import::import_file(&storage, &path, None).await {
warn!(path = %path.display(), error = %e, "failed to import changed file");
}
}