From 4edda201e62674299acb6fe38be3bcea79138974 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 14:23:02 +0300 Subject: [PATCH] pinakes-core: add plugin pipeline; impl signature verification & dependency resolution Signed-off-by: NotAShelf Change-Id: Ida98135cf868db0f5a46a64b8ac562366a6a6964 --- Cargo.lock | 105 ++ Cargo.toml | 8 + crates/pinakes-core/Cargo.toml | 5 +- crates/pinakes-core/src/config.rs | 84 +- crates/pinakes-core/src/error.rs | 3 + crates/pinakes-core/src/plugin/mod.rs | 343 ++++- crates/pinakes-core/src/plugin/pipeline.rs | 1442 +++++++++++++++++++ crates/pinakes-core/src/plugin/registry.rs | 3 +- crates/pinakes-core/src/plugin/rpc.rs | 239 +++ crates/pinakes-core/src/plugin/runtime.rs | 232 ++- crates/pinakes-core/src/plugin/security.rs | 104 +- crates/pinakes-core/src/plugin/signature.rs | 252 ++++ 12 files changed, 2784 insertions(+), 36 deletions(-) create mode 100644 crates/pinakes-core/src/plugin/pipeline.rs create mode 100644 crates/pinakes-core/src/plugin/rpc.rs create mode 100644 crates/pinakes-core/src/plugin/signature.rs diff --git a/Cargo.lock b/Cargo.lock index a39cbca..9dde6a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -862,6 +862,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "const-serialize" version = "0.7.2" @@ -1374,6 +1380,33 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "darling" version = "0.21.3" @@ -1512,6 +1545,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5729f5117e208430e437df2f4843f5e5952997175992d1414f94c57d61e270b4" +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.8" @@ -2273,6 +2316,30 @@ dependencies = [ "cipher", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8", + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "serde", + "sha2", + "subtle", + "zeroize", +] + [[package]] name = "either" version = "1.15.0" @@ -2444,6 +2511,12 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "field-offset" version = "0.3.6" @@ -5323,6 +5396,7 @@ dependencies = [ "blake3", "chrono", "deadpool-postgres", + "ed25519-dalek", "epub", "gray_matter", "image", @@ -5338,6 +5412,7 @@ dependencies = [ "pinakes-plugin-api", "postgres-native-tls", "postgres-types", + "rand 0.10.0", "refinery", "regex", "reqwest 0.13.2", @@ -5351,6 +5426,7 @@ dependencies = [ "tokio-util", "toml 1.0.6+spec-1.1.0", "tracing", + "url", "urlencoding", "uuid", "walkdir", @@ -5454,6 +5530,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -6856,6 +6942,15 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.8" @@ -6989,6 +7084,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 52a67a4..53a54b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = ["crates/*"] +exclude = ["crates/pinakes-core/tests/fixtures/test-plugin"] resolver = "3" [workspace.package] @@ -46,6 +47,9 @@ tracing-subscriber = { version = "0.3.22", features = ["env-filter", "json"] } # Hashing blake3 = "1.8.3" +# Cryptographic signatures (plugin verification) +ed25519-dalek = { version = "2.1.1", features = ["std"] } + # Metadata extraction lofty = "0.23.2" lopdf = "0.39.0" @@ -88,6 +92,7 @@ tower_governor = "0.8.0" # HTTP client reqwest = { version = "0.13.2", features = ["json", "query", "blocking"] } +url = "2.5" # TUI ratatui = "0.30.0" @@ -136,6 +141,9 @@ http = "1.4.0" wasmtime = { version = "42.0.1", features = ["component-model"] } wit-bindgen = "0.53.1" +# Misc +tempfile = "3.26.0" + # See: # [workspace.lints.clippy] diff --git a/crates/pinakes-core/Cargo.toml b/crates/pinakes-core/Cargo.toml index 98d825b..5a09080 100644 --- a/crates/pinakes-core/Cargo.toml +++ b/crates/pinakes-core/Cargo.toml @@ -36,6 +36,7 @@ kamadak-exif = { workspace = true } image = { workspace = true } tokio-util = { workspace = true } reqwest = { workspace = true } +url = { workspace = true } argon2 = { workspace = true } regex = { workspace = true } moka = { workspace = true } @@ -45,9 +46,11 @@ image_hasher = { workspace = true } # Plugin system pinakes-plugin-api.workspace = true wasmtime.workspace = true +ed25519-dalek.workspace = true [lints] workspace = true [dev-dependencies] -tempfile = "3.25.0" +tempfile = { workspace = true } +rand = { workspace = true } diff --git a/crates/pinakes-core/src/config.rs b/crates/pinakes-core/src/config.rs index 8a47eca..3abe942 100644 --- a/crates/pinakes-core/src/config.rs +++ b/crates/pinakes-core/src/config.rs @@ -436,24 +436,69 @@ impl std::fmt::Display for UserRole { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PluginTimeoutConfig { + /// Timeout for capability discovery queries (`supported_types`, + /// `interested_events`) + #[serde(default = "default_capability_query_timeout")] + pub capability_query_secs: u64, + /// Timeout for processing calls (`extract_metadata`, `generate_thumbnail`) + #[serde(default = "default_processing_timeout")] + pub processing_secs: u64, + /// Timeout for event handler calls + #[serde(default = "default_event_handler_timeout")] + pub event_handler_secs: u64, +} + +const fn default_capability_query_timeout() -> u64 { + 2 +} + +const fn default_processing_timeout() -> u64 { + 30 +} + +const fn default_event_handler_timeout() -> u64 { + 10 +} + +impl Default for PluginTimeoutConfig { + fn default() -> Self { + Self { + capability_query_secs: default_capability_query_timeout(), + processing_secs: default_processing_timeout(), + event_handler_secs: default_event_handler_timeout(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PluginsConfig { #[serde(default)] - pub enabled: bool, + pub enabled: bool, #[serde(default = "default_plugin_data_dir")] - pub data_dir: PathBuf, + pub data_dir: PathBuf, #[serde(default = "default_plugin_cache_dir")] - pub cache_dir: PathBuf, + pub cache_dir: PathBuf, #[serde(default)] - pub plugin_dirs: Vec, + pub plugin_dirs: Vec, #[serde(default)] - pub enable_hot_reload: bool, + pub enable_hot_reload: bool, #[serde(default)] - pub allow_unsigned: bool, + pub allow_unsigned: bool, #[serde(default = "default_max_concurrent_ops")] - pub max_concurrent_ops: usize, + pub max_concurrent_ops: usize, #[serde(default = "default_plugin_timeout")] - pub plugin_timeout_secs: u64, + pub plugin_timeout_secs: u64, + #[serde(default)] + pub timeouts: PluginTimeoutConfig, + #[serde(default = "default_max_consecutive_failures")] + pub max_consecutive_failures: u32, + + /// Hex-encoded Ed25519 public keys trusted for plugin signature + /// verification. Each entry is 64 hex characters (32 bytes). + #[serde(default)] + pub trusted_keys: Vec, } fn default_plugin_data_dir() -> PathBuf { @@ -472,17 +517,24 @@ const fn default_plugin_timeout() -> u64 { 30 } +const fn default_max_consecutive_failures() -> u32 { + 5 +} + impl Default for PluginsConfig { fn default() -> Self { Self { - enabled: false, - data_dir: default_plugin_data_dir(), - cache_dir: default_plugin_cache_dir(), - plugin_dirs: vec![], - enable_hot_reload: false, - allow_unsigned: false, - max_concurrent_ops: default_max_concurrent_ops(), - plugin_timeout_secs: default_plugin_timeout(), + enabled: false, + data_dir: default_plugin_data_dir(), + cache_dir: default_plugin_cache_dir(), + plugin_dirs: vec![], + enable_hot_reload: false, + allow_unsigned: false, + max_concurrent_ops: default_max_concurrent_ops(), + plugin_timeout_secs: default_plugin_timeout(), + timeouts: PluginTimeoutConfig::default(), + max_consecutive_failures: default_max_consecutive_failures(), + trusted_keys: vec![], } } } diff --git a/crates/pinakes-core/src/error.rs b/crates/pinakes-core/src/error.rs index 941a23d..c987dc6 100644 --- a/crates/pinakes-core/src/error.rs +++ b/crates/pinakes-core/src/error.rs @@ -28,6 +28,9 @@ pub enum PinakesError { #[error("metadata extraction failed: {0}")] MetadataExtraction(String), + #[error("thumbnail generation failed: {0}")] + ThumbnailGeneration(String), + #[error("search query parse error: {0}")] SearchParse(String), diff --git a/crates/pinakes-core/src/plugin/mod.rs b/crates/pinakes-core/src/plugin/mod.rs index 74a7ed2..8ad12cf 100644 --- a/crates/pinakes-core/src/plugin/mod.rs +++ b/crates/pinakes-core/src/plugin/mod.rs @@ -19,14 +19,19 @@ use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; pub mod loader; +pub mod pipeline; pub mod registry; +pub mod rpc; pub mod runtime; pub mod security; +pub mod signature; pub use loader::PluginLoader; +pub use pipeline::PluginPipeline; pub use registry::{PluginRegistry, RegisteredPlugin}; pub use runtime::{WasmPlugin, WasmRuntime}; pub use security::CapabilityEnforcer; +pub use signature::{SignatureStatus, verify_plugin_signature}; /// Plugin manager coordinates plugin lifecycle and operations pub struct PluginManager { @@ -69,16 +74,28 @@ pub struct PluginManagerConfig { /// Plugin timeout in seconds pub plugin_timeout_secs: u64, + + /// Timeout configuration for different call types + pub timeouts: crate::config::PluginTimeoutConfig, + + /// Max consecutive failures before circuit breaker disables plugin + pub max_consecutive_failures: u32, + + /// Trusted Ed25519 public keys for signature verification (hex-encoded) + pub trusted_keys: Vec, } impl Default for PluginManagerConfig { fn default() -> Self { Self { - plugin_dirs: vec![], - enable_hot_reload: false, - allow_unsigned: false, - max_concurrent_ops: 4, - plugin_timeout_secs: 30, + plugin_dirs: vec![], + enable_hot_reload: false, + allow_unsigned: false, + max_concurrent_ops: 4, + plugin_timeout_secs: 30, + timeouts: crate::config::PluginTimeoutConfig::default(), + max_consecutive_failures: 5, + trusted_keys: vec![], } } } @@ -86,11 +103,14 @@ impl Default for PluginManagerConfig { impl From for PluginManagerConfig { fn from(cfg: crate::config::PluginsConfig) -> Self { Self { - plugin_dirs: cfg.plugin_dirs, - enable_hot_reload: cfg.enable_hot_reload, - allow_unsigned: cfg.allow_unsigned, - max_concurrent_ops: cfg.max_concurrent_ops, - plugin_timeout_secs: cfg.plugin_timeout_secs, + plugin_dirs: cfg.plugin_dirs, + enable_hot_reload: cfg.enable_hot_reload, + allow_unsigned: cfg.allow_unsigned, + max_concurrent_ops: cfg.max_concurrent_ops, + plugin_timeout_secs: cfg.plugin_timeout_secs, + timeouts: cfg.timeouts, + max_consecutive_failures: cfg.max_consecutive_failures, + trusted_keys: cfg.trusted_keys, } } } @@ -127,7 +147,12 @@ impl PluginManager { }) } - /// Discover and load all plugins from configured directories + /// Discover and load all plugins from configured directories. + /// + /// Plugins are loaded in dependency order: if plugin A declares a + /// dependency on plugin B, B is loaded first. Cycles and missing + /// dependencies are detected and reported as warnings; affected plugins + /// are skipped rather than causing a hard failure. /// /// # Errors /// @@ -136,9 +161,10 @@ impl PluginManager { info!("Discovering plugins from {:?}", self.config.plugin_dirs); let manifests = self.loader.discover_plugins()?; + let ordered = Self::resolve_load_order(&manifests); let mut loaded_plugins = Vec::new(); - for manifest in manifests { + for manifest in ordered { match self.load_plugin_from_manifest(&manifest).await { Ok(plugin_id) => { info!("Loaded plugin: {}", plugin_id); @@ -153,6 +179,93 @@ impl PluginManager { Ok(loaded_plugins) } + /// Topological sort of manifests by their declared `dependencies`. + /// + /// Uses Kahn's algorithm. Plugins whose dependencies are missing or form + /// a cycle are logged as warnings and excluded from the result. + fn resolve_load_order( + manifests: &[pinakes_plugin_api::PluginManifest], + ) -> Vec { + use std::collections::{HashMap, HashSet, VecDeque}; + + // Index manifests by name for O(1) lookup + let by_name: HashMap<&str, usize> = manifests + .iter() + .enumerate() + .map(|(i, m)| (m.plugin.name.as_str(), i)) + .collect(); + + // Check for missing dependencies and warn early + let known: HashSet<&str> = by_name.keys().copied().collect(); + for manifest in manifests { + for dep in &manifest.plugin.dependencies { + if !known.contains(dep.as_str()) { + warn!( + "Plugin '{}' depends on '{}' which was not discovered; it will be \ + skipped", + manifest.plugin.name, dep + ); + } + } + } + + // Build adjacency: in_degree[i] = number of deps that must load before i + let mut in_degree = vec![0usize; manifests.len()]; + // dependents[i] = indices that depend on i (i must load before them) + let mut dependents: Vec> = vec![vec![]; manifests.len()]; + + for (i, manifest) in manifests.iter().enumerate() { + for dep in &manifest.plugin.dependencies { + if let Some(&dep_idx) = by_name.get(dep.as_str()) { + in_degree[i] += 1; + dependents[dep_idx].push(i); + } else { + // Missing dep: set in_degree impossibly high so it never resolves + in_degree[i] = usize::MAX; + } + } + } + + // Kahn's algorithm + let mut queue: VecDeque = VecDeque::new(); + for (i, °) in in_degree.iter().enumerate() { + if deg == 0 { + queue.push_back(i); + } + } + + let mut result = Vec::with_capacity(manifests.len()); + while let Some(idx) = queue.pop_front() { + result.push(manifests[idx].clone()); + for &dependent in &dependents[idx] { + if in_degree[dependent] == usize::MAX { + continue; // already poisoned by missing dep + } + in_degree[dependent] -= 1; + if in_degree[dependent] == 0 { + queue.push_back(dependent); + } + } + } + + // Anything not in `result` is part of a cycle or has a missing dep + if result.len() < manifests.len() { + let loaded: HashSet<&str> = + result.iter().map(|m| m.plugin.name.as_str()).collect(); + for manifest in manifests { + if !loaded.contains(manifest.plugin.name.as_str()) { + warn!( + "Plugin '{}' was skipped due to unresolved dependencies or a \ + dependency cycle", + manifest.plugin.name + ); + } + } + } + + result + } + /// Load a plugin from a manifest file /// /// # Errors @@ -217,6 +330,45 @@ impl PluginManager { // Load WASM binary let wasm_path = self.loader.resolve_wasm_path(manifest)?; + + // Verify plugin signature unless unsigned plugins are allowed + if !self.config.allow_unsigned { + let plugin_dir = wasm_path + .parent() + .ok_or_else(|| anyhow::anyhow!("WASM path has no parent directory"))?; + + let trusted_keys: Vec = self + .config + .trusted_keys + .iter() + .filter_map(|hex| { + signature::parse_public_key(hex) + .map_err(|e| warn!("Ignoring malformed trusted key: {e}")) + .ok() + }) + .collect(); + + match signature::verify_plugin_signature( + plugin_dir, + &wasm_path, + &trusted_keys, + )? { + SignatureStatus::Valid => { + debug!("Plugin '{plugin_id}' signature verified"); + }, + SignatureStatus::Unsigned => { + return Err(anyhow::anyhow!( + "Plugin '{plugin_id}' is unsigned and allow_unsigned is false" + )); + }, + SignatureStatus::Invalid(reason) => { + return Err(anyhow::anyhow!( + "Plugin '{plugin_id}' has an invalid signature: {reason}" + )); + }, + } + } + let wasm_plugin = self.runtime.load_plugin(&wasm_path, context)?; // Initialize plugin @@ -413,6 +565,40 @@ impl PluginManager { registry.get(plugin_id).map(|p| p.metadata.clone()) } + /// Get enabled plugins of a specific kind, sorted by priority (ascending). + /// + /// # Returns + /// + /// `(plugin_id, priority, kinds, wasm_plugin)` tuples. + pub async fn get_enabled_by_kind_sorted( + &self, + kind: &str, + ) -> Vec<(String, u16, Vec, WasmPlugin)> { + let registry = self.registry.read().await; + let mut plugins: Vec<_> = registry + .get_by_kind(kind) + .into_iter() + .filter(|p| p.enabled) + .map(|p| { + ( + p.id.clone(), + p.manifest.plugin.priority, + p.manifest.plugin.kind.clone(), + p.wasm_plugin.clone(), + ) + }) + .collect(); + drop(registry); + plugins.sort_by_key(|(_, priority, ..)| *priority); + plugins + } + + /// Get a reference to the capability enforcer. + #[must_use] + pub const fn enforcer(&self) -> &CapabilityEnforcer { + &self.enforcer + } + /// Check if a plugin is loaded and enabled pub async fn is_plugin_enabled(&self, plugin_id: &str) -> bool { let registry = self.registry.read().await; @@ -503,4 +689,137 @@ mod tests { let plugins = manager.list_plugins().await; assert_eq!(plugins.len(), 0); } + + /// Build a minimal manifest for dependency resolution tests + fn test_manifest( + name: &str, + deps: Vec, + ) -> pinakes_plugin_api::PluginManifest { + use pinakes_plugin_api::manifest::{PluginBinary, PluginInfo}; + + pinakes_plugin_api::PluginManifest { + plugin: PluginInfo { + name: name.to_string(), + version: "1.0.0".to_string(), + api_version: "1.0".to_string(), + author: None, + description: None, + homepage: None, + license: None, + priority: 500, + kind: vec!["media_type".to_string()], + binary: PluginBinary { + wasm: "plugin.wasm".to_string(), + entrypoint: None, + }, + dependencies: deps, + }, + capabilities: Default::default(), + config: Default::default(), + } + } + + #[test] + fn test_resolve_load_order_no_deps() { + let manifests = vec![ + test_manifest("alpha", vec![]), + test_manifest("beta", vec![]), + test_manifest("gamma", vec![]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 3); + } + + #[test] + fn test_resolve_load_order_linear_chain() { + // gamma depends on beta, beta depends on alpha + let manifests = vec![ + test_manifest("gamma", vec!["beta".to_string()]), + test_manifest("alpha", vec![]), + test_manifest("beta", vec!["alpha".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 3); + + let names: Vec<&str> = + ordered.iter().map(|m| m.plugin.name.as_str()).collect(); + let alpha_pos = names.iter().position(|&n| n == "alpha").unwrap(); + let beta_pos = names.iter().position(|&n| n == "beta").unwrap(); + let gamma_pos = names.iter().position(|&n| n == "gamma").unwrap(); + assert!(alpha_pos < beta_pos, "alpha must load before beta"); + assert!(beta_pos < gamma_pos, "beta must load before gamma"); + } + + #[test] + fn test_resolve_load_order_cycle_detected() { + // A -> B -> C -> A (cycle) + let manifests = vec![ + test_manifest("a", vec!["c".to_string()]), + test_manifest("b", vec!["a".to_string()]), + test_manifest("c", vec!["b".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + // All three should be excluded due to cycle + assert_eq!(ordered.len(), 0); + } + + #[test] + fn test_resolve_load_order_missing_dependency() { + let manifests = vec![ + test_manifest("good", vec![]), + test_manifest("bad", vec!["nonexistent".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + // Only "good" should be loaded; "bad" depends on something missing + assert_eq!(ordered.len(), 1); + assert_eq!(ordered[0].plugin.name, "good"); + } + + #[test] + fn test_resolve_load_order_partial_cycle() { + // "ok" has no deps, "cycle_a" and "cycle_b" form a cycle + let manifests = vec![ + test_manifest("ok", vec![]), + test_manifest("cycle_a", vec!["cycle_b".to_string()]), + test_manifest("cycle_b", vec!["cycle_a".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 1); + assert_eq!(ordered[0].plugin.name, "ok"); + } + + #[test] + fn test_resolve_load_order_diamond() { + // Man look at how beautiful my diamond is... + // A + // / \ + // B C + // \ / + // D + let manifests = vec![ + test_manifest("d", vec!["b".to_string(), "c".to_string()]), + test_manifest("b", vec!["a".to_string()]), + test_manifest("c", vec!["a".to_string()]), + test_manifest("a", vec![]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 4); + + let names: Vec<&str> = + ordered.iter().map(|m| m.plugin.name.as_str()).collect(); + let a_pos = names.iter().position(|&n| n == "a").unwrap(); + let b_pos = names.iter().position(|&n| n == "b").unwrap(); + let c_pos = names.iter().position(|&n| n == "c").unwrap(); + let d_pos = names.iter().position(|&n| n == "d").unwrap(); + assert!(a_pos < b_pos); + assert!(a_pos < c_pos); + assert!(b_pos < d_pos); + assert!(c_pos < d_pos); + } } diff --git a/crates/pinakes-core/src/plugin/pipeline.rs b/crates/pinakes-core/src/plugin/pipeline.rs new file mode 100644 index 0000000..c2ed430 --- /dev/null +++ b/crates/pinakes-core/src/plugin/pipeline.rs @@ -0,0 +1,1442 @@ +//! Plugin pipeline for wiring plugins into the media processing stages. +//! +//! The [`PluginPipeline`] coordinates built-in handlers and WASM plugins for: +//! +//! - Media type resolution (first-match-wins) +//! - Metadata extraction (accumulating merge by priority) +//! - Thumbnail generation (first-success-wins) +//! - Search backend dispatch (merge results, rank by score) +//! - Theme provider dispatch (accumulate all themes) +//! - Event fan-out to interested handlers +//! +//! Each plugin has a priority (0-999). Built-in handlers run at implicit +//! priority 100. A circuit breaker disables plugins after consecutive failures. + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant}, +}; + +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +use super::PluginManager; +use crate::{ + config::PluginTimeoutConfig, + media_type::MediaType, + metadata::ExtractedMetadata, + model::MediaId, + plugin::rpc::{ + CanHandleRequest, + CanHandleResponse, + ExtractMetadataRequest, + ExtractMetadataResponse, + GenerateThumbnailRequest, + GenerateThumbnailResponse, + HandleEventRequest, + IndexItemRequest, + LoadThemeResponse, + PluginMediaTypeDefinition, + PluginThemeDefinition, + RemoveItemRequest, + SearchRequest, + SearchResponse, + SearchResultItem, + }, +}; + +/// Built-in handlers run at this implicit priority. +const BUILTIN_PRIORITY: u16 = 100; + +/// Cooldown before the circuit breaker allows a single trial call. +const CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_mins(1); + +/// Tracks per-plugin health for the circuit breaker. +struct PluginHealth { + consecutive_failures: u32, + last_failure: Option, + disabled_by_circuit_breaker: bool, +} + +impl PluginHealth { + const fn new() -> Self { + Self { + consecutive_failures: 0, + last_failure: None, + disabled_by_circuit_breaker: false, + } + } +} + +/// Capability information discovered from plugins at startup. +struct CachedCapabilities { + /// Keyed by `(kind, plugin_id)` -> list of supported type strings. + /// Separate entries for each kind avoid collisions when a plugin + /// implements both `metadata_extractor` and `thumbnail_generator`. + supported_types: HashMap<(String, String), Vec>, + /// `plugin_id` -> list of interested event type strings + interested_events: HashMap>, + /// `plugin_id` -> list of media type definitions (for `MediaTypeProvider`) + media_type_definitions: HashMap>, + /// `plugin_id` -> list of theme definitions (for `ThemeProvider`) + theme_definitions: HashMap>, +} + +impl CachedCapabilities { + fn new() -> Self { + Self { + supported_types: HashMap::new(), + interested_events: HashMap::new(), + media_type_definitions: HashMap::new(), + theme_definitions: HashMap::new(), + } + } +} + +/// Coordinates built-in handlers and WASM plugins in a priority-ordered +/// pipeline for media processing stages. +pub struct PluginPipeline { + manager: Arc, + timeouts: PluginTimeoutConfig, + max_consecutive_failures: u32, + health: RwLock>, + capabilities: RwLock, +} + +impl PluginPipeline { + /// Create a new plugin pipeline. + #[must_use] + pub fn new( + manager: Arc, + timeouts: PluginTimeoutConfig, + max_consecutive_failures: u32, + ) -> Self { + Self { + manager, + timeouts, + max_consecutive_failures, + health: RwLock::new(HashMap::new()), + capabilities: RwLock::new(CachedCapabilities::new()), + } + } + + /// Discover capabilities from all enabled plugins. Call on startup and + /// after plugin reload. + /// + /// # Errors + /// + /// Returns an error only for truly fatal problems. Individual plugin + /// discovery failures are logged and skipped. + pub async fn discover_capabilities(&self) -> crate::error::Result<()> { + info!("discovering plugin capabilities"); + + let timeout = Duration::from_secs(self.timeouts.capability_query_secs); + let mut caps = CachedCapabilities::new(); + + // Discover metadata extractors + self + .discover_supported_types("metadata_extractor", timeout, &mut caps) + .await; + + // Discover thumbnail generators + self + .discover_supported_types("thumbnail_generator", timeout, &mut caps) + .await; + + // Discover media type providers + self + .discover_media_type_definitions(timeout, &mut caps) + .await; + + // Discover search backends + self + .discover_supported_types("search_backend", timeout, &mut caps) + .await; + + // Discover theme providers + self.discover_theme_definitions(timeout, &mut caps).await; + + // Discover event handlers + self.discover_interested_events(timeout, &mut caps).await; + + let mut cached = self.capabilities.write().await; + *cached = caps; + drop(cached); + + Ok(()) + } + + /// Query `supported_types` from all enabled plugins of a given kind. + async fn discover_supported_types( + &self, + kind: &str, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self.manager.get_enabled_by_kind_sorted(kind).await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "supported_types", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(types) => { + debug!( + plugin_id = %id, + kind = kind, + types = ?types, + "discovered supported types" + ); + caps + .supported_types + .insert((kind.to_string(), id.clone()), types); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover supported types" + ); + }, + } + } + } + + /// Query `supported_media_types` from `MediaTypeProvider` plugins. + async fn discover_media_type_definitions( + &self, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self.manager.get_enabled_by_kind_sorted("media_type").await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "supported_media_types", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(defs) => { + debug!( + plugin_id = %id, + count = defs.len(), + "discovered media type definitions" + ); + caps.media_type_definitions.insert(id.clone(), defs); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover media type definitions" + ); + }, + } + } + } + + /// Query `interested_events` from `EventHandler` plugins. + async fn discover_interested_events( + &self, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self + .manager + .get_enabled_by_kind_sorted("event_handler") + .await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "interested_events", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(events) => { + debug!( + plugin_id = %id, + events = ?events, + "discovered interested events" + ); + caps.interested_events.insert(id.clone(), events); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover interested events" + ); + }, + } + } + } + + /// Query `get_themes` from `ThemeProvider` plugins. + async fn discover_theme_definitions( + &self, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self + .manager + .get_enabled_by_kind_sorted("theme_provider") + .await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "get_themes", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(defs) => { + debug!( + plugin_id = %id, + count = defs.len(), + "discovered theme definitions" + ); + caps.theme_definitions.insert(id.clone(), defs); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover theme definitions" + ); + }, + } + } + } + + /// Resolve the media type for a file path. + /// + /// Iterates `MediaTypeProvider` plugins in priority order, falling back to + /// the built-in resolver at implicit priority 100. + pub async fn resolve_media_type(&self, path: &Path) -> Option { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self.manager.get_enabled_by_kind_sorted("media_type").await; + + let mut builtin_ran = false; + + for (id, priority, kinds, wasm) in &plugins { + // Run built-in at its implicit priority slot + if *priority >= BUILTIN_PRIORITY && !builtin_ran { + builtin_ran = true; + if let Some(mt) = MediaType::from_path(path) { + return Some(mt); + } + } + + if !self.is_healthy(id).await { + continue; + } + + // Validate the call is allowed for this plugin kind + if !self + .manager + .enforcer() + .validate_function_call(kinds, "can_handle") + { + continue; + } + + let req = CanHandleRequest { + path: path.to_path_buf(), + mime_type: None, + }; + + match wasm + .call_function_json::( + "can_handle", + &req, + timeout, + ) + .await + { + Ok(resp) if resp.can_handle => { + self.record_success(id).await; + // First match wins: the plugin claimed this file. + // Determine the custom media type ID from cached definitions. + if let Some(mt_id) = self.resolve_custom_media_type_id(id, path).await + { + return Some(MediaType::Custom(mt_id)); + } + + // Plugin claimed the file but has no matching definition. + // The claim still stops the chain per first-match semantics. + warn!( + plugin_id = %id, + path = %path.display(), + "plugin can_handle returned true but no matching media type definition" + ); + return None; + }, + Ok(_) => { + self.record_success(id).await; + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "media type resolution failed" + ); + self.record_failure(id).await; + }, + } + } + + // If built-in hasn't run yet (all plugins had priority < 100) + if !builtin_ran { + return MediaType::from_path(path); + } + + None + } + + /// Look up the cached media type definitions for a plugin and find the + /// definition whose extensions match the file's extension. + async fn resolve_custom_media_type_id( + &self, + plugin_id: &str, + path: &Path, + ) -> Option { + let ext = path + .extension() + .and_then(|e| e.to_str()) + .map(str::to_ascii_lowercase)?; + + let caps = self.capabilities.read().await; + let result = caps.media_type_definitions.get(plugin_id).and_then(|defs| { + defs.iter().find_map(|def| { + def + .extensions + .iter() + .any(|e| e.eq_ignore_ascii_case(&ext)) + .then(|| def.id.clone()) + }) + }); + drop(caps); + result + } + + /// Extract metadata using plugins and built-in extractors in priority + /// order. Later (higher priority) results overwrite earlier ones + /// field-by-field. + /// + /// # Errors + /// + /// Propagates errors from the built-in extractor. Plugin errors are + /// logged and skipped. + pub async fn extract_metadata( + &self, + path: &Path, + media_type: &MediaType, + ) -> crate::error::Result { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("metadata_extractor") + .await; + + let media_type_str = media_type.id(); + let mut acc = ExtractedMetadata::default(); + let mut builtin_ran = false; + + for (id, priority, kinds, wasm) in &plugins { + // Insert built-in at its priority slot + if *priority >= BUILTIN_PRIORITY && !builtin_ran { + builtin_ran = true; + acc = self.run_builtin_metadata(path, media_type, acc).await?; + } + + if !self.is_healthy(id).await { + continue; + } + + // Check if this plugin supports the media type + let supported = { + let caps = self.capabilities.read().await; + let key = ("metadata_extractor".to_string(), id.clone()); + caps + .supported_types + .get(&key) + .is_some_and(|types| types.contains(&media_type_str)) + }; + if !supported { + continue; + } + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "extract_metadata") + { + continue; + } + + let req = ExtractMetadataRequest { + path: path.to_path_buf(), + }; + + match wasm + .call_function_json::( + "extract_metadata", + &req, + timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + merge_metadata(&mut acc, &resp); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "metadata extraction failed" + ); + self.record_failure(id).await; + }, + } + } + + // If built-in hasn't run yet (all plugins had priority < 100) + if !builtin_ran { + acc = self.run_builtin_metadata(path, media_type, acc).await?; + } + + Ok(acc) + } + + /// Run the built-in metadata extractor (sync, wrapped in `spawn_blocking`). + async fn run_builtin_metadata( + &self, + path: &Path, + media_type: &MediaType, + mut acc: ExtractedMetadata, + ) -> crate::error::Result { + let path = path.to_path_buf(); + let mt = media_type.clone(); + let builtin = tokio::task::spawn_blocking(move || { + crate::metadata::extract_metadata(&path, &mt) + }) + .await + .map_err(|e| { + crate::error::PinakesError::MetadataExtraction(format!( + "built-in metadata task panicked: {e}" + )) + })??; + + // Merge built-in result into accumulator + merge_extracted(&mut acc, builtin); + Ok(acc) + } + + // Thumbnail generation + + /// Generate a thumbnail using plugins and the built-in generator in + /// priority order. Returns the first successful result. + /// + /// # Errors + /// + /// Propagates errors only from the built-in generator. Plugin errors are + /// logged and skipped. + pub async fn generate_thumbnail( + &self, + media_id: MediaId, + path: &Path, + media_type: &MediaType, + thumb_dir: &Path, + ) -> crate::error::Result> { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("thumbnail_generator") + .await; + + let media_type_str = media_type.id(); + let mut builtin_ran = false; + + for (id, priority, kinds, wasm) in &plugins { + // Insert built-in at its priority slot + if *priority >= BUILTIN_PRIORITY && !builtin_ran { + builtin_ran = true; + if let Some(result) = self + .run_builtin_thumbnail(media_id, path, media_type, thumb_dir) + .await? + { + return Ok(Some(result)); + } + } + + if !self.is_healthy(id).await { + continue; + } + + // Check if this plugin supports the media type + let supported = { + let caps = self.capabilities.read().await; + let key = ("thumbnail_generator".to_string(), id.clone()); + caps + .supported_types + .get(&key) + .is_some_and(|types| types.contains(&media_type_str)) + }; + if !supported { + continue; + } + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "generate_thumbnail") + { + continue; + } + + let output_path = thumb_dir.join(format!("{media_id}.jpg")); + let req = GenerateThumbnailRequest { + source_path: path.to_path_buf(), + output_path: output_path.clone(), + max_width: 320, + max_height: 320, + format: "jpeg".to_string(), + }; + + match wasm + .call_function_json::( + "generate_thumbnail", + &req, + timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + return Ok(Some(resp.path)); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "thumbnail generation failed" + ); + self.record_failure(id).await; + }, + } + } + + // If built-in hasn't run yet + if !builtin_ran { + return self + .run_builtin_thumbnail(media_id, path, media_type, thumb_dir) + .await; + } + + Ok(None) + } + + /// Run the built-in thumbnail generator (sync, wrapped in + /// `spawn_blocking`). + async fn run_builtin_thumbnail( + &self, + media_id: MediaId, + path: &Path, + media_type: &MediaType, + thumb_dir: &Path, + ) -> crate::error::Result> { + let path = path.to_path_buf(); + let mt = media_type.clone(); + let td = thumb_dir.to_path_buf(); + tokio::task::spawn_blocking(move || { + crate::thumbnail::generate_thumbnail(media_id, &path, &mt, &td) + }) + .await + .map_err(|e| { + crate::error::PinakesError::ThumbnailGeneration(format!( + "built-in thumbnail task panicked: {e}" + )) + })? + } + + /// Fan out an event to all interested event handler plugins. + /// + /// The work runs in a spawned task; failures are logged, never + /// propagated. + pub fn emit_event( + self: &Arc, + event_type: &str, + payload: &serde_json::Value, + ) { + let pipeline = Arc::clone(self); + let event_type = event_type.to_string(); + let payload = payload.clone(); + + tokio::spawn(async move { + pipeline.dispatch_event(&event_type, &payload).await; + }); + } + + /// Internal dispatcher for events. + async fn dispatch_event( + &self, + event_type: &str, + payload: &serde_json::Value, + ) { + let timeout = Duration::from_secs(self.timeouts.event_handler_secs); + + // Collect plugin IDs interested in this event + let interested_ids: Vec = { + let caps = self.capabilities.read().await; + caps + .interested_events + .iter() + .filter(|(_id, events)| events.contains(&event_type.to_string())) + .map(|(id, _)| id.clone()) + .collect() + }; + + if interested_ids.is_empty() { + return; + } + + // Get event handler plugins sorted by priority + let plugins = self + .manager + .get_enabled_by_kind_sorted("event_handler") + .await; + + for (id, _priority, kinds, wasm) in &plugins { + if !interested_ids.contains(id) { + continue; + } + + if !self.is_healthy(id).await { + continue; + } + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "handle_event") + { + continue; + } + + let req = HandleEventRequest { + event_type: event_type.to_string(), + payload: payload.clone(), + }; + + // Event handlers return nothing meaningful; we just care about + // success/failure. + match wasm + .call_function_json::( + "handle_event", + &req, + timeout, + ) + .await + { + Ok(_) => { + self.record_success(id).await; + debug!( + plugin_id = %id, + event_type = event_type, + "event handled" + ); + }, + Err(e) => { + warn!( + plugin_id = %id, + event_type = event_type, + error = %e, + "event handler failed" + ); + self.record_failure(id).await; + }, + } + } + } + + /// Search via plugin search backends. Results from all backends are + /// merged, deduplicated by ID, and sorted by score (highest first). + pub async fn search( + &self, + query: &str, + limit: usize, + offset: usize, + ) -> Vec { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("search_backend") + .await; + + let mut all_results: Vec = Vec::new(); + + for (id, _priority, kinds, wasm) in &plugins { + if !self.is_healthy(id).await { + continue; + } + if !self + .manager + .enforcer() + .validate_function_call(kinds, "search") + { + continue; + } + + let req = SearchRequest { + query: query.to_string(), + limit, + offset, + }; + + match wasm + .call_function_json::( + "search", &req, timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + all_results.extend(resp.results); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "search backend query failed" + ); + self.record_failure(id).await; + }, + } + } + + // Deduplicate by ID, keeping the highest-scoring entry + let mut seen: HashMap = HashMap::new(); + let mut deduped: Vec = Vec::new(); + for item in all_results { + if let Some(&idx) = seen.get(&item.id) { + if item.score > deduped[idx].score { + deduped[idx] = item; + } + } else { + seen.insert(item.id.clone(), deduped.len()); + deduped.push(item); + } + } + + // Sort by score descending + deduped.sort_by(|a, b| { + b.score + .partial_cmp(&a.score) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + deduped + } + + /// Index a media item in all search backend plugins (fan-out). + pub async fn index_item(&self, req: &IndexItemRequest) { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("search_backend") + .await; + + for (id, _priority, kinds, wasm) in &plugins { + if !self.is_healthy(id).await { + continue; + } + if !self + .manager + .enforcer() + .validate_function_call(kinds, "index_item") + { + continue; + } + + match wasm + .call_function_json::( + "index_item", + req, + timeout, + ) + .await + { + Ok(_) => { + self.record_success(id).await; + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "search backend index_item failed" + ); + self.record_failure(id).await; + }, + } + } + } + + /// Remove a media item from all search backend plugins (fan-out). + pub async fn remove_item(&self, media_id: &str) { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("search_backend") + .await; + + let req = RemoveItemRequest { + id: media_id.to_string(), + }; + + for (id, _priority, kinds, wasm) in &plugins { + if !self.is_healthy(id).await { + continue; + } + if !self + .manager + .enforcer() + .validate_function_call(kinds, "remove_item") + { + continue; + } + + match wasm + .call_function_json::( + "remove_item", + &req, + timeout, + ) + .await + { + Ok(_) => { + self.record_success(id).await; + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "search backend remove_item failed" + ); + self.record_failure(id).await; + }, + } + } + } + + /// Get all available themes from theme provider plugins. Results from + /// all providers are accumulated. + pub async fn get_themes(&self) -> Vec { + let caps = self.capabilities.read().await; + caps + .theme_definitions + .values() + .flat_map(|defs| defs.iter().cloned()) + .collect() + } + + /// Load a specific theme by ID from the provider that registered it. + pub async fn load_theme(&self, theme_id: &str) -> Option { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + + // Find which plugin owns this theme + let owner_id = { + let caps = self.capabilities.read().await; + caps.theme_definitions.iter().find_map(|(plugin_id, defs)| { + defs + .iter() + .any(|d| d.id == theme_id) + .then(|| plugin_id.clone()) + }) + }; + + let owner_id = owner_id?; + + if !self.is_healthy(&owner_id).await { + return None; + } + + let plugins = self + .manager + .get_enabled_by_kind_sorted("theme_provider") + .await; + + let plugin = plugins.iter().find(|(id, ..)| id == &owner_id)?; + let (id, _priority, kinds, wasm) = plugin; + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "load_theme") + { + return None; + } + + let req = serde_json::json!({"theme_id": theme_id}); + match wasm + .call_function_json::( + "load_theme", + &req, + timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + Some(resp) + }, + Err(e) => { + warn!( + plugin_id = %id, + theme_id = theme_id, + error = %e, + "theme loading failed" + ); + self.record_failure(id).await; + None + }, + } + } + + /// Record a successful call, resetting the failure counter. + async fn record_success(&self, plugin_id: &str) { + let mut health_map = self.health.write().await; + let entry = health_map + .entry(plugin_id.to_string()) + .or_insert_with(PluginHealth::new); + + if entry.disabled_by_circuit_breaker { + info!( + plugin_id = plugin_id, + "circuit breaker recovered: re-enabling plugin after successful trial" + ); + } + + entry.consecutive_failures = 0; + entry.last_failure = None; + entry.disabled_by_circuit_breaker = false; + drop(health_map); + } + + /// Record a failed call. If consecutive failures exceed the threshold, + /// trip the circuit breaker. If a half-open trial fails, reset the + /// cooldown timer. + async fn record_failure(&self, plugin_id: &str) { + let mut health_map = self.health.write().await; + let entry = health_map + .entry(plugin_id.to_string()) + .or_insert_with(PluginHealth::new); + entry.consecutive_failures += 1; + entry.last_failure = Some(Instant::now()); + + if entry.consecutive_failures >= self.max_consecutive_failures { + if !entry.disabled_by_circuit_breaker { + warn!( + plugin_id = plugin_id, + failures = entry.consecutive_failures, + "circuit breaker tripped: disabling plugin" + ); + } + entry.disabled_by_circuit_breaker = true; + } + drop(health_map); + } + + /// Check whether a plugin is healthy enough to receive calls. + /// + /// # Returns + /// + /// `true` if the plugin has no circuit breaker state, has not + /// tripped the breaker, or has tripped but the cooldown has elapsed + /// (half-open state: one trial call is permitted). + async fn is_healthy(&self, plugin_id: &str) -> bool { + let health_map = self.health.read().await; + match health_map.get(plugin_id) { + None => true, + Some(h) if !h.disabled_by_circuit_breaker => true, + Some(h) => { + // Half-open: allow a trial call after the cooldown period + h.last_failure + .is_some_and(|t| t.elapsed() >= CIRCUIT_BREAKER_COOLDOWN) + }, + } + } +} + +/// Merge plugin metadata response fields into the accumulator. +/// Non-None fields overwrite existing values. +fn merge_metadata( + base: &mut ExtractedMetadata, + plugin_resp: &ExtractMetadataResponse, +) { + if let Some(ref v) = plugin_resp.title { + base.title = Some(v.clone()); + } + if let Some(ref v) = plugin_resp.artist { + base.artist = Some(v.clone()); + } + if let Some(ref v) = plugin_resp.album { + base.album = Some(v.clone()); + } + if let Some(ref v) = plugin_resp.genre { + base.genre = Some(v.clone()); + } + if let Some(v) = plugin_resp.year { + base.year = Some(v); + } + if let Some(v) = plugin_resp.duration_secs { + base.duration_secs = Some(v); + } + if let Some(ref v) = plugin_resp.description { + base.description = Some(v.clone()); + } + for (k, v) in &plugin_resp.extra { + base.extra.insert(k.clone(), v.clone()); + } +} + +/// Merge a full [`ExtractedMetadata`] into an accumulator. Non-None +/// fields from `source` overwrite `base`. +fn merge_extracted(base: &mut ExtractedMetadata, source: ExtractedMetadata) { + if source.title.is_some() { + base.title = source.title; + } + if source.artist.is_some() { + base.artist = source.artist; + } + if source.album.is_some() { + base.album = source.album; + } + if source.genre.is_some() { + base.genre = source.genre; + } + if source.year.is_some() { + base.year = source.year; + } + if source.duration_secs.is_some() { + base.duration_secs = source.duration_secs; + } + if source.description.is_some() { + base.description = source.description; + } + for (k, v) in source.extra { + base.extra.insert(k, v); + } + if source.book_metadata.is_some() { + base.book_metadata = source.book_metadata; + } + if source.date_taken.is_some() { + base.date_taken = source.date_taken; + } + if source.latitude.is_some() { + base.latitude = source.latitude; + } + if source.longitude.is_some() { + base.longitude = source.longitude; + } + if source.camera_make.is_some() { + base.camera_make = source.camera_make; + } + if source.camera_model.is_some() { + base.camera_model = source.camera_model; + } + if source.rating.is_some() { + base.rating = source.rating; + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::*; + use crate::plugin::{PluginManager, PluginManagerConfig}; + + /// Create a `PluginPipeline` backed by an empty `PluginManager`. + fn create_test_pipeline() -> (TempDir, Arc) { + let temp_dir = TempDir::new().unwrap(); + let data_dir = temp_dir.path().join("data"); + let cache_dir = temp_dir.path().join("cache"); + + let config = PluginManagerConfig::default(); + let manager = + Arc::new(PluginManager::new(data_dir, cache_dir, config).unwrap()); + let pipeline = Arc::new(PluginPipeline::new( + manager, + PluginTimeoutConfig::default(), + 5, + )); + (temp_dir, pipeline) + } + + #[tokio::test] + async fn test_circuit_breaker_trips_after_max_failures() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Initially healthy + assert!(pipeline.is_healthy(plugin_id).await); + + // Record failures up to the threshold + for _ in 0..5 { + pipeline.record_failure(plugin_id).await; + } + + // Should be disabled now + assert!(!pipeline.is_healthy(plugin_id).await); + } + + #[tokio::test] + async fn test_circuit_breaker_resets_on_success() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Record some failures but not enough to trip + for _ in 0..4 { + pipeline.record_failure(plugin_id).await; + } + assert!(pipeline.is_healthy(plugin_id).await); + + // One success resets the counter + pipeline.record_success(plugin_id).await; + assert!(pipeline.is_healthy(plugin_id).await); + + // Need another full run of failures to trip + for _ in 0..4 { + pipeline.record_failure(plugin_id).await; + } + assert!(pipeline.is_healthy(plugin_id).await); + } + + #[tokio::test] + async fn test_circuit_breaker_reenabled_after_success() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Trip the circuit breaker + for _ in 0..5 { + pipeline.record_failure(plugin_id).await; + } + assert!(!pipeline.is_healthy(plugin_id).await); + + // Success re-enables it + pipeline.record_success(plugin_id).await; + assert!(pipeline.is_healthy(plugin_id).await); + } + + #[tokio::test] + async fn test_circuit_breaker_half_open_after_cooldown() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Trip the circuit breaker + for _ in 0..5 { + pipeline.record_failure(plugin_id).await; + } + assert!(!pipeline.is_healthy(plugin_id).await); + + // Simulate cooldown elapsed by backdating last_failure + { + let mut health_map = pipeline.health.write().await; + let entry = health_map.get_mut(plugin_id).unwrap(); + entry.last_failure = Some( + Instant::now() + .checked_sub(CIRCUIT_BREAKER_COOLDOWN) + .unwrap() + - Duration::from_secs(1), + ); + } + + // Half-open: should be healthy for a trial call + assert!(pipeline.is_healthy(plugin_id).await); + + // If the trial fails, breaker re-trips with fresh cooldown + pipeline.record_failure(plugin_id).await; + assert!(!pipeline.is_healthy(plugin_id).await); + + // If we backdate again and the trial succeeds, fully recovered + { + let mut health_map = pipeline.health.write().await; + let entry = health_map.get_mut(plugin_id).unwrap(); + entry.last_failure = Some( + Instant::now() + .checked_sub(CIRCUIT_BREAKER_COOLDOWN) + .unwrap() + - Duration::from_secs(1), + ); + } + assert!(pipeline.is_healthy(plugin_id).await); + pipeline.record_success(plugin_id).await; + assert!(pipeline.is_healthy(plugin_id).await); + + // Verify fully reset: failure counter starts fresh + let health_map = pipeline.health.read().await; + let entry = health_map.get(plugin_id).unwrap(); + assert_eq!(entry.consecutive_failures, 0); + assert!(!entry.disabled_by_circuit_breaker); + assert!(entry.last_failure.is_none()); + } + + #[tokio::test] + async fn test_empty_pipeline_resolve_media_type() { + let (_dir, pipeline) = create_test_pipeline(); + + // With no plugins, falls back to built-in + let result = pipeline + .resolve_media_type(Path::new("/tmp/test.mp3")) + .await; + assert!(result.is_some()); + assert_eq!(result.unwrap().id(), "mp3"); + } + + #[tokio::test] + async fn test_empty_pipeline_resolve_unknown_type() { + let (_dir, pipeline) = create_test_pipeline(); + + let result = pipeline + .resolve_media_type(Path::new("/tmp/test.xyzunknown")) + .await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_empty_pipeline_extract_metadata() { + let (_dir, pipeline) = create_test_pipeline(); + + // Built-in extractor handles this (may return default for non-existent + // file, but should not panic) + let mt = MediaType::from_path(Path::new("/tmp/fake.txt")); + if let Some(mt) = mt { + let result = pipeline + .extract_metadata(Path::new("/tmp/fake.txt"), &mt) + .await; + + // The built-in extractor may succeed with defaults or fail + // gracefully; either is fine for this test. + let _ = result; + } + } + + #[tokio::test] + async fn test_empty_pipeline_discover_capabilities() { + let (_dir, pipeline) = create_test_pipeline(); + + let result = pipeline.discover_capabilities().await; + assert!(result.is_ok()); + + // Capabilities should be empty + let caps = pipeline.capabilities.read().await; + assert!(caps.supported_types.is_empty()); + assert!(caps.interested_events.is_empty()); + assert!(caps.media_type_definitions.is_empty()); + assert!(caps.theme_definitions.is_empty()); + } + + #[test] + fn test_merge_metadata_overwrites_some_fields() { + let mut base = ExtractedMetadata::default(); + base.title = Some("Original".to_string()); + base.artist = Some("Original Artist".to_string()); + + let resp = ExtractMetadataResponse { + title: Some("New Title".to_string()), + artist: None, // should not overwrite + album: Some("New Album".to_string()), + genre: None, + year: Some(2024), + duration_secs: None, + description: None, + extra: HashMap::new(), + }; + + merge_metadata(&mut base, &resp); + + assert_eq!(base.title.as_deref(), Some("New Title")); + assert_eq!(base.artist.as_deref(), Some("Original Artist")); + assert_eq!(base.album.as_deref(), Some("New Album")); + assert_eq!(base.year, Some(2024)); + } + + #[test] + fn test_merge_metadata_extra_fields() { + let mut base = ExtractedMetadata::default(); + base.extra.insert("key1".to_string(), "val1".to_string()); + + let mut extra = HashMap::new(); + extra.insert("key2".to_string(), "val2".to_string()); + extra.insert("key1".to_string(), "overwritten".to_string()); + + let resp = ExtractMetadataResponse { + extra, + ..Default::default() + }; + + merge_metadata(&mut base, &resp); + + assert_eq!( + base.extra.get("key1").map(String::as_str), + Some("overwritten") + ); + assert_eq!(base.extra.get("key2").map(String::as_str), Some("val2")); + } + + #[test] + fn test_merge_extracted_preserves_photo_fields() { + let mut base = ExtractedMetadata::default(); + let source = ExtractedMetadata { + latitude: Some(48.8566), + longitude: Some(2.3522), + camera_make: Some("Canon".to_string()), + camera_model: Some("EOS R5".to_string()), + ..Default::default() + }; + + merge_extracted(&mut base, source); + + assert_eq!(base.latitude, Some(48.8566)); + assert_eq!(base.longitude, Some(2.3522)); + assert_eq!(base.camera_make.as_deref(), Some("Canon")); + assert_eq!(base.camera_model.as_deref(), Some("EOS R5")); + } + + #[tokio::test] + async fn test_unknown_plugin_is_healthy() { + let (_dir, pipeline) = create_test_pipeline(); + + // A never-seen plugin is considered healthy + assert!(pipeline.is_healthy("never-registered").await); + } + + #[tokio::test] + async fn test_emit_event_with_empty_pipeline() { + let (_dir, pipeline) = create_test_pipeline(); + + // Discover capabilities first (empty, no plugins) + pipeline.discover_capabilities().await.unwrap(); + + // Emit an event with no interested handlers; should complete + // without panic. The spawned task returns immediately. + pipeline.emit_event( + "MediaImported", + &serde_json::json!({"media_id": "test-123"}), + ); + + // Give the spawned task time to run + tokio::time::sleep(Duration::from_millis(50)).await; + } +} diff --git a/crates/pinakes-core/src/plugin/registry.rs b/crates/pinakes-core/src/plugin/registry.rs index 1439280..76ef7ad 100644 --- a/crates/pinakes-core/src/plugin/registry.rs +++ b/crates/pinakes-core/src/plugin/registry.rs @@ -178,8 +178,9 @@ mod tests { entrypoint: None, }, dependencies: vec![], + priority: 0, }, - capabilities: Default::default(), + capabilities: ManifestCapabilities::default(), config: HashMap::new(), }; diff --git a/crates/pinakes-core/src/plugin/rpc.rs b/crates/pinakes-core/src/plugin/rpc.rs new file mode 100644 index 0000000..40d4d13 --- /dev/null +++ b/crates/pinakes-core/src/plugin/rpc.rs @@ -0,0 +1,239 @@ +//! JSON RPC types for structured plugin function calls. +//! +//! Each extension point maps to well-known exported function names. +//! Requests are serialized to JSON, passed to the plugin, and responses +//! are deserialized from JSON written by the plugin via `host_set_result`. + +use std::{collections::HashMap, path::PathBuf}; + +use serde::{Deserialize, Serialize}; + +/// Request to check if a plugin can handle a file +#[derive(Debug, Serialize)] +pub struct CanHandleRequest { + pub path: PathBuf, + pub mime_type: Option, +} + +/// Response from `can_handle` +#[derive(Debug, Deserialize)] +pub struct CanHandleResponse { + pub can_handle: bool, +} + +/// Media type definition returned by `supported_media_types` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PluginMediaTypeDefinition { + pub id: String, + pub name: String, + pub category: Option, + pub extensions: Vec, + pub mime_types: Vec, +} + +/// Request to extract metadata from a file +#[derive(Debug, Serialize)] +pub struct ExtractMetadataRequest { + pub path: PathBuf, +} + +/// Metadata response from a plugin (all fields optional for partial results) +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ExtractMetadataResponse { + #[serde(default)] + pub title: Option, + #[serde(default)] + pub artist: Option, + #[serde(default)] + pub album: Option, + #[serde(default)] + pub genre: Option, + #[serde(default)] + pub year: Option, + #[serde(default)] + pub duration_secs: Option, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub extra: HashMap, +} + +/// Request to generate a thumbnail +#[derive(Debug, Serialize)] +pub struct GenerateThumbnailRequest { + pub source_path: PathBuf, + pub output_path: PathBuf, + pub max_width: u32, + pub max_height: u32, + pub format: String, +} + +/// Response from thumbnail generation +#[derive(Debug, Deserialize)] +pub struct GenerateThumbnailResponse { + pub path: PathBuf, + pub width: u32, + pub height: u32, + pub format: String, +} + +/// Event sent to event handler plugins +#[derive(Debug, Serialize)] +pub struct HandleEventRequest { + pub event_type: String, + pub payload: serde_json::Value, +} + +/// Search request for search backend plugins +#[derive(Debug, Serialize)] +pub struct SearchRequest { + pub query: String, + pub limit: usize, + pub offset: usize, +} + +/// Search response +#[derive(Debug, Clone, Deserialize)] +pub struct SearchResponse { + pub results: Vec, + #[serde(default)] + pub total_count: Option, +} + +/// Individual search result +#[derive(Debug, Clone, Deserialize)] +pub struct SearchResultItem { + pub id: String, + pub score: f64, + pub snippet: Option, +} + +/// Request to index a media item in a search backend +#[derive(Debug, Serialize)] +pub struct IndexItemRequest { + pub id: String, + pub title: Option, + pub artist: Option, + pub album: Option, + pub description: Option, + pub tags: Vec, + pub media_type: String, + pub path: PathBuf, +} + +/// Request to remove a media item from a search backend +#[derive(Debug, Serialize)] +pub struct RemoveItemRequest { + pub id: String, +} + +/// A theme definition returned by a theme provider plugin +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PluginThemeDefinition { + pub id: String, + pub name: String, + pub description: Option, + pub dark: bool, +} + +/// Response from `load_theme` +#[derive(Debug, Clone, Deserialize)] +pub struct LoadThemeResponse { + pub css: Option, + pub colors: HashMap, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_metadata_request_serialization() { + let req = ExtractMetadataRequest { + path: "/tmp/test.mp3".into(), + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains("/tmp/test.mp3")); + } + + #[test] + fn test_extract_metadata_response_partial() { + let json = r#"{"title":"My Song","extra":{"bpm":"120"}}"#; + let resp: ExtractMetadataResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.title.as_deref(), Some("My Song")); + assert_eq!(resp.artist, None); + assert_eq!(resp.extra.get("bpm").map(String::as_str), Some("120")); + } + + #[test] + fn test_extract_metadata_response_empty() { + let json = "{}"; + let resp: ExtractMetadataResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.title, None); + assert!(resp.extra.is_empty()); + } + + #[test] + fn test_can_handle_response() { + let json = r#"{"can_handle":true}"#; + let resp: CanHandleResponse = serde_json::from_str(json).unwrap(); + assert!(resp.can_handle); + } + + #[test] + fn test_can_handle_response_false() { + let json = r#"{"can_handle":false}"#; + let resp: CanHandleResponse = serde_json::from_str(json).unwrap(); + assert!(!resp.can_handle); + } + + #[test] + fn test_plugin_media_type_definition_round_trip() { + let def = PluginMediaTypeDefinition { + id: "heif".to_string(), + name: "HEIF Image".to_string(), + category: Some("image".to_string()), + extensions: vec!["heif".to_string(), "heic".to_string()], + mime_types: vec!["image/heif".to_string()], + }; + let json = serde_json::to_string(&def).unwrap(); + let parsed: PluginMediaTypeDefinition = + serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.id, "heif"); + assert_eq!(parsed.extensions.len(), 2); + } + + #[test] + fn test_search_response() { + let json = + r#"{"results":[{"id":"abc","score":0.95,"snippet":"match here"}]}"#; + let resp: SearchResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.results.len(), 1); + assert_eq!(resp.results[0].id, "abc"); + } + + #[test] + fn test_generate_thumbnail_request_serialization() { + let req = GenerateThumbnailRequest { + source_path: "/media/photo.heif".into(), + output_path: "/tmp/thumb.jpg".into(), + max_width: 256, + max_height: 256, + format: "jpeg".to_string(), + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains("photo.heif")); + assert!(json.contains("256")); + } + + #[test] + fn test_handle_event_request_serialization() { + let req = HandleEventRequest { + event_type: "MediaImported".to_string(), + payload: serde_json::json!({"id": "abc-123"}), + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains("MediaImported")); + assert!(json.contains("abc-123")); + } +} diff --git a/crates/pinakes-core/src/plugin/runtime.rs b/crates/pinakes-core/src/plugin/runtime.rs index c47f12e..8ad22e3 100644 --- a/crates/pinakes-core/src/plugin/runtime.rs +++ b/crates/pinakes-core/src/plugin/runtime.rs @@ -4,7 +4,17 @@ use std::{path::Path, sync::Arc}; use anyhow::{Result, anyhow}; use pinakes_plugin_api::PluginContext; -use wasmtime::{Caller, Config, Engine, Linker, Module, Store, Val, anyhow}; +use wasmtime::{ + Caller, + Config, + Engine, + Linker, + Module, + Store, + StoreLimitsBuilder, + Val, + anyhow, +}; /// WASM runtime wrapper for executing plugins pub struct WasmRuntime { @@ -58,6 +68,8 @@ impl WasmRuntime { pub struct PluginStoreData { pub context: PluginContext, pub exchange_buffer: Vec, + pub pending_events: Vec<(String, String)>, + pub limiter: wasmtime::StoreLimits, } /// A loaded WASM plugin instance @@ -90,11 +102,23 @@ impl WasmPlugin { ) -> Result> { let engine = self.module.engine(); + // Build memory limiter from capabilities + let memory_limit = self + .context + .capabilities + .max_memory_bytes + .unwrap_or(512 * 1024 * 1024); // default 512 MB + + let limiter = StoreLimitsBuilder::new().memory_size(memory_limit).build(); + let store_data = PluginStoreData { - context: self.context.clone(), + context: self.context.clone(), exchange_buffer: Vec::new(), + pending_events: Vec::new(), + limiter, }; let mut store = Store::new(engine, store_data); + store.limiter(|data| &mut data.limiter); // Set fuel limit based on capabilities if let Some(max_cpu_time_ms) = self.context.capabilities.max_cpu_time_ms { @@ -194,6 +218,47 @@ impl WasmPlugin { Ok(Vec::new()) } } + + /// Call a plugin function with JSON request/response serialization. + /// + /// Serializes `request` to JSON, calls the named function, deserializes + /// the response. Wraps the call with `tokio::time::timeout`. + /// + /// # Errors + /// + /// Returns an error if serialization fails, the call times out, the plugin + /// traps, or the response is malformed JSON. + #[allow(clippy::future_not_send)] // Req doesn't need Sync; called within local tasks + pub async fn call_function_json( + &self, + function_name: &str, + request: &Req, + timeout: std::time::Duration, + ) -> anyhow::Result + where + Req: serde::Serialize, + Resp: serde::de::DeserializeOwned, + { + let request_bytes = serde_json::to_vec(request) + .map_err(|e| anyhow::anyhow!("failed to serialize request: {e}"))?; + + let result = tokio::time::timeout( + timeout, + self.call_function(function_name, &request_bytes), + ) + .await + .map_err(|_| { + anyhow::anyhow!( + "plugin call '{function_name}' timed out after {timeout:?}" + ) + })??; + + serde_json::from_slice(&result).map_err(|e| { + anyhow::anyhow!( + "failed to deserialize response from '{function_name}': {e}" + ) + }) + } } #[cfg(test)] @@ -220,7 +285,8 @@ pub struct HostFunctions; impl HostFunctions { /// Registers all host ABI functions (`host_log`, `host_read_file`, /// `host_write_file`, `host_http_request`, `host_get_config`, - /// `host_get_buffer`) into the given linker. + /// `host_get_env`, `host_get_buffer`, `host_set_result`, + /// `host_emit_event`) into the given linker. /// /// # Errors /// @@ -423,6 +489,29 @@ impl HostFunctions { return -2; } + // Check domain whitelist if configured + if let Some(ref allowed) = + caller.data().context.capabilities.network.allowed_domains + { + let parsed = match url::Url::parse(&url_str) { + Ok(u) => u, + _ => { + tracing::warn!(url = %url_str, "plugin provided invalid URL"); + return -1; + }, + }; + let domain = parsed.host_str().unwrap_or(""); + + if !allowed.iter().any(|d| d.eq_ignore_ascii_case(domain)) { + tracing::warn!( + url = %url_str, + domain = domain, + "plugin domain not in allowlist" + ); + return -3; + } + } + // Use block_in_place to avoid blocking the async runtime's thread pool. // Falls back to a blocking client with timeout if block_in_place is // unavailable. @@ -513,6 +602,66 @@ impl HostFunctions { }, )?; + linker.func_wrap( + "env", + "host_get_env", + |mut caller: Caller<'_, PluginStoreData>, + key_ptr: i32, + key_len: i32| + -> i32 { + if key_ptr < 0 || key_len < 0 { + return -1; + } + let memory = caller + .get_export("memory") + .and_then(wasmtime::Extern::into_memory); + let Some(mem) = memory else { return -1 }; + + let data = mem.data(&caller); + let start = u32::try_from(key_ptr).unwrap_or(0) as usize; + let end = start + u32::try_from(key_len).unwrap_or(0) as usize; + if end > data.len() { + return -1; + } + + let key_str = match std::str::from_utf8(&data[start..end]) { + Ok(s) => s.to_string(), + Err(_) => return -1, + }; + + // Check environment capability + let env_cap = &caller.data().context.capabilities.environment; + if !env_cap.enabled { + tracing::warn!( + var = %key_str, + "plugin environment access denied" + ); + return -2; + } + + // Check against allowed variables list if configured + if let Some(ref allowed) = env_cap.allowed_vars + && !allowed.iter().any(|v| v == &key_str) + { + tracing::warn!( + var = %key_str, + "plugin env var not in allowlist" + ); + return -2; + } + + match std::env::var(&key_str) { + Ok(value) => { + let bytes = value.into_bytes(); + let len = i32::try_from(bytes.len()).unwrap_or(i32::MAX); + caller.data_mut().exchange_buffer = bytes; + len + }, + Err(_) => -1, + } + }, + )?; + linker.func_wrap( "env", "host_get_buffer", @@ -543,6 +692,83 @@ impl HostFunctions { }, )?; + linker.func_wrap( + "env", + "host_set_result", + |mut caller: Caller<'_, PluginStoreData>, ptr: i32, len: i32| { + if ptr < 0 || len < 0 { + return; + } + let memory = caller + .get_export("memory") + .and_then(wasmtime::Extern::into_memory); + let Some(mem) = memory else { return }; + + let data = mem.data(&caller); + let start = u32::try_from(ptr).unwrap_or(0) as usize; + let end = start + u32::try_from(len).unwrap_or(0) as usize; + if end <= data.len() { + caller.data_mut().exchange_buffer = data[start..end].to_vec(); + } + }, + )?; + + linker.func_wrap( + "env", + "host_emit_event", + |mut caller: Caller<'_, PluginStoreData>, + type_ptr: i32, + type_len: i32, + payload_ptr: i32, + payload_len: i32| + -> i32 { + const MAX_PENDING_EVENTS: usize = 1000; + + if type_ptr < 0 || type_len < 0 || payload_ptr < 0 || payload_len < 0 { + return -1; + } + let memory = caller + .get_export("memory") + .and_then(wasmtime::Extern::into_memory); + let Some(mem) = memory else { return -1 }; + + let type_start = u32::try_from(type_ptr).unwrap_or(0) as usize; + let type_end = + type_start + u32::try_from(type_len).unwrap_or(0) as usize; + let payload_start = u32::try_from(payload_ptr).unwrap_or(0) as usize; + let payload_end = + payload_start + u32::try_from(payload_len).unwrap_or(0) as usize; + + // Extract owned strings in a block so the immutable borrow of + // `caller` (via `mem.data`) is dropped before `caller.data_mut()`. + let (event_type, payload) = { + let data = mem.data(&caller); + if type_end > data.len() || payload_end > data.len() { + return -1; + } + let event_type = + match std::str::from_utf8(&data[type_start..type_end]) { + Ok(s) => s.to_string(), + Err(_) => return -1, + }; + let payload = + match std::str::from_utf8(&data[payload_start..payload_end]) { + Ok(s) => s.to_string(), + Err(_) => return -1, + }; + (event_type, payload) + }; + + if caller.data().pending_events.len() >= MAX_PENDING_EVENTS { + tracing::warn!("plugin exceeded max pending events limit"); + return -4; + } + + caller.data_mut().pending_events.push((event_type, payload)); + 0 + }, + )?; + Ok(()) } } diff --git a/crates/pinakes-core/src/plugin/security.rs b/crates/pinakes-core/src/plugin/security.rs index 5f887fb..6bebb94 100644 --- a/crates/pinakes-core/src/plugin/security.rs +++ b/crates/pinakes-core/src/plugin/security.rs @@ -235,6 +235,54 @@ impl CapabilityEnforcer { .unwrap_or(self.max_cpu_time_limit) .min(self.max_cpu_time_limit) } + + /// Validate that a function call is allowed for a plugin's declared kinds. + /// + /// Defense-in-depth: even though the pipeline filters by kind, this prevents + /// bugs from calling wrong functions on plugins. Returns `true` if allowed. + #[must_use] + pub fn validate_function_call( + &self, + plugin_kinds: &[String], + function_name: &str, + ) -> bool { + match function_name { + // Lifecycle functions are always allowed + "initialize" | "shutdown" | "health_check" => true, + // MediaTypeProvider + "supported_media_types" | "can_handle" => { + plugin_kinds.iter().any(|k| k == "media_type") + }, + // supported_types is shared by metadata_extractor and thumbnail_generator + "supported_types" => { + plugin_kinds + .iter() + .any(|k| k == "metadata_extractor" || k == "thumbnail_generator") + }, + // MetadataExtractor + "extract_metadata" => { + plugin_kinds.iter().any(|k| k == "metadata_extractor") + }, + // ThumbnailGenerator + "generate_thumbnail" => { + plugin_kinds.iter().any(|k| k == "thumbnail_generator") + }, + // SearchBackend + "search" | "index_item" | "remove_item" | "get_stats" => { + plugin_kinds.iter().any(|k| k == "search_backend") + }, + // EventHandler + "interested_events" | "handle_event" => { + plugin_kinds.iter().any(|k| k == "event_handler") + }, + // ThemeProvider + "get_themes" | "load_theme" => { + plugin_kinds.iter().any(|k| k == "theme_provider") + }, + // Unknown function names are not allowed + _ => false, + } + } } impl Default for CapabilityEnforcer { @@ -356,20 +404,70 @@ mod tests { let mut caps = Capabilities::default(); - // No limits specified - use defaults + // No limits specified, use the defaults assert_eq!(enforcer.get_memory_limit(&caps), 100 * 1024 * 1024); assert_eq!(enforcer.get_cpu_time_limit(&caps), 30_000); - // Plugin requests lower limits - use plugin's + // Plugin requests lower limits, use plugin's caps.max_memory_bytes = Some(50 * 1024 * 1024); caps.max_cpu_time_ms = Some(10_000); assert_eq!(enforcer.get_memory_limit(&caps), 50 * 1024 * 1024); assert_eq!(enforcer.get_cpu_time_limit(&caps), 10_000); - // Plugin requests higher limits - cap at system max + // Plugin requests higher limits, cap at system max caps.max_memory_bytes = Some(200 * 1024 * 1024); caps.max_cpu_time_ms = Some(60_000); assert_eq!(enforcer.get_memory_limit(&caps), 100 * 1024 * 1024); assert_eq!(enforcer.get_cpu_time_limit(&caps), 30_000); } + + #[test] + fn test_validate_function_call_lifecycle_always_allowed() { + let enforcer = CapabilityEnforcer::new(); + let kinds = vec!["metadata_extractor".to_string()]; + assert!(enforcer.validate_function_call(&kinds, "initialize")); + assert!(enforcer.validate_function_call(&kinds, "shutdown")); + assert!(enforcer.validate_function_call(&kinds, "health_check")); + } + + #[test] + fn test_validate_function_call_metadata_extractor() { + let enforcer = CapabilityEnforcer::new(); + let kinds = vec!["metadata_extractor".to_string()]; + assert!(enforcer.validate_function_call(&kinds, "extract_metadata")); + assert!(enforcer.validate_function_call(&kinds, "supported_types")); + assert!(!enforcer.validate_function_call(&kinds, "search")); + assert!(!enforcer.validate_function_call(&kinds, "generate_thumbnail")); + assert!(!enforcer.validate_function_call(&kinds, "can_handle")); + } + + #[test] + fn test_validate_function_call_multi_kind() { + let enforcer = CapabilityEnforcer::new(); + let kinds = + vec!["media_type".to_string(), "metadata_extractor".to_string()]; + assert!(enforcer.validate_function_call(&kinds, "can_handle")); + assert!(enforcer.validate_function_call(&kinds, "supported_media_types")); + assert!(enforcer.validate_function_call(&kinds, "extract_metadata")); + assert!(!enforcer.validate_function_call(&kinds, "search")); + } + + #[test] + fn test_validate_function_call_unknown_function() { + let enforcer = CapabilityEnforcer::new(); + let kinds = vec!["metadata_extractor".to_string()]; + assert!(!enforcer.validate_function_call(&kinds, "unknown_func")); + assert!(!enforcer.validate_function_call(&kinds, "")); + } + + #[test] + fn test_validate_function_call_shared_supported_types() { + let enforcer = CapabilityEnforcer::new(); + let extractor = vec!["metadata_extractor".to_string()]; + let generator = vec!["thumbnail_generator".to_string()]; + let search = vec!["search_backend".to_string()]; + assert!(enforcer.validate_function_call(&extractor, "supported_types")); + assert!(enforcer.validate_function_call(&generator, "supported_types")); + assert!(!enforcer.validate_function_call(&search, "supported_types")); + } } diff --git a/crates/pinakes-core/src/plugin/signature.rs b/crates/pinakes-core/src/plugin/signature.rs new file mode 100644 index 0000000..64f9dc5 --- /dev/null +++ b/crates/pinakes-core/src/plugin/signature.rs @@ -0,0 +1,252 @@ +//! Plugin signature verification using Ed25519 + BLAKE3 +//! +//! Each plugin directory may contain a `plugin.sig` file alongside its +//! `plugin.toml`. The signature covers the BLAKE3 hash of the WASM binary +//! referenced by the manifest. Verification uses Ed25519 public keys +//! configured as trusted in the server's plugin settings. +//! +//! When `allow_unsigned` is false, plugins _must_ carry a valid signature +//! from one of the trusted keys or they will be rejected at load time. + +use std::path::Path; + +use anyhow::{Result, anyhow}; +use ed25519_dalek::{Signature, Verifier, VerifyingKey}; + +/// Outcome of a signature check on a plugin package. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SignatureStatus { + /// Signature is present and valid against a trusted key. + Valid, + /// No signature file found. + Unsigned, + /// Signature file exists but does not match any trusted key. + Invalid(String), +} + +/// Verify the signature of a plugin's WASM binary. +/// +/// Reads `plugin.sig` from `plugin_dir`, computes the BLAKE3 hash of the +/// WASM binary at `wasm_path`, and verifies the signature against each of +/// the `trusted_keys`. The signature file is raw 64-byte Ed25519 signature +/// over the 32-byte BLAKE3 digest. +/// +/// # Errors +/// +/// Returns an error only on I/O failures, never for cryptographic rejection, +/// which is reported via [`SignatureStatus`] instead. +pub fn verify_plugin_signature( + plugin_dir: &Path, + wasm_path: &Path, + trusted_keys: &[VerifyingKey], +) -> Result { + let sig_path = plugin_dir.join("plugin.sig"); + if !sig_path.exists() { + return Ok(SignatureStatus::Unsigned); + } + + let sig_bytes = std::fs::read(&sig_path) + .map_err(|e| anyhow!("failed to read plugin.sig: {e}"))?; + + let signature = Signature::from_slice(&sig_bytes).map_err(|e| { + // Malformed signature file is an invalid signature, not an I/O error + tracing::warn!(path = %sig_path.display(), "malformed plugin.sig: {e}"); + anyhow!("malformed plugin.sig: {e}") + }); + let Ok(signature) = signature else { + return Ok(SignatureStatus::Invalid( + "malformed signature file".to_string(), + )); + }; + + // BLAKE3 hash of the WASM binary is the signed message + let wasm_bytes = std::fs::read(wasm_path) + .map_err(|e| anyhow!("failed to read WASM binary for verification: {e}"))?; + let digest = blake3::hash(&wasm_bytes); + let message = digest.as_bytes(); + + for key in trusted_keys { + if key.verify(message, &signature).is_ok() { + return Ok(SignatureStatus::Valid); + } + } + + Ok(SignatureStatus::Invalid( + "signature did not match any trusted key".to_string(), + )) +} + +/// Parse a hex-encoded Ed25519 public key (64 hex characters = 32 bytes). +/// +/// # Errors +/// +/// Returns an error if the string is not valid hex or is the wrong length. +pub fn parse_public_key(hex_str: &str) -> Result { + let hex_str = hex_str.trim(); + if hex_str.len() != 64 { + return Err(anyhow!( + "expected 64 hex characters for Ed25519 public key, got {}", + hex_str.len() + )); + } + + let mut bytes = [0u8; 32]; + for (i, byte) in bytes.iter_mut().enumerate() { + *byte = u8::from_str_radix(&hex_str[i * 2..i * 2 + 2], 16) + .map_err(|e| anyhow!("invalid hex in public key: {e}"))?; + } + + VerifyingKey::from_bytes(&bytes) + .map_err(|e| anyhow!("invalid Ed25519 public key: {e}")) +} + +#[cfg(test)] +mod tests { + use ed25519_dalek::{Signer, SigningKey}; + use rand::RngExt; + + use super::*; + + fn make_keypair() -> (SigningKey, VerifyingKey) { + let secret_bytes: [u8; 32] = rand::rng().random(); + let signing = SigningKey::from_bytes(&secret_bytes); + let verifying = signing.verifying_key(); + (signing, verifying) + } + + #[test] + fn test_verify_unsigned_plugin() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + std::fs::write(&wasm_path, b"\0asm\x01\x00\x00\x00").unwrap(); + + let (_, vk) = make_keypair(); + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert_eq!(status, SignatureStatus::Unsigned); + } + + #[test] + fn test_verify_valid_signature() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let wasm_bytes = b"\0asm\x01\x00\x00\x00some_code_here"; + std::fs::write(&wasm_path, wasm_bytes).unwrap(); + + let (sk, vk) = make_keypair(); + + // Sign the BLAKE3 hash of the WASM binary + let digest = blake3::hash(wasm_bytes); + let signature = sk.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert_eq!(status, SignatureStatus::Valid); + } + + #[test] + fn test_verify_wrong_key() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let wasm_bytes = b"\0asm\x01\x00\x00\x00some_code"; + std::fs::write(&wasm_path, wasm_bytes).unwrap(); + + let (sk, _) = make_keypair(); + let (_, wrong_vk) = make_keypair(); + + let digest = blake3::hash(wasm_bytes); + let signature = sk.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[wrong_vk]).unwrap(); + assert!(matches!(status, SignatureStatus::Invalid(_))); + } + + #[test] + fn test_verify_tampered_wasm() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let original = b"\0asm\x01\x00\x00\x00original"; + std::fs::write(&wasm_path, original).unwrap(); + + let (sk, vk) = make_keypair(); + let digest = blake3::hash(original); + let signature = sk.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + // Tamper with the WASM file after signing + std::fs::write(&wasm_path, b"\0asm\x01\x00\x00\x00tampered").unwrap(); + + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert!(matches!(status, SignatureStatus::Invalid(_))); + } + + #[test] + fn test_verify_malformed_sig_file() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + std::fs::write(&wasm_path, b"\0asm\x01\x00\x00\x00").unwrap(); + + // Write garbage to plugin.sig (wrong length) + std::fs::write(dir.path().join("plugin.sig"), b"not a signature").unwrap(); + + let (_, vk) = make_keypair(); + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert!(matches!(status, SignatureStatus::Invalid(_))); + } + + #[test] + fn test_verify_multiple_trusted_keys() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let wasm_bytes = b"\0asm\x01\x00\x00\x00multi_key_test"; + std::fs::write(&wasm_path, wasm_bytes).unwrap(); + + let (sk2, vk2) = make_keypair(); + let (_, vk1) = make_keypair(); + let (_, vk3) = make_keypair(); + + // Sign with key 2 + let digest = blake3::hash(wasm_bytes); + let signature = sk2.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + // Verify against [vk1, vk2, vk3]; should find vk2 + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk1, vk2, vk3]) + .unwrap(); + assert_eq!(status, SignatureStatus::Valid); + } + + #[test] + fn test_parse_public_key_valid() { + let (_, vk) = make_keypair(); + let hex = hex_encode(vk.as_bytes()); + let parsed = parse_public_key(&hex).unwrap(); + assert_eq!(parsed, vk); + } + + #[test] + fn test_parse_public_key_wrong_length() { + assert!(parse_public_key("abcdef").is_err()); + } + + #[test] + fn test_parse_public_key_invalid_hex() { + let bad = + "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"; + assert!(parse_public_key(bad).is_err()); + } + + fn hex_encode(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{b:02x}")).collect() + } +}