pinakes-core: drain both exchange_buffer and pending_events from the store via wrappers

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I3afcdf5be8eaf552b8e62a09cc10bc1a6a6a6964
This commit is contained in:
raf 2026-03-23 03:30:44 +03:00
commit ad3aff9a20
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
2 changed files with 98 additions and 18 deletions

View file

@ -690,7 +690,7 @@ impl PluginPipeline {
/// Internal dispatcher for events.
async fn dispatch_event(
&self,
self: &Arc<Self>,
event_type: &str,
payload: &serde_json::Value,
) {
@ -739,23 +739,35 @@ impl PluginPipeline {
payload: payload.clone(),
};
// Event handlers return nothing meaningful; we just care about
// success/failure.
match wasm
.call_function_json::<HandleEventRequest, serde_json::Value>(
.call_function_json_with_events::<HandleEventRequest, serde_json::Value>(
"handle_event",
&req,
timeout,
)
.await
{
Ok(_) => {
Ok((_resp, emitted_events)) => {
self.record_success(id).await;
debug!(
plugin_id = %id,
event_type = event_type,
"event handled"
);
// Re-dispatch any events the handler itself emitted.
for (emitted_type, payload_str) in emitted_events {
if let Ok(emitted_payload) =
serde_json::from_str::<serde_json::Value>(&payload_str)
{
self.emit_event(&emitted_type, &emitted_payload);
} else {
warn!(
plugin_id = %id,
event_type = %emitted_type,
"plugin emitted event with unparseable JSON payload; skipping"
);
}
}
},
Err(e) => {
warn!(

View file

@ -86,20 +86,23 @@ impl WasmPlugin {
&self.context
}
/// Execute a plugin function
/// Execute a plugin function, returning both the result bytes and any
/// events the plugin queued via `host_emit_event`.
///
/// Creates a fresh store and instance per invocation with host functions
/// linked, calls the requested exported function, and returns the result.
/// linked, calls the requested exported function, drains both the exchange
/// buffer and the pending events list before the store is dropped, and
/// returns both.
///
/// # Errors
///
/// Returns an error if the function cannot be found, instantiation fails,
/// or the function call returns an error.
pub async fn call_function(
pub async fn call_function_with_events(
&self,
function_name: &str,
params: &[u8],
) -> Result<Vec<u8>> {
) -> Result<(Vec<u8>, Vec<(String, String)>)> {
let engine = self.module.engine();
// Build memory limiter from capabilities
@ -205,18 +208,38 @@ impl WasmPlugin {
.await?;
}
// Prefer data written into the exchange buffer by host functions
// Drain both buffers before the store is dropped.
let pending_events = std::mem::take(&mut store.data_mut().pending_events);
let exchange = std::mem::take(&mut store.data_mut().exchange_buffer);
if !exchange.is_empty() {
return Ok(exchange);
}
// Fall back to serialising the WASM return value
if let Some(Val::I32(ret)) = results.first() {
Ok(ret.to_le_bytes().to_vec())
let result = if !exchange.is_empty() {
exchange
} else if let Some(Val::I32(ret)) = results.first() {
ret.to_le_bytes().to_vec()
} else {
Ok(Vec::new())
}
Vec::new()
};
Ok((result, pending_events))
}
/// Execute a plugin function, discarding any events the plugin queued.
///
/// This is a thin wrapper around [`Self::call_function_with_events`].
///
/// # Errors
///
/// Returns an error if the function cannot be found, instantiation fails,
/// or the function call returns an error.
pub async fn call_function(
&self,
function_name: &str,
params: &[u8],
) -> Result<Vec<u8>> {
let (data, _events) = self
.call_function_with_events(function_name, params)
.await?;
Ok(data)
}
/// Call a plugin function with JSON request/response serialization.
@ -259,6 +282,51 @@ impl WasmPlugin {
)
})
}
/// Call a plugin function with JSON serialization, also returning any
/// events the plugin queued via `host_emit_event`.
///
/// Mirrors [`Self::call_function_json`] but delegates to
/// [`Self::call_function_with_events`] so the pending events list is not
/// discarded before returning.
///
/// # Errors
///
/// Returns an error if serialization fails, the call times out, the plugin
/// traps, or the response is malformed JSON.
#[allow(clippy::future_not_send)] // Req doesn't need Sync; called within local tasks
pub async fn call_function_json_with_events<Req, Resp>(
&self,
function_name: &str,
request: &Req,
timeout: std::time::Duration,
) -> anyhow::Result<(Resp, Vec<(String, String)>)>
where
Req: serde::Serialize,
Resp: serde::de::DeserializeOwned,
{
let request_bytes = serde_json::to_vec(request)
.map_err(|e| anyhow::anyhow!("failed to serialize request: {e}"))?;
let (result, pending_events) = tokio::time::timeout(
timeout,
self.call_function_with_events(function_name, &request_bytes),
)
.await
.map_err(|_| {
anyhow::anyhow!(
"plugin call '{function_name}' timed out after {timeout:?}"
)
})??;
let resp = serde_json::from_slice(&result).map_err(|e| {
anyhow::anyhow!(
"failed to deserialize response from '{function_name}': {e}"
)
})?;
Ok((resp, pending_events))
}
}
#[cfg(test)]