fc-evaulator: allow fail-fast behaviour
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I1da41766f1c499347279c41f2316f4376a6a6964
This commit is contained in:
parent
49638d5d14
commit
235c9834b7
5 changed files with 30 additions and 134 deletions
|
|
@ -62,6 +62,10 @@ pub struct EvaluatorConfig {
|
||||||
pub work_dir: PathBuf,
|
pub work_dir: PathBuf,
|
||||||
pub restrict_eval: bool,
|
pub restrict_eval: bool,
|
||||||
pub allow_ifd: bool,
|
pub allow_ifd: bool,
|
||||||
|
|
||||||
|
/// Whether to abort on the first evaluation cycle error instead of logging
|
||||||
|
/// and retrying.
|
||||||
|
pub strict_errors: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
@ -70,6 +74,11 @@ pub struct QueueRunnerConfig {
|
||||||
pub poll_interval: u64,
|
pub poll_interval: u64,
|
||||||
pub build_timeout: u64,
|
pub build_timeout: u64,
|
||||||
pub work_dir: PathBuf,
|
pub work_dir: PathBuf,
|
||||||
|
|
||||||
|
/// When true, abort on the first runner loop error instead of logging and
|
||||||
|
/// retrying.
|
||||||
|
#[serde(default)]
|
||||||
|
pub strict_errors: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
@ -504,6 +513,7 @@ impl Default for EvaluatorConfig {
|
||||||
work_dir: PathBuf::from("/tmp/fc-evaluator"),
|
work_dir: PathBuf::from("/tmp/fc-evaluator"),
|
||||||
restrict_eval: true,
|
restrict_eval: true,
|
||||||
allow_ifd: false,
|
allow_ifd: false,
|
||||||
|
strict_errors: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -515,6 +525,7 @@ impl Default for QueueRunnerConfig {
|
||||||
poll_interval: 5,
|
poll_interval: 5,
|
||||||
build_timeout: 3600,
|
build_timeout: 3600,
|
||||||
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
|
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
|
||||||
|
strict_errors: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,13 @@ pub async fn run(
|
||||||
let nix_timeout = Duration::from_secs(config.nix_timeout);
|
let nix_timeout = Duration::from_secs(config.nix_timeout);
|
||||||
let git_timeout = Duration::from_secs(config.git_timeout);
|
let git_timeout = Duration::from_secs(config.git_timeout);
|
||||||
|
|
||||||
|
let strict = config.strict_errors;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
|
if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
|
||||||
|
if strict {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
tracing::error!("Evaluation cycle failed: {e}");
|
tracing::error!("Evaluation cycle failed: {e}");
|
||||||
}
|
}
|
||||||
// Wake on NOTIFY or fall back to regular poll interval
|
// Wake on NOTIFY or fall back to regular poll interval
|
||||||
|
|
@ -180,36 +185,12 @@ async fn evaluate_jobset(
|
||||||
"Inputs unchanged (hash: {}), skipping evaluation",
|
"Inputs unchanged (hash: {}), skipping evaluation",
|
||||||
&inputs_hash[..16],
|
&inputs_hash[..16],
|
||||||
);
|
);
|
||||||
// Create evaluation record even when skipped so system tracks this check
|
repo::jobsets::update_last_checked(pool, jobset.id).await?;
|
||||||
// Handle duplicate key conflict gracefully (another evaluator may have
|
return Ok(());
|
||||||
// created it) - fall through to process existing evaluation instead of
|
|
||||||
// skipping
|
|
||||||
if let Err(e) = repo::evaluations::create(pool, CreateEvaluation {
|
|
||||||
jobset_id: jobset.id,
|
|
||||||
commit_hash: commit_hash.clone(),
|
|
||||||
pr_number: None,
|
|
||||||
pr_head_branch: None,
|
|
||||||
pr_base_branch: None,
|
|
||||||
pr_action: None,
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
if !matches!(e, CiError::Conflict(_)) {
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
tracing::info!(
|
|
||||||
jobset = %jobset.name,
|
|
||||||
commit = %commit_hash,
|
|
||||||
"Evaluation already exists (concurrent creation in inputs_hash path), will process"
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// Successfully created new evaluation, can skip
|
|
||||||
repo::jobsets::update_last_checked(pool, jobset.id).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also skip if commit hasn't changed (backward compat)
|
// Also skip if commit hasn't changed and inputs_hash matches (backward
|
||||||
|
// compat for evaluations created before inputs_hash was indexed)
|
||||||
if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await?
|
if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await?
|
||||||
&& latest.commit_hash == commit_hash
|
&& latest.commit_hash == commit_hash
|
||||||
&& latest.inputs_hash.as_deref() == Some(&inputs_hash)
|
&& latest.inputs_hash.as_deref() == Some(&inputs_hash)
|
||||||
|
|
@ -220,111 +201,8 @@ async fn evaluate_jobset(
|
||||||
"Inputs unchanged (hash: {}), skipping evaluation",
|
"Inputs unchanged (hash: {}), skipping evaluation",
|
||||||
&inputs_hash[..16],
|
&inputs_hash[..16],
|
||||||
);
|
);
|
||||||
// Create evaluation record even when skipped so system tracks this check
|
repo::jobsets::update_last_checked(pool, jobset.id).await?;
|
||||||
// Handle duplicate key conflict gracefully (another evaluator may have
|
return Ok(());
|
||||||
// created it) - fall through to process existing evaluation instead of
|
|
||||||
// skipping
|
|
||||||
if let Err(e) = repo::evaluations::create(pool, CreateEvaluation {
|
|
||||||
jobset_id: jobset.id,
|
|
||||||
commit_hash: commit_hash.clone(),
|
|
||||||
pr_number: None,
|
|
||||||
pr_head_branch: None,
|
|
||||||
pr_base_branch: None,
|
|
||||||
pr_action: None,
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
if !matches!(e, CiError::Conflict(_)) {
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
tracing::info!(
|
|
||||||
jobset = %jobset.name,
|
|
||||||
commit = %commit_hash,
|
|
||||||
"Evaluation already exists (concurrent creation in commit path), will process"
|
|
||||||
);
|
|
||||||
let existing = repo::evaluations::get_by_jobset_and_commit(
|
|
||||||
pool,
|
|
||||||
jobset.id,
|
|
||||||
&commit_hash,
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
.ok_or_else(|| {
|
|
||||||
anyhow::anyhow!(
|
|
||||||
"Evaluation conflict but not found: {}/{}",
|
|
||||||
jobset.id,
|
|
||||||
commit_hash
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if existing.status == EvaluationStatus::Completed {
|
|
||||||
// Check if we need to re-evaluate due to no builds
|
|
||||||
let builds =
|
|
||||||
repo::builds::list_for_evaluation(pool, existing.id).await?;
|
|
||||||
if builds.is_empty() {
|
|
||||||
info!(
|
|
||||||
"Evaluation completed with 0 builds, re-running nix evaluation \
|
|
||||||
jobset={} commit={}",
|
|
||||||
jobset.name, commit_hash
|
|
||||||
);
|
|
||||||
// Update existing evaluation status to Running
|
|
||||||
repo::evaluations::update_status(
|
|
||||||
pool,
|
|
||||||
existing.id,
|
|
||||||
EvaluationStatus::Running,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
// Use existing evaluation instead of creating new one
|
|
||||||
let eval = existing;
|
|
||||||
// Run nix evaluation and create builds from the result
|
|
||||||
let eval_result = crate::nix::evaluate(
|
|
||||||
&repo_path,
|
|
||||||
&jobset.nix_expression,
|
|
||||||
jobset.flake_mode,
|
|
||||||
nix_timeout,
|
|
||||||
config,
|
|
||||||
&inputs,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
create_builds_from_eval(pool, eval.id, &eval_result).await?;
|
|
||||||
|
|
||||||
repo::evaluations::update_status(
|
|
||||||
pool,
|
|
||||||
eval.id,
|
|
||||||
EvaluationStatus::Completed,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
repo::jobsets::update_last_checked(pool, jobset.id).await?;
|
|
||||||
return Ok(());
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
"Evaluation already completed with {} builds, skipping nix \
|
|
||||||
evaluation jobset={} commit={}",
|
|
||||||
builds.len(),
|
|
||||||
jobset.name,
|
|
||||||
commit_hash
|
|
||||||
);
|
|
||||||
repo::jobsets::update_last_checked(pool, jobset.id).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Existing evaluation is pending or running, update status and continue
|
|
||||||
repo::evaluations::update_status(
|
|
||||||
pool,
|
|
||||||
existing.id,
|
|
||||||
EvaluationStatus::Running,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
} else {
|
|
||||||
// Successfully created new evaluation, can skip
|
|
||||||
repo::jobsets::update_last_checked(pool, jobset.id).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let workers = cli.workers.unwrap_or(qr_config.workers);
|
let workers = cli.workers.unwrap_or(qr_config.workers);
|
||||||
let poll_interval = Duration::from_secs(qr_config.poll_interval);
|
let poll_interval = Duration::from_secs(qr_config.poll_interval);
|
||||||
let build_timeout = Duration::from_secs(qr_config.build_timeout);
|
let build_timeout = Duration::from_secs(qr_config.build_timeout);
|
||||||
|
let strict_errors = qr_config.strict_errors;
|
||||||
let work_dir = qr_config.work_dir;
|
let work_dir = qr_config.work_dir;
|
||||||
|
|
||||||
// Ensure the work directory exists
|
// Ensure the work directory exists
|
||||||
|
|
@ -76,7 +77,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup) => {
|
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Runner loop failed: {e}");
|
tracing::error!("Runner loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ pub async fn run(
|
||||||
worker_pool: Arc<WorkerPool>,
|
worker_pool: Arc<WorkerPool>,
|
||||||
poll_interval: Duration,
|
poll_interval: Duration,
|
||||||
wakeup: Arc<Notify>,
|
wakeup: Arc<Notify>,
|
||||||
|
strict_errors: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||||
|
|
@ -184,6 +185,9 @@ pub async fn run(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
if strict_errors {
|
||||||
|
return Err(anyhow::anyhow!("Failed to fetch pending builds: {e}"));
|
||||||
|
}
|
||||||
tracing::error!("Failed to fetch pending builds: {e}");
|
tracing::error!("Failed to fetch pending builds: {e}");
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -82,11 +82,13 @@ in {
|
||||||
poll_interval = 5;
|
poll_interval = 5;
|
||||||
work_dir = "/var/lib/fc/evaluator";
|
work_dir = "/var/lib/fc/evaluator";
|
||||||
nix_timeout = 60;
|
nix_timeout = 60;
|
||||||
|
strict_errors = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
queue_runner = {
|
queue_runner = {
|
||||||
poll_interval = 3;
|
poll_interval = 3;
|
||||||
work_dir = "/var/lib/fc/queue-runner";
|
work_dir = "/var/lib/fc/queue-runner";
|
||||||
|
strict_errors = true;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue