diff --git a/crates/evaluator/src/eval_loop.rs b/crates/evaluator/src/eval_loop.rs index 2b238b9..4a5ff9e 100644 --- a/crates/evaluator/src/eval_loop.rs +++ b/crates/evaluator/src/eval_loop.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; @@ -16,10 +16,15 @@ use fc_common::{ }; use futures::stream::{self, StreamExt}; use sqlx::PgPool; +use tokio::sync::Notify; use tracing::info; use uuid::Uuid; -pub async fn run(pool: PgPool, config: EvaluatorConfig) -> anyhow::Result<()> { +pub async fn run( + pool: PgPool, + config: EvaluatorConfig, + wakeup: Arc, +) -> anyhow::Result<()> { let poll_interval = Duration::from_secs(config.poll_interval); let nix_timeout = Duration::from_secs(config.nix_timeout); let git_timeout = Duration::from_secs(config.git_timeout); @@ -28,7 +33,8 @@ pub async fn run(pool: PgPool, config: EvaluatorConfig) -> anyhow::Result<()> { if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await { tracing::error!("Evaluation cycle failed: {e}"); } - tokio::time::sleep(poll_interval).await; + // Wake on NOTIFY or fall back to regular poll interval + let _ = tokio::time::timeout(poll_interval, wakeup.notified()).await; } } diff --git a/crates/evaluator/src/main.rs b/crates/evaluator/src/main.rs index f76b02e..8d5771b 100644 --- a/crates/evaluator/src/main.rs +++ b/crates/evaluator/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use clap::Parser; use fc_common::{Config, Database}; @@ -29,8 +31,15 @@ async fn main() -> anyhow::Result<()> { let pool = db.pool().clone(); let eval_config = config.evaluator; + let wakeup = Arc::new(tokio::sync::Notify::new()); + let listener_handle = fc_common::pg_notify::spawn_listener( + db.pool(), + &[fc_common::pg_notify::CHANNEL_JOBSETS_CHANGED], + wakeup.clone(), + ); + tokio::select! { - result = fc_evaluator::eval_loop::run(pool, eval_config) => { + result = fc_evaluator::eval_loop::run(pool, eval_config, wakeup) => { if let Err(e) = result { tracing::error!("Evaluator loop failed: {e}"); } @@ -40,6 +49,9 @@ async fn main() -> anyhow::Result<()> { } } + listener_handle.abort(); + let _ = listener_handle.await; + tracing::info!("Evaluator shutting down, closing database pool"); db.close().await;