nix: attempt to fix VM tests; general cleanup

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I65f6909ef02ab4599f5b0bbc0930367e6a6a6964
This commit is contained in:
raf 2026-02-14 13:55:07 +03:00
commit a2b638d4db
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
26 changed files with 2320 additions and 2939 deletions

View file

@ -1,9 +1,10 @@
use std::{collections::HashMap, time::Duration};
use anyhow::Context;
use chrono::Utc;
use fc_common::{
config::EvaluatorConfig,
error::check_disk_space,
error::{CiError, check_disk_space},
models::{
CreateBuild,
CreateEvaluation,
@ -15,6 +16,7 @@ use fc_common::{
};
use futures::stream::{self, StreamExt};
use sqlx::PgPool;
use tracing::info;
use uuid::Uuid;
pub async fn run(pool: PgPool, config: EvaluatorConfig) -> anyhow::Result<()> {
@ -172,7 +174,33 @@ async fn evaluate_jobset(
"Inputs unchanged (hash: {}), skipping evaluation",
&inputs_hash[..16],
);
return Ok(());
// Create evaluation record even when skipped so system tracks this check
// Handle duplicate key conflict gracefully (another evaluator may have
// created it) - fall through to process existing evaluation instead of
// skipping
if let Err(e) = repo::evaluations::create(pool, CreateEvaluation {
jobset_id: jobset.id,
commit_hash: commit_hash.clone(),
pr_number: None,
pr_head_branch: None,
pr_base_branch: None,
pr_action: None,
})
.await
{
if !matches!(e, CiError::Conflict(_)) {
return Err(e.into());
}
tracing::info!(
jobset = %jobset.name,
commit = %commit_hash,
"Evaluation already exists (concurrent creation in inputs_hash path), will process"
);
} else {
// Successfully created new evaluation, can skip
repo::jobsets::update_last_checked(pool, jobset.id).await?;
return Ok(());
}
}
// Also skip if commit hasn't changed (backward compat)
@ -183,9 +211,114 @@ async fn evaluate_jobset(
tracing::debug!(
jobset = %jobset.name,
commit = %commit_hash,
"Already evaluated, skipping"
"Inputs unchanged (hash: {}), skipping evaluation",
&inputs_hash[..16],
);
return Ok(());
// Create evaluation record even when skipped so system tracks this check
// Handle duplicate key conflict gracefully (another evaluator may have
// created it) - fall through to process existing evaluation instead of
// skipping
if let Err(e) = repo::evaluations::create(pool, CreateEvaluation {
jobset_id: jobset.id,
commit_hash: commit_hash.clone(),
pr_number: None,
pr_head_branch: None,
pr_base_branch: None,
pr_action: None,
})
.await
{
if !matches!(e, CiError::Conflict(_)) {
return Err(e.into());
}
tracing::info!(
jobset = %jobset.name,
commit = %commit_hash,
"Evaluation already exists (concurrent creation in commit path), will process"
);
let existing = repo::evaluations::get_by_jobset_and_commit(
pool,
jobset.id,
&commit_hash,
)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"Evaluation conflict but not found: {}/{}",
jobset.id,
commit_hash
)
})?;
if existing.status == EvaluationStatus::Completed {
// Check if we need to re-evaluate due to no builds
let builds =
repo::builds::list_for_evaluation(pool, existing.id).await?;
if builds.is_empty() {
info!(
"Evaluation completed with 0 builds, re-running nix evaluation \
jobset={} commit={}",
jobset.name, commit_hash
);
// Update existing evaluation status to Running
repo::evaluations::update_status(
pool,
existing.id,
EvaluationStatus::Running,
None,
)
.await?;
// Use existing evaluation instead of creating new one
let eval = existing;
// Run nix evaluation and create builds from the result
let eval_result = crate::nix::evaluate(
&repo_path,
&jobset.nix_expression,
jobset.flake_mode,
nix_timeout,
config,
&inputs,
)
.await?;
create_builds_from_eval(pool, eval.id, &eval_result).await?;
repo::evaluations::update_status(
pool,
eval.id,
EvaluationStatus::Completed,
None,
)
.await?;
repo::jobsets::update_last_checked(pool, jobset.id).await?;
return Ok(());
} else {
info!(
"Evaluation already completed with {} builds, skipping nix \
evaluation jobset={} commit={}",
builds.len(),
jobset.name,
commit_hash
);
repo::jobsets::update_last_checked(pool, jobset.id).await?;
return Ok(());
}
}
// Existing evaluation is pending or running, update status and continue
repo::evaluations::update_status(
pool,
existing.id,
EvaluationStatus::Running,
None,
)
.await?;
} else {
// Successfully created new evaluation, can skip
repo::jobsets::update_last_checked(pool, jobset.id).await?;
return Ok(());
}
}
tracing::info!(
@ -194,8 +327,9 @@ async fn evaluate_jobset(
"Starting evaluation"
);
// Create evaluation record
let eval = repo::evaluations::create(pool, CreateEvaluation {
// Create evaluation record. If it already exists (race condition), fetch the
// existing one and continue. Only update status if it's still pending.
let eval = match repo::evaluations::create(pool, CreateEvaluation {
jobset_id: jobset.id,
commit_hash: commit_hash.clone(),
pr_number: None,
@ -203,16 +337,72 @@ async fn evaluate_jobset(
pr_base_branch: None,
pr_action: None,
})
.await?;
.await
{
Ok(eval) => eval,
Err(CiError::Conflict(_)) => {
tracing::info!(
jobset = %jobset.name,
commit = %commit_hash,
"Evaluation already exists (conflict), fetching existing record"
);
let existing = repo::evaluations::get_by_jobset_and_commit(
pool,
jobset.id,
&commit_hash,
)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"Evaluation conflict but not found: {}/{}",
jobset.id,
commit_hash
)
})?;
// Mark as running and set inputs hash
repo::evaluations::update_status(
pool,
eval.id,
EvaluationStatus::Running,
None,
)
.await?;
if existing.status == EvaluationStatus::Pending {
repo::evaluations::update_status(
pool,
existing.id,
EvaluationStatus::Running,
None,
)
.await?;
} else if existing.status == EvaluationStatus::Completed {
let build_count = repo::builds::count_filtered(
pool,
Some(existing.id),
None,
None,
None,
)
.await?;
if build_count > 0 {
info!(
"Evaluation already completed with {} builds, skipping nix \
evaluation jobset={} commit={}",
build_count, jobset.name, commit_hash
);
return Ok(());
} else {
info!(
"Evaluation completed but has 0 builds, re-running nix evaluation \
jobset={} commit={}",
jobset.name, commit_hash
);
}
}
existing
},
Err(e) => {
return Err(anyhow::anyhow!(e)).with_context(|| {
format!("failed to create evaluation for jobset {}", jobset.name)
});
},
};
// Set inputs hash (only needed for new evaluations, not existing ones)
let _ = repo::evaluations::set_inputs_hash(pool, eval.id, &inputs_hash).await;
// Check for declarative config in repo
@ -230,6 +420,7 @@ async fn evaluate_jobset(
.await
{
Ok(eval_result) => {
tracing::debug!(jobset = %jobset.name, job_count = eval_result.jobs.len(), "Nix evaluation returned");
tracing::info!(
jobset = %jobset.name,
count = eval_result.jobs.len(),
@ -237,70 +428,7 @@ async fn evaluate_jobset(
"Evaluation discovered jobs"
);
// Create build records, tracking drv_path -> build_id for dependency
// resolution
let mut drv_to_build: HashMap<String, Uuid> = HashMap::new();
let mut name_to_build: HashMap<String, Uuid> = HashMap::new();
for job in &eval_result.jobs {
let outputs_json = job
.outputs
.as_ref()
.map(|o| serde_json::to_value(o).unwrap_or_default());
let constituents_json = job
.constituents
.as_ref()
.map(|c| serde_json::to_value(c).unwrap_or_default());
let is_aggregate = job.constituents.is_some();
let build = repo::builds::create(pool, CreateBuild {
evaluation_id: eval.id,
job_name: job.name.clone(),
drv_path: job.drv_path.clone(),
system: job.system.clone(),
outputs: outputs_json,
is_aggregate: Some(is_aggregate),
constituents: constituents_json,
})
.await?;
drv_to_build.insert(job.drv_path.clone(), build.id);
name_to_build.insert(job.name.clone(), build.id);
}
// Resolve dependencies
for job in &eval_result.jobs {
let build_id = match drv_to_build.get(&job.drv_path) {
Some(id) => *id,
None => continue,
};
// Input derivation dependencies
if let Some(ref input_drvs) = job.input_drvs {
for dep_drv in input_drvs.keys() {
if let Some(&dep_build_id) = drv_to_build.get(dep_drv)
&& dep_build_id != build_id
{
let _ =
repo::build_dependencies::create(pool, build_id, dep_build_id)
.await;
}
}
}
// Aggregate constituent dependencies
if let Some(ref constituents) = job.constituents {
for constituent_name in constituents {
if let Some(&dep_build_id) = name_to_build.get(constituent_name)
&& dep_build_id != build_id
{
let _ =
repo::build_dependencies::create(pool, build_id, dep_build_id)
.await;
}
}
}
}
create_builds_from_eval(pool, eval.id, &eval_result).await?;
repo::evaluations::update_status(
pool,
@ -349,6 +477,78 @@ async fn evaluate_jobset(
Ok(())
}
/// Create build records from evaluation results, resolving dependencies.
async fn create_builds_from_eval(
pool: &PgPool,
eval_id: Uuid,
eval_result: &crate::nix::EvalResult,
) -> anyhow::Result<()> {
let mut drv_to_build: HashMap<String, Uuid> = HashMap::new();
let mut name_to_build: HashMap<String, Uuid> = HashMap::new();
for job in &eval_result.jobs {
let outputs_json = job
.outputs
.as_ref()
.map(|o| serde_json::to_value(o).unwrap_or_default());
let constituents_json = job
.constituents
.as_ref()
.map(|c| serde_json::to_value(c).unwrap_or_default());
let is_aggregate = job.constituents.is_some();
let build = repo::builds::create(pool, CreateBuild {
evaluation_id: eval_id,
job_name: job.name.clone(),
drv_path: job.drv_path.clone(),
system: job.system.clone(),
outputs: outputs_json,
is_aggregate: Some(is_aggregate),
constituents: constituents_json,
})
.await?;
drv_to_build.insert(job.drv_path.clone(), build.id);
name_to_build.insert(job.name.clone(), build.id);
}
// Resolve dependencies
for job in &eval_result.jobs {
let build_id = match drv_to_build.get(&job.drv_path) {
Some(id) => *id,
None => continue,
};
// Input derivation dependencies
if let Some(ref input_drvs) = job.input_drvs {
for dep_drv in input_drvs.keys() {
if let Some(&dep_build_id) = drv_to_build.get(dep_drv)
&& dep_build_id != build_id
{
let _ =
repo::build_dependencies::create(pool, build_id, dep_build_id)
.await;
}
}
}
// Aggregate constituent dependencies
if let Some(ref constituents) = job.constituents {
for constituent_name in constituents {
if let Some(&dep_build_id) = name_to_build.get(constituent_name)
&& dep_build_id != build_id
{
let _ =
repo::build_dependencies::create(pool, build_id, dep_build_id)
.await;
}
}
}
}
Ok(())
}
/// Compute a deterministic hash over the commit and all jobset inputs.
/// Used for evaluation caching — skip re-eval when inputs haven't changed.
fn compute_inputs_hash(commit_hash: &str, inputs: &[JobsetInput]) -> String {

View file

@ -16,7 +16,9 @@ pub fn clone_or_fetch(
) -> Result<(PathBuf, String)> {
let repo_path = work_dir.join(project_name);
let repo = if repo_path.exists() {
let is_fetch = repo_path.exists();
let repo = if is_fetch {
let repo = Repository::open(&repo_path)?;
// Fetch origin — scope the borrow so `remote` is dropped before we move
// `repo`
@ -29,21 +31,35 @@ pub fn clone_or_fetch(
Repository::clone(url, &repo_path)?
};
// Resolve commit: use specific branch ref or fall back to HEAD
let hash = if let Some(branch_name) = branch {
let refname = format!("refs/remotes/origin/{branch_name}");
let reference = repo.find_reference(&refname).map_err(|e| {
fc_common::error::CiError::NotFound(format!(
"Branch '{branch_name}' not found ({refname}): {e}"
))
})?;
let commit = reference.peel_to_commit()?;
commit.id().to_string()
} else {
let head = repo.head()?;
let commit = head.peel_to_commit()?;
commit.id().to_string()
// Resolve commit from remote refs (which are always up-to-date after fetch).
// When no branch is specified, detect the default branch from local HEAD's
// tracking target.
let branch_name = match branch {
Some(b) => b.to_string(),
None => {
let head = repo.head()?;
head.shorthand().unwrap_or("master").to_string()
},
};
let remote_ref = format!("refs/remotes/origin/{branch_name}");
let reference = repo.find_reference(&remote_ref).map_err(|e| {
fc_common::error::CiError::NotFound(format!(
"Branch '{branch_name}' not found ({remote_ref}): {e}"
))
})?;
let commit = reference.peel_to_commit()?;
let hash = commit.id().to_string();
// After fetch, update the working tree so nix evaluation sees the latest
// files. Skip on fresh clone since the checkout is already current.
if is_fetch {
repo.checkout_tree(
commit.as_object(),
Some(git2::build::CheckoutBuilder::new().force()),
)?;
repo.set_head_detached(commit.id())?;
}
Ok((repo_path, hash))
}

View file

@ -8,22 +8,37 @@ use fc_common::{
};
use serde::Deserialize;
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone)]
pub struct NixJob {
pub name: String,
#[serde(alias = "drvPath")]
pub drv_path: String,
pub system: Option<String>,
pub outputs: Option<HashMap<String, String>>,
#[serde(alias = "inputDrvs")]
pub input_drvs: Option<HashMap<String, serde_json::Value>>,
pub constituents: Option<Vec<String>>,
}
/// Raw deserialization target for nix-eval-jobs output.
/// nix-eval-jobs emits both `attr` (attribute path) and `name` (derivation
/// name) in the same JSON object. We deserialize them separately and prefer
/// `attr` as the job identifier.
#[derive(Deserialize)]
struct RawNixJob {
name: Option<String>,
attr: Option<String>,
#[serde(alias = "drvPath")]
drv_path: Option<String>,
system: Option<String>,
outputs: Option<HashMap<String, String>>,
#[serde(alias = "inputDrvs")]
input_drvs: Option<HashMap<String, serde_json::Value>>,
constituents: Option<Vec<String>>,
}
/// An error reported by nix-eval-jobs for a single job.
#[derive(Debug, Clone, Deserialize)]
struct NixEvalError {
#[serde(alias = "attr")]
attr: Option<String>,
name: Option<String>,
error: String,
}
@ -49,7 +64,11 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult {
&& parsed.get("error").is_some()
{
if let Ok(eval_err) = serde_json::from_str::<NixEvalError>(line) {
let name = eval_err.name.as_deref().unwrap_or("<unknown>");
let name = eval_err
.attr
.as_deref()
.or(eval_err.name.as_deref())
.unwrap_or("<unknown>");
tracing::warn!(
job = name,
"nix-eval-jobs reported error: {}",
@ -60,8 +79,20 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult {
continue;
}
match serde_json::from_str::<NixJob>(line) {
Ok(job) => jobs.push(job),
match serde_json::from_str::<RawNixJob>(line) {
Ok(raw) => {
// drv_path is required for a valid job
if let Some(drv_path) = raw.drv_path {
jobs.push(NixJob {
name: raw.attr.or(raw.name).unwrap_or_default(),
drv_path,
system: raw.system,
outputs: raw.outputs,
input_drvs: raw.input_drvs,
constituents: raw.constituents,
});
}
},
Err(e) => {
tracing::warn!("Failed to parse nix-eval-jobs line: {e}");
},
@ -100,9 +131,11 @@ async fn evaluate_flake(
) -> Result<EvalResult> {
let flake_ref = format!("{}#{}", repo_path.display(), nix_expression);
tracing::debug!(flake_ref = %flake_ref, "Running nix-eval-jobs");
tokio::time::timeout(timeout, async {
let mut cmd = tokio::process::Command::new("nix-eval-jobs");
cmd.arg("--flake").arg(&flake_ref);
cmd.arg("--flake").arg(&flake_ref).arg("--force-recurse");
if config.restrict_eval {
cmd.args(["--option", "restrict-eval", "true"]);
@ -130,6 +163,16 @@ async fn evaluate_flake(
);
}
if result.jobs.is_empty() && result.error_count == 0 {
let stderr = String::from_utf8_lossy(&out.stderr);
if !stderr.trim().is_empty() {
tracing::warn!(
stderr = %stderr,
"nix-eval-jobs returned no jobs, stderr output present"
);
}
}
Ok(result)
},
_ => {
@ -163,7 +206,7 @@ async fn evaluate_legacy(
tokio::time::timeout(timeout, async {
// Try nix-eval-jobs without --flake for legacy expressions
let mut cmd = tokio::process::Command::new("nix-eval-jobs");
cmd.arg(&expr_path);
cmd.arg(&expr_path).arg("--force-recurse");
if config.restrict_eval {
cmd.args(["--option", "restrict-eval", "true"]);

View file

@ -87,6 +87,31 @@ fn test_parse_error_without_name() {
assert_eq!(result.error_count, 1);
}
#[test]
fn test_parse_nix_eval_jobs_attr_field() {
// nix-eval-jobs uses "attr" instead of "name" for the job identifier
let line = r#"{"attr":"x86_64-linux.hello","drvPath":"/nix/store/abc123-hello.drv","system":"x86_64-linux"}"#;
let result = fc_evaluator::nix::parse_eval_output(line);
assert_eq!(result.jobs.len(), 1);
assert_eq!(result.jobs[0].name, "x86_64-linux.hello");
assert_eq!(result.jobs[0].drv_path, "/nix/store/abc123-hello.drv");
}
#[test]
fn test_parse_nix_eval_jobs_both_attr_and_name() {
// nix-eval-jobs with --force-recurse outputs both "attr" and "name" fields.
// "attr" is the attribute path, "name" is the derivation name. We prefer
// "attr" as the job identifier.
let line = r#"{"attr":"x86_64-linux.hello","attrPath":["x86_64-linux","hello"],"drvPath":"/nix/store/abc123-hello.drv","name":"fc-test-hello","outputs":{"out":"/nix/store/abc123-hello"},"system":"x86_64-linux"}"#;
let result = fc_evaluator::nix::parse_eval_output(line);
assert_eq!(result.jobs.len(), 1);
assert_eq!(result.jobs[0].name, "x86_64-linux.hello");
assert_eq!(result.jobs[0].drv_path, "/nix/store/abc123-hello.drv");
assert_eq!(result.jobs[0].system.as_deref(), Some("x86_64-linux"));
let outputs = result.jobs[0].outputs.as_ref().unwrap();
assert_eq!(outputs.get("out").unwrap(), "/nix/store/abc123-hello");
}
// --- Inputs hash computation ---
#[test]