fc-evaluator: use LISTEN/NOTIFY for reactive wakeups
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ibee7506a9a5ffa008c41fae8e9758db66a6a6964
This commit is contained in:
parent
e274389d12
commit
edaf4313e9
2 changed files with 22 additions and 4 deletions
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{collections::HashMap, time::Duration};
|
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
|
@ -16,10 +16,15 @@ use fc_common::{
|
||||||
};
|
};
|
||||||
use futures::stream::{self, StreamExt};
|
use futures::stream::{self, StreamExt};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
|
use tokio::sync::Notify;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub async fn run(pool: PgPool, config: EvaluatorConfig) -> anyhow::Result<()> {
|
pub async fn run(
|
||||||
|
pool: PgPool,
|
||||||
|
config: EvaluatorConfig,
|
||||||
|
wakeup: Arc<Notify>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
let poll_interval = Duration::from_secs(config.poll_interval);
|
let poll_interval = Duration::from_secs(config.poll_interval);
|
||||||
let nix_timeout = Duration::from_secs(config.nix_timeout);
|
let nix_timeout = Duration::from_secs(config.nix_timeout);
|
||||||
let git_timeout = Duration::from_secs(config.git_timeout);
|
let git_timeout = Duration::from_secs(config.git_timeout);
|
||||||
|
|
@ -28,7 +33,8 @@ pub async fn run(pool: PgPool, config: EvaluatorConfig) -> anyhow::Result<()> {
|
||||||
if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
|
if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
|
||||||
tracing::error!("Evaluation cycle failed: {e}");
|
tracing::error!("Evaluation cycle failed: {e}");
|
||||||
}
|
}
|
||||||
tokio::time::sleep(poll_interval).await;
|
// Wake on NOTIFY or fall back to regular poll interval
|
||||||
|
let _ = tokio::time::timeout(poll_interval, wakeup.notified()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use fc_common::{Config, Database};
|
use fc_common::{Config, Database};
|
||||||
|
|
||||||
|
|
@ -29,8 +31,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let pool = db.pool().clone();
|
let pool = db.pool().clone();
|
||||||
let eval_config = config.evaluator;
|
let eval_config = config.evaluator;
|
||||||
|
|
||||||
|
let wakeup = Arc::new(tokio::sync::Notify::new());
|
||||||
|
let listener_handle = fc_common::pg_notify::spawn_listener(
|
||||||
|
db.pool(),
|
||||||
|
&[fc_common::pg_notify::CHANNEL_JOBSETS_CHANGED],
|
||||||
|
wakeup.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_evaluator::eval_loop::run(pool, eval_config) => {
|
result = fc_evaluator::eval_loop::run(pool, eval_config, wakeup) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Evaluator loop failed: {e}");
|
tracing::error!("Evaluator loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
@ -40,6 +49,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
listener_handle.abort();
|
||||||
|
let _ = listener_handle.await;
|
||||||
|
|
||||||
tracing::info!("Evaluator shutting down, closing database pool");
|
tracing::info!("Evaluator shutting down, closing database pool");
|
||||||
db.close().await;
|
db.close().await;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue