From 49638d5d14d0e98942e5d34d690818e4a902096e Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 16 Feb 2026 21:04:45 +0300 Subject: [PATCH] fc-queue-runner: use LISTEN/NOTIFY for reactive wakeups Signed-off-by: NotAShelf Change-Id: I3b6f0f5eff05caf7a04a9da7de8b558f6a6a6964 --- crates/queue-runner/src/main.rs | 12 +++++++++++- crates/queue-runner/src/runner_loop.rs | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index 3793c76..bb39ee1 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -68,8 +68,15 @@ async fn main() -> anyhow::Result<()> { let worker_pool_for_drain = worker_pool.clone(); + let wakeup = Arc::new(tokio::sync::Notify::new()); + let listener_handle = fc_common::pg_notify::spawn_listener( + db.pool(), + &[fc_common::pg_notify::CHANNEL_BUILDS_CHANGED], + wakeup.clone(), + ); + tokio::select! { - result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval) => { + result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup) => { if let Err(e) = result { tracing::error!("Runner loop failed: {e}"); } @@ -83,6 +90,9 @@ async fn main() -> anyhow::Result<()> { } } + listener_handle.abort(); + let _ = listener_handle.await; + tracing::info!("Queue runner shutting down, closing database pool"); db.close().await; diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs index 618a31b..620043f 100644 --- a/crates/queue-runner/src/runner_loop.rs +++ b/crates/queue-runner/src/runner_loop.rs @@ -5,6 +5,7 @@ use fc_common::{ repo, }; use sqlx::PgPool; +use tokio::sync::Notify; use crate::worker::WorkerPool; @@ -12,6 +13,7 @@ pub async fn run( pool: PgPool, worker_pool: Arc, poll_interval: Duration, + wakeup: Arc, ) -> anyhow::Result<()> { // Reset orphaned builds from previous crashes (older than 5 minutes) match repo::builds::reset_orphaned(&pool, 300).await { @@ -185,6 +187,7 @@ pub async fn run( tracing::error!("Failed to fetch pending builds: {e}"); }, } - tokio::time::sleep(poll_interval).await; + // Wake on NOTIFY or fall back to regular poll interval + let _ = tokio::time::timeout(poll_interval, wakeup.notified()).await; } }