From 8472e3f1d81eed4b284eb963eddb076d9b0254c0 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Feb 2026 02:12:34 +0300 Subject: [PATCH] evaluator: per-jobset check intervals; fix one-shot handling Signed-off-by: NotAShelf Change-Id: Ib36e82fad6f1c3b2ad87651d680fc1406a6a6964 --- crates/evaluator/src/eval_loop.rs | 54 +++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/crates/evaluator/src/eval_loop.rs b/crates/evaluator/src/eval_loop.rs index 7d942f1..dd36548 100644 --- a/crates/evaluator/src/eval_loop.rs +++ b/crates/evaluator/src/eval_loop.rs @@ -1,9 +1,16 @@ use std::{collections::HashMap, time::Duration}; +use chrono::Utc; use fc_common::{ config::EvaluatorConfig, error::check_disk_space, - models::{CreateBuild, CreateEvaluation, EvaluationStatus, JobsetInput}, + models::{ + CreateBuild, + CreateEvaluation, + EvaluationStatus, + JobsetInput, + JobsetState, + }, repo, }; use futures::stream::{self, StreamExt}; @@ -30,11 +37,28 @@ async fn run_cycle( git_timeout: Duration, ) -> anyhow::Result<()> { let active = repo::jobsets::list_active(pool).await?; - tracing::info!("Found {} active jobsets", active.len()); + + // Filter to jobsets that are due for evaluation based on their + // check_interval and last_checked_at + let now = Utc::now(); + let ready: Vec<_> = active + .into_iter() + .filter(|js| { + match js.last_checked_at { + Some(last) => { + let elapsed = (now - last).num_seconds(); + elapsed >= i64::from(js.check_interval) + }, + None => true, // Never checked, evaluate now + } + }) + .collect(); + + tracing::info!("Found {} jobsets due for evaluation", ready.len()); let max_concurrent = config.max_concurrent_evals; - stream::iter(active) + stream::iter(ready) .for_each_concurrent(max_concurrent, |jobset| { async move { if let Err(e) = @@ -299,6 +323,29 @@ async fn evaluate_jobset( }, } + // Update last_checked_at timestamp for per-jobset interval tracking + if let Err(e) = repo::jobsets::update_last_checked(pool, jobset.id).await { + tracing::warn!( + jobset = %jobset.name, + "Failed to update last_checked_at: {e}" + ); + } + + // Mark one-shot jobsets as complete (disabled) after evaluation + if jobset.state == JobsetState::OneShot { + tracing::info!( + jobset = %jobset.name, + "One-shot evaluation complete, disabling jobset" + ); + if let Err(e) = repo::jobsets::mark_one_shot_complete(pool, jobset.id).await + { + tracing::error!( + jobset = %jobset.name, + "Failed to mark one-shot complete: {e}" + ); + } + } + Ok(()) } @@ -388,6 +435,7 @@ async fn check_declarative_config( check_interval: js.check_interval, branch: None, scheduling_shares: None, + state: None, }; if let Err(e) = repo::jobsets::upsert(pool, input).await { tracing::warn!("Failed to upsert declarative jobset: {e}");