//! Plugin system for Pinakes //! //! This module provides a comprehensive plugin architecture that allows //! extending Pinakes with custom media types, metadata extractors, search //! backends, and more. //! //! # Architecture //! //! - Plugins are compiled to WASM and run in a sandboxed environment //! - Capability-based security controls what plugins can access //! - Hot-reload support for development //! - Automatic plugin discovery from configured directories use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use pinakes_plugin_api::{PluginContext, PluginMetadata}; use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; pub mod loader; pub mod pipeline; pub mod registry; pub mod rpc; pub mod runtime; pub mod security; pub mod signature; pub use loader::PluginLoader; pub use pipeline::PluginPipeline; pub use registry::{PluginRegistry, RegisteredPlugin}; pub use runtime::{WasmPlugin, WasmRuntime}; pub use security::CapabilityEnforcer; pub use signature::{SignatureStatus, verify_plugin_signature}; /// Plugin manager coordinates plugin lifecycle and operations pub struct PluginManager { /// Plugin registry registry: Arc>, /// WASM runtime for executing plugins runtime: Arc, /// Plugin loader for discovery and loading loader: PluginLoader, /// Capability enforcer for security enforcer: CapabilityEnforcer, /// Plugin data directory data_dir: PathBuf, /// Plugin cache directory cache_dir: PathBuf, /// Configuration config: PluginManagerConfig, } /// Configuration for the plugin manager #[derive(Debug, Clone)] pub struct PluginManagerConfig { /// Directories to search for plugins pub plugin_dirs: Vec, /// Whether to enable hot-reload (for development) pub enable_hot_reload: bool, /// Whether to allow unsigned plugins pub allow_unsigned: bool, /// Maximum number of concurrent plugin operations pub max_concurrent_ops: usize, /// Plugin timeout in seconds pub plugin_timeout_secs: u64, /// Timeout configuration for different call types pub timeouts: crate::config::PluginTimeoutConfig, /// Max consecutive failures before circuit breaker disables plugin pub max_consecutive_failures: u32, /// Trusted Ed25519 public keys for signature verification (hex-encoded) pub trusted_keys: Vec, } impl Default for PluginManagerConfig { fn default() -> Self { Self { plugin_dirs: vec![], enable_hot_reload: false, allow_unsigned: false, max_concurrent_ops: 4, plugin_timeout_secs: 30, timeouts: crate::config::PluginTimeoutConfig::default(), max_consecutive_failures: 5, trusted_keys: vec![], } } } impl From for PluginManagerConfig { fn from(cfg: crate::config::PluginsConfig) -> Self { Self { plugin_dirs: cfg.plugin_dirs, enable_hot_reload: cfg.enable_hot_reload, allow_unsigned: cfg.allow_unsigned, max_concurrent_ops: cfg.max_concurrent_ops, plugin_timeout_secs: cfg.plugin_timeout_secs, timeouts: cfg.timeouts, max_consecutive_failures: cfg.max_consecutive_failures, trusted_keys: cfg.trusted_keys, } } } impl PluginManager { /// Create a new plugin manager /// /// # Errors /// /// Returns an error if the data or cache directories cannot be created, or /// if the WASM runtime cannot be initialized. pub fn new( data_dir: PathBuf, cache_dir: PathBuf, config: PluginManagerConfig, ) -> Result { // Ensure directories exist std::fs::create_dir_all(&data_dir)?; std::fs::create_dir_all(&cache_dir)?; let runtime = Arc::new(WasmRuntime::new()?); let registry = Arc::new(RwLock::new(PluginRegistry::new())); let loader = PluginLoader::new(config.plugin_dirs.clone()); let enforcer = CapabilityEnforcer::new(); Ok(Self { registry, runtime, loader, enforcer, data_dir, cache_dir, config, }) } /// Discover and load all plugins from configured directories. /// /// Plugins are loaded in dependency order: if plugin A declares a /// dependency on plugin B, B is loaded first. Cycles and missing /// dependencies are detected and reported as warnings; affected plugins /// are skipped rather than causing a hard failure. /// /// # Errors /// /// Returns an error if plugin discovery fails. pub async fn discover_and_load_all(&self) -> Result> { info!("Discovering plugins from {:?}", self.config.plugin_dirs); let manifests = self.loader.discover_plugins()?; let ordered = Self::resolve_load_order(&manifests); let mut loaded_plugins = Vec::new(); for manifest in ordered { match self.load_plugin_from_manifest(&manifest).await { Ok(plugin_id) => { info!("Loaded plugin: {}", plugin_id); loaded_plugins.push(plugin_id); }, Err(e) => { warn!("Failed to load plugin {}: {}", manifest.plugin.name, e); }, } } Ok(loaded_plugins) } /// Topological sort of manifests by their declared `dependencies`. /// /// Uses Kahn's algorithm. Plugins whose dependencies are missing or form /// a cycle are logged as warnings and excluded from the result. fn resolve_load_order( manifests: &[pinakes_plugin_api::PluginManifest], ) -> Vec { use std::collections::{HashMap, HashSet, VecDeque}; // Index manifests by name for O(1) lookup let by_name: HashMap<&str, usize> = manifests .iter() .enumerate() .map(|(i, m)| (m.plugin.name.as_str(), i)) .collect(); // Check for missing dependencies and warn early let known: HashSet<&str> = by_name.keys().copied().collect(); for manifest in manifests { for dep in &manifest.plugin.dependencies { if !known.contains(dep.as_str()) { warn!( "Plugin '{}' depends on '{}' which was not discovered; it will be \ skipped", manifest.plugin.name, dep ); } } } // Build adjacency: in_degree[i] = number of deps that must load before i let mut in_degree = vec![0usize; manifests.len()]; // dependents[i] = indices that depend on i (i must load before them) let mut dependents: Vec> = vec![vec![]; manifests.len()]; for (i, manifest) in manifests.iter().enumerate() { for dep in &manifest.plugin.dependencies { if let Some(&dep_idx) = by_name.get(dep.as_str()) { in_degree[i] += 1; dependents[dep_idx].push(i); } else { // Missing dep: set in_degree impossibly high so it never resolves in_degree[i] = usize::MAX; } } } // Kahn's algorithm let mut queue: VecDeque = VecDeque::new(); for (i, °) in in_degree.iter().enumerate() { if deg == 0 { queue.push_back(i); } } let mut result = Vec::with_capacity(manifests.len()); while let Some(idx) = queue.pop_front() { result.push(manifests[idx].clone()); for &dependent in &dependents[idx] { if in_degree[dependent] == usize::MAX { continue; // already poisoned by missing dep } in_degree[dependent] -= 1; if in_degree[dependent] == 0 { queue.push_back(dependent); } } } // Anything not in `result` is part of a cycle or has a missing dep if result.len() < manifests.len() { let loaded: HashSet<&str> = result.iter().map(|m| m.plugin.name.as_str()).collect(); for manifest in manifests { if !loaded.contains(manifest.plugin.name.as_str()) { warn!( "Plugin '{}' was skipped due to unresolved dependencies or a \ dependency cycle", manifest.plugin.name ); } } } result } /// Load a plugin from a manifest file /// /// # Errors /// /// Returns an error if the plugin ID is invalid, capability validation /// fails, the WASM binary cannot be loaded, or the plugin cannot be /// registered. async fn load_plugin_from_manifest( &self, manifest: &pinakes_plugin_api::PluginManifest, ) -> Result { let plugin_id = manifest.plugin_id(); // Validate plugin_id to prevent path traversal if plugin_id.contains('/') || plugin_id.contains('\\') || plugin_id.contains("..") { return Err(anyhow::anyhow!("Invalid plugin ID: {plugin_id}")); } // Check if already loaded { let registry = self.registry.read().await; if registry.is_loaded(&plugin_id) { return Ok(plugin_id); } } // Validate capabilities let capabilities = manifest.to_capabilities(); self.enforcer.validate_capabilities(&capabilities)?; // Create plugin context let plugin_data_dir = self.data_dir.join(&plugin_id); let plugin_cache_dir = self.cache_dir.join(&plugin_id); tokio::fs::create_dir_all(&plugin_data_dir).await?; tokio::fs::create_dir_all(&plugin_cache_dir).await?; let context = PluginContext { data_dir: plugin_data_dir, cache_dir: plugin_cache_dir, config: manifest .config .iter() .map(|(k, v)| { ( k.clone(), serde_json::to_value(v).unwrap_or_else(|e| { tracing::warn!( "failed to serialize config value for key {}: {}", k, e ); serde_json::Value::Null }), ) }) .collect(), capabilities: capabilities.clone(), }; // Load WASM binary let wasm_path = self.loader.resolve_wasm_path(manifest)?; // Verify plugin signature unless unsigned plugins are allowed if !self.config.allow_unsigned { let plugin_dir = wasm_path .parent() .ok_or_else(|| anyhow::anyhow!("WASM path has no parent directory"))?; let trusted_keys: Vec = self .config .trusted_keys .iter() .filter_map(|hex| { signature::parse_public_key(hex) .map_err(|e| warn!("Ignoring malformed trusted key: {e}")) .ok() }) .collect(); match signature::verify_plugin_signature( plugin_dir, &wasm_path, &trusted_keys, )? { SignatureStatus::Valid => { debug!("Plugin '{plugin_id}' signature verified"); }, SignatureStatus::Unsigned => { return Err(anyhow::anyhow!( "Plugin '{plugin_id}' is unsigned and allow_unsigned is false" )); }, SignatureStatus::Invalid(reason) => { return Err(anyhow::anyhow!( "Plugin '{plugin_id}' has an invalid signature: {reason}" )); }, } } let wasm_plugin = self.runtime.load_plugin(&wasm_path, context)?; // Initialize plugin let init_succeeded = match wasm_plugin .call_function("initialize", &[]) .await { Ok(_) => true, Err(e) => { tracing::warn!(plugin_id = %plugin_id, "plugin initialization failed: {}", e); false }, }; // Register plugin let metadata = PluginMetadata { id: plugin_id.clone(), name: manifest.plugin.name.clone(), version: manifest.plugin.version.clone(), author: manifest.plugin.author.clone().unwrap_or_default(), description: manifest .plugin .description .clone() .unwrap_or_default(), api_version: manifest.plugin.api_version.clone(), capabilities_required: capabilities, }; // Derive manifest_path from the loader's plugin directories let manifest_path = self .loader .get_plugin_dir(&manifest.plugin.name) .map(|dir| dir.join("plugin.toml")); let registered = RegisteredPlugin { id: plugin_id.clone(), metadata, wasm_plugin, manifest: manifest.clone(), manifest_path, enabled: init_succeeded, }; { let mut registry = self.registry.write().await; registry.register(registered)?; } Ok(plugin_id) } /// Install a plugin from a file or URL /// /// # Errors /// /// Returns an error if the plugin cannot be downloaded, the manifest cannot /// be read, or the plugin cannot be loaded. pub async fn install_plugin(&self, source: &str) -> Result { info!("Installing plugin from: {}", source); // Download/copy plugin to plugins directory let plugin_path = if source.starts_with("http://") || source.starts_with("https://") { // Download from URL self.loader.download_plugin(source).await? } else { // Copy from local file PathBuf::from(source) }; // Load the manifest let manifest_path = plugin_path.join("plugin.toml"); let manifest = pinakes_plugin_api::PluginManifest::from_file(&manifest_path)?; // Load the plugin self.load_plugin_from_manifest(&manifest).await } /// Uninstall a plugin /// /// # Errors /// /// Returns an error if the plugin ID is invalid, the plugin cannot be shut /// down, cannot be unregistered, or its data directories cannot be removed. pub async fn uninstall_plugin(&self, plugin_id: &str) -> Result<()> { // Validate plugin_id to prevent path traversal if plugin_id.contains('/') || plugin_id.contains('\\') || plugin_id.contains("..") { return Err(anyhow::anyhow!("Invalid plugin ID: {plugin_id}")); } info!("Uninstalling plugin: {}", plugin_id); // Shutdown plugin first self.shutdown_plugin(plugin_id).await?; // Remove from registry { let mut registry = self.registry.write().await; registry.unregister(plugin_id)?; } // Remove plugin data and cache let plugin_data_dir = self.data_dir.join(plugin_id); let plugin_cache_dir = self.cache_dir.join(plugin_id); if plugin_data_dir.exists() { std::fs::remove_dir_all(&plugin_data_dir)?; } if plugin_cache_dir.exists() { std::fs::remove_dir_all(&plugin_cache_dir)?; } Ok(()) } /// Enable a plugin /// /// # Errors /// /// Returns an error if the plugin ID is not found in the registry. pub async fn enable_plugin(&self, plugin_id: &str) -> Result<()> { let mut registry = self.registry.write().await; registry.enable(plugin_id) } /// Disable a plugin /// /// # Errors /// /// Returns an error if the plugin ID is not found in the registry. pub async fn disable_plugin(&self, plugin_id: &str) -> Result<()> { let mut registry = self.registry.write().await; registry.disable(plugin_id) } /// Shutdown a specific plugin /// /// # Errors /// /// Returns an error if the plugin ID is not found in the registry. pub async fn shutdown_plugin(&self, plugin_id: &str) -> Result<()> { debug!("Shutting down plugin: {}", plugin_id); let registry = self.registry.read().await; if let Some(plugin) = registry.get(plugin_id) { let _ = plugin.wasm_plugin.call_function("shutdown", &[]).await; Ok(()) } else { Err(anyhow::anyhow!("Plugin not found: {plugin_id}")) } } /// Shutdown all plugins /// /// # Errors /// /// This function always returns `Ok(())`. Individual plugin shutdown errors /// are logged but do not cause the overall operation to fail. pub async fn shutdown_all(&self) -> Result<()> { info!("Shutting down all plugins"); let plugin_ids: Vec = { let registry = self.registry.read().await; registry.list_all().iter().map(|p| p.id.clone()).collect() }; for plugin_id in plugin_ids { if let Err(e) = self.shutdown_plugin(&plugin_id).await { error!("Failed to shutdown plugin {}: {}", plugin_id, e); } } Ok(()) } /// Get list of all registered plugins pub async fn list_plugins(&self) -> Vec { let registry = self.registry.read().await; registry .list_all() .iter() .map(|p| p.metadata.clone()) .collect() } /// Get plugin metadata by ID pub async fn get_plugin(&self, plugin_id: &str) -> Option { let registry = self.registry.read().await; registry.get(plugin_id).map(|p| p.metadata.clone()) } /// Get enabled plugins of a specific kind, sorted by priority (ascending). /// /// # Returns /// /// `(plugin_id, priority, kinds, wasm_plugin)` tuples. pub async fn get_enabled_by_kind_sorted( &self, kind: &str, ) -> Vec<(String, u16, Vec, WasmPlugin)> { let registry = self.registry.read().await; let mut plugins: Vec<_> = registry .get_by_kind(kind) .into_iter() .filter(|p| p.enabled) .map(|p| { ( p.id.clone(), p.manifest.plugin.priority, p.manifest.plugin.kind.clone(), p.wasm_plugin.clone(), ) }) .collect(); drop(registry); plugins.sort_by_key(|(_, priority, ..)| *priority); plugins } /// Get a reference to the capability enforcer. #[must_use] pub const fn enforcer(&self) -> &CapabilityEnforcer { &self.enforcer } /// List all UI pages provided by loaded plugins. /// /// Returns a vector of `(plugin_id, page)` tuples for all enabled plugins /// that provide pages in their manifests. Both inline and file-referenced /// page entries are resolved. pub async fn list_ui_pages( &self, ) -> Vec<(String, pinakes_plugin_api::UiPage)> { self .list_ui_pages_with_endpoints() .await .into_iter() .map(|(id, page, _)| (id, page)) .collect() } /// List all UI pages provided by loaded plugins, including each plugin's /// declared endpoint allowlist. /// /// Returns a vector of `(plugin_id, page, allowed_endpoints)` tuples. The /// `allowed_endpoints` list mirrors the `required_endpoints` field from the /// plugin manifest's `[ui]` section. pub async fn list_ui_pages_with_endpoints( &self, ) -> Vec<(String, pinakes_plugin_api::UiPage, Vec)> { let registry = self.registry.read().await; let mut pages = Vec::new(); for plugin in registry.list_all() { if !plugin.enabled { continue; } let allowed = plugin.manifest.ui.required_endpoints.clone(); let plugin_dir = plugin .manifest_path .as_ref() .and_then(|p| p.parent()) .map(std::path::Path::to_path_buf); let Some(plugin_dir) = plugin_dir else { for entry in &plugin.manifest.ui.pages { if let pinakes_plugin_api::manifest::UiPageEntry::Inline(page) = entry { pages.push((plugin.id.clone(), (**page).clone(), allowed.clone())); } } continue; }; match plugin.manifest.load_ui_pages(&plugin_dir) { Ok(loaded) => { for page in loaded { pages.push((plugin.id.clone(), page, allowed.clone())); } }, Err(e) => { tracing::warn!( "Failed to load UI pages for plugin '{}': {e}", plugin.id ); }, } } pages } /// Collect CSS custom property overrides declared by all enabled plugins. /// /// When multiple plugins declare the same property name, later-loaded plugins /// overwrite earlier ones. Returns an empty map if no plugins are loaded or /// none declare theme extensions. pub async fn list_ui_theme_extensions( &self, ) -> std::collections::HashMap { let registry = self.registry.read().await; let mut merged = std::collections::HashMap::new(); for plugin in registry.list_all() { if !plugin.enabled { continue; } for (k, v) in &plugin.manifest.ui.theme_extensions { merged.insert(k.clone(), v.clone()); } } merged } /// List all UI widgets provided by loaded plugins. /// /// Returns a vector of `(plugin_id, widget)` tuples for all enabled plugins /// that provide widgets in their manifests. pub async fn list_ui_widgets( &self, ) -> Vec<(String, pinakes_plugin_api::UiWidget)> { let registry = self.registry.read().await; let mut widgets = Vec::new(); for plugin in registry.list_all() { if !plugin.enabled { continue; } for widget in &plugin.manifest.ui.widgets { widgets.push((plugin.id.clone(), widget.clone())); } } widgets } /// Check if a plugin is loaded and enabled pub async fn is_plugin_enabled(&self, plugin_id: &str) -> bool { let registry = self.registry.read().await; registry.is_enabled(plugin_id).unwrap_or(false) } /// Reload a plugin (for hot-reload during development) /// /// # Errors /// /// Returns an error if hot-reload is disabled, the plugin is not found, it /// cannot be shut down, or the reloaded plugin cannot be registered. pub async fn reload_plugin(&self, plugin_id: &str) -> Result<()> { if !self.config.enable_hot_reload { return Err(anyhow::anyhow!("Hot-reload is disabled")); } info!("Reloading plugin: {}", plugin_id); // Re-read the manifest from disk if possible, falling back to cached // version let manifest = { let registry = self.registry.read().await; let plugin = registry .get(plugin_id) .ok_or_else(|| anyhow::anyhow!("Plugin not found"))?; let manifest = plugin.manifest_path.as_ref().map_or_else( || plugin.manifest.clone(), |manifest_path| { pinakes_plugin_api::PluginManifest::from_file(manifest_path) .unwrap_or_else(|e| { warn!( "Failed to re-read manifest from disk, using cached: {}", e ); plugin.manifest.clone() }) }, ); drop(registry); manifest }; // Shutdown and unload current version self.shutdown_plugin(plugin_id).await?; { let mut registry = self.registry.write().await; registry.unregister(plugin_id)?; } // Reload from manifest self.load_plugin_from_manifest(&manifest).await?; Ok(()) } } #[cfg(test)] mod tests { use tempfile::TempDir; use super::*; #[tokio::test] async fn test_plugin_manager_creation() { let temp_dir = TempDir::new().unwrap(); let data_dir = temp_dir.path().join("data"); let cache_dir = temp_dir.path().join("cache"); let config = PluginManagerConfig::default(); let manager = PluginManager::new(data_dir.clone(), cache_dir.clone(), config); assert!(manager.is_ok()); assert!(data_dir.exists()); assert!(cache_dir.exists()); } #[tokio::test] async fn test_list_plugins_empty() { let temp_dir = TempDir::new().unwrap(); let data_dir = temp_dir.path().join("data"); let cache_dir = temp_dir.path().join("cache"); let config = PluginManagerConfig::default(); let manager = PluginManager::new(data_dir, cache_dir, config).unwrap(); let plugins = manager.list_plugins().await; assert_eq!(plugins.len(), 0); } /// Build a minimal manifest for dependency resolution tests fn test_manifest( name: &str, deps: Vec, ) -> pinakes_plugin_api::PluginManifest { use pinakes_plugin_api::manifest::{PluginBinary, PluginInfo}; pinakes_plugin_api::PluginManifest { plugin: PluginInfo { name: name.to_string(), version: "1.0.0".to_string(), api_version: "1.0".to_string(), author: None, description: None, homepage: None, license: None, priority: 500, kind: vec!["media_type".to_string()], binary: PluginBinary { wasm: "plugin.wasm".to_string(), entrypoint: None, }, dependencies: deps, }, capabilities: Default::default(), config: Default::default(), ui: Default::default(), } } #[test] fn test_resolve_load_order_no_deps() { let manifests = vec![ test_manifest("alpha", vec![]), test_manifest("beta", vec![]), test_manifest("gamma", vec![]), ]; let ordered = PluginManager::resolve_load_order(&manifests); assert_eq!(ordered.len(), 3); } #[test] fn test_resolve_load_order_linear_chain() { // gamma depends on beta, beta depends on alpha let manifests = vec![ test_manifest("gamma", vec!["beta".to_string()]), test_manifest("alpha", vec![]), test_manifest("beta", vec!["alpha".to_string()]), ]; let ordered = PluginManager::resolve_load_order(&manifests); assert_eq!(ordered.len(), 3); let names: Vec<&str> = ordered.iter().map(|m| m.plugin.name.as_str()).collect(); let alpha_pos = names.iter().position(|&n| n == "alpha").unwrap(); let beta_pos = names.iter().position(|&n| n == "beta").unwrap(); let gamma_pos = names.iter().position(|&n| n == "gamma").unwrap(); assert!(alpha_pos < beta_pos, "alpha must load before beta"); assert!(beta_pos < gamma_pos, "beta must load before gamma"); } #[test] fn test_resolve_load_order_cycle_detected() { // A -> B -> C -> A (cycle) let manifests = vec![ test_manifest("a", vec!["c".to_string()]), test_manifest("b", vec!["a".to_string()]), test_manifest("c", vec!["b".to_string()]), ]; let ordered = PluginManager::resolve_load_order(&manifests); // All three should be excluded due to cycle assert_eq!(ordered.len(), 0); } #[test] fn test_resolve_load_order_missing_dependency() { let manifests = vec![ test_manifest("good", vec![]), test_manifest("bad", vec!["nonexistent".to_string()]), ]; let ordered = PluginManager::resolve_load_order(&manifests); // Only "good" should be loaded; "bad" depends on something missing assert_eq!(ordered.len(), 1); assert_eq!(ordered[0].plugin.name, "good"); } #[test] fn test_resolve_load_order_partial_cycle() { // "ok" has no deps, "cycle_a" and "cycle_b" form a cycle let manifests = vec![ test_manifest("ok", vec![]), test_manifest("cycle_a", vec!["cycle_b".to_string()]), test_manifest("cycle_b", vec!["cycle_a".to_string()]), ]; let ordered = PluginManager::resolve_load_order(&manifests); assert_eq!(ordered.len(), 1); assert_eq!(ordered[0].plugin.name, "ok"); } #[test] fn test_resolve_load_order_diamond() { // Man look at how beautiful my diamond is... // A // / \ // B C // \ / // D let manifests = vec![ test_manifest("d", vec!["b".to_string(), "c".to_string()]), test_manifest("b", vec!["a".to_string()]), test_manifest("c", vec!["a".to_string()]), test_manifest("a", vec![]), ]; let ordered = PluginManager::resolve_load_order(&manifests); assert_eq!(ordered.len(), 4); let names: Vec<&str> = ordered.iter().map(|m| m.plugin.name.as_str()).collect(); let a_pos = names.iter().position(|&n| n == "a").unwrap(); let b_pos = names.iter().position(|&n| n == "b").unwrap(); let c_pos = names.iter().position(|&n| n == "c").unwrap(); let d_pos = names.iter().position(|&n| n == "d").unwrap(); assert!(a_pos < b_pos); assert!(a_pos < c_pos); assert!(b_pos < d_pos); assert!(c_pos < d_pos); } }