pinakes/crates/pinakes-core/src/plugin/runtime.rs
NotAShelf 278bcaa4b0
pinakes-ui: streamline sidebar design
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I0176fa480e5ba40eea5a39685a4f97896a6a6964
2026-02-04 21:35:34 +03:00

579 lines
20 KiB
Rust

//! WASM runtime for executing plugins
use anyhow::{Result, anyhow};
use pinakes_plugin_api::PluginContext;
use std::path::Path;
use std::sync::Arc;
use wasmtime::*;
/// WASM runtime wrapper for executing plugins
pub struct WasmRuntime {
engine: Engine,
}
impl WasmRuntime {
/// Create a new WASM runtime
pub fn new() -> Result<Self> {
let mut config = Config::new();
// Enable WASM features
config.wasm_component_model(true);
config.async_support(true);
// Set resource limits
config.max_wasm_stack(1024 * 1024); // 1MB stack
config.consume_fuel(true); // Enable fuel metering for CPU limits
let engine = Engine::new(&config)?;
Ok(Self { engine })
}
/// Load a plugin from a WASM file
pub async fn load_plugin(
&self,
wasm_path: &Path,
context: PluginContext,
) -> Result<WasmPlugin> {
if !wasm_path.exists() {
return Err(anyhow!("WASM file not found: {:?}", wasm_path));
}
// Read WASM bytes
let wasm_bytes = std::fs::read(wasm_path)?;
// Compile module
let module = Module::new(&self.engine, &wasm_bytes)?;
Ok(WasmPlugin {
module: Arc::new(module),
context,
})
}
}
/// Store data passed to each WASM invocation
pub struct PluginStoreData {
pub context: PluginContext,
pub exchange_buffer: Vec<u8>,
}
/// A loaded WASM plugin instance
#[derive(Clone)]
pub struct WasmPlugin {
module: Arc<Module>,
context: PluginContext,
}
impl WasmPlugin {
/// Get the plugin context
pub fn context(&self) -> &PluginContext {
&self.context
}
/// Execute a plugin function
///
/// Creates a fresh store and instance per invocation with host functions
/// linked, calls the requested exported function, and returns the result.
pub async fn call_function(&self, function_name: &str, params: &[u8]) -> Result<Vec<u8>> {
let engine = self.module.engine();
// Create store with per-invocation data
let store_data = PluginStoreData {
context: self.context.clone(),
exchange_buffer: Vec::new(),
};
let mut store = Store::new(engine, store_data);
// Set fuel limit based on capabilities
if let Some(max_cpu_time_ms) = self.context.capabilities.max_cpu_time_ms {
let fuel = max_cpu_time_ms * 100_000;
store.set_fuel(fuel)?;
} else {
store.set_fuel(1_000_000_000)?;
}
// Set up linker with host functions
let mut linker = Linker::new(engine);
HostFunctions::setup_linker(&mut linker)?;
// Instantiate the module
let instance = linker.instantiate_async(&mut store, &self.module).await?;
// Get the memory export (if available)
let memory = instance.get_memory(&mut store, "memory");
// If there are params and memory is available, write them
let mut alloc_offset: i32 = 0;
if !params.is_empty()
&& let Some(mem) = &memory
{
// Call the plugin's alloc function if available, otherwise write at offset 0
let offset = if let Ok(alloc) = instance.get_typed_func::<i32, i32>(&mut store, "alloc")
{
let result = alloc.call_async(&mut store, params.len() as i32).await?;
if result < 0 {
return Err(anyhow!("plugin alloc returned negative offset: {}", result));
}
result as usize
} else {
0
};
alloc_offset = offset as i32;
let mem_data = mem.data_mut(&mut store);
if offset + params.len() <= mem_data.len() {
mem_data[offset..offset + params.len()].copy_from_slice(params);
}
}
// Look up the exported function and call it
let func = instance
.get_func(&mut store, function_name)
.ok_or_else(|| anyhow!("exported function '{}' not found", function_name))?;
let func_ty = func.ty(&store);
let param_count = func_ty.params().len();
let result_count = func_ty.results().len();
let mut results = vec![Val::I32(0); result_count];
// Call with appropriate params based on function signature
if param_count == 2 && !params.is_empty() {
// Convention: (ptr, len)
func.call_async(
&mut store,
&[Val::I32(alloc_offset), Val::I32(params.len() as i32)],
&mut results,
)
.await?;
} else if param_count == 0 {
func.call_async(&mut store, &[], &mut results).await?;
} else {
// Generic: fill with zeroes
let params_vals: Vec<Val> = (0..param_count).map(|_| Val::I32(0)).collect();
func.call_async(&mut store, &params_vals, &mut results)
.await?;
}
// Read result from exchange buffer (host functions may have written data)
let exchange = std::mem::take(&mut store.data_mut().exchange_buffer);
if !exchange.is_empty() {
return Ok(exchange);
}
// Otherwise serialize the return values
if let Some(Val::I32(ret)) = results.first() {
Ok(ret.to_le_bytes().to_vec())
} else {
Ok(Vec::new())
}
}
}
#[cfg(test)]
impl Default for WasmPlugin {
fn default() -> Self {
let engine = Engine::default();
let module = Module::new(&engine, br#"(module)"#).unwrap();
Self {
module: Arc::new(module),
context: PluginContext {
data_dir: std::env::temp_dir(),
cache_dir: std::env::temp_dir(),
config: std::collections::HashMap::new(),
capabilities: Default::default(),
},
}
}
}
/// Host functions that plugins can call
pub struct HostFunctions;
impl HostFunctions {
/// Set up host functions in a linker
pub fn setup_linker(linker: &mut Linker<PluginStoreData>) -> Result<()> {
// host_log: log a message from the plugin
linker.func_wrap(
"env",
"host_log",
|mut caller: Caller<'_, PluginStoreData>, level: i32, ptr: i32, len: i32| {
if ptr < 0 || len < 0 {
return;
}
let memory = caller.get_export("memory").and_then(|e| e.into_memory());
if let Some(mem) = memory {
let data = mem.data(&caller);
let start = ptr as usize;
let end = start + len as usize;
if end <= data.len()
&& let Ok(msg) = std::str::from_utf8(&data[start..end])
{
match level {
0 => tracing::error!(plugin = true, "{}", msg),
1 => tracing::warn!(plugin = true, "{}", msg),
2 => tracing::info!(plugin = true, "{}", msg),
_ => tracing::debug!(plugin = true, "{}", msg),
}
}
}
},
)?;
// host_read_file: read a file into the exchange buffer
linker.func_wrap(
"env",
"host_read_file",
|mut caller: Caller<'_, PluginStoreData>, path_ptr: i32, path_len: i32| -> i32 {
if path_ptr < 0 || path_len < 0 {
return -1;
}
let memory = caller.get_export("memory").and_then(|e| e.into_memory());
let Some(mem) = memory else { return -1 };
let data = mem.data(&caller);
let start = path_ptr as usize;
let end = start + path_len as usize;
if end > data.len() {
return -1;
}
let path_str = match std::str::from_utf8(&data[start..end]) {
Ok(s) => s.to_string(),
Err(_) => return -1,
};
// Canonicalize path before checking permissions to prevent traversal
let path = match std::path::Path::new(&path_str).canonicalize() {
Ok(p) => p,
Err(_) => return -1,
};
// Check read permission against canonicalized path
let can_read = caller
.data()
.context
.capabilities
.filesystem
.read
.iter()
.any(|allowed| allowed.canonicalize().is_ok_and(|a| path.starts_with(a)));
if !can_read {
tracing::warn!(path = %path_str, "plugin read access denied");
return -2;
}
match std::fs::read(&path) {
Ok(contents) => {
let len = contents.len() as i32;
caller.data_mut().exchange_buffer = contents;
len
}
Err(_) => -1,
}
},
)?;
// host_write_file: write data to a file
linker.func_wrap(
"env",
"host_write_file",
|mut caller: Caller<'_, PluginStoreData>,
path_ptr: i32,
path_len: i32,
data_ptr: i32,
data_len: i32|
-> i32 {
if path_ptr < 0 || path_len < 0 || data_ptr < 0 || data_len < 0 {
return -1;
}
let memory = caller.get_export("memory").and_then(|e| e.into_memory());
let Some(mem) = memory else { return -1 };
let mem_data = mem.data(&caller);
let path_start = path_ptr as usize;
let path_end = path_start + path_len as usize;
let data_start = data_ptr as usize;
let data_end = data_start + data_len as usize;
if path_end > mem_data.len() || data_end > mem_data.len() {
return -1;
}
let path_str = match std::str::from_utf8(&mem_data[path_start..path_end]) {
Ok(s) => s.to_string(),
Err(_) => return -1,
};
let file_data = mem_data[data_start..data_end].to_vec();
// Canonicalize path for write (file may not exist yet)
let path = std::path::Path::new(&path_str);
let canonical = if path.exists() {
path.canonicalize().ok()
} else {
path.parent()
.and_then(|p| p.canonicalize().ok())
.map(|p| p.join(path.file_name().unwrap_or_default()))
};
let Some(canonical) = canonical else {
return -1;
};
// Check write permission against canonicalized path
let can_write = caller
.data()
.context
.capabilities
.filesystem
.write
.iter()
.any(|allowed| {
allowed
.canonicalize()
.is_ok_and(|a| canonical.starts_with(a))
});
if !can_write {
tracing::warn!(path = %path_str, "plugin write access denied");
return -2;
}
match std::fs::write(&canonical, &file_data) {
Ok(()) => 0,
Err(_) => -1,
}
},
)?;
// host_http_request: make an HTTP request (blocking)
linker.func_wrap(
"env",
"host_http_request",
|mut caller: Caller<'_, PluginStoreData>, url_ptr: i32, url_len: i32| -> i32 {
if url_ptr < 0 || url_len < 0 {
return -1;
}
let memory = caller.get_export("memory").and_then(|e| e.into_memory());
let Some(mem) = memory else { return -1 };
let data = mem.data(&caller);
let start = url_ptr as usize;
let end = start + url_len as usize;
if end > data.len() {
return -1;
}
let url_str = match std::str::from_utf8(&data[start..end]) {
Ok(s) => s.to_string(),
Err(_) => return -1,
};
// Check network permission
if !caller.data().context.capabilities.network.enabled {
tracing::warn!(url = %url_str, "plugin network access denied");
return -2;
}
// Use block_in_place to avoid blocking the async runtime's thread pool.
// Falls back to a blocking client with timeout if block_in_place is unavailable.
let result = std::panic::catch_unwind(|| {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| e.to_string())?;
let resp = client
.get(&url_str)
.send()
.await
.map_err(|e| e.to_string())?;
let bytes = resp.bytes().await.map_err(|e| e.to_string())?;
Ok::<_, String>(bytes)
})
})
});
match result {
Ok(Ok(bytes)) => {
let len = bytes.len() as i32;
caller.data_mut().exchange_buffer = bytes.to_vec();
len
}
Ok(Err(_)) => -1,
Err(_) => {
// block_in_place panicked (e.g. current-thread runtime);
// fall back to blocking client with timeout
let client = match reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
{
Ok(c) => c,
Err(_) => return -1,
};
match client.get(&url_str).send() {
Ok(resp) => match resp.bytes() {
Ok(bytes) => {
let len = bytes.len() as i32;
caller.data_mut().exchange_buffer = bytes.to_vec();
len
}
Err(_) => -1,
},
Err(_) => -1,
}
}
}
},
)?;
// host_get_config: read a config key into the exchange buffer
linker.func_wrap(
"env",
"host_get_config",
|mut caller: Caller<'_, PluginStoreData>, key_ptr: i32, key_len: i32| -> i32 {
if key_ptr < 0 || key_len < 0 {
return -1;
}
let memory = caller.get_export("memory").and_then(|e| e.into_memory());
let Some(mem) = memory else { return -1 };
let data = mem.data(&caller);
let start = key_ptr as usize;
let end = start + key_len as usize;
if end > data.len() {
return -1;
}
let key_str = match std::str::from_utf8(&data[start..end]) {
Ok(s) => s.to_string(),
Err(_) => return -1,
};
match caller.data().context.config.get(&key_str) {
Some(value) => {
let json = value.to_string();
let bytes = json.into_bytes();
let len = bytes.len() as i32;
caller.data_mut().exchange_buffer = bytes;
len
}
None => -1,
}
},
)?;
// host_get_buffer: copy the exchange buffer to WASM memory
linker.func_wrap(
"env",
"host_get_buffer",
|mut caller: Caller<'_, PluginStoreData>, dest_ptr: i32, dest_len: i32| -> i32 {
if dest_ptr < 0 || dest_len < 0 {
return -1;
}
let buf = caller.data().exchange_buffer.clone();
let copy_len = buf.len().min(dest_len as usize);
let memory = caller.get_export("memory").and_then(|e| e.into_memory());
let Some(mem) = memory else { return -1 };
let mem_data = mem.data_mut(&mut caller);
let start = dest_ptr as usize;
if start + copy_len > mem_data.len() {
return -1;
}
mem_data[start..start + copy_len].copy_from_slice(&buf[..copy_len]);
copy_len as i32
},
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use pinakes_plugin_api::PluginContext;
use std::collections::HashMap;
#[test]
fn test_wasm_runtime_creation() {
let runtime = WasmRuntime::new();
assert!(runtime.is_ok());
}
#[test]
fn test_host_functions_file_access() {
let mut capabilities = pinakes_plugin_api::Capabilities::default();
capabilities.filesystem.read.push("/tmp".into());
capabilities.filesystem.write.push("/tmp/output".into());
let context = PluginContext {
data_dir: "/tmp/data".into(),
cache_dir: "/tmp/cache".into(),
config: HashMap::new(),
capabilities,
};
// Verify capability checks work via context fields
let can_read = context
.capabilities
.filesystem
.read
.iter()
.any(|p| Path::new("/tmp/test.txt").starts_with(p));
assert!(can_read);
let cant_read = context
.capabilities
.filesystem
.read
.iter()
.any(|p| Path::new("/etc/passwd").starts_with(p));
assert!(!cant_read);
let can_write = context
.capabilities
.filesystem
.write
.iter()
.any(|p| Path::new("/tmp/output/file.txt").starts_with(p));
assert!(can_write);
let cant_write = context
.capabilities
.filesystem
.write
.iter()
.any(|p| Path::new("/tmp/file.txt").starts_with(p));
assert!(!cant_write);
}
#[test]
fn test_host_functions_network_access() {
let mut context = PluginContext {
data_dir: "/tmp/data".into(),
cache_dir: "/tmp/cache".into(),
config: HashMap::new(),
capabilities: Default::default(),
};
assert!(!context.capabilities.network.enabled);
context.capabilities.network.enabled = true;
assert!(context.capabilities.network.enabled);
}
#[test]
fn test_linker_setup() {
let engine = Engine::default();
let mut linker = Linker::<PluginStoreData>::new(&engine);
let result = HostFunctions::setup_linker(&mut linker);
assert!(result.is_ok());
}
}