pinakes/crates/pinakes-core/src/webhooks.rs
NotAShelf 672e11b592
pinakes-core: add configurable rate limits and cors; add webhook dispatcher; bound job history
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ib0d34cd7878eb9e8d019497234a092466a6a6964
2026-03-08 00:43:24 +03:00

142 lines
4 KiB
Rust

use std::sync::Arc;
use chrono::Utc;
use serde::Serialize;
use tracing::{error, info, warn};
use crate::config::WebhookConfig;
/// Events that can trigger webhook delivery.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "event", content = "data")]
pub enum WebhookEvent {
#[serde(rename = "media.created")]
MediaCreated { media_id: String },
#[serde(rename = "media.updated")]
MediaUpdated { media_id: String },
#[serde(rename = "media.deleted")]
MediaDeleted { media_id: String },
#[serde(rename = "scan.completed")]
ScanCompleted {
files_found: usize,
files_processed: usize,
},
#[serde(rename = "import.completed")]
ImportCompleted { media_id: String },
#[serde(rename = "test")]
Test,
}
impl WebhookEvent {
/// Returns the event type string for matching against webhook config filters.
#[must_use]
pub const fn event_type(&self) -> &str {
match self {
Self::MediaCreated { .. } => "media.created",
Self::MediaUpdated { .. } => "media.updated",
Self::MediaDeleted { .. } => "media.deleted",
Self::ScanCompleted { .. } => "scan.completed",
Self::ImportCompleted { .. } => "import.completed",
Self::Test => "test",
}
}
}
/// Payload sent to webhook endpoints.
#[derive(Debug, Serialize)]
struct WebhookPayload<'a> {
event: &'a WebhookEvent,
timestamp: String,
}
/// Dispatches webhook events to configured endpoints.
pub struct WebhookDispatcher {
webhooks: Vec<WebhookConfig>,
client: reqwest::Client,
}
impl WebhookDispatcher {
/// Create a new dispatcher with the given webhook configurations.
#[must_use]
pub fn new(webhooks: Vec<WebhookConfig>) -> Arc<Self> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default();
Arc::new(Self { webhooks, client })
}
/// Dispatch an event to all matching webhooks.
/// This is fire-and-forget, errors are logged but not propagated.
pub fn dispatch(self: &Arc<Self>, event: WebhookEvent) {
let this = self.clone();
tokio::spawn(async move {
this.dispatch_inner(&event).await;
});
}
async fn dispatch_inner(&self, event: &WebhookEvent) {
let event_type = event.event_type();
let payload = WebhookPayload {
event,
timestamp: Utc::now().to_rfc3339(),
};
let body = match serde_json::to_vec(&payload) {
Ok(b) => b,
Err(e) => {
error!(error = %e, "failed to serialize webhook payload");
return;
},
};
for webhook in &self.webhooks {
// Check if this webhook is interested in this event type
if !webhook.events.is_empty()
&& !webhook.events.iter().any(|e| e == event_type || e == "*")
{
continue;
}
let mut req = self
.client
.post(&webhook.url)
.header("Content-Type", "application/json")
.header("X-Pinakes-Event", event_type);
// Add keyed BLAKE3 signature if secret is configured
if let Some(ref secret) = webhook.secret {
// Derive a 32-byte key from the secret using BLAKE3
let key =
blake3::derive_key("pinakes webhook signature", secret.as_bytes());
let mut hasher = blake3::Hasher::new_keyed(&key);
hasher.update(&body);
let signature = hasher.finalize().to_hex();
req = req.header("X-Pinakes-Signature", format!("blake3={signature}"));
}
match req.body(body.clone()).send().await {
Ok(resp) => {
if resp.status().is_success() {
info!(url = %webhook.url, event = event_type, "webhook delivered");
} else {
warn!(
url = %webhook.url,
event = event_type,
status = %resp.status(),
"webhook delivery returned non-success status"
);
}
},
Err(e) => {
warn!(
url = %webhook.url,
event = event_type,
error = %e,
"webhook delivery failed"
);
},
}
}
}
}