diff --git a/crates/common/migrations/015_listen_notify_triggers.sql b/crates/common/migrations/015_listen_notify_triggers.sql new file mode 100644 index 0000000..26b35b9 --- /dev/null +++ b/crates/common/migrations/015_listen_notify_triggers.sql @@ -0,0 +1,61 @@ +-- PostgreSQL LISTEN/NOTIFY triggers for event-driven reactivity +-- Emits notifications on builds/jobsets mutations so daemons can wake immediately + +-- Trigger function: notify on builds changes +CREATE OR REPLACE FUNCTION notify_builds_changed() RETURNS trigger AS $$ +BEGIN + PERFORM pg_notify('fc_builds_changed', json_build_object( + 'op', TG_OP, + 'table', TG_TABLE_NAME + )::text); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +-- Trigger function: notify on jobsets changes +CREATE OR REPLACE FUNCTION notify_jobsets_changed() RETURNS trigger AS $$ +BEGIN + PERFORM pg_notify('fc_jobsets_changed', json_build_object( + 'op', TG_OP, + 'table', TG_TABLE_NAME + )::text); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +-- Builds: new build inserted (queue-runner should wake) +CREATE TRIGGER trg_builds_insert_notify + AFTER INSERT ON builds + FOR EACH ROW + EXECUTE FUNCTION notify_builds_changed(); + +-- Builds: status changed (queue-runner should re-check, e.g. deps resolved) +CREATE TRIGGER trg_builds_status_notify + AFTER UPDATE ON builds + FOR EACH ROW + WHEN (OLD.status IS DISTINCT FROM NEW.status) + EXECUTE FUNCTION notify_builds_changed(); + +-- Jobsets: new jobset created (evaluator should wake) +CREATE TRIGGER trg_jobsets_insert_notify + AFTER INSERT ON jobsets + FOR EACH ROW + EXECUTE FUNCTION notify_jobsets_changed(); + +-- Jobsets: relevant fields changed (evaluator should re-check) +CREATE TRIGGER trg_jobsets_update_notify + AFTER UPDATE ON jobsets + FOR EACH ROW + WHEN ( + OLD.enabled IS DISTINCT FROM NEW.enabled + OR OLD.state IS DISTINCT FROM NEW.state + OR OLD.nix_expression IS DISTINCT FROM NEW.nix_expression + OR OLD.check_interval IS DISTINCT FROM NEW.check_interval + ) + EXECUTE FUNCTION notify_jobsets_changed(); + +-- Jobsets: deleted (evaluator should wake to stop tracking) +CREATE TRIGGER trg_jobsets_delete_notify + AFTER DELETE ON jobsets + FOR EACH ROW + EXECUTE FUNCTION notify_jobsets_changed(); diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 6303886..0fe56d5 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -10,6 +10,7 @@ pub mod migrate; pub mod migrate_cli; pub mod models; pub mod notifications; +pub mod pg_notify; pub mod repo; pub mod bootstrap; diff --git a/crates/common/src/pg_notify.rs b/crates/common/src/pg_notify.rs new file mode 100644 index 0000000..f4c7b7c --- /dev/null +++ b/crates/common/src/pg_notify.rs @@ -0,0 +1,78 @@ +// Wraps `sqlx::postgres::PgListener` to subscribe to notification channels +// and signal an `Arc` when events arrive. Daemons use +// this to wake immediately instead of waiting for the next poll interval. +use std::sync::Arc; + +use sqlx::PgPool; +use tokio::{sync::Notify, task::JoinHandle}; + +/// Channel emitted on `builds` INSERT or status UPDATE. +pub const CHANNEL_BUILDS_CHANGED: &str = "fc_builds_changed"; + +/// Channel emitted on `jobsets` INSERT, UPDATE (relevant fields), or DELETE. +pub const CHANNEL_JOBSETS_CHANGED: &str = "fc_jobsets_changed"; + +/// Spawns a background task that listens on the given PG channels and calls +/// `wakeup.notify_waiters()` on each notification. Reconnects with 5s backoff +/// on connection loss. +pub fn spawn_listener( + pool: &PgPool, + channels: &[&str], + wakeup: Arc, +) -> JoinHandle<()> { + let pool = pool.clone(); + let channels: Vec = + channels.iter().map(|s| (*s).to_owned()).collect(); + + tokio::spawn(async move { + loop { + if let Err(e) = listen_loop(&pool, &channels, &wakeup).await { + tracing::warn!("PG LISTEN connection lost: {e}, reconnecting in 5s"); + } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }) +} + +/// Core listen loop: connects, subscribes, and dispatches notifications. +async fn listen_loop( + pool: &PgPool, + channels: &[String], + wakeup: &Notify, +) -> Result<(), sqlx::Error> { + let mut listener = sqlx::postgres::PgListener::connect_with(pool).await?; + + let channel_refs: Vec<&str> = channels.iter().map(String::as_str).collect(); + listener.listen_all(channel_refs).await?; + + tracing::info!(channels = ?channels, "PG LISTEN subscribed"); + + loop { + listener.recv().await?; + wakeup.notify_waiters(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn channel_names_are_valid_pg_identifiers() { + for name in [CHANNEL_BUILDS_CHANGED, CHANNEL_JOBSETS_CHANGED] { + assert!(name.len() < 64, "channel name too long: {name}"); + assert!(!name.contains(' '), "channel name has spaces: {name}"); + assert!( + name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_'), + "channel name has invalid chars: {name}" + ); + } + } + + #[test] + fn channel_names_match_migration_triggers() { + // These must match the pg_notify() calls in migration 015 + assert_eq!(CHANNEL_BUILDS_CHANGED, "fc_builds_changed"); + assert_eq!(CHANNEL_JOBSETS_CHANGED, "fc_jobsets_changed"); + } +}