diff --git a/crates/evaluator/src/eval_loop.rs b/crates/evaluator/src/eval_loop.rs index 3c83084..af22b2a 100644 --- a/crates/evaluator/src/eval_loop.rs +++ b/crates/evaluator/src/eval_loop.rs @@ -58,12 +58,13 @@ async fn evaluate_jobset( let url = jobset.repository_url.clone(); let work_dir = config.work_dir.clone(); let project_name = jobset.project_name.clone(); + let branch = jobset.branch.clone(); // Clone/fetch in a blocking task (git2 is sync) with timeout let (repo_path, commit_hash) = tokio::time::timeout( git_timeout, tokio::task::spawn_blocking(move || { - crate::git::clone_or_fetch(&url, &work_dir, &project_name) + crate::git::clone_or_fetch(&url, &work_dir, &project_name, branch.as_deref()) }), ) .await @@ -92,8 +93,8 @@ async fn evaluate_jobset( } // Also skip if commit hasn't changed (backward compat) - if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await? { - if latest.commit_hash == commit_hash && latest.inputs_hash.as_deref() == Some(&inputs_hash) + if let Some(latest) = repo::evaluations::get_latest(pool, jobset.id).await? + && latest.commit_hash == commit_hash && latest.inputs_hash.as_deref() == Some(&inputs_hash) { tracing::debug!( jobset = %jobset.name, @@ -102,7 +103,6 @@ async fn evaluate_jobset( ); return Ok(()); } - } tracing::info!( jobset = %jobset.name, @@ -189,26 +189,24 @@ async fn evaluate_jobset( // Input derivation dependencies if let Some(ref input_drvs) = job.input_drvs { for dep_drv in input_drvs.keys() { - if let Some(&dep_build_id) = drv_to_build.get(dep_drv) { - if dep_build_id != build_id { + if let Some(&dep_build_id) = drv_to_build.get(dep_drv) + && dep_build_id != build_id { let _ = repo::build_dependencies::create(pool, build_id, dep_build_id) .await; } - } } } // Aggregate constituent dependencies if let Some(ref constituents) = job.constituents { for constituent_name in constituents { - if let Some(&dep_build_id) = name_to_build.get(constituent_name) { - if dep_build_id != build_id { + if let Some(&dep_build_id) = name_to_build.get(constituent_name) + && dep_build_id != build_id { let _ = repo::build_dependencies::create(pool, build_id, dep_build_id) .await; } - } } } } @@ -303,6 +301,8 @@ async fn check_declarative_config(pool: &PgPool, repo_path: &std::path::Path, pr enabled: js.enabled, flake_mode: js.flake_mode, check_interval: js.check_interval, + branch: None, + scheduling_shares: None, }; if let Err(e) = repo::jobsets::upsert(pool, input).await { tracing::warn!("Failed to upsert declarative jobset: {e}"); diff --git a/crates/evaluator/src/git.rs b/crates/evaluator/src/git.rs index b300c6a..81fec17 100644 --- a/crates/evaluator/src/git.rs +++ b/crates/evaluator/src/git.rs @@ -3,8 +3,16 @@ use std::path::{Path, PathBuf}; use fc_common::error::Result; use git2::Repository; -/// Clone or fetch a repository. Returns (repo_path, head_commit_hash). -pub fn clone_or_fetch(url: &str, work_dir: &Path, project_name: &str) -> Result<(PathBuf, String)> { +/// Clone or fetch a repository. Returns (repo_path, commit_hash). +/// +/// If `branch` is `Some`, resolve `refs/remotes/origin/` instead of HEAD. +#[tracing::instrument(skip(work_dir))] +pub fn clone_or_fetch( + url: &str, + work_dir: &Path, + project_name: &str, + branch: Option<&str>, +) -> Result<(PathBuf, String)> { let repo_path = work_dir.join(project_name); let repo = if repo_path.exists() { @@ -19,10 +27,21 @@ pub fn clone_or_fetch(url: &str, work_dir: &Path, project_name: &str) -> Result< Repository::clone(url, &repo_path)? }; - // Get HEAD commit hash - let head = repo.head()?; - let commit = head.peel_to_commit()?; - let hash = commit.id().to_string(); + // Resolve commit: use specific branch ref or fall back to HEAD + let hash = if let Some(branch_name) = branch { + let refname = format!("refs/remotes/origin/{branch_name}"); + let reference = repo.find_reference(&refname).map_err(|e| { + fc_common::error::CiError::NotFound(format!( + "Branch '{branch_name}' not found ({refname}): {e}" + )) + })?; + let commit = reference.peel_to_commit()?; + commit.id().to_string() + } else { + let head = repo.head()?; + let commit = head.peel_to_commit()?; + commit.id().to_string() + }; Ok((repo_path, hash)) } diff --git a/crates/evaluator/src/main.rs b/crates/evaluator/src/main.rs index 41bfeb0..b85a55e 100644 --- a/crates/evaluator/src/main.rs +++ b/crates/evaluator/src/main.rs @@ -11,13 +11,12 @@ struct Cli { #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let _cli = Cli::parse(); - tracing::info!("Starting CI Evaluator"); - let config = Config::load()?; + fc_common::init_tracing(&config.tracing); + + tracing::info!("Starting CI Evaluator"); tracing::info!("Configuration loaded"); // Ensure work directory exists diff --git a/crates/evaluator/src/nix.rs b/crates/evaluator/src/nix.rs index 852896e..e837a4c 100644 --- a/crates/evaluator/src/nix.rs +++ b/crates/evaluator/src/nix.rs @@ -45,8 +45,8 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult { continue; } - if let Ok(parsed) = serde_json::from_str::(line) { - if parsed.get("error").is_some() { + if let Ok(parsed) = serde_json::from_str::(line) + && parsed.get("error").is_some() { if let Ok(eval_err) = serde_json::from_str::(line) { let name = eval_err.name.as_deref().unwrap_or(""); tracing::warn!( @@ -58,7 +58,6 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult { } continue; } - } match serde_json::from_str::(line) { Ok(job) => jobs.push(job), @@ -74,6 +73,7 @@ pub fn parse_eval_output(stdout: &str) -> EvalResult { /// Evaluate nix expressions and return discovered jobs. /// If flake_mode is true, uses nix-eval-jobs with --flake flag. /// If flake_mode is false, evaluates a legacy expression file. +#[tracing::instrument(skip(config, inputs), fields(flake_mode, nix_expression))] pub async fn evaluate( repo_path: &Path, nix_expression: &str, @@ -89,6 +89,7 @@ pub async fn evaluate( } } +#[tracing::instrument(skip(config, inputs))] async fn evaluate_flake( repo_path: &Path, nix_expression: &str, @@ -98,7 +99,9 @@ async fn evaluate_flake( ) -> Result { let flake_ref = format!("{}#{}", repo_path.display(), nix_expression); - let result = tokio::time::timeout(timeout, async { + + + tokio::time::timeout(timeout, async { let mut cmd = tokio::process::Command::new("nix-eval-jobs"); cmd.arg("--flake").arg(&flake_ref); @@ -141,12 +144,11 @@ async fn evaluate_flake( } }) .await - .map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))?; - - result + .map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))? } /// Legacy (non-flake) evaluation: import the nix expression file and evaluate it. +#[tracing::instrument(skip(config, inputs))] async fn evaluate_legacy( repo_path: &Path, nix_expression: &str, @@ -156,7 +158,9 @@ async fn evaluate_legacy( ) -> Result { let expr_path = repo_path.join(nix_expression); - let result = tokio::time::timeout(timeout, async { + + + tokio::time::timeout(timeout, async { // Try nix-eval-jobs without --flake for legacy expressions let mut cmd = tokio::process::Command::new("nix-eval-jobs"); cmd.arg(&expr_path); @@ -222,9 +226,7 @@ async fn evaluate_legacy( } }) .await - .map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))?; - - result + .map_err(|_| CiError::Timeout(format!("Nix evaluation timed out after {timeout:?}")))? } async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Result> { @@ -261,8 +263,8 @@ async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Resul if drv_output.status.success() { let drv_stdout = String::from_utf8_lossy(&drv_output.stdout); - if let Ok(drv_json) = serde_json::from_str::(&drv_stdout) { - if let Some((drv_path, drv_val)) = + if let Ok(drv_json) = serde_json::from_str::(&drv_stdout) + && let Some((drv_path, drv_val)) = drv_json.as_object().and_then(|o| o.iter().next()) { let system = drv_val @@ -278,7 +280,6 @@ async fn evaluate_with_nix_eval(repo_path: &Path, nix_expression: &str) -> Resul constituents: None, }); } - } } } } diff --git a/crates/evaluator/tests/git_tests.rs b/crates/evaluator/tests/git_tests.rs index 55c26f3..413f5fc 100644 --- a/crates/evaluator/tests/git_tests.rs +++ b/crates/evaluator/tests/git_tests.rs @@ -22,7 +22,7 @@ fn test_clone_or_fetch_clones_new_repo() { } let url = format!("file://{}", upstream_dir.path().display()); - let result = fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project"); + let result = fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project", None); assert!( result.is_ok(), @@ -54,7 +54,7 @@ fn test_clone_or_fetch_fetches_existing() { // First clone let (_, hash1): (std::path::PathBuf, String) = - fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project") + fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project", None) .expect("first clone failed"); // Make another commit upstream @@ -70,7 +70,7 @@ fn test_clone_or_fetch_fetches_existing() { // Second fetch let (_, hash2): (std::path::PathBuf, String) = - fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project") + fc_evaluator::git::clone_or_fetch(&url, work_dir.path(), "test-project", None) .expect("second fetch failed"); assert!(!hash1.is_empty()); @@ -80,7 +80,11 @@ fn test_clone_or_fetch_fetches_existing() { #[test] fn test_clone_invalid_url_returns_error() { let work_dir = TempDir::new().unwrap(); - let result = - fc_evaluator::git::clone_or_fetch("file:///nonexistent/repo", work_dir.path(), "bad-proj"); + let result = fc_evaluator::git::clone_or_fetch( + "file:///nonexistent/repo", + work_dir.path(), + "bad-proj", + None, + ); assert!(result.is_err()); }