//! Data fetching system for plugin UI pages //! //! Provides data fetching and caching for plugin data sources. use std::{ collections::{HashMap, HashSet}, time::Duration, }; use dioxus::prelude::*; use dioxus_core::Task; use pinakes_plugin_api::{DataSource, Expression, HttpMethod}; use super::expr::{evaluate_expression, value_to_display_string}; use crate::client::ApiClient; /// Cached data for a plugin page #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct PluginPageData { data: HashMap, loading: HashSet, errors: HashMap, } impl PluginPageData { /// Get data for a specific source #[must_use] pub fn get(&self, source: &str) -> Option<&serde_json::Value> { self.data.get(source) } /// Check if a source is currently loading #[must_use] pub fn is_loading(&self, source: &str) -> bool { self.loading.contains(source) } /// Get error for a specific source #[must_use] pub fn error(&self, source: &str) -> Option<&str> { self.errors.get(source).map(String::as_str) } /// Check if there is data for a specific source #[must_use] pub fn has_data(&self, source: &str) -> bool { self.data.contains_key(source) } /// Set data for a source pub fn set_data(&mut self, source: String, value: serde_json::Value) { self.data.insert(source, value); } /// Set loading state for a source pub fn set_loading(&mut self, source: &str, loading: bool) { if loading { self.loading.insert(source.to_string()); self.errors.remove(source); } else { self.loading.remove(source); } } /// Set error for a source pub fn set_error(&mut self, source: String, error: String) { self.errors.insert(source, error); } /// Convert all resolved data to a single JSON object for expression /// evaluation #[must_use] pub fn as_json(&self) -> serde_json::Value { serde_json::Value::Object( self .data .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(), ) } /// Clear all data pub fn clear(&mut self) { self.data.clear(); self.loading.clear(); self.errors.clear(); } } /// Convert a plugin `HttpMethod` to a `reqwest::Method`. pub(super) const fn to_reqwest_method(method: &HttpMethod) -> reqwest::Method { match method { HttpMethod::Get => reqwest::Method::GET, HttpMethod::Post => reqwest::Method::POST, HttpMethod::Put => reqwest::Method::PUT, HttpMethod::Patch => reqwest::Method::PATCH, HttpMethod::Delete => reqwest::Method::DELETE, } } /// Fetch data from an endpoint, evaluating any params expressions against /// the given context. async fn fetch_endpoint( client: &ApiClient, path: &str, method: HttpMethod, params: &HashMap, ctx: &serde_json::Value, allowed_endpoints: &[String], ) -> Result { if !allowed_endpoints.is_empty() && !allowed_endpoints.iter().any(|ep| path == ep.as_str()) { return Err(format!( "Endpoint '{path}' is not in plugin's declared required_endpoints" )); } let reqwest_method = to_reqwest_method(&method); let mut request = client.raw_request(reqwest_method.clone(), path); if !params.is_empty() { if reqwest_method == reqwest::Method::GET { // Evaluate each param expression and add as query string let query_pairs: Vec<(String, String)> = params .iter() .map(|(k, expr)| { let v = evaluate_expression(expr, ctx); (k.clone(), value_to_display_string(&v)) }) .collect(); request = request.query(&query_pairs); } else { // Evaluate params and send as JSON body let body: serde_json::Map = params .iter() .map(|(k, expr)| (k.clone(), evaluate_expression(expr, ctx))) .collect(); request = request.json(&body); } } // Send request and parse response let response = request .send() .await .map_err(|e| format!("Request failed: {e}"))?; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(format!("HTTP {status}: {body}")); } response .json::() .await .map_err(|e| format!("Failed to parse JSON: {e}")) } /// Fetch all data sources for a page /// /// Endpoint sources are deduplicated by `(path, method, params)`: if multiple /// sources share the same triplet, a single HTTP request is made and the raw /// response is shared, with each source's own `transform` applied /// independently. All unique Endpoint and Static sources are fetched /// concurrently. Transform sources are applied after, in iteration order, /// against the full result set. /// /// # Errors /// /// Returns an error if any data source fails to fetch pub async fn fetch_page_data( client: &ApiClient, data_sources: &HashMap, allowed_endpoints: &[String], ) -> Result, String> { // Group non-Transform sources into dedup groups. // // For Endpoint sources, two entries are in the same group when they share // the same (path, method, params) - i.e., they would produce an identical // HTTP request. The per-source `transform` expression is kept separate so // each name can apply its own transform to the shared raw response. // // Static sources never share an HTTP request so each becomes its own group. // // Each group is: (names_and_transforms, representative_source) // where names_and_transforms is Vec<(name, Option)> for Endpoint, // or Vec<(name, ())> for Static (transform is baked in). struct Group { // (source name, per-name transform expression for Endpoint sources) members: Vec<(String, Option)>, // The representative source used to fire the request (transform ignored // for Endpoint - we apply per-member transforms after fetching) source: DataSource, } let mut groups: Vec = Vec::new(); for (name, source) in data_sources { if matches!(source, DataSource::Transform { .. }) { continue; } match source { DataSource::Endpoint { path, method, params, transform, poll_interval, } => { // Find an existing group with the same (path, method, params). let existing = groups.iter_mut().find(|g| { matches!( &g.source, DataSource::Endpoint { path: ep, method: em, params: epa, .. } if ep == path && em == method && epa == params ) }); if let Some(group) = existing { group.members.push((name.clone(), transform.clone())); } else { groups.push(Group { members: vec![(name.clone(), transform.clone())], source: DataSource::Endpoint { path: path.clone(), method: method.clone(), params: params.clone(), poll_interval: *poll_interval, transform: None, }, }); } }, DataSource::Static { .. } => { // Static sources are trivially unique per name; no dedup needed. groups.push(Group { members: vec![(name.clone(), None)], source: source.clone(), }); }, DataSource::Transform { .. } => unreachable!(), } } // Fire one future per group concurrently. let futs: Vec<_> = groups .into_iter() .map(|group| { let client = client.clone(); let allowed = allowed_endpoints.to_vec(); async move { // Fetch the raw value for this group. let raw = match &group.source { DataSource::Endpoint { path, method, params, .. } => { let empty_ctx = serde_json::json!({}); fetch_endpoint( &client, path, method.clone(), params, &empty_ctx, &allowed, ) .await? }, DataSource::Static { value } => value.clone(), DataSource::Transform { .. } => unreachable!(), }; // Apply per-member transforms and collect (name, value) pairs. let pairs: Vec<(String, serde_json::Value)> = group .members .into_iter() .map(|(name, transform)| { let value = if let Some(expr) = &transform { evaluate_expression(expr, &raw) } else { raw.clone() }; (name, value) }) .collect(); Ok::<_, String>(pairs) } }) .collect(); let mut results: HashMap = HashMap::new(); for group_result in futures::future::join_all(futs).await { for (name, value) in group_result? { results.insert(name, value); } } // Process Transform sources in dependency order. HashMap iteration order is // non-deterministic, so a Transform referencing another Transform could see // null if the upstream was not yet resolved. The pending loop below defers // any Transform whose upstream is not yet in results, making progress on // each pass until all are resolved. UiPage::validate guarantees no cycles, // so the loop always terminates. let mut pending: Vec<(&String, &String, &Expression)> = data_sources .iter() .filter_map(|(name, source)| { match source { DataSource::Transform { source_name, expression, } => Some((name, source_name, expression)), _ => None, } }) .collect(); while !pending.is_empty() { let prev_len = pending.len(); let mut i = 0; while i < pending.len() { let (name, source_name, expression) = pending[i]; if results.contains_key(source_name.as_str()) { let ctx = serde_json::Value::Object( results .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(), ); results.insert(name.clone(), evaluate_expression(expression, &ctx)); pending.swap_remove(i); } else { i += 1; } } if pending.len() == prev_len { // No progress: upstream source is missing (should be caught by // UiPage::validate, but handled defensively here). tracing::warn!( "plugin transform dependency unresolvable; processing remaining in \ iteration order" ); for (name, _, expression) in pending { let ctx = serde_json::Value::Object( results .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(), ); results.insert(name.clone(), evaluate_expression(expression, &ctx)); } break; } } Ok(results) } /// Hook to fetch and cache plugin page data /// /// Returns a signal containing the data state. If any data source has a /// non-zero `poll_interval`, a background loop re-fetches automatically at the /// minimum interval. The `refresh` counter can be incremented to trigger an /// immediate re-fetch outside of the polling interval. pub fn use_plugin_data( client: Signal, data_sources: HashMap, refresh: Signal, allowed_endpoints: Vec, ) -> Signal { let mut data = use_signal(PluginPageData::default); let mut poll_task: Signal> = use_signal(|| None); use_effect(move || { // Subscribe to the refresh counter; incrementing it triggers a re-run. let _rev = refresh.read(); let sources = data_sources.clone(); let allowed = allowed_endpoints.clone(); // Cancel the previous polling task before spawning a new one. Use // write() rather than read() so the effect does not subscribe to // poll_task and trigger an infinite re-run loop. if let Some(t) = poll_task.write().take() { t.cancel(); } // Determine minimum poll interval (0 = no polling) let min_poll_secs: u64 = sources .values() .filter_map(|s| { if let DataSource::Endpoint { poll_interval, .. } = s { if *poll_interval > 0 { Some(*poll_interval) } else { None } } else { None } }) .min() .unwrap_or(0); let handle = spawn(async move { // Clear previous data data.write().clear(); // Mark all sources as loading for name in sources.keys() { data.write().set_loading(name, true); } // Initial fetch; clone to release the signal read borrow before await. let cl = client.peek().clone(); match fetch_page_data(&cl, &sources, &allowed).await { Ok(results) => { for (name, value) in results { data.write().set_loading(&name, false); data.write().set_data(name, value); } }, Err(e) => { for name in sources.keys() { data.write().set_loading(name, false); data.write().set_error(name.clone(), e.clone()); } }, } // Polling loop; only runs if at least one source has poll_interval > 0 if min_poll_secs > 0 { loop { tokio::time::sleep(Duration::from_secs(min_poll_secs)).await; let cl = client.peek().clone(); match fetch_page_data(&cl, &sources, &allowed).await { Ok(results) => { for (name, value) in results { // Only write if data is new or has changed to avoid spurious // signal updates that would force a re-render let changed = !data.read().has_data(&name) || data.read().get(&name) != Some(&value); if changed { data.write().set_data(name, value); } } }, Err(e) => { tracing::warn!("Poll fetch failed: {e}"); }, } } } }); *poll_task.write() = Some(handle); }); data } #[cfg(test)] mod tests { use super::*; #[test] fn test_plugin_page_data() { let mut data = PluginPageData::default(); // Test empty state assert!(!data.has_data("test")); assert!(!data.is_loading("test")); assert!(data.error("test").is_none()); // Test setting data data.set_data("test".to_string(), serde_json::json!({"key": "value"})); assert!(data.has_data("test")); assert_eq!(data.get("test"), Some(&serde_json::json!({"key": "value"}))); // Test loading state data.set_loading("loading", true); assert!(data.is_loading("loading")); data.set_loading("loading", false); assert!(!data.is_loading("loading")); // Test error state data.set_error("error".to_string(), "oops".to_string()); assert_eq!(data.error("error"), Some("oops")); } #[test] fn test_as_json_empty() { let data = PluginPageData::default(); assert_eq!(data.as_json(), serde_json::json!({})); } #[test] fn test_as_json_with_data() { let mut data = PluginPageData::default(); data.set_data("users".to_string(), serde_json::json!([{"id": 1}])); data.set_data("count".to_string(), serde_json::json!(42)); let json = data.as_json(); assert_eq!(json["users"], serde_json::json!([{"id": 1}])); assert_eq!(json["count"], serde_json::json!(42)); } #[test] fn test_set_loading_true_clears_error() { let mut data = PluginPageData::default(); data.set_error("src".to_string(), "oops".to_string()); assert!(data.error("src").is_some()); data.set_loading("src", true); assert!(data.error("src").is_none()); assert!(data.is_loading("src")); } #[test] fn test_set_loading_false_removes_flag() { let mut data = PluginPageData::default(); data.set_loading("src", true); assert!(data.is_loading("src")); data.set_loading("src", false); assert!(!data.is_loading("src")); } #[test] fn test_clear_resets_all_state() { let mut data = PluginPageData::default(); data.set_data("x".to_string(), serde_json::json!(1)); data.set_loading("x", true); data.set_error("y".to_string(), "err".to_string()); data.clear(); assert!(!data.has_data("x")); assert!(!data.is_loading("x")); assert!(data.error("y").is_none()); } #[test] fn test_partial_eq() { let mut a = PluginPageData::default(); let mut b = PluginPageData::default(); assert_eq!(a, b); a.set_data("k".to_string(), serde_json::json!(1)); assert_ne!(a, b); b.set_data("k".to_string(), serde_json::json!(1)); assert_eq!(a, b); } #[tokio::test] async fn test_fetch_page_data_static_only() { use pinakes_plugin_api::DataSource; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); sources.insert("nums".to_string(), DataSource::Static { value: serde_json::json!([1, 2, 3]), }); sources.insert("flag".to_string(), DataSource::Static { value: serde_json::json!(true), }); let results = super::fetch_page_data(&client, &sources, &[]) .await .unwrap(); assert_eq!(results["nums"], serde_json::json!([1, 2, 3])); assert_eq!(results["flag"], serde_json::json!(true)); } #[tokio::test] async fn test_fetch_page_data_transform_evaluates_expression() { use pinakes_plugin_api::{DataSource, Expression}; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); // The Transform expression accesses "raw" from the context sources.insert("derived".to_string(), DataSource::Transform { source_name: "raw".to_string(), expression: Expression::Path("raw".to_string()), }); sources.insert("raw".to_string(), DataSource::Static { value: serde_json::json!({"ok": true}), }); let results = super::fetch_page_data(&client, &sources, &[]) .await .unwrap(); assert_eq!(results["raw"], serde_json::json!({"ok": true})); // derived should return the value of "raw" from context assert_eq!(results["derived"], serde_json::json!({"ok": true})); } #[tokio::test] async fn test_fetch_page_data_transform_literal_expression() { use pinakes_plugin_api::{DataSource, Expression}; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); sources.insert("raw".to_string(), DataSource::Static { value: serde_json::json!(42), }); sources.insert("derived".to_string(), DataSource::Transform { source_name: "raw".to_string(), expression: Expression::Literal(serde_json::json!("constant")), }); let results = super::fetch_page_data(&client, &sources, &[]) .await .unwrap(); // A Literal expression returns the literal value, not the source data assert_eq!(results["derived"], serde_json::json!("constant")); } #[tokio::test] async fn test_fetch_page_data_deduplicates_identical_endpoints() { use pinakes_plugin_api::DataSource; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); // Two Static sources with the same payload; dedup is for Endpoint sources, // but both names must appear in the output regardless. sources.insert("a".to_string(), DataSource::Static { value: serde_json::json!(1), }); sources.insert("b".to_string(), DataSource::Static { value: serde_json::json!(1), }); let results = super::fetch_page_data(&client, &sources, &[]) .await .unwrap(); assert_eq!(results["a"], serde_json::json!(1)); assert_eq!(results["b"], serde_json::json!(1)); assert_eq!(results.len(), 2); } // Verifies that endpoint sources with identical (path, method, params) are // deduplicated correctly. Because there is no real server, the allowlist // rejection fires before any network call; both names seeing the same error // proves they were grouped and that the single rejection propagated to all. #[tokio::test] async fn test_dedup_groups_endpoint_sources_with_same_key() { use pinakes_plugin_api::{DataSource, Expression, HttpMethod}; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); // Two endpoints with identical (path, method, params=empty) but different // transforms. Both should produce the same error when the path is blocked. sources.insert("x".to_string(), DataSource::Endpoint { path: "/api/v1/media".to_string(), method: HttpMethod::Get, params: Default::default(), poll_interval: 0, transform: Some(Expression::Literal(serde_json::json!("from_x"))), }); sources.insert("y".to_string(), DataSource::Endpoint { path: "/api/v1/media".to_string(), method: HttpMethod::Get, params: Default::default(), poll_interval: 0, transform: Some(Expression::Literal(serde_json::json!("from_y"))), }); // Both sources point to the same blocked endpoint; expect an error. let allowed = vec!["/api/v1/tags".to_string()]; let result = super::fetch_page_data(&client, &sources, &allowed).await; assert!( result.is_err(), "fetch_page_data must return Err for blocked deduplicated endpoints" ); let msg = result.unwrap_err(); assert!( msg.contains("not in plugin's declared required_endpoints"), "unexpected error: {msg}" ); } // Verifies the transform fan-out behavior: each member of a dedup group // applies its own transform to the shared raw value independently. This // mirrors what Endpoint dedup does after a single shared HTTP request. // // Testing Endpoint dedup with real per-member transforms requires a mock HTTP // server and belongs in an integration test. #[tokio::test] async fn test_dedup_transform_applied_per_source() { use pinakes_plugin_api::{DataSource, Expression}; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); sources.insert("raw_data".to_string(), DataSource::Static { value: serde_json::json!({"count": 42, "name": "test"}), }); // Two Transform sources referencing "raw_data" with different expressions; // each must produce its own independently derived value. sources.insert("derived_count".to_string(), DataSource::Transform { source_name: "raw_data".to_string(), expression: Expression::Path("raw_data.count".to_string()), }); sources.insert("derived_name".to_string(), DataSource::Transform { source_name: "raw_data".to_string(), expression: Expression::Path("raw_data.name".to_string()), }); let results = super::fetch_page_data(&client, &sources, &[]) .await .unwrap(); assert_eq!( results["raw_data"], serde_json::json!({"count": 42, "name": "test"}) ); assert_eq!(results["derived_count"], serde_json::json!(42)); assert_eq!(results["derived_name"], serde_json::json!("test")); assert_eq!(results.len(), 3); } #[tokio::test] async fn test_endpoint_blocked_when_not_in_allowlist() { use pinakes_plugin_api::{DataSource, HttpMethod}; use crate::client::ApiClient; let client = ApiClient::default(); let mut sources = HashMap::new(); sources.insert("items".to_string(), DataSource::Endpoint { path: "/api/v1/media".to_string(), method: HttpMethod::Get, params: Default::default(), poll_interval: 0, transform: None, }); // Provide a non-empty allowlist that does NOT include the endpoint path. let allowed = vec!["/api/v1/tags".to_string()]; let result = super::fetch_page_data(&client, &sources, &allowed).await; assert!( result.is_err(), "fetch_page_data must return Err when endpoint is not in \ allowed_endpoints" ); let msg = result.unwrap_err(); assert!( msg.contains("not in plugin's declared required_endpoints"), "error must explain that the endpoint is not declared, got: {msg}" ); } }