evaluator: per-jobset check intervals; fix one-shot handling
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ib36e82fad6f1c3b2ad87651d680fc1406a6a6964
This commit is contained in:
parent
1df28f6049
commit
8472e3f1d8
1 changed files with 51 additions and 3 deletions
|
|
@ -1,9 +1,16 @@
|
||||||
use std::{collections::HashMap, time::Duration};
|
use std::{collections::HashMap, time::Duration};
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
use fc_common::{
|
use fc_common::{
|
||||||
config::EvaluatorConfig,
|
config::EvaluatorConfig,
|
||||||
error::check_disk_space,
|
error::check_disk_space,
|
||||||
models::{CreateBuild, CreateEvaluation, EvaluationStatus, JobsetInput},
|
models::{
|
||||||
|
CreateBuild,
|
||||||
|
CreateEvaluation,
|
||||||
|
EvaluationStatus,
|
||||||
|
JobsetInput,
|
||||||
|
JobsetState,
|
||||||
|
},
|
||||||
repo,
|
repo,
|
||||||
};
|
};
|
||||||
use futures::stream::{self, StreamExt};
|
use futures::stream::{self, StreamExt};
|
||||||
|
|
@ -30,11 +37,28 @@ async fn run_cycle(
|
||||||
git_timeout: Duration,
|
git_timeout: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let active = repo::jobsets::list_active(pool).await?;
|
let active = repo::jobsets::list_active(pool).await?;
|
||||||
tracing::info!("Found {} active jobsets", active.len());
|
|
||||||
|
// Filter to jobsets that are due for evaluation based on their
|
||||||
|
// check_interval and last_checked_at
|
||||||
|
let now = Utc::now();
|
||||||
|
let ready: Vec<_> = active
|
||||||
|
.into_iter()
|
||||||
|
.filter(|js| {
|
||||||
|
match js.last_checked_at {
|
||||||
|
Some(last) => {
|
||||||
|
let elapsed = (now - last).num_seconds();
|
||||||
|
elapsed >= i64::from(js.check_interval)
|
||||||
|
},
|
||||||
|
None => true, // Never checked, evaluate now
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
tracing::info!("Found {} jobsets due for evaluation", ready.len());
|
||||||
|
|
||||||
let max_concurrent = config.max_concurrent_evals;
|
let max_concurrent = config.max_concurrent_evals;
|
||||||
|
|
||||||
stream::iter(active)
|
stream::iter(ready)
|
||||||
.for_each_concurrent(max_concurrent, |jobset| {
|
.for_each_concurrent(max_concurrent, |jobset| {
|
||||||
async move {
|
async move {
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
|
|
@ -299,6 +323,29 @@ async fn evaluate_jobset(
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update last_checked_at timestamp for per-jobset interval tracking
|
||||||
|
if let Err(e) = repo::jobsets::update_last_checked(pool, jobset.id).await {
|
||||||
|
tracing::warn!(
|
||||||
|
jobset = %jobset.name,
|
||||||
|
"Failed to update last_checked_at: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark one-shot jobsets as complete (disabled) after evaluation
|
||||||
|
if jobset.state == JobsetState::OneShot {
|
||||||
|
tracing::info!(
|
||||||
|
jobset = %jobset.name,
|
||||||
|
"One-shot evaluation complete, disabling jobset"
|
||||||
|
);
|
||||||
|
if let Err(e) = repo::jobsets::mark_one_shot_complete(pool, jobset.id).await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
jobset = %jobset.name,
|
||||||
|
"Failed to mark one-shot complete: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -388,6 +435,7 @@ async fn check_declarative_config(
|
||||||
check_interval: js.check_interval,
|
check_interval: js.check_interval,
|
||||||
branch: None,
|
branch: None,
|
||||||
scheduling_shares: None,
|
scheduling_shares: None,
|
||||||
|
state: None,
|
||||||
};
|
};
|
||||||
if let Err(e) = repo::jobsets::upsert(pool, input).await {
|
if let Err(e) = repo::jobsets::upsert(pool, input).await {
|
||||||
tracing::warn!("Failed to upsert declarative jobset: {e}");
|
tracing::warn!("Failed to upsert declarative jobset: {e}");
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue