Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I4a6b498153eccd5407510dd541b7f4816a6a6964
106 lines
3.1 KiB
Rust
106 lines
3.1 KiB
Rust
use std::sync::Arc;
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::sync::broadcast;
|
|
use tracing::warn;
|
|
|
|
use crate::config::WebhookConfig;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum PinakesEvent {
|
|
MediaImported {
|
|
media_id: String,
|
|
},
|
|
MediaUpdated {
|
|
media_id: String,
|
|
},
|
|
MediaDeleted {
|
|
media_id: String,
|
|
},
|
|
ScanCompleted {
|
|
files_found: usize,
|
|
files_processed: usize,
|
|
},
|
|
IntegrityMismatch {
|
|
media_id: String,
|
|
expected: String,
|
|
actual: String,
|
|
},
|
|
}
|
|
|
|
impl PinakesEvent {
|
|
pub fn event_name(&self) -> &'static str {
|
|
match self {
|
|
Self::MediaImported { .. } => "media_imported",
|
|
Self::MediaUpdated { .. } => "media_updated",
|
|
Self::MediaDeleted { .. } => "media_deleted",
|
|
Self::ScanCompleted { .. } => "scan_completed",
|
|
Self::IntegrityMismatch { .. } => "integrity_mismatch",
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct EventBus {
|
|
tx: broadcast::Sender<PinakesEvent>,
|
|
}
|
|
|
|
impl EventBus {
|
|
pub fn new(webhooks: Vec<WebhookConfig>) -> Arc<Self> {
|
|
let (tx, _) = broadcast::channel(256);
|
|
|
|
// Spawn webhook delivery task
|
|
if !webhooks.is_empty() {
|
|
let mut rx: broadcast::Receiver<PinakesEvent> = tx.subscribe();
|
|
let webhooks = Arc::new(webhooks);
|
|
tokio::spawn(async move {
|
|
while let Ok(event) = rx.recv().await {
|
|
let event_name = event.event_name();
|
|
for hook in webhooks.iter() {
|
|
if hook.events.iter().any(|e| e == event_name || e == "*") {
|
|
let url = hook.url.clone();
|
|
let event_clone = event.clone();
|
|
let secret = hook.secret.clone();
|
|
tokio::spawn(async move {
|
|
deliver_webhook(&url, &event_clone, secret.as_deref()).await;
|
|
});
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
Arc::new(Self { tx })
|
|
}
|
|
|
|
pub fn emit(&self, event: PinakesEvent) {
|
|
// Ignore send errors (no receivers)
|
|
let _ = self.tx.send(event);
|
|
}
|
|
}
|
|
|
|
async fn deliver_webhook(url: &str, event: &PinakesEvent, _secret: Option<&str>) {
|
|
let client = reqwest::Client::new();
|
|
let body = serde_json::to_string(event).unwrap_or_default();
|
|
|
|
for attempt in 0..3 {
|
|
match client
|
|
.post(url)
|
|
.header("Content-Type", "application/json")
|
|
.body(body.clone())
|
|
.send()
|
|
.await
|
|
{
|
|
Ok(resp) if resp.status().is_success() => return,
|
|
Ok(resp) => {
|
|
warn!(url, status = %resp.status(), attempt, "webhook delivery failed");
|
|
}
|
|
Err(e) => {
|
|
warn!(url, error = %e, attempt, "webhook delivery error");
|
|
}
|
|
}
|
|
|
|
// Exponential backoff
|
|
tokio::time::sleep(std::time::Duration::from_secs(1 << attempt)).await;
|
|
}
|
|
}
|