pinakes/crates/pinakes-core/src/plugin/pipeline.rs
NotAShelf f831e58723
treewide: replace std hashers with rustc_hash alternatives; fix clippy
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I766c36cb53d3d7f9e85b91a67c4131a66a6a6964
2026-03-22 17:58:27 +03:00

1444 lines
39 KiB
Rust

//! Plugin pipeline for wiring plugins into the media processing stages.
//!
//! The [`PluginPipeline`] coordinates built-in handlers and WASM plugins for:
//!
//! - Media type resolution (first-match-wins)
//! - Metadata extraction (accumulating merge by priority)
//! - Thumbnail generation (first-success-wins)
//! - Search backend dispatch (merge results, rank by score)
//! - Theme provider dispatch (accumulate all themes)
//! - Event fan-out to interested handlers
//!
//! Each plugin has a priority (0-999). Built-in handlers run at implicit
//! priority 100. A circuit breaker disables plugins after consecutive failures.
use std::{
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use rustc_hash::FxHashMap;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use super::PluginManager;
use crate::{
config::PluginTimeoutConfig,
media_type::MediaType,
metadata::ExtractedMetadata,
model::MediaId,
plugin::rpc::{
CanHandleRequest,
CanHandleResponse,
ExtractMetadataRequest,
ExtractMetadataResponse,
GenerateThumbnailRequest,
GenerateThumbnailResponse,
HandleEventRequest,
IndexItemRequest,
LoadThemeResponse,
PluginMediaTypeDefinition,
PluginThemeDefinition,
RemoveItemRequest,
SearchRequest,
SearchResponse,
SearchResultItem,
},
};
/// Built-in handlers run at this implicit priority.
const BUILTIN_PRIORITY: u16 = 100;
/// Cooldown before the circuit breaker allows a single trial call.
const CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_mins(1);
/// Tracks per-plugin health for the circuit breaker.
struct PluginHealth {
consecutive_failures: u32,
last_failure: Option<Instant>,
disabled_by_circuit_breaker: bool,
}
impl PluginHealth {
const fn new() -> Self {
Self {
consecutive_failures: 0,
last_failure: None,
disabled_by_circuit_breaker: false,
}
}
}
/// Capability information discovered from plugins at startup.
struct CachedCapabilities {
/// Keyed by `(kind, plugin_id)` -> list of supported type strings.
/// Separate entries for each kind avoid collisions when a plugin
/// implements both `metadata_extractor` and `thumbnail_generator`.
supported_types: FxHashMap<(String, String), Vec<String>>,
/// `plugin_id` -> list of interested event type strings
interested_events: FxHashMap<String, Vec<String>>,
/// `plugin_id` -> list of media type definitions (for `MediaTypeProvider`)
media_type_definitions: FxHashMap<String, Vec<PluginMediaTypeDefinition>>,
/// `plugin_id` -> list of theme definitions (for `ThemeProvider`)
theme_definitions: FxHashMap<String, Vec<PluginThemeDefinition>>,
}
impl CachedCapabilities {
fn new() -> Self {
Self {
supported_types: FxHashMap::default(),
interested_events: FxHashMap::default(),
media_type_definitions: FxHashMap::default(),
theme_definitions: FxHashMap::default(),
}
}
}
/// Coordinates built-in handlers and WASM plugins in a priority-ordered
/// pipeline for media processing stages.
pub struct PluginPipeline {
manager: Arc<PluginManager>,
timeouts: PluginTimeoutConfig,
max_consecutive_failures: u32,
health: RwLock<FxHashMap<String, PluginHealth>>,
capabilities: RwLock<CachedCapabilities>,
}
impl PluginPipeline {
/// Create a new plugin pipeline.
#[must_use]
pub fn new(
manager: Arc<PluginManager>,
timeouts: PluginTimeoutConfig,
max_consecutive_failures: u32,
) -> Self {
Self {
manager,
timeouts,
max_consecutive_failures,
health: RwLock::new(FxHashMap::default()),
capabilities: RwLock::new(CachedCapabilities::new()),
}
}
/// Discover capabilities from all enabled plugins. Call on startup and
/// after plugin reload.
///
/// # Errors
///
/// Returns an error only for truly fatal problems. Individual plugin
/// discovery failures are logged and skipped.
pub async fn discover_capabilities(&self) -> crate::error::Result<()> {
info!("discovering plugin capabilities");
let timeout = Duration::from_secs(self.timeouts.capability_query_secs);
let mut caps = CachedCapabilities::new();
// Discover metadata extractors
self
.discover_supported_types("metadata_extractor", timeout, &mut caps)
.await;
// Discover thumbnail generators
self
.discover_supported_types("thumbnail_generator", timeout, &mut caps)
.await;
// Discover media type providers
self
.discover_media_type_definitions(timeout, &mut caps)
.await;
// Discover search backends
self
.discover_supported_types("search_backend", timeout, &mut caps)
.await;
// Discover theme providers
self.discover_theme_definitions(timeout, &mut caps).await;
// Discover event handlers
self.discover_interested_events(timeout, &mut caps).await;
let mut cached = self.capabilities.write().await;
*cached = caps;
drop(cached);
Ok(())
}
/// Query `supported_types` from all enabled plugins of a given kind.
async fn discover_supported_types(
&self,
kind: &str,
timeout: Duration,
caps: &mut CachedCapabilities,
) {
let plugins = self.manager.get_enabled_by_kind_sorted(kind).await;
for (id, _priority, _kinds, wasm) in &plugins {
match wasm
.call_function_json::<serde_json::Value, Vec<String>>(
"supported_types",
&serde_json::json!({}),
timeout,
)
.await
{
Ok(types) => {
debug!(
plugin_id = %id,
kind = kind,
types = ?types,
"discovered supported types"
);
caps
.supported_types
.insert((kind.to_string(), id.clone()), types);
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"failed to discover supported types"
);
},
}
}
}
/// Query `supported_media_types` from `MediaTypeProvider` plugins.
async fn discover_media_type_definitions(
&self,
timeout: Duration,
caps: &mut CachedCapabilities,
) {
let plugins = self.manager.get_enabled_by_kind_sorted("media_type").await;
for (id, _priority, _kinds, wasm) in &plugins {
match wasm
.call_function_json::<serde_json::Value, Vec<PluginMediaTypeDefinition>>(
"supported_media_types",
&serde_json::json!({}),
timeout,
)
.await
{
Ok(defs) => {
debug!(
plugin_id = %id,
count = defs.len(),
"discovered media type definitions"
);
caps.media_type_definitions.insert(id.clone(), defs);
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"failed to discover media type definitions"
);
},
}
}
}
/// Query `interested_events` from `EventHandler` plugins.
async fn discover_interested_events(
&self,
timeout: Duration,
caps: &mut CachedCapabilities,
) {
let plugins = self
.manager
.get_enabled_by_kind_sorted("event_handler")
.await;
for (id, _priority, _kinds, wasm) in &plugins {
match wasm
.call_function_json::<serde_json::Value, Vec<String>>(
"interested_events",
&serde_json::json!({}),
timeout,
)
.await
{
Ok(events) => {
debug!(
plugin_id = %id,
events = ?events,
"discovered interested events"
);
caps.interested_events.insert(id.clone(), events);
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"failed to discover interested events"
);
},
}
}
}
/// Query `get_themes` from `ThemeProvider` plugins.
async fn discover_theme_definitions(
&self,
timeout: Duration,
caps: &mut CachedCapabilities,
) {
let plugins = self
.manager
.get_enabled_by_kind_sorted("theme_provider")
.await;
for (id, _priority, _kinds, wasm) in &plugins {
match wasm
.call_function_json::<serde_json::Value, Vec<PluginThemeDefinition>>(
"get_themes",
&serde_json::json!({}),
timeout,
)
.await
{
Ok(defs) => {
debug!(
plugin_id = %id,
count = defs.len(),
"discovered theme definitions"
);
caps.theme_definitions.insert(id.clone(), defs);
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"failed to discover theme definitions"
);
},
}
}
}
/// Resolve the media type for a file path.
///
/// Iterates `MediaTypeProvider` plugins in priority order, falling back to
/// the built-in resolver at implicit priority 100.
pub async fn resolve_media_type(&self, path: &Path) -> Option<MediaType> {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
let plugins = self.manager.get_enabled_by_kind_sorted("media_type").await;
let mut builtin_ran = false;
for (id, priority, kinds, wasm) in &plugins {
// Run built-in at its implicit priority slot
if *priority >= BUILTIN_PRIORITY && !builtin_ran {
builtin_ran = true;
if let Some(mt) = MediaType::from_path(path) {
return Some(mt);
}
}
if !self.is_healthy(id).await {
continue;
}
// Validate the call is allowed for this plugin kind
if !self
.manager
.enforcer()
.validate_function_call(kinds, "can_handle")
{
continue;
}
let req = CanHandleRequest {
path: path.to_path_buf(),
mime_type: None,
};
match wasm
.call_function_json::<CanHandleRequest, CanHandleResponse>(
"can_handle",
&req,
timeout,
)
.await
{
Ok(resp) if resp.can_handle => {
self.record_success(id).await;
// First match wins: the plugin claimed this file.
// Determine the custom media type ID from cached definitions.
if let Some(mt_id) = self.resolve_custom_media_type_id(id, path).await
{
return Some(MediaType::Custom(mt_id));
}
// Plugin claimed the file but has no matching definition.
// The claim still stops the chain per first-match semantics.
warn!(
plugin_id = %id,
path = %path.display(),
"plugin can_handle returned true but no matching media type definition"
);
return None;
},
Ok(_) => {
self.record_success(id).await;
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"media type resolution failed"
);
self.record_failure(id).await;
},
}
}
// If built-in hasn't run yet (all plugins had priority < 100)
if !builtin_ran {
return MediaType::from_path(path);
}
None
}
/// Look up the cached media type definitions for a plugin and find the
/// definition whose extensions match the file's extension.
async fn resolve_custom_media_type_id(
&self,
plugin_id: &str,
path: &Path,
) -> Option<String> {
let ext = path
.extension()
.and_then(|e| e.to_str())
.map(str::to_ascii_lowercase)?;
let caps = self.capabilities.read().await;
let result = caps.media_type_definitions.get(plugin_id).and_then(|defs| {
defs.iter().find_map(|def| {
def
.extensions
.iter()
.any(|e| e.eq_ignore_ascii_case(&ext))
.then(|| def.id.clone())
})
});
drop(caps);
result
}
/// Extract metadata using plugins and built-in extractors in priority
/// order. Later (higher priority) results overwrite earlier ones
/// field-by-field.
///
/// # Errors
///
/// Propagates errors from the built-in extractor. Plugin errors are
/// logged and skipped.
pub async fn extract_metadata(
&self,
path: &Path,
media_type: &MediaType,
) -> crate::error::Result<ExtractedMetadata> {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
let plugins = self
.manager
.get_enabled_by_kind_sorted("metadata_extractor")
.await;
let media_type_str = media_type.id();
let mut acc = ExtractedMetadata::default();
let mut builtin_ran = false;
for (id, priority, kinds, wasm) in &plugins {
// Insert built-in at its priority slot
if *priority >= BUILTIN_PRIORITY && !builtin_ran {
builtin_ran = true;
acc = self.run_builtin_metadata(path, media_type, acc).await?;
}
if !self.is_healthy(id).await {
continue;
}
// Check if this plugin supports the media type
let supported = {
let caps = self.capabilities.read().await;
let key = ("metadata_extractor".to_string(), id.clone());
caps
.supported_types
.get(&key)
.is_some_and(|types| types.contains(&media_type_str))
};
if !supported {
continue;
}
if !self
.manager
.enforcer()
.validate_function_call(kinds, "extract_metadata")
{
continue;
}
let req = ExtractMetadataRequest {
path: path.to_path_buf(),
};
match wasm
.call_function_json::<ExtractMetadataRequest, ExtractMetadataResponse>(
"extract_metadata",
&req,
timeout,
)
.await
{
Ok(resp) => {
self.record_success(id).await;
merge_metadata(&mut acc, &resp);
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"metadata extraction failed"
);
self.record_failure(id).await;
},
}
}
// If built-in hasn't run yet (all plugins had priority < 100)
if !builtin_ran {
acc = self.run_builtin_metadata(path, media_type, acc).await?;
}
Ok(acc)
}
/// Run the built-in metadata extractor (sync, wrapped in `spawn_blocking`).
async fn run_builtin_metadata(
&self,
path: &Path,
media_type: &MediaType,
mut acc: ExtractedMetadata,
) -> crate::error::Result<ExtractedMetadata> {
let path = path.to_path_buf();
let mt = media_type.clone();
let builtin = tokio::task::spawn_blocking(move || {
crate::metadata::extract_metadata(&path, &mt)
})
.await
.map_err(|e| {
crate::error::PinakesError::MetadataExtraction(format!(
"built-in metadata task panicked: {e}"
))
})??;
// Merge built-in result into accumulator
merge_extracted(&mut acc, builtin);
Ok(acc)
}
// Thumbnail generation
/// Generate a thumbnail using plugins and the built-in generator in
/// priority order. Returns the first successful result.
///
/// # Errors
///
/// Propagates errors only from the built-in generator. Plugin errors are
/// logged and skipped.
pub async fn generate_thumbnail(
&self,
media_id: MediaId,
path: &Path,
media_type: &MediaType,
thumb_dir: &Path,
) -> crate::error::Result<Option<PathBuf>> {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
let plugins = self
.manager
.get_enabled_by_kind_sorted("thumbnail_generator")
.await;
let media_type_str = media_type.id();
let mut builtin_ran = false;
for (id, priority, kinds, wasm) in &plugins {
// Insert built-in at its priority slot
if *priority >= BUILTIN_PRIORITY && !builtin_ran {
builtin_ran = true;
if let Some(result) = self
.run_builtin_thumbnail(media_id, path, media_type, thumb_dir)
.await?
{
return Ok(Some(result));
}
}
if !self.is_healthy(id).await {
continue;
}
// Check if this plugin supports the media type
let supported = {
let caps = self.capabilities.read().await;
let key = ("thumbnail_generator".to_string(), id.clone());
caps
.supported_types
.get(&key)
.is_some_and(|types| types.contains(&media_type_str))
};
if !supported {
continue;
}
if !self
.manager
.enforcer()
.validate_function_call(kinds, "generate_thumbnail")
{
continue;
}
let output_path = thumb_dir.join(format!("{media_id}.jpg"));
let req = GenerateThumbnailRequest {
source_path: path.to_path_buf(),
output_path: output_path.clone(),
max_width: 320,
max_height: 320,
format: "jpeg".to_string(),
};
match wasm
.call_function_json::<GenerateThumbnailRequest, GenerateThumbnailResponse>(
"generate_thumbnail",
&req,
timeout,
)
.await
{
Ok(resp) => {
self.record_success(id).await;
return Ok(Some(resp.path));
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"thumbnail generation failed"
);
self.record_failure(id).await;
},
}
}
// If built-in hasn't run yet
if !builtin_ran {
return self
.run_builtin_thumbnail(media_id, path, media_type, thumb_dir)
.await;
}
Ok(None)
}
/// Run the built-in thumbnail generator (sync, wrapped in
/// `spawn_blocking`).
async fn run_builtin_thumbnail(
&self,
media_id: MediaId,
path: &Path,
media_type: &MediaType,
thumb_dir: &Path,
) -> crate::error::Result<Option<PathBuf>> {
let path = path.to_path_buf();
let mt = media_type.clone();
let td = thumb_dir.to_path_buf();
tokio::task::spawn_blocking(move || {
crate::thumbnail::generate_thumbnail(media_id, &path, &mt, &td)
})
.await
.map_err(|e| {
crate::error::PinakesError::ThumbnailGeneration(format!(
"built-in thumbnail task panicked: {e}"
))
})?
}
/// Fan out an event to all interested event handler plugins.
///
/// The work runs in a spawned task; failures are logged, never
/// propagated.
pub fn emit_event(
self: &Arc<Self>,
event_type: &str,
payload: &serde_json::Value,
) {
let pipeline = Arc::clone(self);
let event_type = event_type.to_string();
let payload = payload.clone();
tokio::spawn(async move {
pipeline.dispatch_event(&event_type, &payload).await;
});
}
/// Internal dispatcher for events.
async fn dispatch_event(
&self,
event_type: &str,
payload: &serde_json::Value,
) {
let timeout = Duration::from_secs(self.timeouts.event_handler_secs);
// Collect plugin IDs interested in this event
let interested_ids: Vec<String> = {
let caps = self.capabilities.read().await;
caps
.interested_events
.iter()
.filter(|(_id, events)| events.contains(&event_type.to_string()))
.map(|(id, _)| id.clone())
.collect()
};
if interested_ids.is_empty() {
return;
}
// Get event handler plugins sorted by priority
let plugins = self
.manager
.get_enabled_by_kind_sorted("event_handler")
.await;
for (id, _priority, kinds, wasm) in &plugins {
if !interested_ids.contains(id) {
continue;
}
if !self.is_healthy(id).await {
continue;
}
if !self
.manager
.enforcer()
.validate_function_call(kinds, "handle_event")
{
continue;
}
let req = HandleEventRequest {
event_type: event_type.to_string(),
payload: payload.clone(),
};
// Event handlers return nothing meaningful; we just care about
// success/failure.
match wasm
.call_function_json::<HandleEventRequest, serde_json::Value>(
"handle_event",
&req,
timeout,
)
.await
{
Ok(_) => {
self.record_success(id).await;
debug!(
plugin_id = %id,
event_type = event_type,
"event handled"
);
},
Err(e) => {
warn!(
plugin_id = %id,
event_type = event_type,
error = %e,
"event handler failed"
);
self.record_failure(id).await;
},
}
}
}
/// Search via plugin search backends. Results from all backends are
/// merged, deduplicated by ID, and sorted by score (highest first).
pub async fn search(
&self,
query: &str,
limit: usize,
offset: usize,
) -> Vec<SearchResultItem> {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
let plugins = self
.manager
.get_enabled_by_kind_sorted("search_backend")
.await;
let mut all_results: Vec<SearchResultItem> = Vec::new();
for (id, _priority, kinds, wasm) in &plugins {
if !self.is_healthy(id).await {
continue;
}
if !self
.manager
.enforcer()
.validate_function_call(kinds, "search")
{
continue;
}
let req = SearchRequest {
query: query.to_string(),
limit,
offset,
};
match wasm
.call_function_json::<SearchRequest, SearchResponse>(
"search", &req, timeout,
)
.await
{
Ok(resp) => {
self.record_success(id).await;
all_results.extend(resp.results);
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"search backend query failed"
);
self.record_failure(id).await;
},
}
}
// Deduplicate by ID, keeping the highest-scoring entry
let mut seen: FxHashMap<String, usize> = FxHashMap::default();
let mut deduped: Vec<SearchResultItem> = Vec::new();
for item in all_results {
if let Some(&idx) = seen.get(&item.id) {
if item.score > deduped[idx].score {
deduped[idx] = item;
}
} else {
seen.insert(item.id.clone(), deduped.len());
deduped.push(item);
}
}
// Sort by score descending
deduped.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
deduped
}
/// Index a media item in all search backend plugins (fan-out).
pub async fn index_item(&self, req: &IndexItemRequest) {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
let plugins = self
.manager
.get_enabled_by_kind_sorted("search_backend")
.await;
for (id, _priority, kinds, wasm) in &plugins {
if !self.is_healthy(id).await {
continue;
}
if !self
.manager
.enforcer()
.validate_function_call(kinds, "index_item")
{
continue;
}
match wasm
.call_function_json::<IndexItemRequest, serde_json::Value>(
"index_item",
req,
timeout,
)
.await
{
Ok(_) => {
self.record_success(id).await;
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"search backend index_item failed"
);
self.record_failure(id).await;
},
}
}
}
/// Remove a media item from all search backend plugins (fan-out).
pub async fn remove_item(&self, media_id: &str) {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
let plugins = self
.manager
.get_enabled_by_kind_sorted("search_backend")
.await;
let req = RemoveItemRequest {
id: media_id.to_string(),
};
for (id, _priority, kinds, wasm) in &plugins {
if !self.is_healthy(id).await {
continue;
}
if !self
.manager
.enforcer()
.validate_function_call(kinds, "remove_item")
{
continue;
}
match wasm
.call_function_json::<RemoveItemRequest, serde_json::Value>(
"remove_item",
&req,
timeout,
)
.await
{
Ok(_) => {
self.record_success(id).await;
},
Err(e) => {
warn!(
plugin_id = %id,
error = %e,
"search backend remove_item failed"
);
self.record_failure(id).await;
},
}
}
}
/// Get all available themes from theme provider plugins. Results from
/// all providers are accumulated.
pub async fn get_themes(&self) -> Vec<PluginThemeDefinition> {
let caps = self.capabilities.read().await;
caps
.theme_definitions
.values()
.flat_map(|defs| defs.iter().cloned())
.collect()
}
/// Load a specific theme by ID from the provider that registered it.
pub async fn load_theme(&self, theme_id: &str) -> Option<LoadThemeResponse> {
let timeout = Duration::from_secs(self.timeouts.processing_secs);
// Find which plugin owns this theme
let owner_id = {
let caps = self.capabilities.read().await;
caps.theme_definitions.iter().find_map(|(plugin_id, defs)| {
defs
.iter()
.any(|d| d.id == theme_id)
.then(|| plugin_id.clone())
})
};
let owner_id = owner_id?;
if !self.is_healthy(&owner_id).await {
return None;
}
let plugins = self
.manager
.get_enabled_by_kind_sorted("theme_provider")
.await;
let plugin = plugins.iter().find(|(id, ..)| id == &owner_id)?;
let (id, _priority, kinds, wasm) = plugin;
if !self
.manager
.enforcer()
.validate_function_call(kinds, "load_theme")
{
return None;
}
let req = serde_json::json!({"theme_id": theme_id});
match wasm
.call_function_json::<serde_json::Value, LoadThemeResponse>(
"load_theme",
&req,
timeout,
)
.await
{
Ok(resp) => {
self.record_success(id).await;
Some(resp)
},
Err(e) => {
warn!(
plugin_id = %id,
theme_id = theme_id,
error = %e,
"theme loading failed"
);
self.record_failure(id).await;
None
},
}
}
/// Record a successful call, resetting the failure counter.
async fn record_success(&self, plugin_id: &str) {
let mut health_map = self.health.write().await;
let entry = health_map
.entry(plugin_id.to_string())
.or_insert_with(PluginHealth::new);
if entry.disabled_by_circuit_breaker {
info!(
plugin_id = plugin_id,
"circuit breaker recovered: re-enabling plugin after successful trial"
);
}
entry.consecutive_failures = 0;
entry.last_failure = None;
entry.disabled_by_circuit_breaker = false;
drop(health_map);
}
/// Record a failed call. If consecutive failures exceed the threshold,
/// trip the circuit breaker. If a half-open trial fails, reset the
/// cooldown timer.
async fn record_failure(&self, plugin_id: &str) {
let mut health_map = self.health.write().await;
let entry = health_map
.entry(plugin_id.to_string())
.or_insert_with(PluginHealth::new);
entry.consecutive_failures += 1;
entry.last_failure = Some(Instant::now());
if entry.consecutive_failures >= self.max_consecutive_failures {
if !entry.disabled_by_circuit_breaker {
warn!(
plugin_id = plugin_id,
failures = entry.consecutive_failures,
"circuit breaker tripped: disabling plugin"
);
}
entry.disabled_by_circuit_breaker = true;
}
drop(health_map);
}
/// Check whether a plugin is healthy enough to receive calls.
///
/// # Returns
///
/// `true` if the plugin has no circuit breaker state, has not
/// tripped the breaker, or has tripped but the cooldown has elapsed
/// (half-open state: one trial call is permitted).
async fn is_healthy(&self, plugin_id: &str) -> bool {
let health_map = self.health.read().await;
match health_map.get(plugin_id) {
None => true,
Some(h) if !h.disabled_by_circuit_breaker => true,
Some(h) => {
// Half-open: allow a trial call after the cooldown period
h.last_failure
.is_some_and(|t| t.elapsed() >= CIRCUIT_BREAKER_COOLDOWN)
},
}
}
}
/// Merge plugin metadata response fields into the accumulator.
/// Non-None fields overwrite existing values.
fn merge_metadata(
base: &mut ExtractedMetadata,
plugin_resp: &ExtractMetadataResponse,
) {
if let Some(ref v) = plugin_resp.title {
base.title = Some(v.clone());
}
if let Some(ref v) = plugin_resp.artist {
base.artist = Some(v.clone());
}
if let Some(ref v) = plugin_resp.album {
base.album = Some(v.clone());
}
if let Some(ref v) = plugin_resp.genre {
base.genre = Some(v.clone());
}
if let Some(v) = plugin_resp.year {
base.year = Some(v);
}
if let Some(v) = plugin_resp.duration_secs {
base.duration_secs = Some(v);
}
if let Some(ref v) = plugin_resp.description {
base.description = Some(v.clone());
}
for (k, v) in &plugin_resp.extra {
base.extra.insert(k.clone(), v.clone());
}
}
/// Merge a full [`ExtractedMetadata`] into an accumulator. Non-None
/// fields from `source` overwrite `base`.
fn merge_extracted(base: &mut ExtractedMetadata, source: ExtractedMetadata) {
if source.title.is_some() {
base.title = source.title;
}
if source.artist.is_some() {
base.artist = source.artist;
}
if source.album.is_some() {
base.album = source.album;
}
if source.genre.is_some() {
base.genre = source.genre;
}
if source.year.is_some() {
base.year = source.year;
}
if source.duration_secs.is_some() {
base.duration_secs = source.duration_secs;
}
if source.description.is_some() {
base.description = source.description;
}
for (k, v) in source.extra {
base.extra.insert(k, v);
}
if source.book_metadata.is_some() {
base.book_metadata = source.book_metadata;
}
if source.date_taken.is_some() {
base.date_taken = source.date_taken;
}
if source.latitude.is_some() {
base.latitude = source.latitude;
}
if source.longitude.is_some() {
base.longitude = source.longitude;
}
if source.camera_make.is_some() {
base.camera_make = source.camera_make;
}
if source.camera_model.is_some() {
base.camera_model = source.camera_model;
}
if source.rating.is_some() {
base.rating = source.rating;
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
use crate::plugin::{PluginManager, PluginManagerConfig};
/// Create a `PluginPipeline` backed by an empty `PluginManager`.
fn create_test_pipeline() -> (TempDir, Arc<PluginPipeline>) {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let cache_dir = temp_dir.path().join("cache");
let config = PluginManagerConfig::default();
let manager =
Arc::new(PluginManager::new(data_dir, cache_dir, config).unwrap());
let pipeline = Arc::new(PluginPipeline::new(
manager,
PluginTimeoutConfig::default(),
5,
));
(temp_dir, pipeline)
}
#[tokio::test]
async fn test_circuit_breaker_trips_after_max_failures() {
let (_dir, pipeline) = create_test_pipeline();
let plugin_id = "test-plugin";
// Initially healthy
assert!(pipeline.is_healthy(plugin_id).await);
// Record failures up to the threshold
for _ in 0..5 {
pipeline.record_failure(plugin_id).await;
}
// Should be disabled now
assert!(!pipeline.is_healthy(plugin_id).await);
}
#[tokio::test]
async fn test_circuit_breaker_resets_on_success() {
let (_dir, pipeline) = create_test_pipeline();
let plugin_id = "test-plugin";
// Record some failures but not enough to trip
for _ in 0..4 {
pipeline.record_failure(plugin_id).await;
}
assert!(pipeline.is_healthy(plugin_id).await);
// One success resets the counter
pipeline.record_success(plugin_id).await;
assert!(pipeline.is_healthy(plugin_id).await);
// Need another full run of failures to trip
for _ in 0..4 {
pipeline.record_failure(plugin_id).await;
}
assert!(pipeline.is_healthy(plugin_id).await);
}
#[tokio::test]
async fn test_circuit_breaker_reenabled_after_success() {
let (_dir, pipeline) = create_test_pipeline();
let plugin_id = "test-plugin";
// Trip the circuit breaker
for _ in 0..5 {
pipeline.record_failure(plugin_id).await;
}
assert!(!pipeline.is_healthy(plugin_id).await);
// Success re-enables it
pipeline.record_success(plugin_id).await;
assert!(pipeline.is_healthy(plugin_id).await);
}
#[tokio::test]
async fn test_circuit_breaker_half_open_after_cooldown() {
let (_dir, pipeline) = create_test_pipeline();
let plugin_id = "test-plugin";
// Trip the circuit breaker
for _ in 0..5 {
pipeline.record_failure(plugin_id).await;
}
assert!(!pipeline.is_healthy(plugin_id).await);
// Simulate cooldown elapsed by backdating last_failure
{
let mut health_map = pipeline.health.write().await;
let entry = health_map.get_mut(plugin_id).unwrap();
entry.last_failure = Some(
Instant::now()
.checked_sub(CIRCUIT_BREAKER_COOLDOWN)
.unwrap()
.checked_sub(Duration::from_secs(1))
.unwrap(),
);
}
// Half-open: should be healthy for a trial call
assert!(pipeline.is_healthy(plugin_id).await);
// If the trial fails, breaker re-trips with fresh cooldown
pipeline.record_failure(plugin_id).await;
assert!(!pipeline.is_healthy(plugin_id).await);
// If we backdate again and the trial succeeds, fully recovered
{
let mut health_map = pipeline.health.write().await;
let entry = health_map.get_mut(plugin_id).unwrap();
entry.last_failure = Some(
Instant::now()
.checked_sub(CIRCUIT_BREAKER_COOLDOWN)
.unwrap()
.checked_sub(Duration::from_secs(1))
.unwrap(),
);
}
assert!(pipeline.is_healthy(plugin_id).await);
pipeline.record_success(plugin_id).await;
assert!(pipeline.is_healthy(plugin_id).await);
// Verify fully reset: failure counter starts fresh
let health_map = pipeline.health.read().await;
let entry = health_map.get(plugin_id).unwrap();
assert_eq!(entry.consecutive_failures, 0);
assert!(!entry.disabled_by_circuit_breaker);
assert!(entry.last_failure.is_none());
}
#[tokio::test]
async fn test_empty_pipeline_resolve_media_type() {
let (_dir, pipeline) = create_test_pipeline();
// With no plugins, falls back to built-in
let result = pipeline
.resolve_media_type(Path::new("/tmp/test.mp3"))
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().id(), "mp3");
}
#[tokio::test]
async fn test_empty_pipeline_resolve_unknown_type() {
let (_dir, pipeline) = create_test_pipeline();
let result = pipeline
.resolve_media_type(Path::new("/tmp/test.xyzunknown"))
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_empty_pipeline_extract_metadata() {
let (_dir, pipeline) = create_test_pipeline();
// Built-in extractor handles this (may return default for non-existent
// file, but should not panic)
let mt = MediaType::from_path(Path::new("/tmp/fake.txt"));
if let Some(mt) = mt {
let result = pipeline
.extract_metadata(Path::new("/tmp/fake.txt"), &mt)
.await;
// The built-in extractor may succeed with defaults or fail
// gracefully; either is fine for this test.
let _ = result;
}
}
#[tokio::test]
async fn test_empty_pipeline_discover_capabilities() {
let (_dir, pipeline) = create_test_pipeline();
let result = pipeline.discover_capabilities().await;
assert!(result.is_ok());
// Capabilities should be empty
let caps = pipeline.capabilities.read().await;
assert!(caps.supported_types.is_empty());
assert!(caps.interested_events.is_empty());
assert!(caps.media_type_definitions.is_empty());
assert!(caps.theme_definitions.is_empty());
}
#[test]
fn test_merge_metadata_overwrites_some_fields() {
let mut base = ExtractedMetadata::default();
base.title = Some("Original".to_string());
base.artist = Some("Original Artist".to_string());
let resp = ExtractMetadataResponse {
title: Some("New Title".to_string()),
artist: None, // should not overwrite
album: Some("New Album".to_string()),
genre: None,
year: Some(2024),
duration_secs: None,
description: None,
extra: FxHashMap::default(),
};
merge_metadata(&mut base, &resp);
assert_eq!(base.title.as_deref(), Some("New Title"));
assert_eq!(base.artist.as_deref(), Some("Original Artist"));
assert_eq!(base.album.as_deref(), Some("New Album"));
assert_eq!(base.year, Some(2024));
}
#[test]
fn test_merge_metadata_extra_fields() {
let mut base = ExtractedMetadata::default();
base.extra.insert("key1".to_string(), "val1".to_string());
let mut extra = FxHashMap::default();
extra.insert("key2".to_string(), "val2".to_string());
extra.insert("key1".to_string(), "overwritten".to_string());
let resp = ExtractMetadataResponse {
extra,
..Default::default()
};
merge_metadata(&mut base, &resp);
assert_eq!(
base.extra.get("key1").map(String::as_str),
Some("overwritten")
);
assert_eq!(base.extra.get("key2").map(String::as_str), Some("val2"));
}
#[test]
fn test_merge_extracted_preserves_photo_fields() {
let mut base = ExtractedMetadata::default();
let source = ExtractedMetadata {
latitude: Some(48.8566),
longitude: Some(2.3522),
camera_make: Some("Canon".to_string()),
camera_model: Some("EOS R5".to_string()),
..Default::default()
};
merge_extracted(&mut base, source);
assert_eq!(base.latitude, Some(48.8566));
assert_eq!(base.longitude, Some(2.3522));
assert_eq!(base.camera_make.as_deref(), Some("Canon"));
assert_eq!(base.camera_model.as_deref(), Some("EOS R5"));
}
#[tokio::test]
async fn test_unknown_plugin_is_healthy() {
let (_dir, pipeline) = create_test_pipeline();
// A never-seen plugin is considered healthy
assert!(pipeline.is_healthy("never-registered").await);
}
#[tokio::test]
async fn test_emit_event_with_empty_pipeline() {
let (_dir, pipeline) = create_test_pipeline();
// Discover capabilities first (empty, no plugins)
pipeline.discover_capabilities().await.unwrap();
// Emit an event with no interested handlers; should complete
// without panic. The spawned task returns immediately.
pipeline.emit_event(
"MediaImported",
&serde_json::json!({"media_id": "test-123"}),
);
// Give the spawned task time to run
tokio::time::sleep(Duration::from_millis(50)).await;
}
}