From 99b3c01d2241677f86cb46f7a7d62e4eeed0c5db Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 00:42:41 +0300 Subject: [PATCH 01/10] chore: tag 0.3.0-dev Signed-off-by: NotAShelf Change-Id: I46e0922237f99736aec4b11ecb84b12a6a6a6964 --- Cargo.lock | 10 +++---- Cargo.toml | 84 +++++++++++++++++++++++++++--------------------------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1431626..a39cbca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5315,7 +5315,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pinakes-core" -version = "0.2.0-dev" +version = "0.3.0-dev" dependencies = [ "anyhow", "argon2", @@ -5360,7 +5360,7 @@ dependencies = [ [[package]] name = "pinakes-plugin-api" -version = "0.2.0-dev" +version = "0.3.0-dev" dependencies = [ "async-trait", "chrono", @@ -5376,7 +5376,7 @@ dependencies = [ [[package]] name = "pinakes-server" -version = "0.2.0-dev" +version = "0.3.0-dev" dependencies = [ "anyhow", "argon2", @@ -5409,7 +5409,7 @@ dependencies = [ [[package]] name = "pinakes-tui" -version = "0.2.0-dev" +version = "0.3.0-dev" dependencies = [ "anyhow", "chrono", @@ -5428,7 +5428,7 @@ dependencies = [ [[package]] name = "pinakes-ui" -version = "0.2.0-dev" +version = "0.3.0-dev" dependencies = [ "ammonia", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index e287f8a..52a67a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "3" [workspace.package] edition = "2024" # keep in sync with .rustfmt.toml -version = "0.2.0-dev" +version = "0.3.0-dev" license = "EUPL-1.2" readme = true rust-version = "1.95.0" # follows nightly Rust @@ -139,46 +139,46 @@ wit-bindgen = "0.53.1" # See: # [workspace.lints.clippy] -cargo = { level = "warn", priority = -1 } +cargo = { level = "warn", priority = -1 } complexity = { level = "warn", priority = -1 } -nursery = { level = "warn", priority = -1 } -pedantic = { level = "warn", priority = -1 } -perf = { level = "warn", priority = -1 } -style = { level = "warn", priority = -1 } +nursery = { level = "warn", priority = -1 } +pedantic = { level = "warn", priority = -1 } +perf = { level = "warn", priority = -1 } +style = { level = "warn", priority = -1 } # The lint groups above enable some less-than-desirable rules, we should manually # enable those to keep our sanity. -absolute_paths = "allow" -arbitrary_source_item_ordering = "allow" -clone_on_ref_ptr = "warn" -dbg_macro = "warn" -empty_drop = "warn" -empty_structs_with_brackets = "warn" -exit = "warn" -filetype_is_file = "warn" -get_unwrap = "warn" -implicit_return = "allow" -infinite_loop = "warn" +absolute_paths = "allow" +arbitrary_source_item_ordering = "allow" +clone_on_ref_ptr = "warn" +dbg_macro = "warn" +empty_drop = "warn" +empty_structs_with_brackets = "warn" +exit = "warn" +filetype_is_file = "warn" +get_unwrap = "warn" +implicit_return = "allow" +infinite_loop = "warn" map_with_unused_argument_over_ranges = "warn" -missing_docs_in_private_items = "allow" -multiple_crate_versions = "allow" # :( -non_ascii_literal = "allow" -non_std_lazy_statics = "warn" -pathbuf_init_then_push = "warn" -pattern_type_mismatch = "allow" -question_mark_used = "allow" -rc_buffer = "warn" -rc_mutex = "warn" -rest_pat_in_fully_bound_structs = "warn" -similar_names = "allow" -single_call_fn = "allow" -std_instead_of_core = "allow" -too_long_first_doc_paragraph = "allow" -too_many_lines = "allow" -undocumented_unsafe_blocks = "warn" -unnecessary_safety_comment = "warn" -unused_result_ok = "warn" -unused_trait_names = "allow" +missing_docs_in_private_items = "allow" +multiple_crate_versions = "allow" # :( +non_ascii_literal = "allow" +non_std_lazy_statics = "warn" +pathbuf_init_then_push = "warn" +pattern_type_mismatch = "allow" +question_mark_used = "allow" +rc_buffer = "warn" +rc_mutex = "warn" +rest_pat_in_fully_bound_structs = "warn" +similar_names = "allow" +single_call_fn = "allow" +std_instead_of_core = "allow" +too_long_first_doc_paragraph = "allow" +too_many_lines = "allow" +undocumented_unsafe_blocks = "warn" +unnecessary_safety_comment = "warn" +unused_result_ok = "warn" +unused_trait_names = "allow" # False positive: # clippy's build script check doesn't recognize workspace-inherited metadata @@ -186,17 +186,17 @@ unused_trait_names = "allow" cargo_common_metadata = "allow" # In the honor of a recent Cloudflare regression -panic = "deny" +panic = "deny" unwrap_used = "deny" # Less dangerous, but we'd like to know # Those must be opt-in, and are fine ONLY in tests and examples. -expect_used = "warn" -print_stderr = "warn" -print_stdout = "warn" -todo = "warn" +expect_used = "warn" +print_stderr = "warn" +print_stdout = "warn" +todo = "warn" unimplemented = "warn" -unreachable = "warn" +unreachable = "warn" [profile.dev.package] blake3 = { opt-level = 3 } From cb10c84809008135069a11d449dbb60392ae650d Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 01:06:57 +0300 Subject: [PATCH 02/10] chore: update example config with ratelimit opts; format TOML Signed-off-by: NotAShelf Change-Id: Iefb657f4564caa8fd9a0ec375522c4726a6a6964 --- .cargo/config.toml | 8 ++--- .deny.toml | 76 ++++++++++++++++++++++---------------------- .rustfmt.toml | 47 ++++++++++++++------------- pinakes.example.toml | 37 ++++++++++++++++++--- 4 files changed, 96 insertions(+), 72 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index bf15bbc..abd0872 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -3,9 +3,9 @@ build-std = ["std", "panic_abort", "core", "alloc"] [build] rustflags = [ - "-Clto", - "-Zvirtual-function-elimination", - "-Zlocation-detail=none", + "-Clto", + "-Zvirtual-function-elimination", + "-Zlocation-detail=none", ] @@ -23,5 +23,3 @@ codegen-units = 1 panic = "abort" strip = true incremental = false - - diff --git a/.deny.toml b/.deny.toml index 06a92b5..5613325 100644 --- a/.deny.toml +++ b/.deny.toml @@ -23,13 +23,13 @@ # dependencies not shared by any other crates, would be ignored, as the target # list here is effectively saying which targets you are building for. targets = [ - # The triple can be any string, but only the target triples built in to - # rustc (as of 1.40) can be checked against actual config expressions - #"x86_64-unknown-linux-musl", - # You can also specify which target_features you promise are enabled for a - # particular target. target_features are currently not validated against - # the actual valid features supported by the target architecture. - #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, + # The triple can be any string, but only the target triples built in to + # rustc (as of 1.40) can be checked against actual config expressions + #"x86_64-unknown-linux-musl", + # You can also specify which target_features you promise are enabled for a + # particular target. target_features are currently not validated against + # the actual valid features supported by the target architecture. + #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, ] # When creating the dependency graph used as the source of truth when checks are # executed, this field can be used to prune crates from the graph, removing them @@ -70,23 +70,23 @@ feature-depth = 1 # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - # Dioxus pulls a whole bunch of GTK3 dependencies that are all deprecated and - # marked insecure. Unfortunately, there doesn't seem to be a GTK4 migration - # in sight, so we'll have to ignore them for now. - { id = "RUSTSEC-2024-0370", reason = "Used by GTK3 and there is no alternative!"}, - { id = "RUSTSEC-2024-0411", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0412", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0413", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0415", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0416", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0418", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0419", reason = "Used by Dioxus and there is no alternative!"}, - { id = "RUSTSEC-2024-0420", reason = "Used by Dioxus and there is no alternative!"} + # Dioxus pulls a whole bunch of GTK3 dependencies that are all deprecated and + # marked insecure. Unfortunately, there doesn't seem to be a GTK4 migration + # in sight, so we'll have to ignore them for now. + { id = "RUSTSEC-2024-0370", reason = "Used by GTK3 and there is no alternative!" }, + { id = "RUSTSEC-2024-0411", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0412", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0413", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0415", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0416", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0418", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0419", reason = "Used by Dioxus and there is no alternative!" }, + { id = "RUSTSEC-2024-0420", reason = "Used by Dioxus and there is no alternative!" }, - #"RUSTSEC-0000-0000", - #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, - #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish - #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" }, + #"RUSTSEC-0000-0000", + #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, + #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish + #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" }, ] # If this is true, then cargo deny will use the git executable to fetch advisory database. # If this is false, then it uses a built-in git library. @@ -120,9 +120,9 @@ confidence-threshold = 0.8 # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list exceptions = [ - # Each entry is the crate and version constraint, and its specific allow - # list - #{ allow = ["Zlib"], crate = "adler32" }, + # Each entry is the crate and version constraint, and its specific allow + # list + #{ allow = ["Zlib"], crate = "adler32" }, ] # Some crates don't have (easily) machine readable licensing information, @@ -153,7 +153,7 @@ ignore = false # is only published to private registries, and ignore is true, the crate will # not have its license(s) checked registries = [ - #"https://sekretz.com/registry + #"https://sekretz.com/registry ] # This section is considered when running `cargo deny check bans`. @@ -180,8 +180,8 @@ workspace-default-features = "allow" external-default-features = "allow" # List of crates that are allowed. Use with care! allow = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" }, ] # If true, workspace members are automatically allowed even when using deny-by-default # This is useful for organizations that want to deny all external dependencies by default @@ -189,11 +189,11 @@ allow = [ allow-workspace = false # List of crates to deny deny = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" }, - # Wrapper crates can optionally be specified to allow the crate when it - # is a direct dependency of the otherwise banned crate - #{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" }, + # Wrapper crates can optionally be specified to allow the crate when it + # is a direct dependency of the otherwise banned crate + #{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] }, ] # List of features to allow/deny @@ -221,16 +221,16 @@ deny = [ # Certain crates/versions that will be skipped when doing duplicate detection. skip = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" }, ] # Similarly to `skip` allows you to skip certain crates during duplicate # detection. Unlike skip, it also includes the entire tree of transitive # dependencies starting at the specified crate, up to a certain depth, which is # by default infinite. skip-tree = [ - #"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies - #{ crate = "ansi_term@0.11.0", depth = 20 }, + #"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies + #{ crate = "ansi_term@0.11.0", depth = 20 }, ] # This section is considered when running `cargo deny check sources`. diff --git a/.rustfmt.toml b/.rustfmt.toml index 8af4b10..324bf8b 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,27 +1,26 @@ -condense_wildcard_suffixes = true +condense_wildcard_suffixes = true doc_comment_code_block_width = 80 -edition = "2024" # Keep in sync with Cargo.toml. +edition = "2024" # Keep in sync with Cargo.toml. enum_discrim_align_threshold = 60 -force_explicit_abi = false -force_multiline_blocks = true -format_code_in_doc_comments = true -format_macro_matchers = true -format_strings = true -group_imports = "StdExternalCrate" -hex_literal_case = "Upper" -imports_granularity = "Crate" -imports_layout = "HorizontalVertical" -inline_attribute_width = 60 -match_block_trailing_comma = true -max_width = 80 -newline_style = "Unix" -normalize_comments = true -normalize_doc_attributes = true -overflow_delimited_expr = true +force_explicit_abi = false +force_multiline_blocks = true +format_code_in_doc_comments = true +format_macro_matchers = true +format_strings = true +group_imports = "StdExternalCrate" +hex_literal_case = "Upper" +imports_granularity = "Crate" +imports_layout = "HorizontalVertical" +inline_attribute_width = 60 +match_block_trailing_comma = true +max_width = 80 +newline_style = "Unix" +normalize_comments = true +normalize_doc_attributes = true +overflow_delimited_expr = true struct_field_align_threshold = 60 -tab_spaces = 2 -unstable_features = true -use_field_init_shorthand = true -use_try_shorthand = true -wrap_comments = true - +tab_spaces = 2 +unstable_features = true +use_field_init_shorthand = true +use_try_shorthand = true +wrap_comments = true diff --git a/pinakes.example.toml b/pinakes.example.toml index 57c9e38..0a3c7c7 100644 --- a/pinakes.example.toml +++ b/pinakes.example.toml @@ -95,10 +95,9 @@ port = 3000 # session_timeout_secs = 86400 # Enable CORS (Cross-Origin Resource Sharing) +# When enabled with origins, replaces default localhost origins # Default: false # cors_enabled = true - -# Allowed CORS origins (if CORS is enabled) # cors_origins = ["http://localhost:5173", "https://app.example.com"] # Enable TLS/HTTPS @@ -198,6 +197,10 @@ sidebar_collapsed = false ## User Accounts & Authentication [accounts] +# Session expiry in hours +# Default: 24 +# session_expiry_hours = 24 + # Require email verification for new accounts # Default: false # require_email_verification = false @@ -222,6 +225,22 @@ sidebar_collapsed = false # Default: 900 (15 minutes) # lockout_duration_secs = 900 +## Rate Limiting Configuration +## All rate limits are per-IP. Values control token bucket parameters: +## per_second = interval in seconds between token replenishment +## burst_size = maximum tokens (concurrent requests) allowed +# [rate_limits] +# global_per_second = 1 # ~100 req/sec with burst_size=100 +# global_burst_size = 100 +# login_per_second = 12 # ~5 req/min with burst_size=5 +# login_burst_size = 5 +# search_per_second = 6 # ~10 req/min with burst_size=10 +# search_burst_size = 10 +# stream_per_second = 60 # 1 per minute, max 5 concurrent +# stream_burst_size = 5 +# share_per_second = 2 # Share token access rate limit +# share_burst_size = 20 + ## Background Jobs Configuration [jobs] # Number of concurrent background job workers @@ -232,6 +251,11 @@ worker_count = 2 # Default: 60 cache_ttl_secs = 60 +# Maximum time a job can run before being cancelled (in seconds) +# Set to 0 to disable timeout +# Default: 3600 (1 hour) +# job_timeout_secs = 3600 + ## Metadata Enrichment Configuration [enrichment] # Enable automatic metadata enrichment from online sources @@ -333,11 +357,14 @@ enabled = false # retention_days = 90 ## Webhook Configuration -# Send HTTP notifications for events +# Send HTTP notifications for events. +# Supported events: media.created, media.updated, media.deleted, +# scan.completed, import.completed, test +# Use "*" to receive all events. # [[webhooks]] # url = "https://example.com/webhook" -# events = ["media.imported", "media.deleted", "tag.created"] -# secret = "webhook-secret-for-signature" # Optional HMAC secret +# events = ["media.created", "media.deleted", "scan.completed"] +# secret = "webhook-secret-for-signature" # Optional BLAKE3 HMAC secret ## Scheduled Tasks # Configure periodic background tasks From 8347a714d283721175a937990d791a004e85b4b6 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 01:07:43 +0300 Subject: [PATCH 03/10] pinakes-plugin-api: extend manifest with dependencies; basic WASM exchange buffer Signed-off-by: NotAShelf Change-Id: I60c0607fe27092a43826ac956e20a9a16a6a6964 --- crates/pinakes-plugin-api/src/manifest.rs | 72 ++++++++++++++++++++++- crates/pinakes-plugin-api/src/wasm.rs | 3 + 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/crates/pinakes-plugin-api/src/manifest.rs b/crates/pinakes-plugin-api/src/manifest.rs index 0b6d9f4..a334fba 100644 --- a/crates/pinakes-plugin-api/src/manifest.rs +++ b/crates/pinakes-plugin-api/src/manifest.rs @@ -24,6 +24,10 @@ pub struct PluginManifest { pub config: HashMap, } +const fn default_priority() -> u16 { + 500 +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PluginInfo { pub name: String, @@ -34,6 +38,11 @@ pub struct PluginInfo { pub homepage: Option, pub license: Option, + /// Pipeline priority (0-999). Lower values run first. Built-in handlers run + /// at 100. Default: 500. + #[serde(default = "default_priority")] + pub priority: u16, + /// Plugin kind(s) - e.g., `media_type`, `metadata_extractor` pub kind: Vec, @@ -62,6 +71,9 @@ pub struct ManifestCapabilities { #[serde(default)] pub network: bool, + #[serde(default)] + pub allowed_domains: Option>, + #[serde(default)] pub environment: Option>, @@ -175,6 +187,12 @@ impl PluginManifest { )); } + if self.plugin.priority > 999 { + return Err(ManifestError::ValidationError( + "priority must be 0-999".to_string(), + )); + } + Ok(()) } @@ -200,7 +218,7 @@ impl PluginManifest { }, network: NetworkCapability { enabled: self.capabilities.network, - allowed_domains: None, + allowed_domains: self.capabilities.allowed_domains.clone(), }, environment: EnvironmentCapability { enabled: self.capabilities.environment.is_some(), @@ -277,6 +295,58 @@ version = "1.0.0" api_version = "1.0" kind = ["invalid_kind"] +[plugin.binary] +wasm = "plugin.wasm" +"#; + + assert!(PluginManifest::parse_str(toml).is_err()); + } + + #[test] + fn test_priority_default() { + let toml = r#" +[plugin] +name = "test" +version = "1.0.0" +api_version = "1.0" +kind = ["media_type"] + +[plugin.binary] +wasm = "plugin.wasm" +"#; + + let manifest = PluginManifest::parse_str(toml).unwrap(); + assert_eq!(manifest.plugin.priority, 500); + } + + #[test] + fn test_priority_custom() { + let toml = r#" +[plugin] +name = "test" +version = "1.0.0" +api_version = "1.0" +priority = 50 +kind = ["media_type"] + +[plugin.binary] +wasm = "plugin.wasm" +"#; + + let manifest = PluginManifest::parse_str(toml).unwrap(); + assert_eq!(manifest.plugin.priority, 50); + } + + #[test] + fn test_priority_out_of_range() { + let toml = r#" +[plugin] +name = "test" +version = "1.0.0" +api_version = "1.0" +priority = 1000 +kind = ["media_type"] + [plugin.binary] wasm = "plugin.wasm" "#; diff --git a/crates/pinakes-plugin-api/src/wasm.rs b/crates/pinakes-plugin-api/src/wasm.rs index 7df1018..166785f 100644 --- a/crates/pinakes-plugin-api/src/wasm.rs +++ b/crates/pinakes-plugin-api/src/wasm.rs @@ -72,6 +72,9 @@ pub mod host_functions { /// Emit an event pub const EMIT_EVENT: &str = "host_emit_event"; + + /// Set result data from plugin + pub const SET_RESULT: &str = "host_set_result"; } /// Log level for plugin logging From 4edda201e62674299acb6fe38be3bcea79138974 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 14:23:02 +0300 Subject: [PATCH 04/10] pinakes-core: add plugin pipeline; impl signature verification & dependency resolution Signed-off-by: NotAShelf Change-Id: Ida98135cf868db0f5a46a64b8ac562366a6a6964 --- Cargo.lock | 105 ++ Cargo.toml | 8 + crates/pinakes-core/Cargo.toml | 5 +- crates/pinakes-core/src/config.rs | 84 +- crates/pinakes-core/src/error.rs | 3 + crates/pinakes-core/src/plugin/mod.rs | 343 ++++- crates/pinakes-core/src/plugin/pipeline.rs | 1442 +++++++++++++++++++ crates/pinakes-core/src/plugin/registry.rs | 3 +- crates/pinakes-core/src/plugin/rpc.rs | 239 +++ crates/pinakes-core/src/plugin/runtime.rs | 232 ++- crates/pinakes-core/src/plugin/security.rs | 104 +- crates/pinakes-core/src/plugin/signature.rs | 252 ++++ 12 files changed, 2784 insertions(+), 36 deletions(-) create mode 100644 crates/pinakes-core/src/plugin/pipeline.rs create mode 100644 crates/pinakes-core/src/plugin/rpc.rs create mode 100644 crates/pinakes-core/src/plugin/signature.rs diff --git a/Cargo.lock b/Cargo.lock index a39cbca..9dde6a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -862,6 +862,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "const-serialize" version = "0.7.2" @@ -1374,6 +1380,33 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "darling" version = "0.21.3" @@ -1512,6 +1545,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5729f5117e208430e437df2f4843f5e5952997175992d1414f94c57d61e270b4" +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.8" @@ -2273,6 +2316,30 @@ dependencies = [ "cipher", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8", + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "serde", + "sha2", + "subtle", + "zeroize", +] + [[package]] name = "either" version = "1.15.0" @@ -2444,6 +2511,12 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "field-offset" version = "0.3.6" @@ -5323,6 +5396,7 @@ dependencies = [ "blake3", "chrono", "deadpool-postgres", + "ed25519-dalek", "epub", "gray_matter", "image", @@ -5338,6 +5412,7 @@ dependencies = [ "pinakes-plugin-api", "postgres-native-tls", "postgres-types", + "rand 0.10.0", "refinery", "regex", "reqwest 0.13.2", @@ -5351,6 +5426,7 @@ dependencies = [ "tokio-util", "toml 1.0.6+spec-1.1.0", "tracing", + "url", "urlencoding", "uuid", "walkdir", @@ -5454,6 +5530,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -6856,6 +6942,15 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.8" @@ -6989,6 +7084,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 52a67a4..53a54b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = ["crates/*"] +exclude = ["crates/pinakes-core/tests/fixtures/test-plugin"] resolver = "3" [workspace.package] @@ -46,6 +47,9 @@ tracing-subscriber = { version = "0.3.22", features = ["env-filter", "json"] } # Hashing blake3 = "1.8.3" +# Cryptographic signatures (plugin verification) +ed25519-dalek = { version = "2.1.1", features = ["std"] } + # Metadata extraction lofty = "0.23.2" lopdf = "0.39.0" @@ -88,6 +92,7 @@ tower_governor = "0.8.0" # HTTP client reqwest = { version = "0.13.2", features = ["json", "query", "blocking"] } +url = "2.5" # TUI ratatui = "0.30.0" @@ -136,6 +141,9 @@ http = "1.4.0" wasmtime = { version = "42.0.1", features = ["component-model"] } wit-bindgen = "0.53.1" +# Misc +tempfile = "3.26.0" + # See: # [workspace.lints.clippy] diff --git a/crates/pinakes-core/Cargo.toml b/crates/pinakes-core/Cargo.toml index 98d825b..5a09080 100644 --- a/crates/pinakes-core/Cargo.toml +++ b/crates/pinakes-core/Cargo.toml @@ -36,6 +36,7 @@ kamadak-exif = { workspace = true } image = { workspace = true } tokio-util = { workspace = true } reqwest = { workspace = true } +url = { workspace = true } argon2 = { workspace = true } regex = { workspace = true } moka = { workspace = true } @@ -45,9 +46,11 @@ image_hasher = { workspace = true } # Plugin system pinakes-plugin-api.workspace = true wasmtime.workspace = true +ed25519-dalek.workspace = true [lints] workspace = true [dev-dependencies] -tempfile = "3.25.0" +tempfile = { workspace = true } +rand = { workspace = true } diff --git a/crates/pinakes-core/src/config.rs b/crates/pinakes-core/src/config.rs index 8a47eca..3abe942 100644 --- a/crates/pinakes-core/src/config.rs +++ b/crates/pinakes-core/src/config.rs @@ -436,24 +436,69 @@ impl std::fmt::Display for UserRole { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PluginTimeoutConfig { + /// Timeout for capability discovery queries (`supported_types`, + /// `interested_events`) + #[serde(default = "default_capability_query_timeout")] + pub capability_query_secs: u64, + /// Timeout for processing calls (`extract_metadata`, `generate_thumbnail`) + #[serde(default = "default_processing_timeout")] + pub processing_secs: u64, + /// Timeout for event handler calls + #[serde(default = "default_event_handler_timeout")] + pub event_handler_secs: u64, +} + +const fn default_capability_query_timeout() -> u64 { + 2 +} + +const fn default_processing_timeout() -> u64 { + 30 +} + +const fn default_event_handler_timeout() -> u64 { + 10 +} + +impl Default for PluginTimeoutConfig { + fn default() -> Self { + Self { + capability_query_secs: default_capability_query_timeout(), + processing_secs: default_processing_timeout(), + event_handler_secs: default_event_handler_timeout(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PluginsConfig { #[serde(default)] - pub enabled: bool, + pub enabled: bool, #[serde(default = "default_plugin_data_dir")] - pub data_dir: PathBuf, + pub data_dir: PathBuf, #[serde(default = "default_plugin_cache_dir")] - pub cache_dir: PathBuf, + pub cache_dir: PathBuf, #[serde(default)] - pub plugin_dirs: Vec, + pub plugin_dirs: Vec, #[serde(default)] - pub enable_hot_reload: bool, + pub enable_hot_reload: bool, #[serde(default)] - pub allow_unsigned: bool, + pub allow_unsigned: bool, #[serde(default = "default_max_concurrent_ops")] - pub max_concurrent_ops: usize, + pub max_concurrent_ops: usize, #[serde(default = "default_plugin_timeout")] - pub plugin_timeout_secs: u64, + pub plugin_timeout_secs: u64, + #[serde(default)] + pub timeouts: PluginTimeoutConfig, + #[serde(default = "default_max_consecutive_failures")] + pub max_consecutive_failures: u32, + + /// Hex-encoded Ed25519 public keys trusted for plugin signature + /// verification. Each entry is 64 hex characters (32 bytes). + #[serde(default)] + pub trusted_keys: Vec, } fn default_plugin_data_dir() -> PathBuf { @@ -472,17 +517,24 @@ const fn default_plugin_timeout() -> u64 { 30 } +const fn default_max_consecutive_failures() -> u32 { + 5 +} + impl Default for PluginsConfig { fn default() -> Self { Self { - enabled: false, - data_dir: default_plugin_data_dir(), - cache_dir: default_plugin_cache_dir(), - plugin_dirs: vec![], - enable_hot_reload: false, - allow_unsigned: false, - max_concurrent_ops: default_max_concurrent_ops(), - plugin_timeout_secs: default_plugin_timeout(), + enabled: false, + data_dir: default_plugin_data_dir(), + cache_dir: default_plugin_cache_dir(), + plugin_dirs: vec![], + enable_hot_reload: false, + allow_unsigned: false, + max_concurrent_ops: default_max_concurrent_ops(), + plugin_timeout_secs: default_plugin_timeout(), + timeouts: PluginTimeoutConfig::default(), + max_consecutive_failures: default_max_consecutive_failures(), + trusted_keys: vec![], } } } diff --git a/crates/pinakes-core/src/error.rs b/crates/pinakes-core/src/error.rs index 941a23d..c987dc6 100644 --- a/crates/pinakes-core/src/error.rs +++ b/crates/pinakes-core/src/error.rs @@ -28,6 +28,9 @@ pub enum PinakesError { #[error("metadata extraction failed: {0}")] MetadataExtraction(String), + #[error("thumbnail generation failed: {0}")] + ThumbnailGeneration(String), + #[error("search query parse error: {0}")] SearchParse(String), diff --git a/crates/pinakes-core/src/plugin/mod.rs b/crates/pinakes-core/src/plugin/mod.rs index 74a7ed2..8ad12cf 100644 --- a/crates/pinakes-core/src/plugin/mod.rs +++ b/crates/pinakes-core/src/plugin/mod.rs @@ -19,14 +19,19 @@ use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; pub mod loader; +pub mod pipeline; pub mod registry; +pub mod rpc; pub mod runtime; pub mod security; +pub mod signature; pub use loader::PluginLoader; +pub use pipeline::PluginPipeline; pub use registry::{PluginRegistry, RegisteredPlugin}; pub use runtime::{WasmPlugin, WasmRuntime}; pub use security::CapabilityEnforcer; +pub use signature::{SignatureStatus, verify_plugin_signature}; /// Plugin manager coordinates plugin lifecycle and operations pub struct PluginManager { @@ -69,16 +74,28 @@ pub struct PluginManagerConfig { /// Plugin timeout in seconds pub plugin_timeout_secs: u64, + + /// Timeout configuration for different call types + pub timeouts: crate::config::PluginTimeoutConfig, + + /// Max consecutive failures before circuit breaker disables plugin + pub max_consecutive_failures: u32, + + /// Trusted Ed25519 public keys for signature verification (hex-encoded) + pub trusted_keys: Vec, } impl Default for PluginManagerConfig { fn default() -> Self { Self { - plugin_dirs: vec![], - enable_hot_reload: false, - allow_unsigned: false, - max_concurrent_ops: 4, - plugin_timeout_secs: 30, + plugin_dirs: vec![], + enable_hot_reload: false, + allow_unsigned: false, + max_concurrent_ops: 4, + plugin_timeout_secs: 30, + timeouts: crate::config::PluginTimeoutConfig::default(), + max_consecutive_failures: 5, + trusted_keys: vec![], } } } @@ -86,11 +103,14 @@ impl Default for PluginManagerConfig { impl From for PluginManagerConfig { fn from(cfg: crate::config::PluginsConfig) -> Self { Self { - plugin_dirs: cfg.plugin_dirs, - enable_hot_reload: cfg.enable_hot_reload, - allow_unsigned: cfg.allow_unsigned, - max_concurrent_ops: cfg.max_concurrent_ops, - plugin_timeout_secs: cfg.plugin_timeout_secs, + plugin_dirs: cfg.plugin_dirs, + enable_hot_reload: cfg.enable_hot_reload, + allow_unsigned: cfg.allow_unsigned, + max_concurrent_ops: cfg.max_concurrent_ops, + plugin_timeout_secs: cfg.plugin_timeout_secs, + timeouts: cfg.timeouts, + max_consecutive_failures: cfg.max_consecutive_failures, + trusted_keys: cfg.trusted_keys, } } } @@ -127,7 +147,12 @@ impl PluginManager { }) } - /// Discover and load all plugins from configured directories + /// Discover and load all plugins from configured directories. + /// + /// Plugins are loaded in dependency order: if plugin A declares a + /// dependency on plugin B, B is loaded first. Cycles and missing + /// dependencies are detected and reported as warnings; affected plugins + /// are skipped rather than causing a hard failure. /// /// # Errors /// @@ -136,9 +161,10 @@ impl PluginManager { info!("Discovering plugins from {:?}", self.config.plugin_dirs); let manifests = self.loader.discover_plugins()?; + let ordered = Self::resolve_load_order(&manifests); let mut loaded_plugins = Vec::new(); - for manifest in manifests { + for manifest in ordered { match self.load_plugin_from_manifest(&manifest).await { Ok(plugin_id) => { info!("Loaded plugin: {}", plugin_id); @@ -153,6 +179,93 @@ impl PluginManager { Ok(loaded_plugins) } + /// Topological sort of manifests by their declared `dependencies`. + /// + /// Uses Kahn's algorithm. Plugins whose dependencies are missing or form + /// a cycle are logged as warnings and excluded from the result. + fn resolve_load_order( + manifests: &[pinakes_plugin_api::PluginManifest], + ) -> Vec { + use std::collections::{HashMap, HashSet, VecDeque}; + + // Index manifests by name for O(1) lookup + let by_name: HashMap<&str, usize> = manifests + .iter() + .enumerate() + .map(|(i, m)| (m.plugin.name.as_str(), i)) + .collect(); + + // Check for missing dependencies and warn early + let known: HashSet<&str> = by_name.keys().copied().collect(); + for manifest in manifests { + for dep in &manifest.plugin.dependencies { + if !known.contains(dep.as_str()) { + warn!( + "Plugin '{}' depends on '{}' which was not discovered; it will be \ + skipped", + manifest.plugin.name, dep + ); + } + } + } + + // Build adjacency: in_degree[i] = number of deps that must load before i + let mut in_degree = vec![0usize; manifests.len()]; + // dependents[i] = indices that depend on i (i must load before them) + let mut dependents: Vec> = vec![vec![]; manifests.len()]; + + for (i, manifest) in manifests.iter().enumerate() { + for dep in &manifest.plugin.dependencies { + if let Some(&dep_idx) = by_name.get(dep.as_str()) { + in_degree[i] += 1; + dependents[dep_idx].push(i); + } else { + // Missing dep: set in_degree impossibly high so it never resolves + in_degree[i] = usize::MAX; + } + } + } + + // Kahn's algorithm + let mut queue: VecDeque = VecDeque::new(); + for (i, °) in in_degree.iter().enumerate() { + if deg == 0 { + queue.push_back(i); + } + } + + let mut result = Vec::with_capacity(manifests.len()); + while let Some(idx) = queue.pop_front() { + result.push(manifests[idx].clone()); + for &dependent in &dependents[idx] { + if in_degree[dependent] == usize::MAX { + continue; // already poisoned by missing dep + } + in_degree[dependent] -= 1; + if in_degree[dependent] == 0 { + queue.push_back(dependent); + } + } + } + + // Anything not in `result` is part of a cycle or has a missing dep + if result.len() < manifests.len() { + let loaded: HashSet<&str> = + result.iter().map(|m| m.plugin.name.as_str()).collect(); + for manifest in manifests { + if !loaded.contains(manifest.plugin.name.as_str()) { + warn!( + "Plugin '{}' was skipped due to unresolved dependencies or a \ + dependency cycle", + manifest.plugin.name + ); + } + } + } + + result + } + /// Load a plugin from a manifest file /// /// # Errors @@ -217,6 +330,45 @@ impl PluginManager { // Load WASM binary let wasm_path = self.loader.resolve_wasm_path(manifest)?; + + // Verify plugin signature unless unsigned plugins are allowed + if !self.config.allow_unsigned { + let plugin_dir = wasm_path + .parent() + .ok_or_else(|| anyhow::anyhow!("WASM path has no parent directory"))?; + + let trusted_keys: Vec = self + .config + .trusted_keys + .iter() + .filter_map(|hex| { + signature::parse_public_key(hex) + .map_err(|e| warn!("Ignoring malformed trusted key: {e}")) + .ok() + }) + .collect(); + + match signature::verify_plugin_signature( + plugin_dir, + &wasm_path, + &trusted_keys, + )? { + SignatureStatus::Valid => { + debug!("Plugin '{plugin_id}' signature verified"); + }, + SignatureStatus::Unsigned => { + return Err(anyhow::anyhow!( + "Plugin '{plugin_id}' is unsigned and allow_unsigned is false" + )); + }, + SignatureStatus::Invalid(reason) => { + return Err(anyhow::anyhow!( + "Plugin '{plugin_id}' has an invalid signature: {reason}" + )); + }, + } + } + let wasm_plugin = self.runtime.load_plugin(&wasm_path, context)?; // Initialize plugin @@ -413,6 +565,40 @@ impl PluginManager { registry.get(plugin_id).map(|p| p.metadata.clone()) } + /// Get enabled plugins of a specific kind, sorted by priority (ascending). + /// + /// # Returns + /// + /// `(plugin_id, priority, kinds, wasm_plugin)` tuples. + pub async fn get_enabled_by_kind_sorted( + &self, + kind: &str, + ) -> Vec<(String, u16, Vec, WasmPlugin)> { + let registry = self.registry.read().await; + let mut plugins: Vec<_> = registry + .get_by_kind(kind) + .into_iter() + .filter(|p| p.enabled) + .map(|p| { + ( + p.id.clone(), + p.manifest.plugin.priority, + p.manifest.plugin.kind.clone(), + p.wasm_plugin.clone(), + ) + }) + .collect(); + drop(registry); + plugins.sort_by_key(|(_, priority, ..)| *priority); + plugins + } + + /// Get a reference to the capability enforcer. + #[must_use] + pub const fn enforcer(&self) -> &CapabilityEnforcer { + &self.enforcer + } + /// Check if a plugin is loaded and enabled pub async fn is_plugin_enabled(&self, plugin_id: &str) -> bool { let registry = self.registry.read().await; @@ -503,4 +689,137 @@ mod tests { let plugins = manager.list_plugins().await; assert_eq!(plugins.len(), 0); } + + /// Build a minimal manifest for dependency resolution tests + fn test_manifest( + name: &str, + deps: Vec, + ) -> pinakes_plugin_api::PluginManifest { + use pinakes_plugin_api::manifest::{PluginBinary, PluginInfo}; + + pinakes_plugin_api::PluginManifest { + plugin: PluginInfo { + name: name.to_string(), + version: "1.0.0".to_string(), + api_version: "1.0".to_string(), + author: None, + description: None, + homepage: None, + license: None, + priority: 500, + kind: vec!["media_type".to_string()], + binary: PluginBinary { + wasm: "plugin.wasm".to_string(), + entrypoint: None, + }, + dependencies: deps, + }, + capabilities: Default::default(), + config: Default::default(), + } + } + + #[test] + fn test_resolve_load_order_no_deps() { + let manifests = vec![ + test_manifest("alpha", vec![]), + test_manifest("beta", vec![]), + test_manifest("gamma", vec![]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 3); + } + + #[test] + fn test_resolve_load_order_linear_chain() { + // gamma depends on beta, beta depends on alpha + let manifests = vec![ + test_manifest("gamma", vec!["beta".to_string()]), + test_manifest("alpha", vec![]), + test_manifest("beta", vec!["alpha".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 3); + + let names: Vec<&str> = + ordered.iter().map(|m| m.plugin.name.as_str()).collect(); + let alpha_pos = names.iter().position(|&n| n == "alpha").unwrap(); + let beta_pos = names.iter().position(|&n| n == "beta").unwrap(); + let gamma_pos = names.iter().position(|&n| n == "gamma").unwrap(); + assert!(alpha_pos < beta_pos, "alpha must load before beta"); + assert!(beta_pos < gamma_pos, "beta must load before gamma"); + } + + #[test] + fn test_resolve_load_order_cycle_detected() { + // A -> B -> C -> A (cycle) + let manifests = vec![ + test_manifest("a", vec!["c".to_string()]), + test_manifest("b", vec!["a".to_string()]), + test_manifest("c", vec!["b".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + // All three should be excluded due to cycle + assert_eq!(ordered.len(), 0); + } + + #[test] + fn test_resolve_load_order_missing_dependency() { + let manifests = vec![ + test_manifest("good", vec![]), + test_manifest("bad", vec!["nonexistent".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + // Only "good" should be loaded; "bad" depends on something missing + assert_eq!(ordered.len(), 1); + assert_eq!(ordered[0].plugin.name, "good"); + } + + #[test] + fn test_resolve_load_order_partial_cycle() { + // "ok" has no deps, "cycle_a" and "cycle_b" form a cycle + let manifests = vec![ + test_manifest("ok", vec![]), + test_manifest("cycle_a", vec!["cycle_b".to_string()]), + test_manifest("cycle_b", vec!["cycle_a".to_string()]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 1); + assert_eq!(ordered[0].plugin.name, "ok"); + } + + #[test] + fn test_resolve_load_order_diamond() { + // Man look at how beautiful my diamond is... + // A + // / \ + // B C + // \ / + // D + let manifests = vec![ + test_manifest("d", vec!["b".to_string(), "c".to_string()]), + test_manifest("b", vec!["a".to_string()]), + test_manifest("c", vec!["a".to_string()]), + test_manifest("a", vec![]), + ]; + + let ordered = PluginManager::resolve_load_order(&manifests); + assert_eq!(ordered.len(), 4); + + let names: Vec<&str> = + ordered.iter().map(|m| m.plugin.name.as_str()).collect(); + let a_pos = names.iter().position(|&n| n == "a").unwrap(); + let b_pos = names.iter().position(|&n| n == "b").unwrap(); + let c_pos = names.iter().position(|&n| n == "c").unwrap(); + let d_pos = names.iter().position(|&n| n == "d").unwrap(); + assert!(a_pos < b_pos); + assert!(a_pos < c_pos); + assert!(b_pos < d_pos); + assert!(c_pos < d_pos); + } } diff --git a/crates/pinakes-core/src/plugin/pipeline.rs b/crates/pinakes-core/src/plugin/pipeline.rs new file mode 100644 index 0000000..c2ed430 --- /dev/null +++ b/crates/pinakes-core/src/plugin/pipeline.rs @@ -0,0 +1,1442 @@ +//! Plugin pipeline for wiring plugins into the media processing stages. +//! +//! The [`PluginPipeline`] coordinates built-in handlers and WASM plugins for: +//! +//! - Media type resolution (first-match-wins) +//! - Metadata extraction (accumulating merge by priority) +//! - Thumbnail generation (first-success-wins) +//! - Search backend dispatch (merge results, rank by score) +//! - Theme provider dispatch (accumulate all themes) +//! - Event fan-out to interested handlers +//! +//! Each plugin has a priority (0-999). Built-in handlers run at implicit +//! priority 100. A circuit breaker disables plugins after consecutive failures. + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant}, +}; + +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +use super::PluginManager; +use crate::{ + config::PluginTimeoutConfig, + media_type::MediaType, + metadata::ExtractedMetadata, + model::MediaId, + plugin::rpc::{ + CanHandleRequest, + CanHandleResponse, + ExtractMetadataRequest, + ExtractMetadataResponse, + GenerateThumbnailRequest, + GenerateThumbnailResponse, + HandleEventRequest, + IndexItemRequest, + LoadThemeResponse, + PluginMediaTypeDefinition, + PluginThemeDefinition, + RemoveItemRequest, + SearchRequest, + SearchResponse, + SearchResultItem, + }, +}; + +/// Built-in handlers run at this implicit priority. +const BUILTIN_PRIORITY: u16 = 100; + +/// Cooldown before the circuit breaker allows a single trial call. +const CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_mins(1); + +/// Tracks per-plugin health for the circuit breaker. +struct PluginHealth { + consecutive_failures: u32, + last_failure: Option, + disabled_by_circuit_breaker: bool, +} + +impl PluginHealth { + const fn new() -> Self { + Self { + consecutive_failures: 0, + last_failure: None, + disabled_by_circuit_breaker: false, + } + } +} + +/// Capability information discovered from plugins at startup. +struct CachedCapabilities { + /// Keyed by `(kind, plugin_id)` -> list of supported type strings. + /// Separate entries for each kind avoid collisions when a plugin + /// implements both `metadata_extractor` and `thumbnail_generator`. + supported_types: HashMap<(String, String), Vec>, + /// `plugin_id` -> list of interested event type strings + interested_events: HashMap>, + /// `plugin_id` -> list of media type definitions (for `MediaTypeProvider`) + media_type_definitions: HashMap>, + /// `plugin_id` -> list of theme definitions (for `ThemeProvider`) + theme_definitions: HashMap>, +} + +impl CachedCapabilities { + fn new() -> Self { + Self { + supported_types: HashMap::new(), + interested_events: HashMap::new(), + media_type_definitions: HashMap::new(), + theme_definitions: HashMap::new(), + } + } +} + +/// Coordinates built-in handlers and WASM plugins in a priority-ordered +/// pipeline for media processing stages. +pub struct PluginPipeline { + manager: Arc, + timeouts: PluginTimeoutConfig, + max_consecutive_failures: u32, + health: RwLock>, + capabilities: RwLock, +} + +impl PluginPipeline { + /// Create a new plugin pipeline. + #[must_use] + pub fn new( + manager: Arc, + timeouts: PluginTimeoutConfig, + max_consecutive_failures: u32, + ) -> Self { + Self { + manager, + timeouts, + max_consecutive_failures, + health: RwLock::new(HashMap::new()), + capabilities: RwLock::new(CachedCapabilities::new()), + } + } + + /// Discover capabilities from all enabled plugins. Call on startup and + /// after plugin reload. + /// + /// # Errors + /// + /// Returns an error only for truly fatal problems. Individual plugin + /// discovery failures are logged and skipped. + pub async fn discover_capabilities(&self) -> crate::error::Result<()> { + info!("discovering plugin capabilities"); + + let timeout = Duration::from_secs(self.timeouts.capability_query_secs); + let mut caps = CachedCapabilities::new(); + + // Discover metadata extractors + self + .discover_supported_types("metadata_extractor", timeout, &mut caps) + .await; + + // Discover thumbnail generators + self + .discover_supported_types("thumbnail_generator", timeout, &mut caps) + .await; + + // Discover media type providers + self + .discover_media_type_definitions(timeout, &mut caps) + .await; + + // Discover search backends + self + .discover_supported_types("search_backend", timeout, &mut caps) + .await; + + // Discover theme providers + self.discover_theme_definitions(timeout, &mut caps).await; + + // Discover event handlers + self.discover_interested_events(timeout, &mut caps).await; + + let mut cached = self.capabilities.write().await; + *cached = caps; + drop(cached); + + Ok(()) + } + + /// Query `supported_types` from all enabled plugins of a given kind. + async fn discover_supported_types( + &self, + kind: &str, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self.manager.get_enabled_by_kind_sorted(kind).await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "supported_types", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(types) => { + debug!( + plugin_id = %id, + kind = kind, + types = ?types, + "discovered supported types" + ); + caps + .supported_types + .insert((kind.to_string(), id.clone()), types); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover supported types" + ); + }, + } + } + } + + /// Query `supported_media_types` from `MediaTypeProvider` plugins. + async fn discover_media_type_definitions( + &self, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self.manager.get_enabled_by_kind_sorted("media_type").await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "supported_media_types", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(defs) => { + debug!( + plugin_id = %id, + count = defs.len(), + "discovered media type definitions" + ); + caps.media_type_definitions.insert(id.clone(), defs); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover media type definitions" + ); + }, + } + } + } + + /// Query `interested_events` from `EventHandler` plugins. + async fn discover_interested_events( + &self, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self + .manager + .get_enabled_by_kind_sorted("event_handler") + .await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "interested_events", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(events) => { + debug!( + plugin_id = %id, + events = ?events, + "discovered interested events" + ); + caps.interested_events.insert(id.clone(), events); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover interested events" + ); + }, + } + } + } + + /// Query `get_themes` from `ThemeProvider` plugins. + async fn discover_theme_definitions( + &self, + timeout: Duration, + caps: &mut CachedCapabilities, + ) { + let plugins = self + .manager + .get_enabled_by_kind_sorted("theme_provider") + .await; + for (id, _priority, _kinds, wasm) in &plugins { + match wasm + .call_function_json::>( + "get_themes", + &serde_json::json!({}), + timeout, + ) + .await + { + Ok(defs) => { + debug!( + plugin_id = %id, + count = defs.len(), + "discovered theme definitions" + ); + caps.theme_definitions.insert(id.clone(), defs); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "failed to discover theme definitions" + ); + }, + } + } + } + + /// Resolve the media type for a file path. + /// + /// Iterates `MediaTypeProvider` plugins in priority order, falling back to + /// the built-in resolver at implicit priority 100. + pub async fn resolve_media_type(&self, path: &Path) -> Option { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self.manager.get_enabled_by_kind_sorted("media_type").await; + + let mut builtin_ran = false; + + for (id, priority, kinds, wasm) in &plugins { + // Run built-in at its implicit priority slot + if *priority >= BUILTIN_PRIORITY && !builtin_ran { + builtin_ran = true; + if let Some(mt) = MediaType::from_path(path) { + return Some(mt); + } + } + + if !self.is_healthy(id).await { + continue; + } + + // Validate the call is allowed for this plugin kind + if !self + .manager + .enforcer() + .validate_function_call(kinds, "can_handle") + { + continue; + } + + let req = CanHandleRequest { + path: path.to_path_buf(), + mime_type: None, + }; + + match wasm + .call_function_json::( + "can_handle", + &req, + timeout, + ) + .await + { + Ok(resp) if resp.can_handle => { + self.record_success(id).await; + // First match wins: the plugin claimed this file. + // Determine the custom media type ID from cached definitions. + if let Some(mt_id) = self.resolve_custom_media_type_id(id, path).await + { + return Some(MediaType::Custom(mt_id)); + } + + // Plugin claimed the file but has no matching definition. + // The claim still stops the chain per first-match semantics. + warn!( + plugin_id = %id, + path = %path.display(), + "plugin can_handle returned true but no matching media type definition" + ); + return None; + }, + Ok(_) => { + self.record_success(id).await; + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "media type resolution failed" + ); + self.record_failure(id).await; + }, + } + } + + // If built-in hasn't run yet (all plugins had priority < 100) + if !builtin_ran { + return MediaType::from_path(path); + } + + None + } + + /// Look up the cached media type definitions for a plugin and find the + /// definition whose extensions match the file's extension. + async fn resolve_custom_media_type_id( + &self, + plugin_id: &str, + path: &Path, + ) -> Option { + let ext = path + .extension() + .and_then(|e| e.to_str()) + .map(str::to_ascii_lowercase)?; + + let caps = self.capabilities.read().await; + let result = caps.media_type_definitions.get(plugin_id).and_then(|defs| { + defs.iter().find_map(|def| { + def + .extensions + .iter() + .any(|e| e.eq_ignore_ascii_case(&ext)) + .then(|| def.id.clone()) + }) + }); + drop(caps); + result + } + + /// Extract metadata using plugins and built-in extractors in priority + /// order. Later (higher priority) results overwrite earlier ones + /// field-by-field. + /// + /// # Errors + /// + /// Propagates errors from the built-in extractor. Plugin errors are + /// logged and skipped. + pub async fn extract_metadata( + &self, + path: &Path, + media_type: &MediaType, + ) -> crate::error::Result { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("metadata_extractor") + .await; + + let media_type_str = media_type.id(); + let mut acc = ExtractedMetadata::default(); + let mut builtin_ran = false; + + for (id, priority, kinds, wasm) in &plugins { + // Insert built-in at its priority slot + if *priority >= BUILTIN_PRIORITY && !builtin_ran { + builtin_ran = true; + acc = self.run_builtin_metadata(path, media_type, acc).await?; + } + + if !self.is_healthy(id).await { + continue; + } + + // Check if this plugin supports the media type + let supported = { + let caps = self.capabilities.read().await; + let key = ("metadata_extractor".to_string(), id.clone()); + caps + .supported_types + .get(&key) + .is_some_and(|types| types.contains(&media_type_str)) + }; + if !supported { + continue; + } + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "extract_metadata") + { + continue; + } + + let req = ExtractMetadataRequest { + path: path.to_path_buf(), + }; + + match wasm + .call_function_json::( + "extract_metadata", + &req, + timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + merge_metadata(&mut acc, &resp); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "metadata extraction failed" + ); + self.record_failure(id).await; + }, + } + } + + // If built-in hasn't run yet (all plugins had priority < 100) + if !builtin_ran { + acc = self.run_builtin_metadata(path, media_type, acc).await?; + } + + Ok(acc) + } + + /// Run the built-in metadata extractor (sync, wrapped in `spawn_blocking`). + async fn run_builtin_metadata( + &self, + path: &Path, + media_type: &MediaType, + mut acc: ExtractedMetadata, + ) -> crate::error::Result { + let path = path.to_path_buf(); + let mt = media_type.clone(); + let builtin = tokio::task::spawn_blocking(move || { + crate::metadata::extract_metadata(&path, &mt) + }) + .await + .map_err(|e| { + crate::error::PinakesError::MetadataExtraction(format!( + "built-in metadata task panicked: {e}" + )) + })??; + + // Merge built-in result into accumulator + merge_extracted(&mut acc, builtin); + Ok(acc) + } + + // Thumbnail generation + + /// Generate a thumbnail using plugins and the built-in generator in + /// priority order. Returns the first successful result. + /// + /// # Errors + /// + /// Propagates errors only from the built-in generator. Plugin errors are + /// logged and skipped. + pub async fn generate_thumbnail( + &self, + media_id: MediaId, + path: &Path, + media_type: &MediaType, + thumb_dir: &Path, + ) -> crate::error::Result> { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("thumbnail_generator") + .await; + + let media_type_str = media_type.id(); + let mut builtin_ran = false; + + for (id, priority, kinds, wasm) in &plugins { + // Insert built-in at its priority slot + if *priority >= BUILTIN_PRIORITY && !builtin_ran { + builtin_ran = true; + if let Some(result) = self + .run_builtin_thumbnail(media_id, path, media_type, thumb_dir) + .await? + { + return Ok(Some(result)); + } + } + + if !self.is_healthy(id).await { + continue; + } + + // Check if this plugin supports the media type + let supported = { + let caps = self.capabilities.read().await; + let key = ("thumbnail_generator".to_string(), id.clone()); + caps + .supported_types + .get(&key) + .is_some_and(|types| types.contains(&media_type_str)) + }; + if !supported { + continue; + } + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "generate_thumbnail") + { + continue; + } + + let output_path = thumb_dir.join(format!("{media_id}.jpg")); + let req = GenerateThumbnailRequest { + source_path: path.to_path_buf(), + output_path: output_path.clone(), + max_width: 320, + max_height: 320, + format: "jpeg".to_string(), + }; + + match wasm + .call_function_json::( + "generate_thumbnail", + &req, + timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + return Ok(Some(resp.path)); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "thumbnail generation failed" + ); + self.record_failure(id).await; + }, + } + } + + // If built-in hasn't run yet + if !builtin_ran { + return self + .run_builtin_thumbnail(media_id, path, media_type, thumb_dir) + .await; + } + + Ok(None) + } + + /// Run the built-in thumbnail generator (sync, wrapped in + /// `spawn_blocking`). + async fn run_builtin_thumbnail( + &self, + media_id: MediaId, + path: &Path, + media_type: &MediaType, + thumb_dir: &Path, + ) -> crate::error::Result> { + let path = path.to_path_buf(); + let mt = media_type.clone(); + let td = thumb_dir.to_path_buf(); + tokio::task::spawn_blocking(move || { + crate::thumbnail::generate_thumbnail(media_id, &path, &mt, &td) + }) + .await + .map_err(|e| { + crate::error::PinakesError::ThumbnailGeneration(format!( + "built-in thumbnail task panicked: {e}" + )) + })? + } + + /// Fan out an event to all interested event handler plugins. + /// + /// The work runs in a spawned task; failures are logged, never + /// propagated. + pub fn emit_event( + self: &Arc, + event_type: &str, + payload: &serde_json::Value, + ) { + let pipeline = Arc::clone(self); + let event_type = event_type.to_string(); + let payload = payload.clone(); + + tokio::spawn(async move { + pipeline.dispatch_event(&event_type, &payload).await; + }); + } + + /// Internal dispatcher for events. + async fn dispatch_event( + &self, + event_type: &str, + payload: &serde_json::Value, + ) { + let timeout = Duration::from_secs(self.timeouts.event_handler_secs); + + // Collect plugin IDs interested in this event + let interested_ids: Vec = { + let caps = self.capabilities.read().await; + caps + .interested_events + .iter() + .filter(|(_id, events)| events.contains(&event_type.to_string())) + .map(|(id, _)| id.clone()) + .collect() + }; + + if interested_ids.is_empty() { + return; + } + + // Get event handler plugins sorted by priority + let plugins = self + .manager + .get_enabled_by_kind_sorted("event_handler") + .await; + + for (id, _priority, kinds, wasm) in &plugins { + if !interested_ids.contains(id) { + continue; + } + + if !self.is_healthy(id).await { + continue; + } + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "handle_event") + { + continue; + } + + let req = HandleEventRequest { + event_type: event_type.to_string(), + payload: payload.clone(), + }; + + // Event handlers return nothing meaningful; we just care about + // success/failure. + match wasm + .call_function_json::( + "handle_event", + &req, + timeout, + ) + .await + { + Ok(_) => { + self.record_success(id).await; + debug!( + plugin_id = %id, + event_type = event_type, + "event handled" + ); + }, + Err(e) => { + warn!( + plugin_id = %id, + event_type = event_type, + error = %e, + "event handler failed" + ); + self.record_failure(id).await; + }, + } + } + } + + /// Search via plugin search backends. Results from all backends are + /// merged, deduplicated by ID, and sorted by score (highest first). + pub async fn search( + &self, + query: &str, + limit: usize, + offset: usize, + ) -> Vec { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("search_backend") + .await; + + let mut all_results: Vec = Vec::new(); + + for (id, _priority, kinds, wasm) in &plugins { + if !self.is_healthy(id).await { + continue; + } + if !self + .manager + .enforcer() + .validate_function_call(kinds, "search") + { + continue; + } + + let req = SearchRequest { + query: query.to_string(), + limit, + offset, + }; + + match wasm + .call_function_json::( + "search", &req, timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + all_results.extend(resp.results); + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "search backend query failed" + ); + self.record_failure(id).await; + }, + } + } + + // Deduplicate by ID, keeping the highest-scoring entry + let mut seen: HashMap = HashMap::new(); + let mut deduped: Vec = Vec::new(); + for item in all_results { + if let Some(&idx) = seen.get(&item.id) { + if item.score > deduped[idx].score { + deduped[idx] = item; + } + } else { + seen.insert(item.id.clone(), deduped.len()); + deduped.push(item); + } + } + + // Sort by score descending + deduped.sort_by(|a, b| { + b.score + .partial_cmp(&a.score) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + deduped + } + + /// Index a media item in all search backend plugins (fan-out). + pub async fn index_item(&self, req: &IndexItemRequest) { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("search_backend") + .await; + + for (id, _priority, kinds, wasm) in &plugins { + if !self.is_healthy(id).await { + continue; + } + if !self + .manager + .enforcer() + .validate_function_call(kinds, "index_item") + { + continue; + } + + match wasm + .call_function_json::( + "index_item", + req, + timeout, + ) + .await + { + Ok(_) => { + self.record_success(id).await; + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "search backend index_item failed" + ); + self.record_failure(id).await; + }, + } + } + } + + /// Remove a media item from all search backend plugins (fan-out). + pub async fn remove_item(&self, media_id: &str) { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + let plugins = self + .manager + .get_enabled_by_kind_sorted("search_backend") + .await; + + let req = RemoveItemRequest { + id: media_id.to_string(), + }; + + for (id, _priority, kinds, wasm) in &plugins { + if !self.is_healthy(id).await { + continue; + } + if !self + .manager + .enforcer() + .validate_function_call(kinds, "remove_item") + { + continue; + } + + match wasm + .call_function_json::( + "remove_item", + &req, + timeout, + ) + .await + { + Ok(_) => { + self.record_success(id).await; + }, + Err(e) => { + warn!( + plugin_id = %id, + error = %e, + "search backend remove_item failed" + ); + self.record_failure(id).await; + }, + } + } + } + + /// Get all available themes from theme provider plugins. Results from + /// all providers are accumulated. + pub async fn get_themes(&self) -> Vec { + let caps = self.capabilities.read().await; + caps + .theme_definitions + .values() + .flat_map(|defs| defs.iter().cloned()) + .collect() + } + + /// Load a specific theme by ID from the provider that registered it. + pub async fn load_theme(&self, theme_id: &str) -> Option { + let timeout = Duration::from_secs(self.timeouts.processing_secs); + + // Find which plugin owns this theme + let owner_id = { + let caps = self.capabilities.read().await; + caps.theme_definitions.iter().find_map(|(plugin_id, defs)| { + defs + .iter() + .any(|d| d.id == theme_id) + .then(|| plugin_id.clone()) + }) + }; + + let owner_id = owner_id?; + + if !self.is_healthy(&owner_id).await { + return None; + } + + let plugins = self + .manager + .get_enabled_by_kind_sorted("theme_provider") + .await; + + let plugin = plugins.iter().find(|(id, ..)| id == &owner_id)?; + let (id, _priority, kinds, wasm) = plugin; + + if !self + .manager + .enforcer() + .validate_function_call(kinds, "load_theme") + { + return None; + } + + let req = serde_json::json!({"theme_id": theme_id}); + match wasm + .call_function_json::( + "load_theme", + &req, + timeout, + ) + .await + { + Ok(resp) => { + self.record_success(id).await; + Some(resp) + }, + Err(e) => { + warn!( + plugin_id = %id, + theme_id = theme_id, + error = %e, + "theme loading failed" + ); + self.record_failure(id).await; + None + }, + } + } + + /// Record a successful call, resetting the failure counter. + async fn record_success(&self, plugin_id: &str) { + let mut health_map = self.health.write().await; + let entry = health_map + .entry(plugin_id.to_string()) + .or_insert_with(PluginHealth::new); + + if entry.disabled_by_circuit_breaker { + info!( + plugin_id = plugin_id, + "circuit breaker recovered: re-enabling plugin after successful trial" + ); + } + + entry.consecutive_failures = 0; + entry.last_failure = None; + entry.disabled_by_circuit_breaker = false; + drop(health_map); + } + + /// Record a failed call. If consecutive failures exceed the threshold, + /// trip the circuit breaker. If a half-open trial fails, reset the + /// cooldown timer. + async fn record_failure(&self, plugin_id: &str) { + let mut health_map = self.health.write().await; + let entry = health_map + .entry(plugin_id.to_string()) + .or_insert_with(PluginHealth::new); + entry.consecutive_failures += 1; + entry.last_failure = Some(Instant::now()); + + if entry.consecutive_failures >= self.max_consecutive_failures { + if !entry.disabled_by_circuit_breaker { + warn!( + plugin_id = plugin_id, + failures = entry.consecutive_failures, + "circuit breaker tripped: disabling plugin" + ); + } + entry.disabled_by_circuit_breaker = true; + } + drop(health_map); + } + + /// Check whether a plugin is healthy enough to receive calls. + /// + /// # Returns + /// + /// `true` if the plugin has no circuit breaker state, has not + /// tripped the breaker, or has tripped but the cooldown has elapsed + /// (half-open state: one trial call is permitted). + async fn is_healthy(&self, plugin_id: &str) -> bool { + let health_map = self.health.read().await; + match health_map.get(plugin_id) { + None => true, + Some(h) if !h.disabled_by_circuit_breaker => true, + Some(h) => { + // Half-open: allow a trial call after the cooldown period + h.last_failure + .is_some_and(|t| t.elapsed() >= CIRCUIT_BREAKER_COOLDOWN) + }, + } + } +} + +/// Merge plugin metadata response fields into the accumulator. +/// Non-None fields overwrite existing values. +fn merge_metadata( + base: &mut ExtractedMetadata, + plugin_resp: &ExtractMetadataResponse, +) { + if let Some(ref v) = plugin_resp.title { + base.title = Some(v.clone()); + } + if let Some(ref v) = plugin_resp.artist { + base.artist = Some(v.clone()); + } + if let Some(ref v) = plugin_resp.album { + base.album = Some(v.clone()); + } + if let Some(ref v) = plugin_resp.genre { + base.genre = Some(v.clone()); + } + if let Some(v) = plugin_resp.year { + base.year = Some(v); + } + if let Some(v) = plugin_resp.duration_secs { + base.duration_secs = Some(v); + } + if let Some(ref v) = plugin_resp.description { + base.description = Some(v.clone()); + } + for (k, v) in &plugin_resp.extra { + base.extra.insert(k.clone(), v.clone()); + } +} + +/// Merge a full [`ExtractedMetadata`] into an accumulator. Non-None +/// fields from `source` overwrite `base`. +fn merge_extracted(base: &mut ExtractedMetadata, source: ExtractedMetadata) { + if source.title.is_some() { + base.title = source.title; + } + if source.artist.is_some() { + base.artist = source.artist; + } + if source.album.is_some() { + base.album = source.album; + } + if source.genre.is_some() { + base.genre = source.genre; + } + if source.year.is_some() { + base.year = source.year; + } + if source.duration_secs.is_some() { + base.duration_secs = source.duration_secs; + } + if source.description.is_some() { + base.description = source.description; + } + for (k, v) in source.extra { + base.extra.insert(k, v); + } + if source.book_metadata.is_some() { + base.book_metadata = source.book_metadata; + } + if source.date_taken.is_some() { + base.date_taken = source.date_taken; + } + if source.latitude.is_some() { + base.latitude = source.latitude; + } + if source.longitude.is_some() { + base.longitude = source.longitude; + } + if source.camera_make.is_some() { + base.camera_make = source.camera_make; + } + if source.camera_model.is_some() { + base.camera_model = source.camera_model; + } + if source.rating.is_some() { + base.rating = source.rating; + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::*; + use crate::plugin::{PluginManager, PluginManagerConfig}; + + /// Create a `PluginPipeline` backed by an empty `PluginManager`. + fn create_test_pipeline() -> (TempDir, Arc) { + let temp_dir = TempDir::new().unwrap(); + let data_dir = temp_dir.path().join("data"); + let cache_dir = temp_dir.path().join("cache"); + + let config = PluginManagerConfig::default(); + let manager = + Arc::new(PluginManager::new(data_dir, cache_dir, config).unwrap()); + let pipeline = Arc::new(PluginPipeline::new( + manager, + PluginTimeoutConfig::default(), + 5, + )); + (temp_dir, pipeline) + } + + #[tokio::test] + async fn test_circuit_breaker_trips_after_max_failures() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Initially healthy + assert!(pipeline.is_healthy(plugin_id).await); + + // Record failures up to the threshold + for _ in 0..5 { + pipeline.record_failure(plugin_id).await; + } + + // Should be disabled now + assert!(!pipeline.is_healthy(plugin_id).await); + } + + #[tokio::test] + async fn test_circuit_breaker_resets_on_success() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Record some failures but not enough to trip + for _ in 0..4 { + pipeline.record_failure(plugin_id).await; + } + assert!(pipeline.is_healthy(plugin_id).await); + + // One success resets the counter + pipeline.record_success(plugin_id).await; + assert!(pipeline.is_healthy(plugin_id).await); + + // Need another full run of failures to trip + for _ in 0..4 { + pipeline.record_failure(plugin_id).await; + } + assert!(pipeline.is_healthy(plugin_id).await); + } + + #[tokio::test] + async fn test_circuit_breaker_reenabled_after_success() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Trip the circuit breaker + for _ in 0..5 { + pipeline.record_failure(plugin_id).await; + } + assert!(!pipeline.is_healthy(plugin_id).await); + + // Success re-enables it + pipeline.record_success(plugin_id).await; + assert!(pipeline.is_healthy(plugin_id).await); + } + + #[tokio::test] + async fn test_circuit_breaker_half_open_after_cooldown() { + let (_dir, pipeline) = create_test_pipeline(); + let plugin_id = "test-plugin"; + + // Trip the circuit breaker + for _ in 0..5 { + pipeline.record_failure(plugin_id).await; + } + assert!(!pipeline.is_healthy(plugin_id).await); + + // Simulate cooldown elapsed by backdating last_failure + { + let mut health_map = pipeline.health.write().await; + let entry = health_map.get_mut(plugin_id).unwrap(); + entry.last_failure = Some( + Instant::now() + .checked_sub(CIRCUIT_BREAKER_COOLDOWN) + .unwrap() + - Duration::from_secs(1), + ); + } + + // Half-open: should be healthy for a trial call + assert!(pipeline.is_healthy(plugin_id).await); + + // If the trial fails, breaker re-trips with fresh cooldown + pipeline.record_failure(plugin_id).await; + assert!(!pipeline.is_healthy(plugin_id).await); + + // If we backdate again and the trial succeeds, fully recovered + { + let mut health_map = pipeline.health.write().await; + let entry = health_map.get_mut(plugin_id).unwrap(); + entry.last_failure = Some( + Instant::now() + .checked_sub(CIRCUIT_BREAKER_COOLDOWN) + .unwrap() + - Duration::from_secs(1), + ); + } + assert!(pipeline.is_healthy(plugin_id).await); + pipeline.record_success(plugin_id).await; + assert!(pipeline.is_healthy(plugin_id).await); + + // Verify fully reset: failure counter starts fresh + let health_map = pipeline.health.read().await; + let entry = health_map.get(plugin_id).unwrap(); + assert_eq!(entry.consecutive_failures, 0); + assert!(!entry.disabled_by_circuit_breaker); + assert!(entry.last_failure.is_none()); + } + + #[tokio::test] + async fn test_empty_pipeline_resolve_media_type() { + let (_dir, pipeline) = create_test_pipeline(); + + // With no plugins, falls back to built-in + let result = pipeline + .resolve_media_type(Path::new("/tmp/test.mp3")) + .await; + assert!(result.is_some()); + assert_eq!(result.unwrap().id(), "mp3"); + } + + #[tokio::test] + async fn test_empty_pipeline_resolve_unknown_type() { + let (_dir, pipeline) = create_test_pipeline(); + + let result = pipeline + .resolve_media_type(Path::new("/tmp/test.xyzunknown")) + .await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_empty_pipeline_extract_metadata() { + let (_dir, pipeline) = create_test_pipeline(); + + // Built-in extractor handles this (may return default for non-existent + // file, but should not panic) + let mt = MediaType::from_path(Path::new("/tmp/fake.txt")); + if let Some(mt) = mt { + let result = pipeline + .extract_metadata(Path::new("/tmp/fake.txt"), &mt) + .await; + + // The built-in extractor may succeed with defaults or fail + // gracefully; either is fine for this test. + let _ = result; + } + } + + #[tokio::test] + async fn test_empty_pipeline_discover_capabilities() { + let (_dir, pipeline) = create_test_pipeline(); + + let result = pipeline.discover_capabilities().await; + assert!(result.is_ok()); + + // Capabilities should be empty + let caps = pipeline.capabilities.read().await; + assert!(caps.supported_types.is_empty()); + assert!(caps.interested_events.is_empty()); + assert!(caps.media_type_definitions.is_empty()); + assert!(caps.theme_definitions.is_empty()); + } + + #[test] + fn test_merge_metadata_overwrites_some_fields() { + let mut base = ExtractedMetadata::default(); + base.title = Some("Original".to_string()); + base.artist = Some("Original Artist".to_string()); + + let resp = ExtractMetadataResponse { + title: Some("New Title".to_string()), + artist: None, // should not overwrite + album: Some("New Album".to_string()), + genre: None, + year: Some(2024), + duration_secs: None, + description: None, + extra: HashMap::new(), + }; + + merge_metadata(&mut base, &resp); + + assert_eq!(base.title.as_deref(), Some("New Title")); + assert_eq!(base.artist.as_deref(), Some("Original Artist")); + assert_eq!(base.album.as_deref(), Some("New Album")); + assert_eq!(base.year, Some(2024)); + } + + #[test] + fn test_merge_metadata_extra_fields() { + let mut base = ExtractedMetadata::default(); + base.extra.insert("key1".to_string(), "val1".to_string()); + + let mut extra = HashMap::new(); + extra.insert("key2".to_string(), "val2".to_string()); + extra.insert("key1".to_string(), "overwritten".to_string()); + + let resp = ExtractMetadataResponse { + extra, + ..Default::default() + }; + + merge_metadata(&mut base, &resp); + + assert_eq!( + base.extra.get("key1").map(String::as_str), + Some("overwritten") + ); + assert_eq!(base.extra.get("key2").map(String::as_str), Some("val2")); + } + + #[test] + fn test_merge_extracted_preserves_photo_fields() { + let mut base = ExtractedMetadata::default(); + let source = ExtractedMetadata { + latitude: Some(48.8566), + longitude: Some(2.3522), + camera_make: Some("Canon".to_string()), + camera_model: Some("EOS R5".to_string()), + ..Default::default() + }; + + merge_extracted(&mut base, source); + + assert_eq!(base.latitude, Some(48.8566)); + assert_eq!(base.longitude, Some(2.3522)); + assert_eq!(base.camera_make.as_deref(), Some("Canon")); + assert_eq!(base.camera_model.as_deref(), Some("EOS R5")); + } + + #[tokio::test] + async fn test_unknown_plugin_is_healthy() { + let (_dir, pipeline) = create_test_pipeline(); + + // A never-seen plugin is considered healthy + assert!(pipeline.is_healthy("never-registered").await); + } + + #[tokio::test] + async fn test_emit_event_with_empty_pipeline() { + let (_dir, pipeline) = create_test_pipeline(); + + // Discover capabilities first (empty, no plugins) + pipeline.discover_capabilities().await.unwrap(); + + // Emit an event with no interested handlers; should complete + // without panic. The spawned task returns immediately. + pipeline.emit_event( + "MediaImported", + &serde_json::json!({"media_id": "test-123"}), + ); + + // Give the spawned task time to run + tokio::time::sleep(Duration::from_millis(50)).await; + } +} diff --git a/crates/pinakes-core/src/plugin/registry.rs b/crates/pinakes-core/src/plugin/registry.rs index 1439280..76ef7ad 100644 --- a/crates/pinakes-core/src/plugin/registry.rs +++ b/crates/pinakes-core/src/plugin/registry.rs @@ -178,8 +178,9 @@ mod tests { entrypoint: None, }, dependencies: vec![], + priority: 0, }, - capabilities: Default::default(), + capabilities: ManifestCapabilities::default(), config: HashMap::new(), }; diff --git a/crates/pinakes-core/src/plugin/rpc.rs b/crates/pinakes-core/src/plugin/rpc.rs new file mode 100644 index 0000000..40d4d13 --- /dev/null +++ b/crates/pinakes-core/src/plugin/rpc.rs @@ -0,0 +1,239 @@ +//! JSON RPC types for structured plugin function calls. +//! +//! Each extension point maps to well-known exported function names. +//! Requests are serialized to JSON, passed to the plugin, and responses +//! are deserialized from JSON written by the plugin via `host_set_result`. + +use std::{collections::HashMap, path::PathBuf}; + +use serde::{Deserialize, Serialize}; + +/// Request to check if a plugin can handle a file +#[derive(Debug, Serialize)] +pub struct CanHandleRequest { + pub path: PathBuf, + pub mime_type: Option, +} + +/// Response from `can_handle` +#[derive(Debug, Deserialize)] +pub struct CanHandleResponse { + pub can_handle: bool, +} + +/// Media type definition returned by `supported_media_types` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PluginMediaTypeDefinition { + pub id: String, + pub name: String, + pub category: Option, + pub extensions: Vec, + pub mime_types: Vec, +} + +/// Request to extract metadata from a file +#[derive(Debug, Serialize)] +pub struct ExtractMetadataRequest { + pub path: PathBuf, +} + +/// Metadata response from a plugin (all fields optional for partial results) +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ExtractMetadataResponse { + #[serde(default)] + pub title: Option, + #[serde(default)] + pub artist: Option, + #[serde(default)] + pub album: Option, + #[serde(default)] + pub genre: Option, + #[serde(default)] + pub year: Option, + #[serde(default)] + pub duration_secs: Option, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub extra: HashMap, +} + +/// Request to generate a thumbnail +#[derive(Debug, Serialize)] +pub struct GenerateThumbnailRequest { + pub source_path: PathBuf, + pub output_path: PathBuf, + pub max_width: u32, + pub max_height: u32, + pub format: String, +} + +/// Response from thumbnail generation +#[derive(Debug, Deserialize)] +pub struct GenerateThumbnailResponse { + pub path: PathBuf, + pub width: u32, + pub height: u32, + pub format: String, +} + +/// Event sent to event handler plugins +#[derive(Debug, Serialize)] +pub struct HandleEventRequest { + pub event_type: String, + pub payload: serde_json::Value, +} + +/// Search request for search backend plugins +#[derive(Debug, Serialize)] +pub struct SearchRequest { + pub query: String, + pub limit: usize, + pub offset: usize, +} + +/// Search response +#[derive(Debug, Clone, Deserialize)] +pub struct SearchResponse { + pub results: Vec, + #[serde(default)] + pub total_count: Option, +} + +/// Individual search result +#[derive(Debug, Clone, Deserialize)] +pub struct SearchResultItem { + pub id: String, + pub score: f64, + pub snippet: Option, +} + +/// Request to index a media item in a search backend +#[derive(Debug, Serialize)] +pub struct IndexItemRequest { + pub id: String, + pub title: Option, + pub artist: Option, + pub album: Option, + pub description: Option, + pub tags: Vec, + pub media_type: String, + pub path: PathBuf, +} + +/// Request to remove a media item from a search backend +#[derive(Debug, Serialize)] +pub struct RemoveItemRequest { + pub id: String, +} + +/// A theme definition returned by a theme provider plugin +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PluginThemeDefinition { + pub id: String, + pub name: String, + pub description: Option, + pub dark: bool, +} + +/// Response from `load_theme` +#[derive(Debug, Clone, Deserialize)] +pub struct LoadThemeResponse { + pub css: Option, + pub colors: HashMap, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_metadata_request_serialization() { + let req = ExtractMetadataRequest { + path: "/tmp/test.mp3".into(), + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains("/tmp/test.mp3")); + } + + #[test] + fn test_extract_metadata_response_partial() { + let json = r#"{"title":"My Song","extra":{"bpm":"120"}}"#; + let resp: ExtractMetadataResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.title.as_deref(), Some("My Song")); + assert_eq!(resp.artist, None); + assert_eq!(resp.extra.get("bpm").map(String::as_str), Some("120")); + } + + #[test] + fn test_extract_metadata_response_empty() { + let json = "{}"; + let resp: ExtractMetadataResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.title, None); + assert!(resp.extra.is_empty()); + } + + #[test] + fn test_can_handle_response() { + let json = r#"{"can_handle":true}"#; + let resp: CanHandleResponse = serde_json::from_str(json).unwrap(); + assert!(resp.can_handle); + } + + #[test] + fn test_can_handle_response_false() { + let json = r#"{"can_handle":false}"#; + let resp: CanHandleResponse = serde_json::from_str(json).unwrap(); + assert!(!resp.can_handle); + } + + #[test] + fn test_plugin_media_type_definition_round_trip() { + let def = PluginMediaTypeDefinition { + id: "heif".to_string(), + name: "HEIF Image".to_string(), + category: Some("image".to_string()), + extensions: vec!["heif".to_string(), "heic".to_string()], + mime_types: vec!["image/heif".to_string()], + }; + let json = serde_json::to_string(&def).unwrap(); + let parsed: PluginMediaTypeDefinition = + serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.id, "heif"); + assert_eq!(parsed.extensions.len(), 2); + } + + #[test] + fn test_search_response() { + let json = + r#"{"results":[{"id":"abc","score":0.95,"snippet":"match here"}]}"#; + let resp: SearchResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.results.len(), 1); + assert_eq!(resp.results[0].id, "abc"); + } + + #[test] + fn test_generate_thumbnail_request_serialization() { + let req = GenerateThumbnailRequest { + source_path: "/media/photo.heif".into(), + output_path: "/tmp/thumb.jpg".into(), + max_width: 256, + max_height: 256, + format: "jpeg".to_string(), + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains("photo.heif")); + assert!(json.contains("256")); + } + + #[test] + fn test_handle_event_request_serialization() { + let req = HandleEventRequest { + event_type: "MediaImported".to_string(), + payload: serde_json::json!({"id": "abc-123"}), + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains("MediaImported")); + assert!(json.contains("abc-123")); + } +} diff --git a/crates/pinakes-core/src/plugin/runtime.rs b/crates/pinakes-core/src/plugin/runtime.rs index c47f12e..8ad22e3 100644 --- a/crates/pinakes-core/src/plugin/runtime.rs +++ b/crates/pinakes-core/src/plugin/runtime.rs @@ -4,7 +4,17 @@ use std::{path::Path, sync::Arc}; use anyhow::{Result, anyhow}; use pinakes_plugin_api::PluginContext; -use wasmtime::{Caller, Config, Engine, Linker, Module, Store, Val, anyhow}; +use wasmtime::{ + Caller, + Config, + Engine, + Linker, + Module, + Store, + StoreLimitsBuilder, + Val, + anyhow, +}; /// WASM runtime wrapper for executing plugins pub struct WasmRuntime { @@ -58,6 +68,8 @@ impl WasmRuntime { pub struct PluginStoreData { pub context: PluginContext, pub exchange_buffer: Vec, + pub pending_events: Vec<(String, String)>, + pub limiter: wasmtime::StoreLimits, } /// A loaded WASM plugin instance @@ -90,11 +102,23 @@ impl WasmPlugin { ) -> Result> { let engine = self.module.engine(); + // Build memory limiter from capabilities + let memory_limit = self + .context + .capabilities + .max_memory_bytes + .unwrap_or(512 * 1024 * 1024); // default 512 MB + + let limiter = StoreLimitsBuilder::new().memory_size(memory_limit).build(); + let store_data = PluginStoreData { - context: self.context.clone(), + context: self.context.clone(), exchange_buffer: Vec::new(), + pending_events: Vec::new(), + limiter, }; let mut store = Store::new(engine, store_data); + store.limiter(|data| &mut data.limiter); // Set fuel limit based on capabilities if let Some(max_cpu_time_ms) = self.context.capabilities.max_cpu_time_ms { @@ -194,6 +218,47 @@ impl WasmPlugin { Ok(Vec::new()) } } + + /// Call a plugin function with JSON request/response serialization. + /// + /// Serializes `request` to JSON, calls the named function, deserializes + /// the response. Wraps the call with `tokio::time::timeout`. + /// + /// # Errors + /// + /// Returns an error if serialization fails, the call times out, the plugin + /// traps, or the response is malformed JSON. + #[allow(clippy::future_not_send)] // Req doesn't need Sync; called within local tasks + pub async fn call_function_json( + &self, + function_name: &str, + request: &Req, + timeout: std::time::Duration, + ) -> anyhow::Result + where + Req: serde::Serialize, + Resp: serde::de::DeserializeOwned, + { + let request_bytes = serde_json::to_vec(request) + .map_err(|e| anyhow::anyhow!("failed to serialize request: {e}"))?; + + let result = tokio::time::timeout( + timeout, + self.call_function(function_name, &request_bytes), + ) + .await + .map_err(|_| { + anyhow::anyhow!( + "plugin call '{function_name}' timed out after {timeout:?}" + ) + })??; + + serde_json::from_slice(&result).map_err(|e| { + anyhow::anyhow!( + "failed to deserialize response from '{function_name}': {e}" + ) + }) + } } #[cfg(test)] @@ -220,7 +285,8 @@ pub struct HostFunctions; impl HostFunctions { /// Registers all host ABI functions (`host_log`, `host_read_file`, /// `host_write_file`, `host_http_request`, `host_get_config`, - /// `host_get_buffer`) into the given linker. + /// `host_get_env`, `host_get_buffer`, `host_set_result`, + /// `host_emit_event`) into the given linker. /// /// # Errors /// @@ -423,6 +489,29 @@ impl HostFunctions { return -2; } + // Check domain whitelist if configured + if let Some(ref allowed) = + caller.data().context.capabilities.network.allowed_domains + { + let parsed = match url::Url::parse(&url_str) { + Ok(u) => u, + _ => { + tracing::warn!(url = %url_str, "plugin provided invalid URL"); + return -1; + }, + }; + let domain = parsed.host_str().unwrap_or(""); + + if !allowed.iter().any(|d| d.eq_ignore_ascii_case(domain)) { + tracing::warn!( + url = %url_str, + domain = domain, + "plugin domain not in allowlist" + ); + return -3; + } + } + // Use block_in_place to avoid blocking the async runtime's thread pool. // Falls back to a blocking client with timeout if block_in_place is // unavailable. @@ -513,6 +602,66 @@ impl HostFunctions { }, )?; + linker.func_wrap( + "env", + "host_get_env", + |mut caller: Caller<'_, PluginStoreData>, + key_ptr: i32, + key_len: i32| + -> i32 { + if key_ptr < 0 || key_len < 0 { + return -1; + } + let memory = caller + .get_export("memory") + .and_then(wasmtime::Extern::into_memory); + let Some(mem) = memory else { return -1 }; + + let data = mem.data(&caller); + let start = u32::try_from(key_ptr).unwrap_or(0) as usize; + let end = start + u32::try_from(key_len).unwrap_or(0) as usize; + if end > data.len() { + return -1; + } + + let key_str = match std::str::from_utf8(&data[start..end]) { + Ok(s) => s.to_string(), + Err(_) => return -1, + }; + + // Check environment capability + let env_cap = &caller.data().context.capabilities.environment; + if !env_cap.enabled { + tracing::warn!( + var = %key_str, + "plugin environment access denied" + ); + return -2; + } + + // Check against allowed variables list if configured + if let Some(ref allowed) = env_cap.allowed_vars + && !allowed.iter().any(|v| v == &key_str) + { + tracing::warn!( + var = %key_str, + "plugin env var not in allowlist" + ); + return -2; + } + + match std::env::var(&key_str) { + Ok(value) => { + let bytes = value.into_bytes(); + let len = i32::try_from(bytes.len()).unwrap_or(i32::MAX); + caller.data_mut().exchange_buffer = bytes; + len + }, + Err(_) => -1, + } + }, + )?; + linker.func_wrap( "env", "host_get_buffer", @@ -543,6 +692,83 @@ impl HostFunctions { }, )?; + linker.func_wrap( + "env", + "host_set_result", + |mut caller: Caller<'_, PluginStoreData>, ptr: i32, len: i32| { + if ptr < 0 || len < 0 { + return; + } + let memory = caller + .get_export("memory") + .and_then(wasmtime::Extern::into_memory); + let Some(mem) = memory else { return }; + + let data = mem.data(&caller); + let start = u32::try_from(ptr).unwrap_or(0) as usize; + let end = start + u32::try_from(len).unwrap_or(0) as usize; + if end <= data.len() { + caller.data_mut().exchange_buffer = data[start..end].to_vec(); + } + }, + )?; + + linker.func_wrap( + "env", + "host_emit_event", + |mut caller: Caller<'_, PluginStoreData>, + type_ptr: i32, + type_len: i32, + payload_ptr: i32, + payload_len: i32| + -> i32 { + const MAX_PENDING_EVENTS: usize = 1000; + + if type_ptr < 0 || type_len < 0 || payload_ptr < 0 || payload_len < 0 { + return -1; + } + let memory = caller + .get_export("memory") + .and_then(wasmtime::Extern::into_memory); + let Some(mem) = memory else { return -1 }; + + let type_start = u32::try_from(type_ptr).unwrap_or(0) as usize; + let type_end = + type_start + u32::try_from(type_len).unwrap_or(0) as usize; + let payload_start = u32::try_from(payload_ptr).unwrap_or(0) as usize; + let payload_end = + payload_start + u32::try_from(payload_len).unwrap_or(0) as usize; + + // Extract owned strings in a block so the immutable borrow of + // `caller` (via `mem.data`) is dropped before `caller.data_mut()`. + let (event_type, payload) = { + let data = mem.data(&caller); + if type_end > data.len() || payload_end > data.len() { + return -1; + } + let event_type = + match std::str::from_utf8(&data[type_start..type_end]) { + Ok(s) => s.to_string(), + Err(_) => return -1, + }; + let payload = + match std::str::from_utf8(&data[payload_start..payload_end]) { + Ok(s) => s.to_string(), + Err(_) => return -1, + }; + (event_type, payload) + }; + + if caller.data().pending_events.len() >= MAX_PENDING_EVENTS { + tracing::warn!("plugin exceeded max pending events limit"); + return -4; + } + + caller.data_mut().pending_events.push((event_type, payload)); + 0 + }, + )?; + Ok(()) } } diff --git a/crates/pinakes-core/src/plugin/security.rs b/crates/pinakes-core/src/plugin/security.rs index 5f887fb..6bebb94 100644 --- a/crates/pinakes-core/src/plugin/security.rs +++ b/crates/pinakes-core/src/plugin/security.rs @@ -235,6 +235,54 @@ impl CapabilityEnforcer { .unwrap_or(self.max_cpu_time_limit) .min(self.max_cpu_time_limit) } + + /// Validate that a function call is allowed for a plugin's declared kinds. + /// + /// Defense-in-depth: even though the pipeline filters by kind, this prevents + /// bugs from calling wrong functions on plugins. Returns `true` if allowed. + #[must_use] + pub fn validate_function_call( + &self, + plugin_kinds: &[String], + function_name: &str, + ) -> bool { + match function_name { + // Lifecycle functions are always allowed + "initialize" | "shutdown" | "health_check" => true, + // MediaTypeProvider + "supported_media_types" | "can_handle" => { + plugin_kinds.iter().any(|k| k == "media_type") + }, + // supported_types is shared by metadata_extractor and thumbnail_generator + "supported_types" => { + plugin_kinds + .iter() + .any(|k| k == "metadata_extractor" || k == "thumbnail_generator") + }, + // MetadataExtractor + "extract_metadata" => { + plugin_kinds.iter().any(|k| k == "metadata_extractor") + }, + // ThumbnailGenerator + "generate_thumbnail" => { + plugin_kinds.iter().any(|k| k == "thumbnail_generator") + }, + // SearchBackend + "search" | "index_item" | "remove_item" | "get_stats" => { + plugin_kinds.iter().any(|k| k == "search_backend") + }, + // EventHandler + "interested_events" | "handle_event" => { + plugin_kinds.iter().any(|k| k == "event_handler") + }, + // ThemeProvider + "get_themes" | "load_theme" => { + plugin_kinds.iter().any(|k| k == "theme_provider") + }, + // Unknown function names are not allowed + _ => false, + } + } } impl Default for CapabilityEnforcer { @@ -356,20 +404,70 @@ mod tests { let mut caps = Capabilities::default(); - // No limits specified - use defaults + // No limits specified, use the defaults assert_eq!(enforcer.get_memory_limit(&caps), 100 * 1024 * 1024); assert_eq!(enforcer.get_cpu_time_limit(&caps), 30_000); - // Plugin requests lower limits - use plugin's + // Plugin requests lower limits, use plugin's caps.max_memory_bytes = Some(50 * 1024 * 1024); caps.max_cpu_time_ms = Some(10_000); assert_eq!(enforcer.get_memory_limit(&caps), 50 * 1024 * 1024); assert_eq!(enforcer.get_cpu_time_limit(&caps), 10_000); - // Plugin requests higher limits - cap at system max + // Plugin requests higher limits, cap at system max caps.max_memory_bytes = Some(200 * 1024 * 1024); caps.max_cpu_time_ms = Some(60_000); assert_eq!(enforcer.get_memory_limit(&caps), 100 * 1024 * 1024); assert_eq!(enforcer.get_cpu_time_limit(&caps), 30_000); } + + #[test] + fn test_validate_function_call_lifecycle_always_allowed() { + let enforcer = CapabilityEnforcer::new(); + let kinds = vec!["metadata_extractor".to_string()]; + assert!(enforcer.validate_function_call(&kinds, "initialize")); + assert!(enforcer.validate_function_call(&kinds, "shutdown")); + assert!(enforcer.validate_function_call(&kinds, "health_check")); + } + + #[test] + fn test_validate_function_call_metadata_extractor() { + let enforcer = CapabilityEnforcer::new(); + let kinds = vec!["metadata_extractor".to_string()]; + assert!(enforcer.validate_function_call(&kinds, "extract_metadata")); + assert!(enforcer.validate_function_call(&kinds, "supported_types")); + assert!(!enforcer.validate_function_call(&kinds, "search")); + assert!(!enforcer.validate_function_call(&kinds, "generate_thumbnail")); + assert!(!enforcer.validate_function_call(&kinds, "can_handle")); + } + + #[test] + fn test_validate_function_call_multi_kind() { + let enforcer = CapabilityEnforcer::new(); + let kinds = + vec!["media_type".to_string(), "metadata_extractor".to_string()]; + assert!(enforcer.validate_function_call(&kinds, "can_handle")); + assert!(enforcer.validate_function_call(&kinds, "supported_media_types")); + assert!(enforcer.validate_function_call(&kinds, "extract_metadata")); + assert!(!enforcer.validate_function_call(&kinds, "search")); + } + + #[test] + fn test_validate_function_call_unknown_function() { + let enforcer = CapabilityEnforcer::new(); + let kinds = vec!["metadata_extractor".to_string()]; + assert!(!enforcer.validate_function_call(&kinds, "unknown_func")); + assert!(!enforcer.validate_function_call(&kinds, "")); + } + + #[test] + fn test_validate_function_call_shared_supported_types() { + let enforcer = CapabilityEnforcer::new(); + let extractor = vec!["metadata_extractor".to_string()]; + let generator = vec!["thumbnail_generator".to_string()]; + let search = vec!["search_backend".to_string()]; + assert!(enforcer.validate_function_call(&extractor, "supported_types")); + assert!(enforcer.validate_function_call(&generator, "supported_types")); + assert!(!enforcer.validate_function_call(&search, "supported_types")); + } } diff --git a/crates/pinakes-core/src/plugin/signature.rs b/crates/pinakes-core/src/plugin/signature.rs new file mode 100644 index 0000000..64f9dc5 --- /dev/null +++ b/crates/pinakes-core/src/plugin/signature.rs @@ -0,0 +1,252 @@ +//! Plugin signature verification using Ed25519 + BLAKE3 +//! +//! Each plugin directory may contain a `plugin.sig` file alongside its +//! `plugin.toml`. The signature covers the BLAKE3 hash of the WASM binary +//! referenced by the manifest. Verification uses Ed25519 public keys +//! configured as trusted in the server's plugin settings. +//! +//! When `allow_unsigned` is false, plugins _must_ carry a valid signature +//! from one of the trusted keys or they will be rejected at load time. + +use std::path::Path; + +use anyhow::{Result, anyhow}; +use ed25519_dalek::{Signature, Verifier, VerifyingKey}; + +/// Outcome of a signature check on a plugin package. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SignatureStatus { + /// Signature is present and valid against a trusted key. + Valid, + /// No signature file found. + Unsigned, + /// Signature file exists but does not match any trusted key. + Invalid(String), +} + +/// Verify the signature of a plugin's WASM binary. +/// +/// Reads `plugin.sig` from `plugin_dir`, computes the BLAKE3 hash of the +/// WASM binary at `wasm_path`, and verifies the signature against each of +/// the `trusted_keys`. The signature file is raw 64-byte Ed25519 signature +/// over the 32-byte BLAKE3 digest. +/// +/// # Errors +/// +/// Returns an error only on I/O failures, never for cryptographic rejection, +/// which is reported via [`SignatureStatus`] instead. +pub fn verify_plugin_signature( + plugin_dir: &Path, + wasm_path: &Path, + trusted_keys: &[VerifyingKey], +) -> Result { + let sig_path = plugin_dir.join("plugin.sig"); + if !sig_path.exists() { + return Ok(SignatureStatus::Unsigned); + } + + let sig_bytes = std::fs::read(&sig_path) + .map_err(|e| anyhow!("failed to read plugin.sig: {e}"))?; + + let signature = Signature::from_slice(&sig_bytes).map_err(|e| { + // Malformed signature file is an invalid signature, not an I/O error + tracing::warn!(path = %sig_path.display(), "malformed plugin.sig: {e}"); + anyhow!("malformed plugin.sig: {e}") + }); + let Ok(signature) = signature else { + return Ok(SignatureStatus::Invalid( + "malformed signature file".to_string(), + )); + }; + + // BLAKE3 hash of the WASM binary is the signed message + let wasm_bytes = std::fs::read(wasm_path) + .map_err(|e| anyhow!("failed to read WASM binary for verification: {e}"))?; + let digest = blake3::hash(&wasm_bytes); + let message = digest.as_bytes(); + + for key in trusted_keys { + if key.verify(message, &signature).is_ok() { + return Ok(SignatureStatus::Valid); + } + } + + Ok(SignatureStatus::Invalid( + "signature did not match any trusted key".to_string(), + )) +} + +/// Parse a hex-encoded Ed25519 public key (64 hex characters = 32 bytes). +/// +/// # Errors +/// +/// Returns an error if the string is not valid hex or is the wrong length. +pub fn parse_public_key(hex_str: &str) -> Result { + let hex_str = hex_str.trim(); + if hex_str.len() != 64 { + return Err(anyhow!( + "expected 64 hex characters for Ed25519 public key, got {}", + hex_str.len() + )); + } + + let mut bytes = [0u8; 32]; + for (i, byte) in bytes.iter_mut().enumerate() { + *byte = u8::from_str_radix(&hex_str[i * 2..i * 2 + 2], 16) + .map_err(|e| anyhow!("invalid hex in public key: {e}"))?; + } + + VerifyingKey::from_bytes(&bytes) + .map_err(|e| anyhow!("invalid Ed25519 public key: {e}")) +} + +#[cfg(test)] +mod tests { + use ed25519_dalek::{Signer, SigningKey}; + use rand::RngExt; + + use super::*; + + fn make_keypair() -> (SigningKey, VerifyingKey) { + let secret_bytes: [u8; 32] = rand::rng().random(); + let signing = SigningKey::from_bytes(&secret_bytes); + let verifying = signing.verifying_key(); + (signing, verifying) + } + + #[test] + fn test_verify_unsigned_plugin() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + std::fs::write(&wasm_path, b"\0asm\x01\x00\x00\x00").unwrap(); + + let (_, vk) = make_keypair(); + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert_eq!(status, SignatureStatus::Unsigned); + } + + #[test] + fn test_verify_valid_signature() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let wasm_bytes = b"\0asm\x01\x00\x00\x00some_code_here"; + std::fs::write(&wasm_path, wasm_bytes).unwrap(); + + let (sk, vk) = make_keypair(); + + // Sign the BLAKE3 hash of the WASM binary + let digest = blake3::hash(wasm_bytes); + let signature = sk.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert_eq!(status, SignatureStatus::Valid); + } + + #[test] + fn test_verify_wrong_key() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let wasm_bytes = b"\0asm\x01\x00\x00\x00some_code"; + std::fs::write(&wasm_path, wasm_bytes).unwrap(); + + let (sk, _) = make_keypair(); + let (_, wrong_vk) = make_keypair(); + + let digest = blake3::hash(wasm_bytes); + let signature = sk.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[wrong_vk]).unwrap(); + assert!(matches!(status, SignatureStatus::Invalid(_))); + } + + #[test] + fn test_verify_tampered_wasm() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let original = b"\0asm\x01\x00\x00\x00original"; + std::fs::write(&wasm_path, original).unwrap(); + + let (sk, vk) = make_keypair(); + let digest = blake3::hash(original); + let signature = sk.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + // Tamper with the WASM file after signing + std::fs::write(&wasm_path, b"\0asm\x01\x00\x00\x00tampered").unwrap(); + + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert!(matches!(status, SignatureStatus::Invalid(_))); + } + + #[test] + fn test_verify_malformed_sig_file() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + std::fs::write(&wasm_path, b"\0asm\x01\x00\x00\x00").unwrap(); + + // Write garbage to plugin.sig (wrong length) + std::fs::write(dir.path().join("plugin.sig"), b"not a signature").unwrap(); + + let (_, vk) = make_keypair(); + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk]).unwrap(); + assert!(matches!(status, SignatureStatus::Invalid(_))); + } + + #[test] + fn test_verify_multiple_trusted_keys() { + let dir = tempfile::tempdir().unwrap(); + let wasm_path = dir.path().join("plugin.wasm"); + let wasm_bytes = b"\0asm\x01\x00\x00\x00multi_key_test"; + std::fs::write(&wasm_path, wasm_bytes).unwrap(); + + let (sk2, vk2) = make_keypair(); + let (_, vk1) = make_keypair(); + let (_, vk3) = make_keypair(); + + // Sign with key 2 + let digest = blake3::hash(wasm_bytes); + let signature = sk2.sign(digest.as_bytes()); + std::fs::write(dir.path().join("plugin.sig"), signature.to_bytes()) + .unwrap(); + + // Verify against [vk1, vk2, vk3]; should find vk2 + let status = + verify_plugin_signature(dir.path(), &wasm_path, &[vk1, vk2, vk3]) + .unwrap(); + assert_eq!(status, SignatureStatus::Valid); + } + + #[test] + fn test_parse_public_key_valid() { + let (_, vk) = make_keypair(); + let hex = hex_encode(vk.as_bytes()); + let parsed = parse_public_key(&hex).unwrap(); + assert_eq!(parsed, vk); + } + + #[test] + fn test_parse_public_key_wrong_length() { + assert!(parse_public_key("abcdef").is_err()); + } + + #[test] + fn test_parse_public_key_invalid_hex() { + let bad = + "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"; + assert!(parse_public_key(bad).is_err()); + } + + fn hex_encode(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{b:02x}")).collect() + } +} From 57f440d62e29d64894073bd7043480051f9f5c13 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 14:54:40 +0300 Subject: [PATCH 05/10] nix: add wasm32-wasip1 target Signed-off-by: NotAShelf Change-Id: I4bfc9e9749c17b6950b5489fa42d43c26a6a6964 --- nix/shell.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nix/shell.nix b/nix/shell.nix index 819f926..ec76d19 100644 --- a/nix/shell.nix +++ b/nix/shell.nix @@ -44,7 +44,7 @@ in # with the least amount of friction. (rust-bin.nightly.latest.default.override { extensions = ["rustfmt" "rust-src" "rust-analyzer" "clippy" "rust-analyzer"]; - targets = ["wasm32-unknown-unknown"]; # web + targets = ["wasm32-unknown-unknown" "wasm32-wasip1"]; # web + plugins }) # Handy CLI for packaging Dioxus apps and such From 61ebc6824caded2f2c4834ec42b25997ed9f4a52 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 14:56:08 +0300 Subject: [PATCH 06/10] meta: ignore compiled test fixtures in git Signed-off-by: NotAShelf Change-Id: Ic2f8776f5a55acc4281bfe6fcbe9d1116a6a6964 --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 79ac36b..28274bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Rust target/ +**/*.wasm # Nix .direnv/ From 7d3c2052c2c0775124638015a040ec3bf328872a Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 14:56:31 +0300 Subject: [PATCH 07/10] pinakes-core: add plugin integration tests and test fixtures Signed-off-by: NotAShelf Change-Id: If4372ea33b93306486170353f9edf4a76a6a6964 --- .../tests/fixtures/test-plugin/.gitignore | 2 + .../tests/fixtures/test-plugin/Cargo.toml | 14 ++ .../tests/fixtures/test-plugin/plugin.toml | 17 ++ .../tests/fixtures/test-plugin/src/lib.rs | 181 +++++++++++++++++ crates/pinakes-core/tests/integration.rs | 4 +- .../pinakes-core/tests/plugin_integration.rs | 188 ++++++++++++++++++ 6 files changed, 404 insertions(+), 2 deletions(-) create mode 100644 crates/pinakes-core/tests/fixtures/test-plugin/.gitignore create mode 100644 crates/pinakes-core/tests/fixtures/test-plugin/Cargo.toml create mode 100644 crates/pinakes-core/tests/fixtures/test-plugin/plugin.toml create mode 100644 crates/pinakes-core/tests/fixtures/test-plugin/src/lib.rs create mode 100644 crates/pinakes-core/tests/plugin_integration.rs diff --git a/crates/pinakes-core/tests/fixtures/test-plugin/.gitignore b/crates/pinakes-core/tests/fixtures/test-plugin/.gitignore new file mode 100644 index 0000000..ca98cd9 --- /dev/null +++ b/crates/pinakes-core/tests/fixtures/test-plugin/.gitignore @@ -0,0 +1,2 @@ +/target/ +Cargo.lock diff --git a/crates/pinakes-core/tests/fixtures/test-plugin/Cargo.toml b/crates/pinakes-core/tests/fixtures/test-plugin/Cargo.toml new file mode 100644 index 0000000..eeb8116 --- /dev/null +++ b/crates/pinakes-core/tests/fixtures/test-plugin/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "test-plugin" +version = "1.0.0" +edition = "2024" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +dlmalloc = { version = "0.2", features = ["global"] } + +[profile.release] +opt-level = "s" +lto = true diff --git a/crates/pinakes-core/tests/fixtures/test-plugin/plugin.toml b/crates/pinakes-core/tests/fixtures/test-plugin/plugin.toml new file mode 100644 index 0000000..a9c9e5f --- /dev/null +++ b/crates/pinakes-core/tests/fixtures/test-plugin/plugin.toml @@ -0,0 +1,17 @@ +[plugin] +name = "test-plugin" +version = "1.0.0" +api_version = "1.0" +description = "Test fixture plugin for integration tests" +kind = ["media_type", "metadata_extractor", "thumbnail_generator", "event_handler"] +priority = 50 + +[plugin.binary] +wasm = "test_plugin.wasm" + +[capabilities] +network = false + +[capabilities.filesystem] +read = ["/tmp"] +write = ["/tmp"] diff --git a/crates/pinakes-core/tests/fixtures/test-plugin/src/lib.rs b/crates/pinakes-core/tests/fixtures/test-plugin/src/lib.rs new file mode 100644 index 0000000..5ebfa3c --- /dev/null +++ b/crates/pinakes-core/tests/fixtures/test-plugin/src/lib.rs @@ -0,0 +1,181 @@ +//! Test fixture plugin for integration tests. +//! +//! Exercises all extension points: MediaTypeProvider, MetadataExtractor, +//! ThumbnailGenerator, and EventHandler. +//! +//! To build in the project directory, you might need to set `RUSTFLAGS=""` or +//! explicitly specify a non-Clang linking pipeline. Below command is the +//! easiest way to build this: +//! +//! ``` +//! $ RUSTFLAGS="" cargo build --target wasm32-unknown-unknown --release` +//! ``` + +#![no_std] + +extern crate alloc; + +use alloc::{format, vec::Vec}; +use core::alloc::Layout; + +// Global allocator for no_std WASM +#[global_allocator] +static ALLOC: dlmalloc::GlobalDlmalloc = dlmalloc::GlobalDlmalloc; + +#[panic_handler] +fn panic_handler(_info: &core::panic::PanicInfo) -> ! { + core::arch::wasm32::unreachable() +} + +// Host functions provided by the runtime +unsafe extern "C" { + fn host_set_result(ptr: i32, len: i32); + fn host_log(level: i32, ptr: i32, len: i32); +} + +/// Write a JSON response back to the host via host_set_result. +fn set_response(json: &[u8]) { + unsafe { + host_set_result(json.as_ptr() as i32, json.len() as i32); + } +} + +/// Log at info level. +fn log_info(msg: &str) { + unsafe { + host_log(2, msg.as_ptr() as i32, msg.len() as i32); + } +} + +/// Read the JSON request bytes from memory at (ptr, len). +unsafe fn read_request(ptr: i32, len: i32) -> Vec { + if ptr < 0 || len <= 0 { + return Vec::new(); + } + let slice = + unsafe { core::slice::from_raw_parts(ptr as *const u8, len as usize) }; + slice.to_vec() +} + +/// JSON string value extraction, without Serde. +/// Finds `"key": "value"` and returns `value`. +/// FIXME: this is a hack, I need to look into serde without std... +fn json_get_str<'a>(json: &'a [u8], key: &str) -> Option<&'a str> { + let json_str = core::str::from_utf8(json).ok()?; + let pattern = format!("\"{}\"", key); + let key_pos = json_str.find(&pattern)?; + let after_key = &json_str[key_pos + pattern.len()..]; + // Skip `: ` or `:` + let after_colon = after_key.trim_start().strip_prefix(':')?; + let after_colon = after_colon.trim_start(); + + if after_colon.starts_with('"') { + let value_start = 1; + let value_end = after_colon[value_start..].find('"')?; + Some(&after_colon[value_start..value_start + value_end]) + } else { + None + } +} + +/// Allocate memory for the host to write request data into. +#[unsafe(no_mangle)] +pub extern "C" fn alloc(size: i32) -> i32 { + if size <= 0 { + return 0; + } + unsafe { + let layout = Layout::from_size_align(size as usize, 1).unwrap(); + let ptr = alloc::alloc::alloc(layout); + if ptr.is_null() { + return -1; + } + ptr as i32 + } +} + +// Lifecycle +#[unsafe(no_mangle)] +pub extern "C" fn initialize() -> i32 { + log_info("test-plugin initialized"); + 0 +} + +#[unsafe(no_mangle)] +pub extern "C" fn shutdown() -> i32 { + log_info("test-plugin shutdown"); + 0 +} + +/// Returns custom media type definitions: a `.testfile` type. +#[unsafe(no_mangle)] +pub extern "C" fn supported_media_types(_ptr: i32, _len: i32) { + let response = br#"[{"id":"testfile","name":"Test File","category":"document","extensions":["testfile"],"mime_types":["application/x-testfile"]}]"#; + set_response(response); +} + +/// Check if this plugin can handle a given file path. +#[unsafe(no_mangle)] +pub extern "C" fn can_handle(ptr: i32, len: i32) { + let req = unsafe { read_request(ptr, len) }; + + let path = json_get_str(&req, "path").unwrap_or(""); + let can = path.ends_with(".testfile"); + if can { + set_response(br#"{"can_handle":true}"#); + } else { + set_response(br#"{"can_handle":false}"#); + } +} + +/// Returns the list of media types this extractor supports. +#[unsafe(no_mangle)] +pub extern "C" fn supported_types(_ptr: i32, _len: i32) { + set_response(br#"["testfile"]"#); +} + +/// Extract metadata from a file. +#[unsafe(no_mangle)] +pub extern "C" fn extract_metadata(ptr: i32, len: i32) { + let req = unsafe { read_request(ptr, len) }; + let path = json_get_str(&req, "path").unwrap_or("unknown"); + + // Extract filename from path + let filename = path.rsplit('/').next().unwrap_or(path); + + let response = format!( + r#"{{"title":"{}","artist":"test-plugin","description":"Extracted by test-plugin","extra":{{"plugin_source":"test-plugin"}}}}"#, + filename, + ); + set_response(response.as_bytes()); +} + +/// Generate a thumbnail (returns a fixed path for testing). +#[unsafe(no_mangle)] +pub extern "C" fn generate_thumbnail(ptr: i32, len: i32) { + let req = unsafe { read_request(ptr, len) }; + let output = + json_get_str(&req, "output_path").unwrap_or("/tmp/test-thumb.jpg"); + + let response = format!( + r#"{{"path":"{}","width":320,"height":320,"format":"jpeg"}}"#, + output + ); + set_response(response.as_bytes()); +} + +/// Returns the event types this handler is interested in. +#[unsafe(no_mangle)] +pub extern "C" fn interested_events(_ptr: i32, _len: i32) { + set_response(br#"["MediaImported","MediaUpdated","MediaDeleted"]"#); +} + +/// Handle an event. +#[unsafe(no_mangle)] +pub extern "C" fn handle_event(ptr: i32, len: i32) { + let req = unsafe { read_request(ptr, len) }; + let event_type = json_get_str(&req, "event_type").unwrap_or("unknown"); + let msg = format!("test-plugin handled event: {}", event_type); + log_info(&msg); + set_response(b"{}"); +} diff --git a/crates/pinakes-core/tests/integration.rs b/crates/pinakes-core/tests/integration.rs index 161755f..da1035a 100644 --- a/crates/pinakes-core/tests/integration.rs +++ b/crates/pinakes-core/tests/integration.rs @@ -407,13 +407,13 @@ async fn test_import_with_dedup() { std::fs::write(&file_path, "hello world").unwrap(); // First import - let result1 = pinakes_core::import::import_file(&storage, &file_path) + let result1 = pinakes_core::import::import_file(&storage, &file_path, None) .await .unwrap(); assert!(!result1.was_duplicate); // Second import of same file - let result2 = pinakes_core::import::import_file(&storage, &file_path) + let result2 = pinakes_core::import::import_file(&storage, &file_path, None) .await .unwrap(); assert!(result2.was_duplicate); diff --git a/crates/pinakes-core/tests/plugin_integration.rs b/crates/pinakes-core/tests/plugin_integration.rs new file mode 100644 index 0000000..c48a250 --- /dev/null +++ b/crates/pinakes-core/tests/plugin_integration.rs @@ -0,0 +1,188 @@ +//! Integration tests for the plugin pipeline with real WASM execution. +//! +//! These tests use the test-plugin fixture at +//! `tests/fixtures/test-plugin/` which is a compiled WASM binary +//! exercising all extension points. + +// FIXME: add a Justfile and make sure the fixture is compiled for tests... + +#![allow(clippy::print_stderr, reason = "Fine for tests")] +use std::{path::Path, sync::Arc}; + +use pinakes_core::{ + config::PluginTimeoutConfig, + plugin::{PluginManager, PluginManagerConfig, PluginPipeline}, +}; +use tempfile::TempDir; + +/// Path to the compiled test plugin fixture. +fn fixture_dir() -> std::path::PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/test-plugin") +} + +/// Check the WASM binary exists (skip tests if not built). +fn wasm_binary_exists() -> bool { + fixture_dir().join("test_plugin.wasm").exists() +} + +/// Set up a `PluginManager` pointing at the fixture directory as a +/// `plugin_dir`. The loader expects `plugin_dirs/test-plugin/plugin.toml`. +/// So we point at the fixtures directory (parent of test-plugin/). +fn setup_manager(temp: &TempDir) -> PluginManager { + let fixtures_dir = + Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures"); + + let config = PluginManagerConfig { + plugin_dirs: vec![fixtures_dir], + allow_unsigned: true, + max_concurrent_ops: 2, + plugin_timeout_secs: 30, + timeouts: PluginTimeoutConfig::default(), + max_consecutive_failures: 5, + ..Default::default() + }; + + PluginManager::new( + temp.path().join("data"), + temp.path().join("cache"), + config, + ) + .expect("create plugin manager") +} + +#[tokio::test] +async fn test_plugin_discovery_loads_fixture() { + if !wasm_binary_exists() { + eprintln!("WASM binary not found, skipping test"); + return; + } + + let temp = TempDir::new().unwrap(); + let manager = setup_manager(&temp); + + let loaded = manager.discover_and_load_all().await.unwrap(); + assert!( + !loaded.is_empty(), + "should discover at least the test-plugin" + ); + assert!( + loaded.contains(&"test-plugin".to_string()), + "should load test-plugin, got: {loaded:?}" + ); +} + +#[tokio::test] +async fn test_plugin_capability_discovery() { + if !wasm_binary_exists() { + return; + } + + let temp = TempDir::new().unwrap(); + let manager = Arc::new(setup_manager(&temp)); + manager.discover_and_load_all().await.unwrap(); + + let pipeline = Arc::new(PluginPipeline::new( + manager, + PluginTimeoutConfig::default(), + 5, + )); + + pipeline.discover_capabilities().await.unwrap(); + + // The test plugin registers a custom .testfile media type, so + // resolve_media_type should recognise it (even if no physical file exists, + // can_handle checks the extension). + let resolved = pipeline + .resolve_media_type(Path::new("/tmp/example.testfile")) + .await; + assert!( + resolved.is_some(), + "pipeline should resolve .testfile via test-plugin" + ); +} + +#[tokio::test] +async fn test_pipeline_builtin_fallback_still_works() { + if !wasm_binary_exists() { + return; + } + + let temp = TempDir::new().unwrap(); + let manager = Arc::new(setup_manager(&temp)); + manager.discover_and_load_all().await.unwrap(); + + let pipeline = Arc::new(PluginPipeline::new( + manager, + PluginTimeoutConfig::default(), + 5, + )); + pipeline.discover_capabilities().await.unwrap(); + + // Built-in types should still resolve (test-plugin at priority 50 runs + // first but won't claim .mp3). + let mp3 = pipeline + .resolve_media_type(Path::new("/tmp/song.mp3")) + .await; + assert!(mp3.is_some(), "built-in .mp3 resolution should still work"); + + // Unknown types should still resolve to None + let unknown = pipeline + .resolve_media_type(Path::new("/tmp/data.xyz999")) + .await; + assert!( + unknown.is_none(), + "unknown extension should resolve to None" + ); +} + +#[tokio::test] +async fn test_emit_event_with_loaded_plugin() { + if !wasm_binary_exists() { + return; + } + + let temp = TempDir::new().unwrap(); + let manager = Arc::new(setup_manager(&temp)); + manager.discover_and_load_all().await.unwrap(); + + let pipeline = Arc::new(PluginPipeline::new( + manager, + PluginTimeoutConfig::default(), + 5, + )); + pipeline.discover_capabilities().await.unwrap(); + + // Emit an event the test-plugin is interested in (MediaImported). + // Should not panic; the handler runs in a spawned task. + pipeline.emit_event( + "MediaImported", + &serde_json::json!({"media_id": "test-123", "path": "/tmp/test.mp3"}), + ); + + // Give the spawned task a moment to run + tokio::time::sleep(std::time::Duration::from_millis(200)).await; +} + +#[tokio::test] +async fn test_event_not_interested_is_ignored() { + if !wasm_binary_exists() { + return; + } + + let temp = TempDir::new().unwrap(); + let manager = Arc::new(setup_manager(&temp)); + manager.discover_and_load_all().await.unwrap(); + + let pipeline = Arc::new(PluginPipeline::new( + manager, + PluginTimeoutConfig::default(), + 5, + )); + pipeline.discover_capabilities().await.unwrap(); + + // Emit an event the test-plugin is NOT interested in. + // Should complete silently. + pipeline.emit_event("ScanStarted", &serde_json::json!({"roots": ["/tmp"]})); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; +} From f686e8a777003f5c5d3e1c160a5bbe5ec5f8ab98 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 15:01:46 +0300 Subject: [PATCH 08/10] pinakes-core: emit plugin events from scan and import pipelines Signed-off-by: NotAShelf Change-Id: Ib992e292a3272c52f9b7c18164ec61f56a6a6964 --- crates/pinakes-core/src/import.rs | 50 +++++++++++++++++++++++++++---- crates/pinakes-core/src/scan.rs | 45 ++++++++++++++++++++++++---- 2 files changed, 83 insertions(+), 12 deletions(-) diff --git a/crates/pinakes-core/src/import.rs b/crates/pinakes-core/src/import.rs index 5d4935d..6d3c657 100644 --- a/crates/pinakes-core/src/import.rs +++ b/crates/pinakes-core/src/import.rs @@ -21,6 +21,7 @@ use crate::{ MediaItem, StorageMode, }, + plugin::PluginPipeline, storage::DynStorageBackend, thumbnail, }; @@ -106,8 +107,10 @@ pub async fn validate_path_in_roots( pub async fn import_file( storage: &DynStorageBackend, path: &Path, + pipeline: Option<&Arc>, ) -> Result { - import_file_with_options(storage, path, &ImportOptions::default()).await + import_file_with_options(storage, path, &ImportOptions::default(), pipeline) + .await } /// Import a file with configurable options for incremental scanning @@ -119,6 +122,7 @@ pub async fn import_file_with_options( storage: &DynStorageBackend, path: &Path, options: &ImportOptions, + pipeline: Option<&Arc>, ) -> Result { let path = path.canonicalize()?; @@ -128,8 +132,12 @@ pub async fn import_file_with_options( validate_path_in_roots(storage, &path).await?; - let media_type = MediaType::from_path(&path) - .ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?; + let media_type = if let Some(pl) = pipeline { + pl.resolve_media_type(&path).await + } else { + MediaType::from_path(&path) + } + .ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?; let current_mtime = get_file_mtime(&path); @@ -169,7 +177,9 @@ pub async fn import_file_with_options( let file_meta = std::fs::metadata(&path)?; let file_size = file_meta.len(); - let extracted = { + let extracted = if let Some(pl) = pipeline { + pl.extract_metadata(&path, &media_type).await? + } else { let path_clone = path.clone(); let media_type_clone = media_type.clone(); tokio::task::spawn_blocking(move || { @@ -189,7 +199,15 @@ pub async fn import_file_with_options( let media_id = MediaId::new(); // Generate thumbnail for image types - let thumb_path = { + let thumb_path = if let Some(pl) = pipeline { + pl.generate_thumbnail( + media_id, + &path, + &media_type, + &thumbnail::default_thumbnail_dir(), + ) + .await? + } else { let source = path.clone(); let thumb_dir = thumbnail::default_thumbnail_dir(); let media_type_clone = media_type.clone(); @@ -218,6 +236,9 @@ pub async fn import_file_with_options( let is_markdown = media_type == MediaType::Builtin(BuiltinMediaType::Markdown); + // Capture media type debug string before moving into MediaItem + let media_type_debug = format!("{media_type:?}"); + let item = MediaItem { id: media_id, path: path.clone(), @@ -299,6 +320,15 @@ pub async fn import_file_with_options( ) .await?; + if let Some(pl) = pipeline { + let payload = serde_json::json!({ + "media_id": media_id.to_string(), + "path": path.to_string_lossy(), + "media_type": media_type_debug, + }); + pl.emit_event("MediaImported", &payload); + } + info!(media_id = %media_id, path = %path.display(), "imported media file"); Ok(ImportResult { @@ -349,6 +379,7 @@ pub async fn import_directory( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result>> { import_directory_with_options( storage, @@ -356,6 +387,7 @@ pub async fn import_directory( ignore_patterns, DEFAULT_IMPORT_CONCURRENCY, &ImportOptions::default(), + pipeline, ) .await } @@ -372,6 +404,7 @@ pub async fn import_directory_with_concurrency( dir: &Path, ignore_patterns: &[String], concurrency: usize, + pipeline: Option<&Arc>, ) -> Result>> { import_directory_with_options( storage, @@ -379,6 +412,7 @@ pub async fn import_directory_with_concurrency( ignore_patterns, concurrency, &ImportOptions::default(), + pipeline, ) .await } @@ -395,11 +429,13 @@ pub async fn import_directory_with_options( ignore_patterns: &[String], concurrency: usize, options: &ImportOptions, + pipeline: Option<&Arc>, ) -> Result>> { let concurrency = concurrency.clamp(1, 256); let dir = dir.to_path_buf(); let patterns = ignore_patterns.to_vec(); let options = options.clone(); + let pipeline = pipeline.cloned(); let entries: Vec = { let dir = dir.clone(); @@ -425,9 +461,11 @@ pub async fn import_directory_with_options( let storage = Arc::clone(storage); let path = entry_path.clone(); let opts = options.clone(); + let pl = pipeline.clone(); join_set.spawn(async move { - let result = import_file_with_options(&storage, &path, &opts).await; + let result = + import_file_with_options(&storage, &path, &opts, pl.as_ref()).await; (path, result) }); diff --git a/crates/pinakes-core/src/scan.rs b/crates/pinakes-core/src/scan.rs index 8fe459f..ce43896 100644 --- a/crates/pinakes-core/src/scan.rs +++ b/crates/pinakes-core/src/scan.rs @@ -11,7 +11,12 @@ use notify::{PollWatcher, RecursiveMode, Watcher}; use tokio::sync::mpsc; use tracing::{info, warn}; -use crate::{error::Result, import, storage::DynStorageBackend}; +use crate::{ + error::Result, + import, + plugin::PluginPipeline, + storage::DynStorageBackend, +}; /// Status of a directory scan operation. pub struct ScanStatus { @@ -122,6 +127,7 @@ pub async fn scan_directory( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result { scan_directory_with_options( storage, @@ -129,6 +135,7 @@ pub async fn scan_directory( ignore_patterns, None, &ScanOptions::default(), + pipeline, ) .await } @@ -154,13 +161,21 @@ pub async fn scan_directory_incremental( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result { let options = ScanOptions { incremental: true, force_full: false, }; - scan_directory_with_options(storage, dir, ignore_patterns, None, &options) - .await + scan_directory_with_options( + storage, + dir, + ignore_patterns, + None, + &options, + pipeline, + ) + .await } /// Scans a directory with progress reporting. @@ -184,6 +199,7 @@ pub async fn scan_directory_with_progress( dir: &Path, ignore_patterns: &[String], progress: Option<&ScanProgress>, + pipeline: Option<&Arc>, ) -> Result { scan_directory_with_options( storage, @@ -191,6 +207,7 @@ pub async fn scan_directory_with_progress( ignore_patterns, progress, &ScanOptions::default(), + pipeline, ) .await } @@ -207,6 +224,7 @@ pub async fn scan_directory_with_options( ignore_patterns: &[String], progress: Option<&ScanProgress>, scan_options: &ScanOptions, + pipeline: Option<&Arc>, ) -> Result { info!( dir = %dir.display(), @@ -230,8 +248,9 @@ pub async fn scan_directory_with_options( storage, dir, ignore_patterns, - 8, // Default concurrency + 8, // default concurrency &import_options, + pipeline, ) .await?; @@ -301,12 +320,14 @@ pub async fn scan_directory_with_options( pub async fn scan_all_roots( storage: &DynStorageBackend, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result> { scan_all_roots_with_options( storage, ignore_patterns, None, &ScanOptions::default(), + pipeline, ) .await } @@ -328,12 +349,20 @@ pub async fn scan_all_roots( pub async fn scan_all_roots_incremental( storage: &DynStorageBackend, ignore_patterns: &[String], + pipeline: Option<&Arc>, ) -> Result> { let options = ScanOptions { incremental: true, force_full: false, }; - scan_all_roots_with_options(storage, ignore_patterns, None, &options).await + scan_all_roots_with_options( + storage, + ignore_patterns, + None, + &options, + pipeline, + ) + .await } /// Scans all root directories with progress reporting. @@ -355,12 +384,14 @@ pub async fn scan_all_roots_with_progress( storage: &DynStorageBackend, ignore_patterns: &[String], progress: Option<&ScanProgress>, + pipeline: Option<&Arc>, ) -> Result> { scan_all_roots_with_options( storage, ignore_patterns, progress, &ScanOptions::default(), + pipeline, ) .await } @@ -386,6 +417,7 @@ pub async fn scan_all_roots_with_options( ignore_patterns: &[String], progress: Option<&ScanProgress>, scan_options: &ScanOptions, + pipeline: Option<&Arc>, ) -> Result> { let roots = storage.list_root_dirs().await?; let mut statuses = Vec::new(); @@ -397,6 +429,7 @@ pub async fn scan_all_roots_with_options( ignore_patterns, progress, scan_options, + pipeline, ) .await { @@ -536,7 +569,7 @@ pub async fn watch_and_import( && !crate::import::should_ignore(&path, &ignore_patterns) { info!(path = %path.display(), "detected file change, importing"); - if let Err(e) = import::import_file(&storage, &path).await { + if let Err(e) = import::import_file(&storage, &path, None).await { warn!(path = %path.display(), error = %e, "failed to import changed file"); } } From e9c5390c453d60dcf89c967c9a41c35f4fab1693 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 15:06:11 +0300 Subject: [PATCH 09/10] pinakes-server: integrate plugin system into routes & application state Signed-off-by: NotAShelf Change-Id: Ib5d482326cae1dcb43603bffb76a6a186a6a6964 --- crates/pinakes-server/src/main.rs | 133 ++++++++++++------ .../pinakes-server/src/routes/collections.rs | 15 ++ crates/pinakes-server/src/routes/media.rs | 48 ++++++- crates/pinakes-server/src/routes/plugins.rs | 22 +++ crates/pinakes-server/src/routes/tags.rs | 18 +++ crates/pinakes-server/src/state.rs | 16 ++- 6 files changed, 206 insertions(+), 46 deletions(-) diff --git a/crates/pinakes-server/src/main.rs b/crates/pinakes-server/src/main.rs index a271630..28dbeb9 100644 --- a/crates/pinakes-server/src/main.rs +++ b/crates/pinakes-server/src/main.rs @@ -234,11 +234,57 @@ async fn main() -> Result<()> { )) }; + // Initialize plugin manager if plugins are enabled (needed before job queue) + let plugin_manager = if config.plugins.enabled { + match pinakes_core::plugin::PluginManager::new( + config.plugins.data_dir.clone(), + config.plugins.cache_dir.clone(), + config.plugins.clone().into(), + ) { + Ok(pm) => { + tracing::info!("Plugin manager initialized"); + Some(Arc::new(pm)) + }, + Err(e) => { + tracing::warn!("Failed to initialize plugin manager: {}", e); + None + }, + } + } else { + tracing::info!("Plugins disabled in configuration"); + None + }; + + // Initialize plugin pipeline if plugin manager is available + let plugin_pipeline = if let Some(ref pm) = plugin_manager { + match pm.discover_and_load_all().await { + Ok(loaded) => { + tracing::info!(count = loaded.len(), "loaded plugins"); + let pipeline = Arc::new(pinakes_core::plugin::PluginPipeline::new( + Arc::clone(pm), + config.plugins.timeouts.clone(), + config.plugins.max_consecutive_failures, + )); + if let Err(e) = pipeline.discover_capabilities().await { + tracing::warn!(error = %e, "failed to discover plugin capabilities"); + } + Some(pipeline) + }, + Err(e) => { + tracing::warn!(error = %e, "plugin discovery failed"); + None + }, + } + } else { + None + }; + // Initialize job queue with executor let job_storage = storage.clone(); let job_config = config.clone(); let job_transcode = transcode_service.clone(); let job_webhooks = webhook_dispatcher.clone(); + let job_pipeline = plugin_pipeline.clone(); let job_queue = pinakes_core::jobs::JobQueue::new( config.jobs.worker_count, config.jobs.job_timeout_secs, @@ -247,31 +293,50 @@ async fn main() -> Result<()> { let config = job_config.clone(); let transcode_svc = job_transcode.clone(); let webhooks = job_webhooks.clone(); + let pipeline = job_pipeline.clone(); tokio::spawn(async move { use pinakes_core::jobs::{JobKind, JobQueue}; match kind { JobKind::Scan { path } => { + if let Some(ref pl) = pipeline { + pl.emit_event( + "ScanStarted", + &serde_json::json!({"path": path.as_ref().map(|p| p.display().to_string())}), + ); + } let ignore = config.scanning.ignore_patterns.clone(); let res = if let Some(p) = path { - pinakes_core::scan::scan_directory(&storage, &p, &ignore).await + pinakes_core::scan::scan_directory( + &storage, + &p, + &ignore, + pipeline.as_ref(), + ) + .await } else { - pinakes_core::scan::scan_all_roots(&storage, &ignore) - .await - .map(|statuses| { - let total_found: usize = - statuses.iter().map(|s| s.files_found).sum(); - let total_processed: usize = - statuses.iter().map(|s| s.files_processed).sum(); - let all_errors: Vec = - statuses.into_iter().flat_map(|s| s.errors).collect(); - pinakes_core::scan::ScanStatus { - scanning: false, - files_found: total_found, - files_processed: total_processed, - files_skipped: 0, - errors: all_errors, - } - }) + pinakes_core::scan::scan_all_roots( + &storage, + &ignore, + pipeline.as_ref(), + ) + .await + .map(|statuses| { + let total_found: usize = + statuses.iter().map(|s| s.files_found).sum(); + let total_processed: usize = + statuses.iter().map(|s| s.files_processed).sum(); + let total_skipped: usize = + statuses.iter().map(|s| s.files_skipped).sum(); + let all_errors: Vec = + statuses.into_iter().flat_map(|s| s.errors).collect(); + pinakes_core::scan::ScanStatus { + scanning: false, + files_found: total_found, + files_processed: total_processed, + files_skipped: total_skipped, + errors: all_errors, + } + }) }; match res { Ok(status) => { @@ -283,6 +348,15 @@ async fn main() -> Result<()> { }, ); } + if let Some(ref pl) = pipeline { + pl.emit_event( + "ScanCompleted", + &serde_json::json!({ + "files_found": status.files_found, + "files_processed": status.files_processed, + }), + ); + } JobQueue::complete( &jobs, job_id, @@ -630,28 +704,6 @@ async fn main() -> Result<()> { config.jobs.cache_ttl_secs, )); - // Initialize plugin manager if plugins are enabled (before moving config into - // Arc) - let plugin_manager = if config.plugins.enabled { - match pinakes_core::plugin::PluginManager::new( - config.plugins.data_dir.clone(), - config.plugins.cache_dir.clone(), - config.plugins.clone().into(), - ) { - Ok(pm) => { - tracing::info!("Plugin manager initialized"); - Some(Arc::new(pm)) - }, - Err(e) => { - tracing::warn!("Failed to initialize plugin manager: {}", e); - None - }, - } - } else { - tracing::info!("Plugins disabled in configuration"); - None - }; - // Initialize scheduler with cancellation support let shutdown_token = tokio_util::sync::CancellationToken::new(); let config_arc = Arc::new(RwLock::new(config)); @@ -737,6 +789,7 @@ async fn main() -> Result<()> { cache, scheduler, plugin_manager, + plugin_pipeline, transcode_service, managed_storage, chunked_upload_manager, diff --git a/crates/pinakes-server/src/routes/collections.rs b/crates/pinakes-server/src/routes/collections.rs index 9ca72c3..159d125 100644 --- a/crates/pinakes-server/src/routes/collections.rs +++ b/crates/pinakes-server/src/routes/collections.rs @@ -48,6 +48,15 @@ pub async fn create_collection( req.filter_query.as_deref(), ) .await?; + + state.emit_plugin_event( + "CollectionCreated", + &serde_json::json!({ + "id": col.id.to_string(), + "name": col.name, + }), + ); + Ok(Json(CollectionResponse::from(col))) } @@ -73,6 +82,12 @@ pub async fn delete_collection( Path(id): Path, ) -> Result, ApiError> { state.storage.delete_collection(id).await?; + + state.emit_plugin_event( + "CollectionDeleted", + &serde_json::json!({"id": id.to_string()}), + ); + Ok(Json(serde_json::json!({"deleted": true}))) } diff --git a/crates/pinakes-server/src/routes/media.rs b/crates/pinakes-server/src/routes/media.rs index 09d8410..a2b3a4a 100644 --- a/crates/pinakes-server/src/routes/media.rs +++ b/crates/pinakes-server/src/routes/media.rs @@ -87,8 +87,12 @@ pub async fn import_media( State(state): State, Json(req): Json, ) -> Result, ApiError> { - let result = - pinakes_core::import::import_file(&state.storage, &req.path).await?; + let result = pinakes_core::import::import_file( + &state.storage, + &req.path, + state.plugin_pipeline.as_ref(), + ) + .await?; if let Some(ref dispatcher) = state.webhook_dispatcher { let id = result.media_id.0.to_string(); @@ -197,6 +201,11 @@ pub async fn update_media( }); } + state.emit_plugin_event( + "MediaUpdated", + &serde_json::json!({"media_id": item.id.to_string()}), + ); + Ok(Json(MediaResponse::from(item))) } @@ -227,6 +236,14 @@ pub async fn delete_media( tracing::warn!(path = %thumb_path.display(), error = %e, "failed to remove thumbnail"); } + state.emit_plugin_event( + "MediaDeleted", + &serde_json::json!({ + "media_id": media_id.to_string(), + "path": item.path.to_string_lossy(), + }), + ); + Ok(Json(serde_json::json!({"deleted": true}))) } @@ -362,8 +379,12 @@ pub async fn import_with_options( State(state): State, Json(req): Json, ) -> Result, ApiError> { - let result = - pinakes_core::import::import_file(&state.storage, &req.path).await?; + let result = pinakes_core::import::import_file( + &state.storage, + &req.path, + state.plugin_pipeline.as_ref(), + ) + .await?; if !result.was_duplicate { apply_import_post_processing( @@ -400,7 +421,13 @@ pub async fn batch_import( let mut errors = 0usize; for path in &req.paths { - match pinakes_core::import::import_file(&state.storage, path).await { + match pinakes_core::import::import_file( + &state.storage, + path, + state.plugin_pipeline.as_ref(), + ) + .await + { Ok(result) => { if result.was_duplicate { duplicates += 1; @@ -458,6 +485,7 @@ pub async fn import_directory_endpoint( &req.path, &ignore_patterns, concurrency, + state.plugin_pipeline.as_ref(), ) .await?; @@ -1065,6 +1093,11 @@ pub async fn soft_delete_media( ) .await?; + state.emit_plugin_event( + "MediaDeleted", + &serde_json::json!({"media_id": media_id.to_string(), "trashed": true}), + ); + Ok(Json(serde_json::json!({"deleted": true, "trashed": true}))) } @@ -1106,6 +1139,11 @@ pub async fn restore_media( ) .await?; + state.emit_plugin_event( + "MediaUpdated", + &serde_json::json!({"media_id": media_id.to_string(), "restored": true}), + ); + Ok(Json(MediaResponse::from(item))) } diff --git a/crates/pinakes-server/src/routes/plugins.rs b/crates/pinakes-server/src/routes/plugins.rs index 77a03a7..00b77f1 100644 --- a/crates/pinakes-server/src/routes/plugins.rs +++ b/crates/pinakes-server/src/routes/plugins.rs @@ -127,6 +127,17 @@ pub async fn toggle_plugin( })?; } + // Re-discover capabilities after toggle so cached data stays current + if let Some(ref pipeline) = state.plugin_pipeline + && let Err(e) = pipeline.discover_capabilities().await + { + tracing::warn!( + plugin_id = %id, + error = %e, + "failed to re-discover capabilities after plugin toggle" + ); + } + Ok(Json(serde_json::json!({ "id": id, "enabled": req.enabled @@ -150,5 +161,16 @@ pub async fn reload_plugin( )) })?; + // Re-discover capabilities after reload so cached data stays current + if let Some(ref pipeline) = state.plugin_pipeline + && let Err(e) = pipeline.discover_capabilities().await + { + tracing::warn!( + plugin_id = %id, + error = %e, + "failed to re-discover capabilities after plugin reload" + ); + } + Ok(Json(serde_json::json!({"reloaded": true}))) } diff --git a/crates/pinakes-server/src/routes/tags.rs b/crates/pinakes-server/src/routes/tags.rs index 449c78d..3c12ec2 100644 --- a/crates/pinakes-server/src/routes/tags.rs +++ b/crates/pinakes-server/src/routes/tags.rs @@ -58,6 +58,15 @@ pub async fn tag_media( ) -> Result, ApiError> { pinakes_core::tags::tag_media(&state.storage, MediaId(media_id), req.tag_id) .await?; + + state.emit_plugin_event( + "MediaTagged", + &serde_json::json!({ + "media_id": media_id.to_string(), + "tag_id": req.tag_id.to_string(), + }), + ); + Ok(Json(serde_json::json!({"tagged": true}))) } @@ -67,6 +76,15 @@ pub async fn untag_media( ) -> Result, ApiError> { pinakes_core::tags::untag_media(&state.storage, MediaId(media_id), tag_id) .await?; + + state.emit_plugin_event( + "MediaUntagged", + &serde_json::json!({ + "media_id": media_id.to_string(), + "tag_id": tag_id.to_string(), + }), + ); + Ok(Json(serde_json::json!({"untagged": true}))) } diff --git a/crates/pinakes-server/src/state.rs b/crates/pinakes-server/src/state.rs index 584b598..2917ed5 100644 --- a/crates/pinakes-server/src/state.rs +++ b/crates/pinakes-server/src/state.rs @@ -5,7 +5,7 @@ use pinakes_core::{ config::Config, jobs::JobQueue, managed_storage::ManagedStorageService, - plugin::PluginManager, + plugin::{PluginManager, PluginPipeline}, scan::ScanProgress, scheduler::TaskScheduler, storage::DynStorageBackend, @@ -32,9 +32,23 @@ pub struct AppState { pub cache: Arc, pub scheduler: Arc, pub plugin_manager: Option>, + pub plugin_pipeline: Option>, pub transcode_service: Option>, pub managed_storage: Option>, pub chunked_upload_manager: Option>, pub webhook_dispatcher: Option>, pub session_semaphore: Arc, } + +impl AppState { + /// Emit a plugin event if the pipeline is active. + pub fn emit_plugin_event( + &self, + event_type: &str, + payload: &serde_json::Value, + ) { + if let Some(ref pipeline) = self.plugin_pipeline { + pipeline.emit_event(event_type, payload); + } + } +} From ce9c27d41076af99da75a33348f90baa6fcaf021 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Mar 2026 15:16:12 +0300 Subject: [PATCH 10/10] pinakes-server: update tests with plugin configuration Signed-off-by: NotAShelf Change-Id: I320426c6b2cc9119a44570b4534e08d66a6a6964 --- crates/pinakes-server/tests/api.rs | 2 ++ crates/pinakes-server/tests/plugin.rs | 21 +++++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/crates/pinakes-server/tests/api.rs b/crates/pinakes-server/tests/api.rs index 379b07f..7a66859 100644 --- a/crates/pinakes-server/tests/api.rs +++ b/crates/pinakes-server/tests/api.rs @@ -186,6 +186,7 @@ async fn setup_app() -> axum::Router { cache: Arc::new(CacheLayer::new(60)), scheduler: Arc::new(scheduler), plugin_manager: None, + plugin_pipeline: None, transcode_service: None, managed_storage: None, chunked_upload_manager: None, @@ -262,6 +263,7 @@ async fn setup_app_with_auth() -> (axum::Router, String, String, String) { cache: Arc::new(CacheLayer::new(60)), scheduler: Arc::new(scheduler), plugin_manager: None, + plugin_pipeline: None, transcode_service: None, managed_storage: None, chunked_upload_manager: None, diff --git a/crates/pinakes-server/tests/plugin.rs b/crates/pinakes-server/tests/plugin.rs index c9d1560..0dcb250 100644 --- a/crates/pinakes-server/tests/plugin.rs +++ b/crates/pinakes-server/tests/plugin.rs @@ -68,14 +68,18 @@ async fn setup_app_with_plugins() std::fs::create_dir_all(&cache_dir).expect("create cache dir"); let plugin_config = PluginsConfig { - enabled: true, - data_dir: data_dir.clone(), - cache_dir: cache_dir.clone(), - plugin_dirs: vec![], - enable_hot_reload: false, - allow_unsigned: true, - max_concurrent_ops: 2, - plugin_timeout_secs: 10, + enabled: true, + data_dir: data_dir.clone(), + cache_dir: cache_dir.clone(), + plugin_dirs: vec![], + enable_hot_reload: false, + allow_unsigned: true, + max_concurrent_ops: 2, + plugin_timeout_secs: 10, + timeouts: + pinakes_core::config::PluginTimeoutConfig::default(), + max_consecutive_failures: 5, + trusted_keys: vec![], }; let plugin_manager = @@ -145,6 +149,7 @@ async fn setup_app_with_plugins() cache: Arc::new(CacheLayer::new(60)), scheduler: Arc::new(scheduler), plugin_manager: Some(plugin_manager.clone()), + plugin_pipeline: None, transcode_service: None, managed_storage: None, chunked_upload_manager: None,