From ad3aff9a205f2b7742099a519488ac890fd9cedb Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 23 Mar 2026 03:30:44 +0300 Subject: [PATCH] pinakes-core: drain both `exchange_buffer` and `pending_events` from the store via wrappers Signed-off-by: NotAShelf Change-Id: I3afcdf5be8eaf552b8e62a09cc10bc1a6a6a6964 --- crates/pinakes-core/src/plugin/pipeline.rs | 22 +++-- crates/pinakes-core/src/plugin/runtime.rs | 94 +++++++++++++++++++--- 2 files changed, 98 insertions(+), 18 deletions(-) diff --git a/crates/pinakes-core/src/plugin/pipeline.rs b/crates/pinakes-core/src/plugin/pipeline.rs index f4301a5..f094e73 100644 --- a/crates/pinakes-core/src/plugin/pipeline.rs +++ b/crates/pinakes-core/src/plugin/pipeline.rs @@ -690,7 +690,7 @@ impl PluginPipeline { /// Internal dispatcher for events. async fn dispatch_event( - &self, + self: &Arc, event_type: &str, payload: &serde_json::Value, ) { @@ -739,23 +739,35 @@ impl PluginPipeline { payload: payload.clone(), }; - // Event handlers return nothing meaningful; we just care about - // success/failure. match wasm - .call_function_json::( + .call_function_json_with_events::( "handle_event", &req, timeout, ) .await { - Ok(_) => { + Ok((_resp, emitted_events)) => { self.record_success(id).await; debug!( plugin_id = %id, event_type = event_type, "event handled" ); + // Re-dispatch any events the handler itself emitted. + for (emitted_type, payload_str) in emitted_events { + if let Ok(emitted_payload) = + serde_json::from_str::(&payload_str) + { + self.emit_event(&emitted_type, &emitted_payload); + } else { + warn!( + plugin_id = %id, + event_type = %emitted_type, + "plugin emitted event with unparseable JSON payload; skipping" + ); + } + } }, Err(e) => { warn!( diff --git a/crates/pinakes-core/src/plugin/runtime.rs b/crates/pinakes-core/src/plugin/runtime.rs index b550b05..e07a1c4 100644 --- a/crates/pinakes-core/src/plugin/runtime.rs +++ b/crates/pinakes-core/src/plugin/runtime.rs @@ -86,20 +86,23 @@ impl WasmPlugin { &self.context } - /// Execute a plugin function + /// Execute a plugin function, returning both the result bytes and any + /// events the plugin queued via `host_emit_event`. /// /// Creates a fresh store and instance per invocation with host functions - /// linked, calls the requested exported function, and returns the result. + /// linked, calls the requested exported function, drains both the exchange + /// buffer and the pending events list before the store is dropped, and + /// returns both. /// /// # Errors /// /// Returns an error if the function cannot be found, instantiation fails, /// or the function call returns an error. - pub async fn call_function( + pub async fn call_function_with_events( &self, function_name: &str, params: &[u8], - ) -> Result> { + ) -> Result<(Vec, Vec<(String, String)>)> { let engine = self.module.engine(); // Build memory limiter from capabilities @@ -205,18 +208,38 @@ impl WasmPlugin { .await?; } - // Prefer data written into the exchange buffer by host functions + // Drain both buffers before the store is dropped. + let pending_events = std::mem::take(&mut store.data_mut().pending_events); let exchange = std::mem::take(&mut store.data_mut().exchange_buffer); - if !exchange.is_empty() { - return Ok(exchange); - } - // Fall back to serialising the WASM return value - if let Some(Val::I32(ret)) = results.first() { - Ok(ret.to_le_bytes().to_vec()) + let result = if !exchange.is_empty() { + exchange + } else if let Some(Val::I32(ret)) = results.first() { + ret.to_le_bytes().to_vec() } else { - Ok(Vec::new()) - } + Vec::new() + }; + + Ok((result, pending_events)) + } + + /// Execute a plugin function, discarding any events the plugin queued. + /// + /// This is a thin wrapper around [`Self::call_function_with_events`]. + /// + /// # Errors + /// + /// Returns an error if the function cannot be found, instantiation fails, + /// or the function call returns an error. + pub async fn call_function( + &self, + function_name: &str, + params: &[u8], + ) -> Result> { + let (data, _events) = self + .call_function_with_events(function_name, params) + .await?; + Ok(data) } /// Call a plugin function with JSON request/response serialization. @@ -259,6 +282,51 @@ impl WasmPlugin { ) }) } + + /// Call a plugin function with JSON serialization, also returning any + /// events the plugin queued via `host_emit_event`. + /// + /// Mirrors [`Self::call_function_json`] but delegates to + /// [`Self::call_function_with_events`] so the pending events list is not + /// discarded before returning. + /// + /// # Errors + /// + /// Returns an error if serialization fails, the call times out, the plugin + /// traps, or the response is malformed JSON. + #[allow(clippy::future_not_send)] // Req doesn't need Sync; called within local tasks + pub async fn call_function_json_with_events( + &self, + function_name: &str, + request: &Req, + timeout: std::time::Duration, + ) -> anyhow::Result<(Resp, Vec<(String, String)>)> + where + Req: serde::Serialize, + Resp: serde::de::DeserializeOwned, + { + let request_bytes = serde_json::to_vec(request) + .map_err(|e| anyhow::anyhow!("failed to serialize request: {e}"))?; + + let (result, pending_events) = tokio::time::timeout( + timeout, + self.call_function_with_events(function_name, &request_bytes), + ) + .await + .map_err(|_| { + anyhow::anyhow!( + "plugin call '{function_name}' timed out after {timeout:?}" + ) + })??; + + let resp = serde_json::from_slice(&result).map_err(|e| { + anyhow::anyhow!( + "failed to deserialize response from '{function_name}': {e}" + ) + })?; + + Ok((resp, pending_events)) + } } #[cfg(test)]