pinakes-core: add plugin pipeline; impl signature verification & dependency resolution
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ida98135cf868db0f5a46a64b8ac562366a6a6964
This commit is contained in:
parent
8347a714d2
commit
4edda201e6
12 changed files with 2784 additions and 36 deletions
|
|
@ -19,14 +19,19 @@ use tokio::sync::RwLock;
|
|||
use tracing::{debug, error, info, warn};
|
||||
|
||||
pub mod loader;
|
||||
pub mod pipeline;
|
||||
pub mod registry;
|
||||
pub mod rpc;
|
||||
pub mod runtime;
|
||||
pub mod security;
|
||||
pub mod signature;
|
||||
|
||||
pub use loader::PluginLoader;
|
||||
pub use pipeline::PluginPipeline;
|
||||
pub use registry::{PluginRegistry, RegisteredPlugin};
|
||||
pub use runtime::{WasmPlugin, WasmRuntime};
|
||||
pub use security::CapabilityEnforcer;
|
||||
pub use signature::{SignatureStatus, verify_plugin_signature};
|
||||
|
||||
/// Plugin manager coordinates plugin lifecycle and operations
|
||||
pub struct PluginManager {
|
||||
|
|
@ -69,16 +74,28 @@ pub struct PluginManagerConfig {
|
|||
|
||||
/// Plugin timeout in seconds
|
||||
pub plugin_timeout_secs: u64,
|
||||
|
||||
/// Timeout configuration for different call types
|
||||
pub timeouts: crate::config::PluginTimeoutConfig,
|
||||
|
||||
/// Max consecutive failures before circuit breaker disables plugin
|
||||
pub max_consecutive_failures: u32,
|
||||
|
||||
/// Trusted Ed25519 public keys for signature verification (hex-encoded)
|
||||
pub trusted_keys: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for PluginManagerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
plugin_dirs: vec![],
|
||||
enable_hot_reload: false,
|
||||
allow_unsigned: false,
|
||||
max_concurrent_ops: 4,
|
||||
plugin_timeout_secs: 30,
|
||||
plugin_dirs: vec![],
|
||||
enable_hot_reload: false,
|
||||
allow_unsigned: false,
|
||||
max_concurrent_ops: 4,
|
||||
plugin_timeout_secs: 30,
|
||||
timeouts: crate::config::PluginTimeoutConfig::default(),
|
||||
max_consecutive_failures: 5,
|
||||
trusted_keys: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -86,11 +103,14 @@ impl Default for PluginManagerConfig {
|
|||
impl From<crate::config::PluginsConfig> for PluginManagerConfig {
|
||||
fn from(cfg: crate::config::PluginsConfig) -> Self {
|
||||
Self {
|
||||
plugin_dirs: cfg.plugin_dirs,
|
||||
enable_hot_reload: cfg.enable_hot_reload,
|
||||
allow_unsigned: cfg.allow_unsigned,
|
||||
max_concurrent_ops: cfg.max_concurrent_ops,
|
||||
plugin_timeout_secs: cfg.plugin_timeout_secs,
|
||||
plugin_dirs: cfg.plugin_dirs,
|
||||
enable_hot_reload: cfg.enable_hot_reload,
|
||||
allow_unsigned: cfg.allow_unsigned,
|
||||
max_concurrent_ops: cfg.max_concurrent_ops,
|
||||
plugin_timeout_secs: cfg.plugin_timeout_secs,
|
||||
timeouts: cfg.timeouts,
|
||||
max_consecutive_failures: cfg.max_consecutive_failures,
|
||||
trusted_keys: cfg.trusted_keys,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -127,7 +147,12 @@ impl PluginManager {
|
|||
})
|
||||
}
|
||||
|
||||
/// Discover and load all plugins from configured directories
|
||||
/// Discover and load all plugins from configured directories.
|
||||
///
|
||||
/// Plugins are loaded in dependency order: if plugin A declares a
|
||||
/// dependency on plugin B, B is loaded first. Cycles and missing
|
||||
/// dependencies are detected and reported as warnings; affected plugins
|
||||
/// are skipped rather than causing a hard failure.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
|
|
@ -136,9 +161,10 @@ impl PluginManager {
|
|||
info!("Discovering plugins from {:?}", self.config.plugin_dirs);
|
||||
|
||||
let manifests = self.loader.discover_plugins()?;
|
||||
let ordered = Self::resolve_load_order(&manifests);
|
||||
let mut loaded_plugins = Vec::new();
|
||||
|
||||
for manifest in manifests {
|
||||
for manifest in ordered {
|
||||
match self.load_plugin_from_manifest(&manifest).await {
|
||||
Ok(plugin_id) => {
|
||||
info!("Loaded plugin: {}", plugin_id);
|
||||
|
|
@ -153,6 +179,93 @@ impl PluginManager {
|
|||
Ok(loaded_plugins)
|
||||
}
|
||||
|
||||
/// Topological sort of manifests by their declared `dependencies`.
|
||||
///
|
||||
/// Uses Kahn's algorithm. Plugins whose dependencies are missing or form
|
||||
/// a cycle are logged as warnings and excluded from the result.
|
||||
fn resolve_load_order(
|
||||
manifests: &[pinakes_plugin_api::PluginManifest],
|
||||
) -> Vec<pinakes_plugin_api::PluginManifest> {
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
|
||||
// Index manifests by name for O(1) lookup
|
||||
let by_name: HashMap<&str, usize> = manifests
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, m)| (m.plugin.name.as_str(), i))
|
||||
.collect();
|
||||
|
||||
// Check for missing dependencies and warn early
|
||||
let known: HashSet<&str> = by_name.keys().copied().collect();
|
||||
for manifest in manifests {
|
||||
for dep in &manifest.plugin.dependencies {
|
||||
if !known.contains(dep.as_str()) {
|
||||
warn!(
|
||||
"Plugin '{}' depends on '{}' which was not discovered; it will be \
|
||||
skipped",
|
||||
manifest.plugin.name, dep
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build adjacency: in_degree[i] = number of deps that must load before i
|
||||
let mut in_degree = vec![0usize; manifests.len()];
|
||||
// dependents[i] = indices that depend on i (i must load before them)
|
||||
let mut dependents: Vec<Vec<usize>> = vec![vec![]; manifests.len()];
|
||||
|
||||
for (i, manifest) in manifests.iter().enumerate() {
|
||||
for dep in &manifest.plugin.dependencies {
|
||||
if let Some(&dep_idx) = by_name.get(dep.as_str()) {
|
||||
in_degree[i] += 1;
|
||||
dependents[dep_idx].push(i);
|
||||
} else {
|
||||
// Missing dep: set in_degree impossibly high so it never resolves
|
||||
in_degree[i] = usize::MAX;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Kahn's algorithm
|
||||
let mut queue: VecDeque<usize> = VecDeque::new();
|
||||
for (i, °) in in_degree.iter().enumerate() {
|
||||
if deg == 0 {
|
||||
queue.push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
let mut result = Vec::with_capacity(manifests.len());
|
||||
while let Some(idx) = queue.pop_front() {
|
||||
result.push(manifests[idx].clone());
|
||||
for &dependent in &dependents[idx] {
|
||||
if in_degree[dependent] == usize::MAX {
|
||||
continue; // already poisoned by missing dep
|
||||
}
|
||||
in_degree[dependent] -= 1;
|
||||
if in_degree[dependent] == 0 {
|
||||
queue.push_back(dependent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Anything not in `result` is part of a cycle or has a missing dep
|
||||
if result.len() < manifests.len() {
|
||||
let loaded: HashSet<&str> =
|
||||
result.iter().map(|m| m.plugin.name.as_str()).collect();
|
||||
for manifest in manifests {
|
||||
if !loaded.contains(manifest.plugin.name.as_str()) {
|
||||
warn!(
|
||||
"Plugin '{}' was skipped due to unresolved dependencies or a \
|
||||
dependency cycle",
|
||||
manifest.plugin.name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Load a plugin from a manifest file
|
||||
///
|
||||
/// # Errors
|
||||
|
|
@ -217,6 +330,45 @@ impl PluginManager {
|
|||
|
||||
// Load WASM binary
|
||||
let wasm_path = self.loader.resolve_wasm_path(manifest)?;
|
||||
|
||||
// Verify plugin signature unless unsigned plugins are allowed
|
||||
if !self.config.allow_unsigned {
|
||||
let plugin_dir = wasm_path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("WASM path has no parent directory"))?;
|
||||
|
||||
let trusted_keys: Vec<ed25519_dalek::VerifyingKey> = self
|
||||
.config
|
||||
.trusted_keys
|
||||
.iter()
|
||||
.filter_map(|hex| {
|
||||
signature::parse_public_key(hex)
|
||||
.map_err(|e| warn!("Ignoring malformed trusted key: {e}"))
|
||||
.ok()
|
||||
})
|
||||
.collect();
|
||||
|
||||
match signature::verify_plugin_signature(
|
||||
plugin_dir,
|
||||
&wasm_path,
|
||||
&trusted_keys,
|
||||
)? {
|
||||
SignatureStatus::Valid => {
|
||||
debug!("Plugin '{plugin_id}' signature verified");
|
||||
},
|
||||
SignatureStatus::Unsigned => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Plugin '{plugin_id}' is unsigned and allow_unsigned is false"
|
||||
));
|
||||
},
|
||||
SignatureStatus::Invalid(reason) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Plugin '{plugin_id}' has an invalid signature: {reason}"
|
||||
));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
let wasm_plugin = self.runtime.load_plugin(&wasm_path, context)?;
|
||||
|
||||
// Initialize plugin
|
||||
|
|
@ -413,6 +565,40 @@ impl PluginManager {
|
|||
registry.get(plugin_id).map(|p| p.metadata.clone())
|
||||
}
|
||||
|
||||
/// Get enabled plugins of a specific kind, sorted by priority (ascending).
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// `(plugin_id, priority, kinds, wasm_plugin)` tuples.
|
||||
pub async fn get_enabled_by_kind_sorted(
|
||||
&self,
|
||||
kind: &str,
|
||||
) -> Vec<(String, u16, Vec<String>, WasmPlugin)> {
|
||||
let registry = self.registry.read().await;
|
||||
let mut plugins: Vec<_> = registry
|
||||
.get_by_kind(kind)
|
||||
.into_iter()
|
||||
.filter(|p| p.enabled)
|
||||
.map(|p| {
|
||||
(
|
||||
p.id.clone(),
|
||||
p.manifest.plugin.priority,
|
||||
p.manifest.plugin.kind.clone(),
|
||||
p.wasm_plugin.clone(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
drop(registry);
|
||||
plugins.sort_by_key(|(_, priority, ..)| *priority);
|
||||
plugins
|
||||
}
|
||||
|
||||
/// Get a reference to the capability enforcer.
|
||||
#[must_use]
|
||||
pub const fn enforcer(&self) -> &CapabilityEnforcer {
|
||||
&self.enforcer
|
||||
}
|
||||
|
||||
/// Check if a plugin is loaded and enabled
|
||||
pub async fn is_plugin_enabled(&self, plugin_id: &str) -> bool {
|
||||
let registry = self.registry.read().await;
|
||||
|
|
@ -503,4 +689,137 @@ mod tests {
|
|||
let plugins = manager.list_plugins().await;
|
||||
assert_eq!(plugins.len(), 0);
|
||||
}
|
||||
|
||||
/// Build a minimal manifest for dependency resolution tests
|
||||
fn test_manifest(
|
||||
name: &str,
|
||||
deps: Vec<String>,
|
||||
) -> pinakes_plugin_api::PluginManifest {
|
||||
use pinakes_plugin_api::manifest::{PluginBinary, PluginInfo};
|
||||
|
||||
pinakes_plugin_api::PluginManifest {
|
||||
plugin: PluginInfo {
|
||||
name: name.to_string(),
|
||||
version: "1.0.0".to_string(),
|
||||
api_version: "1.0".to_string(),
|
||||
author: None,
|
||||
description: None,
|
||||
homepage: None,
|
||||
license: None,
|
||||
priority: 500,
|
||||
kind: vec!["media_type".to_string()],
|
||||
binary: PluginBinary {
|
||||
wasm: "plugin.wasm".to_string(),
|
||||
entrypoint: None,
|
||||
},
|
||||
dependencies: deps,
|
||||
},
|
||||
capabilities: Default::default(),
|
||||
config: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_load_order_no_deps() {
|
||||
let manifests = vec![
|
||||
test_manifest("alpha", vec![]),
|
||||
test_manifest("beta", vec![]),
|
||||
test_manifest("gamma", vec![]),
|
||||
];
|
||||
|
||||
let ordered = PluginManager::resolve_load_order(&manifests);
|
||||
assert_eq!(ordered.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_load_order_linear_chain() {
|
||||
// gamma depends on beta, beta depends on alpha
|
||||
let manifests = vec![
|
||||
test_manifest("gamma", vec!["beta".to_string()]),
|
||||
test_manifest("alpha", vec![]),
|
||||
test_manifest("beta", vec!["alpha".to_string()]),
|
||||
];
|
||||
|
||||
let ordered = PluginManager::resolve_load_order(&manifests);
|
||||
assert_eq!(ordered.len(), 3);
|
||||
|
||||
let names: Vec<&str> =
|
||||
ordered.iter().map(|m| m.plugin.name.as_str()).collect();
|
||||
let alpha_pos = names.iter().position(|&n| n == "alpha").unwrap();
|
||||
let beta_pos = names.iter().position(|&n| n == "beta").unwrap();
|
||||
let gamma_pos = names.iter().position(|&n| n == "gamma").unwrap();
|
||||
assert!(alpha_pos < beta_pos, "alpha must load before beta");
|
||||
assert!(beta_pos < gamma_pos, "beta must load before gamma");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_load_order_cycle_detected() {
|
||||
// A -> B -> C -> A (cycle)
|
||||
let manifests = vec![
|
||||
test_manifest("a", vec!["c".to_string()]),
|
||||
test_manifest("b", vec!["a".to_string()]),
|
||||
test_manifest("c", vec!["b".to_string()]),
|
||||
];
|
||||
|
||||
let ordered = PluginManager::resolve_load_order(&manifests);
|
||||
// All three should be excluded due to cycle
|
||||
assert_eq!(ordered.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_load_order_missing_dependency() {
|
||||
let manifests = vec![
|
||||
test_manifest("good", vec![]),
|
||||
test_manifest("bad", vec!["nonexistent".to_string()]),
|
||||
];
|
||||
|
||||
let ordered = PluginManager::resolve_load_order(&manifests);
|
||||
// Only "good" should be loaded; "bad" depends on something missing
|
||||
assert_eq!(ordered.len(), 1);
|
||||
assert_eq!(ordered[0].plugin.name, "good");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_load_order_partial_cycle() {
|
||||
// "ok" has no deps, "cycle_a" and "cycle_b" form a cycle
|
||||
let manifests = vec![
|
||||
test_manifest("ok", vec![]),
|
||||
test_manifest("cycle_a", vec!["cycle_b".to_string()]),
|
||||
test_manifest("cycle_b", vec!["cycle_a".to_string()]),
|
||||
];
|
||||
|
||||
let ordered = PluginManager::resolve_load_order(&manifests);
|
||||
assert_eq!(ordered.len(), 1);
|
||||
assert_eq!(ordered[0].plugin.name, "ok");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_load_order_diamond() {
|
||||
// Man look at how beautiful my diamond is...
|
||||
// A
|
||||
// / \
|
||||
// B C
|
||||
// \ /
|
||||
// D
|
||||
let manifests = vec![
|
||||
test_manifest("d", vec!["b".to_string(), "c".to_string()]),
|
||||
test_manifest("b", vec!["a".to_string()]),
|
||||
test_manifest("c", vec!["a".to_string()]),
|
||||
test_manifest("a", vec![]),
|
||||
];
|
||||
|
||||
let ordered = PluginManager::resolve_load_order(&manifests);
|
||||
assert_eq!(ordered.len(), 4);
|
||||
|
||||
let names: Vec<&str> =
|
||||
ordered.iter().map(|m| m.plugin.name.as_str()).collect();
|
||||
let a_pos = names.iter().position(|&n| n == "a").unwrap();
|
||||
let b_pos = names.iter().position(|&n| n == "b").unwrap();
|
||||
let c_pos = names.iter().position(|&n| n == "c").unwrap();
|
||||
let d_pos = names.iter().position(|&n| n == "d").unwrap();
|
||||
assert!(a_pos < b_pos);
|
||||
assert!(a_pos < c_pos);
|
||||
assert!(b_pos < d_pos);
|
||||
assert!(c_pos < d_pos);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue