diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 721130d..0303ac0 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -62,6 +62,10 @@ pub struct EvaluatorConfig { pub work_dir: PathBuf, pub restrict_eval: bool, pub allow_ifd: bool, + + /// Whether to abort on the first evaluation cycle error instead of logging + /// and retrying. + pub strict_errors: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -70,6 +74,11 @@ pub struct QueueRunnerConfig { pub poll_interval: u64, pub build_timeout: u64, pub work_dir: PathBuf, + + /// When true, abort on the first runner loop error instead of logging and + /// retrying. + #[serde(default)] + pub strict_errors: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -504,6 +513,7 @@ impl Default for EvaluatorConfig { work_dir: PathBuf::from("/tmp/fc-evaluator"), restrict_eval: true, allow_ifd: false, + strict_errors: false, } } } @@ -515,6 +525,7 @@ impl Default for QueueRunnerConfig { poll_interval: 5, build_timeout: 3600, work_dir: PathBuf::from("/tmp/fc-queue-runner"), + strict_errors: false, } } } diff --git a/crates/evaluator/src/eval_loop.rs b/crates/evaluator/src/eval_loop.rs index 4a5ff9e..6274ca9 100644 --- a/crates/evaluator/src/eval_loop.rs +++ b/crates/evaluator/src/eval_loop.rs @@ -29,8 +29,13 @@ pub async fn run( let nix_timeout = Duration::from_secs(config.nix_timeout); let git_timeout = Duration::from_secs(config.git_timeout); + let strict = config.strict_errors; + loop { if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await { + if strict { + return Err(e); + } tracing::error!("Evaluation cycle failed: {e}"); } // Wake on NOTIFY or fall back to regular poll interval @@ -180,36 +185,12 @@ async fn evaluate_jobset( "Inputs unchanged (hash: {}), skipping evaluation", &inputs_hash[..16], ); - // Create evaluation record even when skipped so system tracks this check - // Handle duplicate key conflict gracefully (another evaluator may have - // created it) - fall through to process existing evaluation instead of - // skipping - if let Err(e) = repo::evaluations::create(pool, CreateEvaluation { - jobset_id: jobset.id, - commit_hash: commit_hash.clone(), - pr_number: None, - pr_head_branch: None, - pr_base_branch: None, - pr_action: None, - }) - .await - { - if !matches!(e, CiError::Conflict(_)) { - return Err(e.into()); - } - tracing::info!( - jobset = %jobset.name, - commit = %commit_hash, - "Evaluation already exists (concurrent creation in inputs_hash path), will process" - ); - } else { - // Successfully created new evaluation, can skip - repo::jobsets::update_last_checked(pool, jobset.id).await?; - return Ok(()); - } + repo::jobsets::update_last_checked(pool, jobset.id).await?; + return Ok(()); } - // Also skip if commit hasn't changed (backward compat) + // Also skip if commit hasn't changed and inputs_hash matches (backward + // compat for evaluations created before inputs_hash was indexed) if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await? && latest.commit_hash == commit_hash && latest.inputs_hash.as_deref() == Some(&inputs_hash) @@ -220,111 +201,8 @@ async fn evaluate_jobset( "Inputs unchanged (hash: {}), skipping evaluation", &inputs_hash[..16], ); - // Create evaluation record even when skipped so system tracks this check - // Handle duplicate key conflict gracefully (another evaluator may have - // created it) - fall through to process existing evaluation instead of - // skipping - if let Err(e) = repo::evaluations::create(pool, CreateEvaluation { - jobset_id: jobset.id, - commit_hash: commit_hash.clone(), - pr_number: None, - pr_head_branch: None, - pr_base_branch: None, - pr_action: None, - }) - .await - { - if !matches!(e, CiError::Conflict(_)) { - return Err(e.into()); - } - tracing::info!( - jobset = %jobset.name, - commit = %commit_hash, - "Evaluation already exists (concurrent creation in commit path), will process" - ); - let existing = repo::evaluations::get_by_jobset_and_commit( - pool, - jobset.id, - &commit_hash, - ) - .await? - .ok_or_else(|| { - anyhow::anyhow!( - "Evaluation conflict but not found: {}/{}", - jobset.id, - commit_hash - ) - })?; - - if existing.status == EvaluationStatus::Completed { - // Check if we need to re-evaluate due to no builds - let builds = - repo::builds::list_for_evaluation(pool, existing.id).await?; - if builds.is_empty() { - info!( - "Evaluation completed with 0 builds, re-running nix evaluation \ - jobset={} commit={}", - jobset.name, commit_hash - ); - // Update existing evaluation status to Running - repo::evaluations::update_status( - pool, - existing.id, - EvaluationStatus::Running, - None, - ) - .await?; - // Use existing evaluation instead of creating new one - let eval = existing; - // Run nix evaluation and create builds from the result - let eval_result = crate::nix::evaluate( - &repo_path, - &jobset.nix_expression, - jobset.flake_mode, - nix_timeout, - config, - &inputs, - ) - .await?; - - create_builds_from_eval(pool, eval.id, &eval_result).await?; - - repo::evaluations::update_status( - pool, - eval.id, - EvaluationStatus::Completed, - None, - ) - .await?; - - repo::jobsets::update_last_checked(pool, jobset.id).await?; - return Ok(()); - } else { - info!( - "Evaluation already completed with {} builds, skipping nix \ - evaluation jobset={} commit={}", - builds.len(), - jobset.name, - commit_hash - ); - repo::jobsets::update_last_checked(pool, jobset.id).await?; - return Ok(()); - } - } - - // Existing evaluation is pending or running, update status and continue - repo::evaluations::update_status( - pool, - existing.id, - EvaluationStatus::Running, - None, - ) - .await?; - } else { - // Successfully created new evaluation, can skip - repo::jobsets::update_last_checked(pool, jobset.id).await?; - return Ok(()); - } + repo::jobsets::update_last_checked(pool, jobset.id).await?; + return Ok(()); } tracing::info!( diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index bb39ee1..aa85591 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -35,6 +35,7 @@ async fn main() -> anyhow::Result<()> { let workers = cli.workers.unwrap_or(qr_config.workers); let poll_interval = Duration::from_secs(qr_config.poll_interval); let build_timeout = Duration::from_secs(qr_config.build_timeout); + let strict_errors = qr_config.strict_errors; let work_dir = qr_config.work_dir; // Ensure the work directory exists @@ -76,7 +77,7 @@ async fn main() -> anyhow::Result<()> { ); tokio::select! { - result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup) => { + result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors) => { if let Err(e) = result { tracing::error!("Runner loop failed: {e}"); } diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs index 620043f..57406e6 100644 --- a/crates/queue-runner/src/runner_loop.rs +++ b/crates/queue-runner/src/runner_loop.rs @@ -14,6 +14,7 @@ pub async fn run( worker_pool: Arc, poll_interval: Duration, wakeup: Arc, + strict_errors: bool, ) -> anyhow::Result<()> { // Reset orphaned builds from previous crashes (older than 5 minutes) match repo::builds::reset_orphaned(&pool, 300).await { @@ -184,6 +185,9 @@ pub async fn run( } }, Err(e) => { + if strict_errors { + return Err(anyhow::anyhow!("Failed to fetch pending builds: {e}")); + } tracing::error!("Failed to fetch pending builds: {e}"); }, } diff --git a/nix/vm-common.nix b/nix/vm-common.nix index 0965e2b..e0233c5 100644 --- a/nix/vm-common.nix +++ b/nix/vm-common.nix @@ -82,11 +82,13 @@ in { poll_interval = 5; work_dir = "/var/lib/fc/evaluator"; nix_timeout = 60; + strict_errors = true; }; queue_runner = { poll_interval = 3; work_dir = "/var/lib/fc/queue-runner"; + strict_errors = true; }; };