pinakes/crates/pinakes-ui/src/plugin_ui/data.rs
NotAShelf 185e3b562a
treewide: cleanup
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ia01590cdeed872cc8ebd16f6ca95f3cc6a6a6964
2026-03-12 19:41:15 +03:00

768 lines
24 KiB
Rust

//! Data fetching system for plugin UI pages
//!
//! Provides data fetching and caching for plugin data sources.
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use dioxus::prelude::*;
use dioxus_core::Task;
use pinakes_plugin_api::{DataSource, Expression, HttpMethod};
use super::expr::{evaluate_expression, value_to_display_string};
use crate::client::ApiClient;
/// Cached data for a plugin page
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PluginPageData {
data: HashMap<String, serde_json::Value>,
loading: HashSet<String>,
errors: HashMap<String, String>,
}
impl PluginPageData {
/// Get data for a specific source
#[must_use]
pub fn get(&self, source: &str) -> Option<&serde_json::Value> {
self.data.get(source)
}
/// Check if a source is currently loading
#[must_use]
pub fn is_loading(&self, source: &str) -> bool {
self.loading.contains(source)
}
/// Get error for a specific source
#[must_use]
pub fn error(&self, source: &str) -> Option<&str> {
self.errors.get(source).map(String::as_str)
}
/// Check if there is data for a specific source
#[must_use]
pub fn has_data(&self, source: &str) -> bool {
self.data.contains_key(source)
}
/// Set data for a source
pub fn set_data(&mut self, source: String, value: serde_json::Value) {
self.data.insert(source, value);
}
/// Set loading state for a source
pub fn set_loading(&mut self, source: &str, loading: bool) {
if loading {
self.loading.insert(source.to_string());
self.errors.remove(source);
} else {
self.loading.remove(source);
}
}
/// Set error for a source
pub fn set_error(&mut self, source: String, error: String) {
self.errors.insert(source, error);
}
/// Convert all resolved data to a single JSON object for expression
/// evaluation
#[must_use]
pub fn as_json(&self) -> serde_json::Value {
serde_json::Value::Object(
self
.data
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
}
/// Clear all data
pub fn clear(&mut self) {
self.data.clear();
self.loading.clear();
self.errors.clear();
}
}
/// Convert a plugin `HttpMethod` to a `reqwest::Method`.
pub(super) const fn to_reqwest_method(method: &HttpMethod) -> reqwest::Method {
match method {
HttpMethod::Get => reqwest::Method::GET,
HttpMethod::Post => reqwest::Method::POST,
HttpMethod::Put => reqwest::Method::PUT,
HttpMethod::Patch => reqwest::Method::PATCH,
HttpMethod::Delete => reqwest::Method::DELETE,
}
}
/// Fetch data from an endpoint, evaluating any params expressions against
/// the given context.
async fn fetch_endpoint(
client: &ApiClient,
path: &str,
method: HttpMethod,
params: &HashMap<String, Expression>,
ctx: &serde_json::Value,
allowed_endpoints: &[String],
) -> Result<serde_json::Value, String> {
if !allowed_endpoints.is_empty()
&& !allowed_endpoints.iter().any(|ep| path == ep.as_str())
{
return Err(format!(
"Endpoint '{path}' is not in plugin's declared required_endpoints"
));
}
let reqwest_method = to_reqwest_method(&method);
let mut request = client.raw_request(reqwest_method.clone(), path);
if !params.is_empty() {
if reqwest_method == reqwest::Method::GET {
// Evaluate each param expression and add as query string
let query_pairs: Vec<(String, String)> = params
.iter()
.map(|(k, expr)| {
let v = evaluate_expression(expr, ctx);
(k.clone(), value_to_display_string(&v))
})
.collect();
request = request.query(&query_pairs);
} else {
// Evaluate params and send as JSON body
let body: serde_json::Map<String, serde_json::Value> = params
.iter()
.map(|(k, expr)| (k.clone(), evaluate_expression(expr, ctx)))
.collect();
request = request.json(&body);
}
}
// Send request and parse response
let response = request
.send()
.await
.map_err(|e| format!("Request failed: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("HTTP {status}: {body}"));
}
response
.json::<serde_json::Value>()
.await
.map_err(|e| format!("Failed to parse JSON: {e}"))
}
/// Fetch all data sources for a page
///
/// Endpoint sources are deduplicated by `(path, method, params)`: if multiple
/// sources share the same triplet, a single HTTP request is made and the raw
/// response is shared, with each source's own `transform` applied
/// independently. All unique Endpoint and Static sources are fetched
/// concurrently. Transform sources are applied after, in iteration order,
/// against the full result set.
///
/// # Errors
///
/// Returns an error if any data source fails to fetch
pub async fn fetch_page_data(
client: &ApiClient,
data_sources: &HashMap<String, DataSource>,
allowed_endpoints: &[String],
) -> Result<HashMap<String, serde_json::Value>, String> {
// Group non-Transform sources into dedup groups.
//
// For Endpoint sources, two entries are in the same group when they share
// the same (path, method, params) - i.e., they would produce an identical
// HTTP request. The per-source `transform` expression is kept separate so
// each name can apply its own transform to the shared raw response.
//
// Static sources never share an HTTP request so each becomes its own group.
//
// Each group is: (names_and_transforms, representative_source)
// where names_and_transforms is Vec<(name, Option<Expression>)> for Endpoint,
// or Vec<(name, ())> for Static (transform is baked in).
struct Group {
// (source name, per-name transform expression for Endpoint sources)
members: Vec<(String, Option<Expression>)>,
// The representative source used to fire the request (transform ignored
// for Endpoint - we apply per-member transforms after fetching)
source: DataSource,
}
let mut groups: Vec<Group> = Vec::new();
for (name, source) in data_sources {
if matches!(source, DataSource::Transform { .. }) {
continue;
}
match source {
DataSource::Endpoint {
path,
method,
params,
transform,
poll_interval,
} => {
// Find an existing group with the same (path, method, params).
let existing = groups.iter_mut().find(|g| {
matches!(
&g.source,
DataSource::Endpoint {
path: ep,
method: em,
params: epa,
..
} if ep == path && em == method && epa == params
)
});
if let Some(group) = existing {
group.members.push((name.clone(), transform.clone()));
} else {
groups.push(Group {
members: vec![(name.clone(), transform.clone())],
source: DataSource::Endpoint {
path: path.clone(),
method: method.clone(),
params: params.clone(),
poll_interval: *poll_interval,
transform: None,
},
});
}
},
DataSource::Static { .. } => {
// Static sources are trivially unique per name; no dedup needed.
groups.push(Group {
members: vec![(name.clone(), None)],
source: source.clone(),
});
},
DataSource::Transform { .. } => unreachable!(),
}
}
// Fire one future per group concurrently.
let futs: Vec<_> = groups
.into_iter()
.map(|group| {
let client = client.clone();
let allowed = allowed_endpoints.to_vec();
async move {
// Fetch the raw value for this group.
let raw = match &group.source {
DataSource::Endpoint {
path,
method,
params,
..
} => {
let empty_ctx = serde_json::json!({});
fetch_endpoint(
&client,
path,
method.clone(),
params,
&empty_ctx,
&allowed,
)
.await?
},
DataSource::Static { value } => value.clone(),
DataSource::Transform { .. } => unreachable!(),
};
// Apply per-member transforms and collect (name, value) pairs.
let pairs: Vec<(String, serde_json::Value)> = group
.members
.into_iter()
.map(|(name, transform)| {
let value = if let Some(expr) = &transform {
evaluate_expression(expr, &raw)
} else {
raw.clone()
};
(name, value)
})
.collect();
Ok::<_, String>(pairs)
}
})
.collect();
let mut results: HashMap<String, serde_json::Value> = HashMap::new();
for group_result in futures::future::join_all(futs).await {
for (name, value) in group_result? {
results.insert(name, value);
}
}
// Process Transform sources in dependency order. HashMap iteration order is
// non-deterministic, so a Transform referencing another Transform could see
// null if the upstream was not yet resolved. The pending loop below defers
// any Transform whose upstream is not yet in results, making progress on
// each pass until all are resolved. UiPage::validate guarantees no cycles,
// so the loop always terminates.
let mut pending: Vec<(&String, &String, &Expression)> = data_sources
.iter()
.filter_map(|(name, source)| {
match source {
DataSource::Transform {
source_name,
expression,
} => Some((name, source_name, expression)),
_ => None,
}
})
.collect();
while !pending.is_empty() {
let prev_len = pending.len();
let mut i = 0;
while i < pending.len() {
let (name, source_name, expression) = pending[i];
if results.contains_key(source_name.as_str()) {
let ctx = serde_json::Value::Object(
results
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
);
results.insert(name.clone(), evaluate_expression(expression, &ctx));
pending.swap_remove(i);
} else {
i += 1;
}
}
if pending.len() == prev_len {
// No progress: upstream source is missing (should be caught by
// UiPage::validate, but handled defensively here).
tracing::warn!(
"plugin transform dependency unresolvable; processing remaining in \
iteration order"
);
for (name, _, expression) in pending {
let ctx = serde_json::Value::Object(
results
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
);
results.insert(name.clone(), evaluate_expression(expression, &ctx));
}
break;
}
}
Ok(results)
}
/// Hook to fetch and cache plugin page data
///
/// Returns a signal containing the data state. If any data source has a
/// non-zero `poll_interval`, a background loop re-fetches automatically at the
/// minimum interval. The `refresh` counter can be incremented to trigger an
/// immediate re-fetch outside of the polling interval.
pub fn use_plugin_data(
client: Signal<ApiClient>,
data_sources: HashMap<String, DataSource>,
refresh: Signal<u32>,
allowed_endpoints: Vec<String>,
) -> Signal<PluginPageData> {
let mut data = use_signal(PluginPageData::default);
let mut poll_task: Signal<Option<Task>> = use_signal(|| None);
use_effect(move || {
// Subscribe to the refresh counter; incrementing it triggers a re-run.
let _rev = refresh.read();
let sources = data_sources.clone();
let allowed = allowed_endpoints.clone();
// Cancel the previous polling task before spawning a new one. Use
// write() rather than read() so the effect does not subscribe to
// poll_task and trigger an infinite re-run loop.
if let Some(t) = poll_task.write().take() {
t.cancel();
}
// Determine minimum poll interval (0 = no polling)
let min_poll_secs: u64 = sources
.values()
.filter_map(|s| {
if let DataSource::Endpoint { poll_interval, .. } = s {
if *poll_interval > 0 {
Some(*poll_interval)
} else {
None
}
} else {
None
}
})
.min()
.unwrap_or(0);
let handle = spawn(async move {
// Clear previous data
data.write().clear();
// Mark all sources as loading
for name in sources.keys() {
data.write().set_loading(name, true);
}
// Initial fetch; clone to release the signal read borrow before await.
let cl = client.peek().clone();
match fetch_page_data(&cl, &sources, &allowed).await {
Ok(results) => {
for (name, value) in results {
data.write().set_loading(&name, false);
data.write().set_data(name, value);
}
},
Err(e) => {
for name in sources.keys() {
data.write().set_loading(name, false);
data.write().set_error(name.clone(), e.clone());
}
},
}
// Polling loop; only runs if at least one source has poll_interval > 0
if min_poll_secs > 0 {
loop {
tokio::time::sleep(Duration::from_secs(min_poll_secs)).await;
let cl = client.peek().clone();
match fetch_page_data(&cl, &sources, &allowed).await {
Ok(results) => {
for (name, value) in results {
// Only write if data is new or has changed to avoid spurious
// signal updates that would force a re-render
let changed = !data.read().has_data(&name)
|| data.read().get(&name) != Some(&value);
if changed {
data.write().set_data(name, value);
}
}
},
Err(e) => {
tracing::warn!("Poll fetch failed: {e}");
},
}
}
}
});
*poll_task.write() = Some(handle);
});
data
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_plugin_page_data() {
let mut data = PluginPageData::default();
// Test empty state
assert!(!data.has_data("test"));
assert!(!data.is_loading("test"));
assert!(data.error("test").is_none());
// Test setting data
data.set_data("test".to_string(), serde_json::json!({"key": "value"}));
assert!(data.has_data("test"));
assert_eq!(data.get("test"), Some(&serde_json::json!({"key": "value"})));
// Test loading state
data.set_loading("loading", true);
assert!(data.is_loading("loading"));
data.set_loading("loading", false);
assert!(!data.is_loading("loading"));
// Test error state
data.set_error("error".to_string(), "oops".to_string());
assert_eq!(data.error("error"), Some("oops"));
}
#[test]
fn test_as_json_empty() {
let data = PluginPageData::default();
assert_eq!(data.as_json(), serde_json::json!({}));
}
#[test]
fn test_as_json_with_data() {
let mut data = PluginPageData::default();
data.set_data("users".to_string(), serde_json::json!([{"id": 1}]));
data.set_data("count".to_string(), serde_json::json!(42));
let json = data.as_json();
assert_eq!(json["users"], serde_json::json!([{"id": 1}]));
assert_eq!(json["count"], serde_json::json!(42));
}
#[test]
fn test_set_loading_true_clears_error() {
let mut data = PluginPageData::default();
data.set_error("src".to_string(), "oops".to_string());
assert!(data.error("src").is_some());
data.set_loading("src", true);
assert!(data.error("src").is_none());
assert!(data.is_loading("src"));
}
#[test]
fn test_set_loading_false_removes_flag() {
let mut data = PluginPageData::default();
data.set_loading("src", true);
assert!(data.is_loading("src"));
data.set_loading("src", false);
assert!(!data.is_loading("src"));
}
#[test]
fn test_clear_resets_all_state() {
let mut data = PluginPageData::default();
data.set_data("x".to_string(), serde_json::json!(1));
data.set_loading("x", true);
data.set_error("y".to_string(), "err".to_string());
data.clear();
assert!(!data.has_data("x"));
assert!(!data.is_loading("x"));
assert!(data.error("y").is_none());
}
#[test]
fn test_partial_eq() {
let mut a = PluginPageData::default();
let mut b = PluginPageData::default();
assert_eq!(a, b);
a.set_data("k".to_string(), serde_json::json!(1));
assert_ne!(a, b);
b.set_data("k".to_string(), serde_json::json!(1));
assert_eq!(a, b);
}
#[tokio::test]
async fn test_fetch_page_data_static_only() {
use pinakes_plugin_api::DataSource;
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
sources.insert("nums".to_string(), DataSource::Static {
value: serde_json::json!([1, 2, 3]),
});
sources.insert("flag".to_string(), DataSource::Static {
value: serde_json::json!(true),
});
let results = super::fetch_page_data(&client, &sources, &[])
.await
.unwrap();
assert_eq!(results["nums"], serde_json::json!([1, 2, 3]));
assert_eq!(results["flag"], serde_json::json!(true));
}
#[tokio::test]
async fn test_fetch_page_data_transform_evaluates_expression() {
use pinakes_plugin_api::{DataSource, Expression};
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
// The Transform expression accesses "raw" from the context
sources.insert("derived".to_string(), DataSource::Transform {
source_name: "raw".to_string(),
expression: Expression::Path("raw".to_string()),
});
sources.insert("raw".to_string(), DataSource::Static {
value: serde_json::json!({"ok": true}),
});
let results = super::fetch_page_data(&client, &sources, &[])
.await
.unwrap();
assert_eq!(results["raw"], serde_json::json!({"ok": true}));
// derived should return the value of "raw" from context
assert_eq!(results["derived"], serde_json::json!({"ok": true}));
}
#[tokio::test]
async fn test_fetch_page_data_transform_literal_expression() {
use pinakes_plugin_api::{DataSource, Expression};
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
sources.insert("raw".to_string(), DataSource::Static {
value: serde_json::json!(42),
});
sources.insert("derived".to_string(), DataSource::Transform {
source_name: "raw".to_string(),
expression: Expression::Literal(serde_json::json!("constant")),
});
let results = super::fetch_page_data(&client, &sources, &[])
.await
.unwrap();
// A Literal expression returns the literal value, not the source data
assert_eq!(results["derived"], serde_json::json!("constant"));
}
#[tokio::test]
async fn test_fetch_page_data_deduplicates_identical_endpoints() {
use pinakes_plugin_api::DataSource;
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
// Two Static sources with the same payload; dedup is for Endpoint sources,
// but both names must appear in the output regardless.
sources.insert("a".to_string(), DataSource::Static {
value: serde_json::json!(1),
});
sources.insert("b".to_string(), DataSource::Static {
value: serde_json::json!(1),
});
let results = super::fetch_page_data(&client, &sources, &[])
.await
.unwrap();
assert_eq!(results["a"], serde_json::json!(1));
assert_eq!(results["b"], serde_json::json!(1));
assert_eq!(results.len(), 2);
}
// Verifies that endpoint sources with identical (path, method, params) are
// deduplicated correctly. Because there is no real server, the allowlist
// rejection fires before any network call; both names seeing the same error
// proves they were grouped and that the single rejection propagated to all.
#[tokio::test]
async fn test_dedup_groups_endpoint_sources_with_same_key() {
use pinakes_plugin_api::{DataSource, Expression, HttpMethod};
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
// Two endpoints with identical (path, method, params=empty) but different
// transforms. Both should produce the same error when the path is blocked.
sources.insert("x".to_string(), DataSource::Endpoint {
path: "/api/v1/media".to_string(),
method: HttpMethod::Get,
params: Default::default(),
poll_interval: 0,
transform: Some(Expression::Literal(serde_json::json!("from_x"))),
});
sources.insert("y".to_string(), DataSource::Endpoint {
path: "/api/v1/media".to_string(),
method: HttpMethod::Get,
params: Default::default(),
poll_interval: 0,
transform: Some(Expression::Literal(serde_json::json!("from_y"))),
});
// Both sources point to the same blocked endpoint; expect an error.
let allowed = vec!["/api/v1/tags".to_string()];
let result = super::fetch_page_data(&client, &sources, &allowed).await;
assert!(
result.is_err(),
"fetch_page_data must return Err for blocked deduplicated endpoints"
);
let msg = result.unwrap_err();
assert!(
msg.contains("not in plugin's declared required_endpoints"),
"unexpected error: {msg}"
);
}
// Verifies the transform fan-out behavior: each member of a dedup group
// applies its own transform to the shared raw value independently. This
// mirrors what Endpoint dedup does after a single shared HTTP request.
//
// Testing Endpoint dedup with real per-member transforms requires a mock HTTP
// server and belongs in an integration test.
#[tokio::test]
async fn test_dedup_transform_applied_per_source() {
use pinakes_plugin_api::{DataSource, Expression};
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
sources.insert("raw_data".to_string(), DataSource::Static {
value: serde_json::json!({"count": 42, "name": "test"}),
});
// Two Transform sources referencing "raw_data" with different expressions;
// each must produce its own independently derived value.
sources.insert("derived_count".to_string(), DataSource::Transform {
source_name: "raw_data".to_string(),
expression: Expression::Path("raw_data.count".to_string()),
});
sources.insert("derived_name".to_string(), DataSource::Transform {
source_name: "raw_data".to_string(),
expression: Expression::Path("raw_data.name".to_string()),
});
let results = super::fetch_page_data(&client, &sources, &[])
.await
.unwrap();
assert_eq!(
results["raw_data"],
serde_json::json!({"count": 42, "name": "test"})
);
assert_eq!(results["derived_count"], serde_json::json!(42));
assert_eq!(results["derived_name"], serde_json::json!("test"));
assert_eq!(results.len(), 3);
}
#[tokio::test]
async fn test_endpoint_blocked_when_not_in_allowlist() {
use pinakes_plugin_api::{DataSource, HttpMethod};
use crate::client::ApiClient;
let client = ApiClient::default();
let mut sources = HashMap::new();
sources.insert("items".to_string(), DataSource::Endpoint {
path: "/api/v1/media".to_string(),
method: HttpMethod::Get,
params: Default::default(),
poll_interval: 0,
transform: None,
});
// Provide a non-empty allowlist that does NOT include the endpoint path.
let allowed = vec!["/api/v1/tags".to_string()];
let result = super::fetch_page_data(&client, &sources, &allowed).await;
assert!(
result.is_err(),
"fetch_page_data must return Err when endpoint is not in \
allowed_endpoints"
);
let msg = result.unwrap_err();
assert!(
msg.contains("not in plugin's declared required_endpoints"),
"error must explain that the endpoint is not declared, got: {msg}"
);
}
}