fc-evaluator: dispatch pending status on build creation
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I534a8dde536b6e0537d7c3c44c3ba0146a6a6964
This commit is contained in:
parent
85a43c8ca0
commit
1e28c31077
2 changed files with 58 additions and 4 deletions
|
|
@ -28,6 +28,7 @@ use uuid::Uuid;
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
pool: PgPool,
|
pool: PgPool,
|
||||||
config: EvaluatorConfig,
|
config: EvaluatorConfig,
|
||||||
|
notifications_config: fc_common::config::NotificationsConfig,
|
||||||
wakeup: Arc<Notify>,
|
wakeup: Arc<Notify>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let poll_interval = Duration::from_secs(config.poll_interval);
|
let poll_interval = Duration::from_secs(config.poll_interval);
|
||||||
|
|
@ -37,7 +38,15 @@ pub async fn run(
|
||||||
let strict = config.strict_errors;
|
let strict = config.strict_errors;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
|
if let Err(e) = run_cycle(
|
||||||
|
&pool,
|
||||||
|
&config,
|
||||||
|
¬ifications_config,
|
||||||
|
nix_timeout,
|
||||||
|
git_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
if strict {
|
if strict {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
@ -51,6 +60,7 @@ pub async fn run(
|
||||||
async fn run_cycle(
|
async fn run_cycle(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
config: &EvaluatorConfig,
|
config: &EvaluatorConfig,
|
||||||
|
notifications_config: &fc_common::config::NotificationsConfig,
|
||||||
nix_timeout: Duration,
|
nix_timeout: Duration,
|
||||||
git_timeout: Duration,
|
git_timeout: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
@ -76,8 +86,15 @@ async fn run_cycle(
|
||||||
stream::iter(ready)
|
stream::iter(ready)
|
||||||
.for_each_concurrent(max_concurrent, |jobset| {
|
.for_each_concurrent(max_concurrent, |jobset| {
|
||||||
async move {
|
async move {
|
||||||
if let Err(e) =
|
if let Err(e) = evaluate_jobset(
|
||||||
evaluate_jobset(pool, &jobset, config, nix_timeout, git_timeout).await
|
pool,
|
||||||
|
&jobset,
|
||||||
|
config,
|
||||||
|
notifications_config,
|
||||||
|
nix_timeout,
|
||||||
|
git_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
jobset_id = %jobset.id,
|
jobset_id = %jobset.id,
|
||||||
|
|
@ -111,6 +128,7 @@ async fn evaluate_jobset(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
jobset: &fc_common::models::ActiveJobset,
|
jobset: &fc_common::models::ActiveJobset,
|
||||||
config: &EvaluatorConfig,
|
config: &EvaluatorConfig,
|
||||||
|
notifications_config: &fc_common::config::NotificationsConfig,
|
||||||
nix_timeout: Duration,
|
nix_timeout: Duration,
|
||||||
git_timeout: Duration,
|
git_timeout: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
@ -326,6 +344,41 @@ async fn evaluate_jobset(
|
||||||
|
|
||||||
create_builds_from_eval(pool, eval.id, &eval_result).await?;
|
create_builds_from_eval(pool, eval.id, &eval_result).await?;
|
||||||
|
|
||||||
|
// Dispatch pending notifications for created builds
|
||||||
|
if notifications_config.enable_retry_queue {
|
||||||
|
if let Ok(project) = repo::projects::get(pool, jobset.project_id).await
|
||||||
|
{
|
||||||
|
if let Ok(builds) =
|
||||||
|
repo::builds::list_for_evaluation(pool, eval.id).await
|
||||||
|
{
|
||||||
|
for build in builds {
|
||||||
|
// Skip aggregate builds (they complete later when constituents
|
||||||
|
// finish)
|
||||||
|
if !build.is_aggregate {
|
||||||
|
fc_common::notifications::dispatch_build_created(
|
||||||
|
pool,
|
||||||
|
&build,
|
||||||
|
&project,
|
||||||
|
&eval.commit_hash,
|
||||||
|
notifications_config,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
eval_id = %eval.id,
|
||||||
|
"Failed to fetch builds for pending notifications"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
project_id = %jobset.project_id,
|
||||||
|
"Failed to fetch project for pending notifications"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
repo::evaluations::update_status(
|
repo::evaluations::update_status(
|
||||||
pool,
|
pool,
|
||||||
eval.id,
|
eval.id,
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let pool = db.pool().clone();
|
let pool = db.pool().clone();
|
||||||
let eval_config = config.evaluator;
|
let eval_config = config.evaluator;
|
||||||
|
let notifications_config = config.notifications;
|
||||||
|
|
||||||
let wakeup = Arc::new(tokio::sync::Notify::new());
|
let wakeup = Arc::new(tokio::sync::Notify::new());
|
||||||
let listener_handle = fc_common::pg_notify::spawn_listener(
|
let listener_handle = fc_common::pg_notify::spawn_listener(
|
||||||
|
|
@ -39,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_evaluator::eval_loop::run(pool, eval_config, wakeup) => {
|
result = fc_evaluator::eval_loop::run(pool, eval_config, notifications_config, wakeup) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Evaluator loop failed: {e}");
|
tracing::error!("Evaluator loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue