fc-queue-runner: use LISTEN/NOTIFY for reactive wakeups
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I3b6f0f5eff05caf7a04a9da7de8b558f6a6a6964
This commit is contained in:
parent
edaf4313e9
commit
49638d5d14
2 changed files with 15 additions and 2 deletions
|
|
@ -68,8 +68,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let worker_pool_for_drain = worker_pool.clone();
|
let worker_pool_for_drain = worker_pool.clone();
|
||||||
|
|
||||||
|
let wakeup = Arc::new(tokio::sync::Notify::new());
|
||||||
|
let listener_handle = fc_common::pg_notify::spawn_listener(
|
||||||
|
db.pool(),
|
||||||
|
&[fc_common::pg_notify::CHANNEL_BUILDS_CHANGED],
|
||||||
|
wakeup.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval) => {
|
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Runner loop failed: {e}");
|
tracing::error!("Runner loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
@ -83,6 +90,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
listener_handle.abort();
|
||||||
|
let _ = listener_handle.await;
|
||||||
|
|
||||||
tracing::info!("Queue runner shutting down, closing database pool");
|
tracing::info!("Queue runner shutting down, closing database pool");
|
||||||
db.close().await;
|
db.close().await;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ use fc_common::{
|
||||||
repo,
|
repo,
|
||||||
};
|
};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::worker::WorkerPool;
|
use crate::worker::WorkerPool;
|
||||||
|
|
||||||
|
|
@ -12,6 +13,7 @@ pub async fn run(
|
||||||
pool: PgPool,
|
pool: PgPool,
|
||||||
worker_pool: Arc<WorkerPool>,
|
worker_pool: Arc<WorkerPool>,
|
||||||
poll_interval: Duration,
|
poll_interval: Duration,
|
||||||
|
wakeup: Arc<Notify>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||||
|
|
@ -185,6 +187,7 @@ pub async fn run(
|
||||||
tracing::error!("Failed to fetch pending builds: {e}");
|
tracing::error!("Failed to fetch pending builds: {e}");
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
tokio::time::sleep(poll_interval).await;
|
// Wake on NOTIFY or fall back to regular poll interval
|
||||||
|
let _ = tokio::time::timeout(poll_interval, wakeup.notified()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue