fc-common: add PostgreSQL LISTEN/NOTIFY infrastructure
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Iffdb2fa758825e8c5d5791bf4fb15c8e6a6a6964
This commit is contained in:
parent
541cd7832f
commit
e274389d12
3 changed files with 140 additions and 0 deletions
61
crates/common/migrations/015_listen_notify_triggers.sql
Normal file
61
crates/common/migrations/015_listen_notify_triggers.sql
Normal file
|
|
@ -0,0 +1,61 @@
|
||||||
|
-- PostgreSQL LISTEN/NOTIFY triggers for event-driven reactivity
|
||||||
|
-- Emits notifications on builds/jobsets mutations so daemons can wake immediately
|
||||||
|
|
||||||
|
-- Trigger function: notify on builds changes
|
||||||
|
CREATE OR REPLACE FUNCTION notify_builds_changed() RETURNS trigger AS $$
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_notify('fc_builds_changed', json_build_object(
|
||||||
|
'op', TG_OP,
|
||||||
|
'table', TG_TABLE_NAME
|
||||||
|
)::text);
|
||||||
|
RETURN NULL;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- Trigger function: notify on jobsets changes
|
||||||
|
CREATE OR REPLACE FUNCTION notify_jobsets_changed() RETURNS trigger AS $$
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_notify('fc_jobsets_changed', json_build_object(
|
||||||
|
'op', TG_OP,
|
||||||
|
'table', TG_TABLE_NAME
|
||||||
|
)::text);
|
||||||
|
RETURN NULL;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- Builds: new build inserted (queue-runner should wake)
|
||||||
|
CREATE TRIGGER trg_builds_insert_notify
|
||||||
|
AFTER INSERT ON builds
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION notify_builds_changed();
|
||||||
|
|
||||||
|
-- Builds: status changed (queue-runner should re-check, e.g. deps resolved)
|
||||||
|
CREATE TRIGGER trg_builds_status_notify
|
||||||
|
AFTER UPDATE ON builds
|
||||||
|
FOR EACH ROW
|
||||||
|
WHEN (OLD.status IS DISTINCT FROM NEW.status)
|
||||||
|
EXECUTE FUNCTION notify_builds_changed();
|
||||||
|
|
||||||
|
-- Jobsets: new jobset created (evaluator should wake)
|
||||||
|
CREATE TRIGGER trg_jobsets_insert_notify
|
||||||
|
AFTER INSERT ON jobsets
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION notify_jobsets_changed();
|
||||||
|
|
||||||
|
-- Jobsets: relevant fields changed (evaluator should re-check)
|
||||||
|
CREATE TRIGGER trg_jobsets_update_notify
|
||||||
|
AFTER UPDATE ON jobsets
|
||||||
|
FOR EACH ROW
|
||||||
|
WHEN (
|
||||||
|
OLD.enabled IS DISTINCT FROM NEW.enabled
|
||||||
|
OR OLD.state IS DISTINCT FROM NEW.state
|
||||||
|
OR OLD.nix_expression IS DISTINCT FROM NEW.nix_expression
|
||||||
|
OR OLD.check_interval IS DISTINCT FROM NEW.check_interval
|
||||||
|
)
|
||||||
|
EXECUTE FUNCTION notify_jobsets_changed();
|
||||||
|
|
||||||
|
-- Jobsets: deleted (evaluator should wake to stop tracking)
|
||||||
|
CREATE TRIGGER trg_jobsets_delete_notify
|
||||||
|
AFTER DELETE ON jobsets
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION notify_jobsets_changed();
|
||||||
|
|
@ -10,6 +10,7 @@ pub mod migrate;
|
||||||
pub mod migrate_cli;
|
pub mod migrate_cli;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
pub mod notifications;
|
pub mod notifications;
|
||||||
|
pub mod pg_notify;
|
||||||
pub mod repo;
|
pub mod repo;
|
||||||
|
|
||||||
pub mod bootstrap;
|
pub mod bootstrap;
|
||||||
|
|
|
||||||
78
crates/common/src/pg_notify.rs
Normal file
78
crates/common/src/pg_notify.rs
Normal file
|
|
@ -0,0 +1,78 @@
|
||||||
|
// Wraps `sqlx::postgres::PgListener` to subscribe to notification channels
|
||||||
|
// and signal an `Arc<tokio::sync::Notify>` when events arrive. Daemons use
|
||||||
|
// this to wake immediately instead of waiting for the next poll interval.
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use tokio::{sync::Notify, task::JoinHandle};
|
||||||
|
|
||||||
|
/// Channel emitted on `builds` INSERT or status UPDATE.
|
||||||
|
pub const CHANNEL_BUILDS_CHANGED: &str = "fc_builds_changed";
|
||||||
|
|
||||||
|
/// Channel emitted on `jobsets` INSERT, UPDATE (relevant fields), or DELETE.
|
||||||
|
pub const CHANNEL_JOBSETS_CHANGED: &str = "fc_jobsets_changed";
|
||||||
|
|
||||||
|
/// Spawns a background task that listens on the given PG channels and calls
|
||||||
|
/// `wakeup.notify_waiters()` on each notification. Reconnects with 5s backoff
|
||||||
|
/// on connection loss.
|
||||||
|
pub fn spawn_listener(
|
||||||
|
pool: &PgPool,
|
||||||
|
channels: &[&str],
|
||||||
|
wakeup: Arc<Notify>,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
let pool = pool.clone();
|
||||||
|
let channels: Vec<String> =
|
||||||
|
channels.iter().map(|s| (*s).to_owned()).collect();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Err(e) = listen_loop(&pool, &channels, &wakeup).await {
|
||||||
|
tracing::warn!("PG LISTEN connection lost: {e}, reconnecting in 5s");
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Core listen loop: connects, subscribes, and dispatches notifications.
|
||||||
|
async fn listen_loop(
|
||||||
|
pool: &PgPool,
|
||||||
|
channels: &[String],
|
||||||
|
wakeup: &Notify,
|
||||||
|
) -> Result<(), sqlx::Error> {
|
||||||
|
let mut listener = sqlx::postgres::PgListener::connect_with(pool).await?;
|
||||||
|
|
||||||
|
let channel_refs: Vec<&str> = channels.iter().map(String::as_str).collect();
|
||||||
|
listener.listen_all(channel_refs).await?;
|
||||||
|
|
||||||
|
tracing::info!(channels = ?channels, "PG LISTEN subscribed");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
listener.recv().await?;
|
||||||
|
wakeup.notify_waiters();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn channel_names_are_valid_pg_identifiers() {
|
||||||
|
for name in [CHANNEL_BUILDS_CHANGED, CHANNEL_JOBSETS_CHANGED] {
|
||||||
|
assert!(name.len() < 64, "channel name too long: {name}");
|
||||||
|
assert!(!name.contains(' '), "channel name has spaces: {name}");
|
||||||
|
assert!(
|
||||||
|
name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_'),
|
||||||
|
"channel name has invalid chars: {name}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn channel_names_match_migration_triggers() {
|
||||||
|
// These must match the pg_notify() calls in migration 015
|
||||||
|
assert_eq!(CHANNEL_BUILDS_CHANGED, "fc_builds_changed");
|
||||||
|
assert_eq!(CHANNEL_JOBSETS_CHANGED, "fc_jobsets_changed");
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue