crates/evaluator: add multi-branch evaluation support

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ida40acb8d093f7d7e387913681b767276a6a6964
This commit is contained in:
raf 2026-02-02 01:24:44 +03:00
commit 00a4dc8d37
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
5 changed files with 62 additions and 39 deletions

View file

@ -58,12 +58,13 @@ async fn evaluate_jobset(
let url = jobset.repository_url.clone(); let url = jobset.repository_url.clone();
let work_dir = config.work_dir.clone(); let work_dir = config.work_dir.clone();
let project_name = jobset.project_name.clone(); let project_name = jobset.project_name.clone();
let branch = jobset.branch.clone();
// Clone/fetch in a blocking task (git2 is sync) with timeout // Clone/fetch in a blocking task (git2 is sync) with timeout
let (repo_path, commit_hash) = tokio::time::timeout( let (repo_path, commit_hash) = tokio::time::timeout(
git_timeout, git_timeout,
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
crate::git::clone_or_fetch(&url, &work_dir, &project_name) crate::git::clone_or_fetch(&url, &work_dir, &project_name, branch.as_deref())
}), }),
) )
.await .await
@ -92,8 +93,8 @@ async fn evaluate_jobset(
} }
// Also skip if commit hasn't changed (backward compat) // Also skip if commit hasn't changed (backward compat)
if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await? { if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await?
if latest.commit_hash == commit_hash && latest.inputs_hash.as_deref() == Some(&inputs_hash) && latest.commit_hash == commit_hash && latest.inputs_hash.as_deref() == Some(&inputs_hash)
{ {
tracing::debug!( tracing::debug!(
jobset = %jobset.name, jobset = %jobset.name,
@ -102,7 +103,6 @@ async fn evaluate_jobset(
); );
return Ok(()); return Ok(());
} }
}
tracing::info!( tracing::info!(
jobset = %jobset.name, jobset = %jobset.name,
@ -189,26 +189,24 @@ async fn evaluate_jobset(
// Input derivation dependencies // Input derivation dependencies
if let Some(ref input_drvs) = job.input_drvs { if let Some(ref input_drvs) = job.input_drvs {
for dep_drv in input_drvs.keys() { for dep_drv in input_drvs.keys() {
if let Some(&dep_build_id) = drv_to_build.get(dep_drv) { if let Some(&dep_build_id) = drv_to_build.get(dep_drv)
if dep_build_id != build_id { && dep_build_id != build_id {
let _ = let _ =
repo::build_dependencies::create(pool, build_id, dep_build_id) repo::build_dependencies::create(pool, build_id, dep_build_id)
.await; .await;
} }
}
} }
} }
// Aggregate constituent dependencies // Aggregate constituent dependencies
if let Some(ref constituents) = job.constituents { if let Some(ref constituents) = job.constituents {
for constituent_name in constituents { for constituent_name in constituents {
if let Some(&dep_build_id) = name_to_build.get(constituent_name) { if let Some(&dep_build_id) = name_to_build.get(constituent_name)
if dep_build_id != build_id { && dep_build_id != build_id {
let _ = let _ =
repo::build_dependencies::create(pool, build_id, dep_build_id) repo::build_dependencies::create(pool, build_id, dep_build_id)
.await; .await;
} }
}
} }
} }
} }
@ -303,6 +301,8 @@ async fn check_declarative_config(pool: &PgPool, repo_path: &std::path::Path, pr
enabled: js.enabled, enabled: js.enabled,
flake_mode: js.flake_mode, flake_mode: js.flake_mode,
check_interval: js.check_interval, check_interval: js.check_interval,
branch: None,
scheduling_shares: None,
}; };
if let Err(e) = repo::jobsets::upsert(pool, input).await { if let Err(e) = repo::jobsets::upsert(pool, input).await {
tracing::warn!("Failed to upsert declarative jobset: {e}"); tracing::warn!("Failed to upsert declarative jobset: {e}");

View file

@ -3,8 +3,16 @@ use std::path::{Path, PathBuf};
use fc_common::error::Result; use fc_common::error::Result;
use git2::Repository; use git2::Repository;
/// Clone or fetch a repository. Returns (repo_path, head_commit_hash). /// Clone or fetch a repository. Returns (repo_path, commit_hash).
pub fn clone_or_fetch(url: &str, work_dir: &Path, project_name: &str) -> Result<(PathBuf, String)> { ///
/// If `branch` is `Some`, resolve `refs/remotes/origin/<branch>` instead of HEAD.
#[tracing::instrument(skip(work_dir))]
pub fn clone_or_fetch(
url: &str,
work_dir: &Path,
project_name: &str,
branch: Option<&str>,
) -> Result<(PathBuf, String)> {
let repo_path = work_dir.join(project_name); let repo_path = work_dir.join(project_name);
let repo = if repo_path.exists() { let repo = if repo_path.exists() {
@ -19,10 +27,21 @@ pub fn clone_or_fetch(url: &str, work_dir: &Path, project_name: &str) -> Result<
Repository::clone(url, &repo_path)? Repository::clone(url, &repo_path)?
}; };
// Get HEAD commit hash // Resolve commit: use specific branch ref or fall back to HEAD
let head = repo.head()?; let hash = if let Some(branch_name) = branch {
let commit = head.peel_to_commit()?; let refname = format!("refs/remotes/origin/{branch_name}");
let hash = commit.id().to_string(); let reference = repo.find_reference(&refname).map_err(|e| {
fc_common::error::CiError::NotFound(format!(
"Branch '{branch_name}' not found ({refname}): {e}"
))
})?;
let commit = reference.peel_to_commit()?;
commit.id().to_string()
} else {
let head = repo.head()?;
let commit = head.peel_to_commit()?;
commit.id().to_string()
};
Ok((repo_path, hash)) Ok((repo_path, hash))
} }

View file

@ -11,13 +11,12 @@ struct Cli {
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let _cli = Cli::parse(); let _cli = Cli::parse();
tracing::info!("Starting CI Evaluator");
let config = Config::load()?; let config = Config::load()?;
fc_common::init_tracing(&config.tracing);
tracing::info!("Starting CI Evaluator");
tracing::info!("Configuration loaded"); tracing::info!("Configuration loaded");
// Ensure work directory exists // Ensure work directory exists

View file

@ -45,8 +45,8 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult {
continue; continue;
} }
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) { if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line)
if parsed.get("error").is_some() { && parsed.get("error").is_some() {
if let Ok(eval_err) = serde_json::from_str::<NixEvalError>(line) { if let Ok(eval_err) = serde_json::from_str::<NixEvalError>(line) {
let name = eval_err.name.as_deref().unwrap_or("<unknown>"); let name = eval_err.name.as_deref().unwrap_or("<unknown>");
tracing::warn!( tracing::warn!(
@ -58,7 +58,6 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult {
} }
continue; continue;
} }
}
match serde_json::from_str::<NixJob>(line) { match serde_json::from_str::<NixJob>(line) {
Ok(job) => jobs.push(job), Ok(job) => jobs.push(job),
@ -74,6 +73,7 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult {
/// Evaluate nix expressions and return discovered jobs. /// Evaluate nix expressions and return discovered jobs.
/// If flake_mode is true, uses nix-eval-jobs with --flake flag. /// If flake_mode is true, uses nix-eval-jobs with --flake flag.
/// If flake_mode is false, evaluates a legacy expression file. /// If flake_mode is false, evaluates a legacy expression file.
#[tracing::instrument(skip(config, inputs), fields(flake_mode, nix_expression))]
pub async fn evaluate( pub async fn evaluate(
repo_path: &Path, repo_path: &Path,
nix_expression: &str, nix_expression: &str,
@ -89,6 +89,7 @@ pub async fn evaluate(
} }
} }
#[tracing::instrument(skip(config, inputs))]
async fn evaluate_flake( async fn evaluate_flake(
repo_path: &Path, repo_path: &Path,
nix_expression: &str, nix_expression: &str,
@ -98,7 +99,9 @@ async fn evaluate_flake(
) -> Result<EvalResult> { ) -> Result<EvalResult> {
let flake_ref = format!("{}#{}", repo_path.display(), nix_expression); let flake_ref = format!("{}#{}", repo_path.display(), nix_expression);
let result = tokio::time::timeout(timeout, async {
tokio::time::timeout(timeout, async {
let mut cmd = tokio::process::Command::new("nix-eval-jobs"); let mut cmd = tokio::process::Command::new("nix-eval-jobs");
cmd.arg("--flake").arg(&flake_ref); cmd.arg("--flake").arg(&flake_ref);
@ -141,12 +144,11 @@ async fn evaluate_flake(
} }
}) })
.await .await
.map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))?; .map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))?
result
} }
/// Legacy (non-flake) evaluation: import the nix expression file and evaluate it. /// Legacy (non-flake) evaluation: import the nix expression file and evaluate it.
#[tracing::instrument(skip(config, inputs))]
async fn evaluate_legacy( async fn evaluate_legacy(
repo_path: &Path, repo_path: &Path,
nix_expression: &str, nix_expression: &str,
@ -156,7 +158,9 @@ async fn evaluate_legacy(
) -> Result<EvalResult> { ) -> Result<EvalResult> {
let expr_path = repo_path.join(nix_expression); let expr_path = repo_path.join(nix_expression);
let result = tokio::time::timeout(timeout, async {
tokio::time::timeout(timeout, async {
// Try nix-eval-jobs without --flake for legacy expressions // Try nix-eval-jobs without --flake for legacy expressions
let mut cmd = tokio::process::Command::new("nix-eval-jobs"); let mut cmd = tokio::process::Command::new("nix-eval-jobs");
cmd.arg(&expr_path); cmd.arg(&expr_path);
@ -222,9 +226,7 @@ async fn evaluate_legacy(
} }
}) })
.await .await
.map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))?; .map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))?
result
} }
async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Result<Vec<NixJob>> { async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Result<Vec<NixJob>> {
@ -261,8 +263,8 @@ async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Resul
if drv_output.status.success() { if drv_output.status.success() {
let drv_stdout = String::from_utf8_lossy(&drv_output.stdout); let drv_stdout = String::from_utf8_lossy(&drv_output.stdout);
if let Ok(drv_json) = serde_json::from_str::<serde_json::Value>(&drv_stdout) { if let Ok(drv_json) = serde_json::from_str::<serde_json::Value>(&drv_stdout)
if let Some((drv_path, drv_val)) = && let Some((drv_path, drv_val)) =
drv_json.as_object().and_then(|o| o.iter().next()) drv_json.as_object().and_then(|o| o.iter().next())
{ {
let system = drv_val let system = drv_val
@ -278,7 +280,6 @@ async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Resul
constituents: None, constituents: None,
}); });
} }
}
} }
} }
} }

View file

@ -22,7 +22,7 @@ fn test_clone_or_fetch_clones_new_repo() {
} }
let url = format!("file://{}", upstream_dir.path().display()); let url = format!("file://{}", upstream_dir.path().display());
let result = fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project"); let result = fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project", None);
assert!( assert!(
result.is_ok(), result.is_ok(),
@ -54,7 +54,7 @@ fn test_clone_or_fetch_fetches_existing() {
// First clone // First clone
let (_, hash1): (std::path::PathBuf, String) = let (_, hash1): (std::path::PathBuf, String) =
fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project") fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project", None)
.expect("first clone failed"); .expect("first clone failed");
// Make another commit upstream // Make another commit upstream
@ -70,7 +70,7 @@ fn test_clone_or_fetch_fetches_existing() {
// Second fetch // Second fetch
let (_, hash2): (std::path::PathBuf, String) = let (_, hash2): (std::path::PathBuf, String) =
fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project") fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project", None)
.expect("second fetch failed"); .expect("second fetch failed");
assert!(!hash1.is_empty()); assert!(!hash1.is_empty());
@ -80,7 +80,11 @@ fn test_clone_or_fetch_fetches_existing() {
#[test] #[test]
fn test_clone_invalid_url_returns_error() { fn test_clone_invalid_url_returns_error() {
let work_dir = TempDir::new().unwrap(); let work_dir = TempDir::new().unwrap();
let result = let result = fc_evaluator::git::clone_or_fetch(
fc_evaluator::git::clone_or_fetch("file:///nonexistent/repo", work_dir.path(), "bad-proj"); "file:///nonexistent/repo",
work_dir.path(),
"bad-proj",
None,
);
assert!(result.is_err()); assert!(result.is_err());
} }