From 44d1ee1d6b59466a22c06486d9460d86777e3f24 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 1 Feb 2026 15:13:19 +0300 Subject: [PATCH] queue-runner: semaphore-based worker pool with atomic build claiming Signed-off-by: NotAShelf Change-Id: Ie3a8d343c3705200f0cac566227db54f6a6a6964 --- crates/queue-runner/Cargo.toml | 8 +- crates/queue-runner/src/builder.rs | 305 +++++++++++++ crates/queue-runner/src/lib.rs | 3 + crates/queue-runner/src/main.rs | 146 ++++++- crates/queue-runner/src/runner_loop.rs | 125 ++++++ crates/queue-runner/src/worker.rs | 500 ++++++++++++++++++++++ crates/queue-runner/tests/runner_tests.rs | 285 ++++++++++++ 7 files changed, 1367 insertions(+), 5 deletions(-) create mode 100644 crates/queue-runner/src/builder.rs create mode 100644 crates/queue-runner/src/lib.rs create mode 100644 crates/queue-runner/src/runner_loop.rs create mode 100644 crates/queue-runner/src/worker.rs create mode 100644 crates/queue-runner/tests/runner_tests.rs diff --git a/crates/queue-runner/Cargo.toml b/crates/queue-runner/Cargo.toml index d31b7ec..b8fae01 100644 --- a/crates/queue-runner/Cargo.toml +++ b/crates/queue-runner/Cargo.toml @@ -19,4 +19,10 @@ anyhow.workspace = true thiserror.workspace = true clap.workspace = true config.workspace = true -fc-common = { path = "../common" } \ No newline at end of file +tokio-util.workspace = true + +# Our crates +fc-common.workspace = true + +[dev-dependencies] +tempfile.workspace = true diff --git a/crates/queue-runner/src/builder.rs b/crates/queue-runner/src/builder.rs new file mode 100644 index 0000000..eff5963 --- /dev/null +++ b/crates/queue-runner/src/builder.rs @@ -0,0 +1,305 @@ +use std::path::Path; +use std::time::Duration; + +use fc_common::CiError; +use fc_common::error::Result; +use tokio::io::{AsyncBufReadExt, BufReader}; + +const MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100MB + +/// Run a build on a remote machine via `nix build --store ssh://...`. +pub async fn run_nix_build_remote( + drv_path: &str, + work_dir: &Path, + timeout: Duration, + store_uri: &str, + ssh_key_file: Option<&str>, + live_log_path: Option<&Path>, +) -> Result { + let result = tokio::time::timeout(timeout, async { + let mut cmd = tokio::process::Command::new("nix"); + cmd.args([ + "build", + "--no-link", + "--print-out-paths", + "--log-format", + "internal-json", + "--option", + "sandbox", + "true", + "--max-build-log-size", + "104857600", + "--store", + store_uri, + drv_path, + ]) + .current_dir(work_dir) + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); + + if let Some(key_file) = ssh_key_file { + cmd.env( + "NIX_SSHOPTS", + format!("-i {key_file} -o StrictHostKeyChecking=accept-new"), + ); + } + + let mut child = cmd + .spawn() + .map_err(|e| CiError::Build(format!("Failed to run remote nix build: {e}")))?; + + let stdout_handle = child.stdout.take(); + let stderr_handle = child.stderr.take(); + + let stdout_task = tokio::spawn(async move { + let mut buf = String::new(); + if let Some(stdout) = stdout_handle { + let mut reader = BufReader::new(stdout); + let mut line = String::new(); + while reader.read_line(&mut line).await.unwrap_or(0) > 0 { + buf.push_str(&line); + line.clear(); + } + } + buf + }); + + let live_log_path_owned = live_log_path.map(|p| p.to_path_buf()); + let stderr_task = tokio::spawn(async move { + let mut buf = String::new(); + let steps: Vec = Vec::new(); + let mut log_file = if let Some(ref path) = live_log_path_owned { + tokio::fs::File::create(path).await.ok() + } else { + None + }; + + if let Some(stderr) = stderr_handle { + let mut reader = BufReader::new(stderr); + let mut line = String::new(); + while reader.read_line(&mut line).await.unwrap_or(0) > 0 { + if let Some(ref mut f) = log_file { + use tokio::io::AsyncWriteExt; + let _ = f.write_all(line.as_bytes()).await; + let _ = f.flush().await; + } + if buf.len() < MAX_LOG_SIZE { + buf.push_str(&line); + } + line.clear(); + } + } + (buf, steps) + }); + + let stdout_buf = stdout_task.await.unwrap_or_default(); + let (stderr_buf, sub_steps) = stderr_task.await.unwrap_or_default(); + + let status = child + .wait() + .await + .map_err(|e| CiError::Build(format!("Failed to wait for remote nix build: {e}")))?; + + let output_paths: Vec = stdout_buf + .lines() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + Ok::<_, CiError>(BuildResult { + success: status.success(), + stdout: stdout_buf, + stderr: stderr_buf, + output_paths, + sub_steps, + }) + }) + .await; + + match result { + Ok(inner) => inner, + Err(_) => Err(CiError::Timeout(format!( + "Remote build timed out after {timeout:?}" + ))), + } +} + +pub struct BuildResult { + pub success: bool, + pub stdout: String, + pub stderr: String, + pub output_paths: Vec, + pub sub_steps: Vec, +} + +/// A sub-step parsed from nix's internal JSON log format. +pub struct SubStep { + pub drv_path: String, + pub completed_at: Option>, + pub success: bool, +} + +/// Parse a single nix internal JSON log line (`@nix {...}`). +/// Returns `Some(action, drv_path)` if the line contains a derivation action. +pub fn parse_nix_log_line(line: &str) -> Option<(&'static str, String)> { + let json_str = line.strip_prefix("@nix ")?.trim(); + let parsed: serde_json::Value = serde_json::from_str(json_str).ok()?; + let action = parsed.get("action")?.as_str()?; + let drv = parsed.get("derivation")?.as_str()?.to_string(); + + match action { + "start" => Some(("start", drv)), + "stop" => Some(("stop", drv)), + _ => None, + } +} + +/// Run `nix build` for a derivation path. +/// If `live_log_path` is provided, build output is streamed to that file incrementally. +pub async fn run_nix_build( + drv_path: &str, + work_dir: &Path, + timeout: Duration, + live_log_path: Option<&Path>, +) -> Result { + let result = tokio::time::timeout(timeout, async { + let mut child = tokio::process::Command::new("nix") + .args([ + "build", + "--no-link", + "--print-out-paths", + "--log-format", + "internal-json", + "--option", + "sandbox", + "true", + "--max-build-log-size", + "104857600", + drv_path, + ]) + .current_dir(work_dir) + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| CiError::Build(format!("Failed to run nix build: {e}")))?; + + let stdout_handle = child.stdout.take(); + let stderr_handle = child.stderr.take(); + + // Read stdout (output paths) + let stdout_task = tokio::spawn(async move { + let mut buf = String::new(); + if let Some(stdout) = stdout_handle { + let mut reader = BufReader::new(stdout); + let mut line = String::new(); + while reader.read_line(&mut line).await.unwrap_or(0) > 0 { + buf.push_str(&line); + line.clear(); + } + } + buf + }); + + // Read stderr (logs + internal JSON) + let live_log_path_owned = live_log_path.map(|p| p.to_path_buf()); + let stderr_task = tokio::spawn(async move { + let mut buf = String::new(); + let mut steps: Vec = Vec::new(); + let mut log_file = if let Some(ref path) = live_log_path_owned { + tokio::fs::File::create(path).await.ok() + } else { + None + }; + + if let Some(stderr) = stderr_handle { + let mut reader = BufReader::new(stderr); + let mut line = String::new(); + while reader.read_line(&mut line).await.unwrap_or(0) > 0 { + // Write to live log file if available + if let Some(ref mut f) = log_file { + use tokio::io::AsyncWriteExt; + let _ = f.write_all(line.as_bytes()).await; + let _ = f.flush().await; + } + + // Parse nix internal JSON log lines + if line.starts_with("@nix ") { + if let Some(json_str) = line.strip_prefix("@nix ") { + if let Ok(parsed) = + serde_json::from_str::(json_str.trim()) + { + if let Some(action) = parsed.get("action").and_then(|a| a.as_str()) + { + match action { + "start" => { + if let Some(drv) = + parsed.get("derivation").and_then(|d| d.as_str()) + { + steps.push(SubStep { + drv_path: drv.to_string(), + completed_at: None, + success: false, + }); + } + } + "stop" => { + if let Some(drv) = + parsed.get("derivation").and_then(|d| d.as_str()) + { + if let Some(step) = + steps.iter_mut().rfind(|s| s.drv_path == drv) + { + step.completed_at = Some(chrono::Utc::now()); + step.success = true; + } + } + } + _ => {} + } + } + } + } + } + + if buf.len() < MAX_LOG_SIZE { + buf.push_str(&line); + } + line.clear(); + } + } + (buf, steps) + }); + + let stdout_buf = stdout_task.await.unwrap_or_default(); + let (stderr_buf, sub_steps) = stderr_task.await.unwrap_or_default(); + + let status = child + .wait() + .await + .map_err(|e| CiError::Build(format!("Failed to wait for nix build: {e}")))?; + + let output_paths: Vec = stdout_buf + .lines() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + Ok::<_, CiError>(BuildResult { + success: status.success(), + stdout: stdout_buf, + stderr: stderr_buf, + output_paths, + sub_steps, + }) + }) + .await; + + match result { + Ok(inner) => inner, + Err(_) => Err(CiError::Timeout(format!( + "Build timed out after {timeout:?}" + ))), + } +} diff --git a/crates/queue-runner/src/lib.rs b/crates/queue-runner/src/lib.rs new file mode 100644 index 0000000..158c376 --- /dev/null +++ b/crates/queue-runner/src/lib.rs @@ -0,0 +1,3 @@ +pub mod builder; +pub mod runner_loop; +pub mod worker; diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index 519fc36..fd91aec 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -1,5 +1,13 @@ +use std::time::Duration; + use clap::Parser; -use tracing_subscriber::fmt::init; + +use fc_common::config::{Config, GcConfig}; +use fc_common::database::Database; +use fc_common::gc_roots; +use std::sync::Arc; + +use fc_queue_runner::worker::WorkerPool; #[derive(Parser)] #[command(name = "fc-queue-runner")] @@ -11,13 +19,143 @@ struct Cli { #[tokio::main] async fn main() -> anyhow::Result<()> { - #[allow(unused_variables, reason = "Main application logic is TODO")] + tracing_subscriber::fmt::init(); + let cli = Cli::parse(); tracing::info!("Starting CI Queue Runner"); - init(); - // TODO: Implement queue runner logic + let config = Config::load()?; + let log_config = config.logs; + let gc_config = config.gc; + let gc_config_for_loop = gc_config.clone(); + let notifications_config = config.notifications; + let signing_config = config.signing; + let qr_config = config.queue_runner; + + let workers = cli.workers.unwrap_or(qr_config.workers); + let poll_interval = Duration::from_secs(qr_config.poll_interval); + let build_timeout = Duration::from_secs(qr_config.build_timeout); + let work_dir = qr_config.work_dir; + + // Ensure the work directory exists + tokio::fs::create_dir_all(&work_dir).await?; + + // Clean up orphaned active logs from previous crashes + cleanup_stale_logs(&log_config.log_dir).await; + + let db = Database::new(config.database).await?; + + let worker_pool = Arc::new(WorkerPool::new( + db.pool().clone(), + workers, + work_dir.clone(), + build_timeout, + log_config, + gc_config, + notifications_config, + signing_config, + )); + + tracing::info!( + workers = workers, + poll_interval = ?poll_interval, + build_timeout = ?build_timeout, + work_dir = %work_dir.display(), + "Queue runner configured" + ); + + let worker_pool_for_drain = worker_pool.clone(); + + tokio::select! { + result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval) => { + if let Err(e) = result { + tracing::error!("Runner loop failed: {e}"); + } + } + () = gc_loop(gc_config_for_loop) => {} + () = shutdown_signal() => { + tracing::info!("Shutdown signal received, draining in-flight builds..."); + worker_pool_for_drain.drain(); + worker_pool_for_drain.wait_for_drain().await; + tracing::info!("All in-flight builds completed"); + } + } + + tracing::info!("Queue runner shutting down, closing database pool"); + db.close().await; Ok(()) } + +async fn cleanup_stale_logs(log_dir: &std::path::Path) { + if let Ok(mut entries) = tokio::fs::read_dir(log_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + if entry.file_name().to_string_lossy().ends_with(".active.log") { + let _ = tokio::fs::remove_file(entry.path()).await; + tracing::info!("Removed stale active log: {}", entry.path().display()); + } + } + } +} + +async fn gc_loop(gc_config: GcConfig) { + if !gc_config.enabled { + return std::future::pending().await; + } + let interval = std::time::Duration::from_secs(gc_config.cleanup_interval); + let max_age = std::time::Duration::from_secs(gc_config.max_age_days * 86400); + + loop { + tokio::time::sleep(interval).await; + match gc_roots::cleanup_old_roots(&gc_config.gc_roots_dir, max_age) { + Ok(count) if count > 0 => { + tracing::info!(count, "Cleaned up old GC roots"); + // Optionally run nix-collect-garbage + match tokio::process::Command::new("nix-collect-garbage") + .output() + .await + { + Ok(output) if output.status.success() => { + tracing::info!("nix-collect-garbage completed"); + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("nix-collect-garbage failed: {stderr}"); + } + Err(e) => { + tracing::warn!("Failed to run nix-collect-garbage: {e}"); + } + } + } + Ok(_) => {} + Err(e) => { + tracing::error!("GC cleanup failed: {e}"); + } + } + } +} + +async fn shutdown_signal() { + let ctrl_c = async { + tokio::signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + () = ctrl_c => {}, + () = terminate => {}, + } +} diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs new file mode 100644 index 0000000..dca8ba8 --- /dev/null +++ b/crates/queue-runner/src/runner_loop.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; +use std::time::Duration; + +use sqlx::PgPool; + +use fc_common::models::BuildStatus; +use fc_common::repo; + +use crate::worker::WorkerPool; + +pub async fn run( + pool: PgPool, + worker_pool: Arc, + poll_interval: Duration, +) -> anyhow::Result<()> { + // Reset orphaned builds from previous crashes (older than 5 minutes) + match repo::builds::reset_orphaned(&pool, 300).await { + Ok(count) if count > 0 => { + tracing::warn!(count, "Reset orphaned builds back to pending"); + } + Ok(_) => {} + Err(e) => { + tracing::error!("Failed to reset orphaned builds: {e}"); + } + } + + loop { + match repo::builds::list_pending(&pool, 10).await { + Ok(builds) => { + if !builds.is_empty() { + tracing::info!("Found {} pending builds", builds.len()); + } + for build in builds { + // Aggregate builds: check if all constituents are done + if build.is_aggregate { + match repo::build_dependencies::all_deps_completed(&pool, build.id).await { + Ok(true) => { + // All constituents done — mark aggregate as completed + tracing::info!( + build_id = %build.id, + job = %build.job_name, + "Aggregate build: all constituents completed" + ); + let _ = repo::builds::start(&pool, build.id).await; + let _ = repo::builds::complete( + &pool, + build.id, + BuildStatus::Completed, + None, + None, + None, + ) + .await; + continue; + } + Ok(false) => { + tracing::debug!( + build_id = %build.id, + "Aggregate build waiting for constituents" + ); + continue; + } + Err(e) => { + tracing::error!( + build_id = %build.id, + "Failed to check aggregate deps: {e}" + ); + continue; + } + } + } + + // Derivation deduplication: reuse result if same drv was already built + match repo::builds::get_completed_by_drv_path(&pool, &build.drv_path).await { + Ok(Some(existing)) if existing.id != build.id => { + tracing::info!( + build_id = %build.id, + existing_id = %existing.id, + drv = %build.drv_path, + "Dedup: reusing result from existing build" + ); + let _ = repo::builds::start(&pool, build.id).await; + let _ = repo::builds::complete( + &pool, + build.id, + BuildStatus::Completed, + existing.log_path.as_deref(), + existing.build_output_path.as_deref(), + None, + ) + .await; + continue; + } + _ => {} + } + + // Dependency-aware scheduling: skip if deps not met + match repo::build_dependencies::all_deps_completed(&pool, build.id).await { + Ok(true) => {} + Ok(false) => { + tracing::debug!( + build_id = %build.id, + "Build waiting for dependencies" + ); + continue; + } + Err(e) => { + tracing::error!( + build_id = %build.id, + "Failed to check build deps: {e}" + ); + continue; + } + } + + worker_pool.dispatch(build); + } + } + Err(e) => { + tracing::error!("Failed to fetch pending builds: {e}"); + } + } + tokio::time::sleep(poll_interval).await; + } +} diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs new file mode 100644 index 0000000..29e35e6 --- /dev/null +++ b/crates/queue-runner/src/worker.rs @@ -0,0 +1,500 @@ +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use sqlx::PgPool; +use tokio::sync::Semaphore; + +use fc_common::config::{GcConfig, LogConfig, NotificationsConfig, SigningConfig}; +use fc_common::gc_roots::GcRoots; +use fc_common::log_storage::LogStorage; +use fc_common::models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep}; +use fc_common::repo; + +pub struct WorkerPool { + semaphore: Arc, + pool: PgPool, + work_dir: Arc, + build_timeout: Duration, + log_config: Arc, + gc_config: Arc, + notifications_config: Arc, + signing_config: Arc, + drain_token: tokio_util::sync::CancellationToken, +} + +impl WorkerPool { + pub fn new( + db_pool: PgPool, + workers: usize, + work_dir: PathBuf, + build_timeout: Duration, + log_config: LogConfig, + gc_config: GcConfig, + notifications_config: NotificationsConfig, + signing_config: SigningConfig, + ) -> Self { + Self { + semaphore: Arc::new(Semaphore::new(workers)), + pool: db_pool, + work_dir: Arc::new(work_dir), + build_timeout, + log_config: Arc::new(log_config), + gc_config: Arc::new(gc_config), + notifications_config: Arc::new(notifications_config), + signing_config: Arc::new(signing_config), + drain_token: tokio_util::sync::CancellationToken::new(), + } + } + + /// Signal all workers to stop accepting new builds. In-flight builds will finish. + pub fn drain(&self) { + self.drain_token.cancel(); + } + + /// Wait until all in-flight builds complete (semaphore fully available). + pub async fn wait_for_drain(&self) { + // Acquire all permits = all workers idle + let workers = self.semaphore.available_permits() + 1; // at least 1 + let _ = tokio::time::timeout( + Duration::from_secs(self.build_timeout.as_secs() + 60), + async { + for _ in 0..workers { + if let Ok(permit) = self.semaphore.acquire().await { + permit.forget(); // don't release back + } + } + }, + ) + .await; + } + + pub fn dispatch(&self, build: Build) { + if self.drain_token.is_cancelled() { + tracing::info!(build_id = %build.id, "Drain in progress, not dispatching"); + return; + } + + let semaphore = self.semaphore.clone(); + let pool = self.pool.clone(); + let work_dir = self.work_dir.clone(); + let timeout = self.build_timeout; + let log_config = self.log_config.clone(); + let gc_config = self.gc_config.clone(); + let notifications_config = self.notifications_config.clone(); + let signing_config = self.signing_config.clone(); + + tokio::spawn(async move { + let _permit = match semaphore.acquire().await { + Ok(p) => p, + Err(_) => return, + }; + + if let Err(e) = run_build( + &pool, + &build, + &work_dir, + timeout, + &log_config, + &gc_config, + ¬ifications_config, + &signing_config, + ) + .await + { + tracing::error!(build_id = %build.id, "Build dispatch failed: {e}"); + } + }); + } +} + +/// Query nix path-info for narHash and narSize of an output path. +async fn get_path_info(output_path: &str) -> Option<(String, i64)> { + let output = tokio::process::Command::new("nix") + .args(["path-info", "--json", output_path]) + .output() + .await + .ok()?; + + if !output.status.success() { + return None; + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let parsed: serde_json::Value = serde_json::from_str(&stdout).ok()?; + + let entry = parsed.as_array()?.first()?; + let nar_hash = entry.get("narHash")?.as_str()?.to_string(); + let nar_size = entry.get("narSize")?.as_i64()?; + + Some((nar_hash, nar_size)) +} + +/// Look up the project that owns a build (build -> evaluation -> jobset -> project). +async fn get_project_for_build( + pool: &PgPool, + build: &Build, +) -> Option<(fc_common::models::Project, String)> { + let eval = repo::evaluations::get(pool, build.evaluation_id) + .await + .ok()?; + let jobset = repo::jobsets::get(pool, eval.jobset_id).await.ok()?; + let project = repo::projects::get(pool, jobset.project_id).await.ok()?; + Some((project, eval.commit_hash)) +} + +/// Sign nix store outputs using the configured signing key. +async fn sign_outputs(output_paths: &[String], signing_config: &SigningConfig) -> bool { + let key_file = match &signing_config.key_file { + Some(kf) if signing_config.enabled && kf.exists() => kf, + _ => return false, + }; + + for output_path in output_paths { + let result = tokio::process::Command::new("nix") + .args([ + "store", + "sign", + "--key-file", + &key_file.to_string_lossy(), + output_path, + ]) + .output() + .await; + + match result { + Ok(o) if o.status.success() => { + tracing::debug!(output = output_path, "Signed store path"); + } + Ok(o) => { + let stderr = String::from_utf8_lossy(&o.stderr); + tracing::warn!(output = output_path, "Failed to sign: {stderr}"); + } + Err(e) => { + tracing::warn!(output = output_path, "Failed to run nix store sign: {e}"); + } + } + } + true +} + +/// Try to run the build on a remote builder if one is available for the build's system. +async fn try_remote_build( + pool: &PgPool, + build: &Build, + work_dir: &std::path::Path, + timeout: Duration, + live_log_path: Option<&std::path::Path>, +) -> Option { + let system = build.system.as_deref()?; + + let builders = repo::remote_builders::find_for_system(pool, system) + .await + .ok()?; + + for builder in &builders { + tracing::info!( + build_id = %build.id, + builder = %builder.name, + "Attempting remote build on {}", + builder.ssh_uri, + ); + + // Set builder_id + let _ = repo::builds::set_builder(pool, build.id, builder.id).await; + + // Build remotely via --store + let store_uri = format!("ssh://{}", builder.ssh_uri); + let result = crate::builder::run_nix_build_remote( + &build.drv_path, + work_dir, + timeout, + &store_uri, + builder.ssh_key_file.as_deref(), + live_log_path, + ) + .await; + + match result { + Ok(r) => return Some(r), + Err(e) => { + tracing::warn!( + build_id = %build.id, + builder = %builder.name, + "Remote build failed: {e}, trying next builder" + ); + } + } + } + + None +} + +async fn run_build( + pool: &PgPool, + build: &Build, + work_dir: &std::path::Path, + timeout: Duration, + log_config: &LogConfig, + gc_config: &GcConfig, + notifications_config: &NotificationsConfig, + signing_config: &SigningConfig, +) -> anyhow::Result<()> { + // Atomically claim the build + let claimed = repo::builds::start(pool, build.id).await?; + if claimed.is_none() { + tracing::debug!(build_id = %build.id, "Build already claimed, skipping"); + return Ok(()); + } + + tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build"); + + // Create a build step record + let step = repo::build_steps::create( + pool, + CreateBuildStep { + build_id: build.id, + step_number: 1, + command: format!("nix build --no-link --print-out-paths {}", build.drv_path), + }, + ) + .await?; + + // Set up live log path + let live_log_path = log_config.log_dir.join(format!("{}.active.log", build.id)); + let _ = tokio::fs::create_dir_all(&log_config.log_dir).await; + + // Try remote build first, then fall back to local + let result = if build.system.is_some() { + match try_remote_build(pool, build, work_dir, timeout, Some(&live_log_path)).await { + Some(r) => Ok(r), + None => { + // No remote builder available or all failed — build locally + crate::builder::run_nix_build( + &build.drv_path, + work_dir, + timeout, + Some(&live_log_path), + ) + .await + } + } + } else { + crate::builder::run_nix_build(&build.drv_path, work_dir, timeout, Some(&live_log_path)) + .await + }; + + // Initialize log storage + let log_storage = LogStorage::new(log_config.log_dir.clone()).ok(); + + match result { + Ok(build_result) => { + // Complete the build step + let exit_code = if build_result.success { 0 } else { 1 }; + repo::build_steps::complete( + pool, + step.id, + exit_code, + Some(&build_result.stdout), + Some(&build_result.stderr), + ) + .await?; + + // Create sub-step records from parsed nix log + for (i, sub_step) in build_result.sub_steps.iter().enumerate() { + let sub = repo::build_steps::create( + pool, + CreateBuildStep { + build_id: build.id, + step_number: (i as i32) + 2, + command: format!("nix build {}", sub_step.drv_path), + }, + ) + .await?; + let sub_exit = if sub_step.success { 0 } else { 1 }; + repo::build_steps::complete(pool, sub.id, sub_exit, None, None).await?; + } + + // Write build log (rename active log to final) + let log_path = if let Some(ref storage) = log_storage { + let final_path = storage.log_path(&build.id); + if live_log_path.exists() { + let _ = tokio::fs::rename(&live_log_path, &final_path).await; + } else { + match storage.write_log(&build.id, &build_result.stdout, &build_result.stderr) { + Ok(_) => {} + Err(e) => { + tracing::warn!(build_id = %build.id, "Failed to write build log: {e}"); + } + } + } + Some(final_path.to_string_lossy().to_string()) + } else { + None + }; + + if build_result.success { + // Parse output names from build's outputs JSON + let output_names: Vec = build + .outputs + .as_ref() + .and_then(|v| v.as_object()) + .map(|obj| obj.keys().cloned().collect()) + .unwrap_or_default(); + + // Register GC roots and create build products for each output + for (i, output_path) in build_result.output_paths.iter().enumerate() { + let output_name = output_names.get(i).cloned().unwrap_or_else(|| { + if i == 0 { + build.job_name.clone() + } else { + format!("{}-{i}", build.job_name) + } + }); + + // Register GC root + let mut gc_root_path = None; + if let Ok(gc_roots) = + GcRoots::new(gc_config.gc_roots_dir.clone(), gc_config.enabled) + { + let gc_id = if i == 0 { + build.id + } else { + uuid::Uuid::new_v4() + }; + match gc_roots.register(&gc_id, output_path) { + Ok(Some(link_path)) => { + gc_root_path = Some(link_path.to_string_lossy().to_string()); + } + Ok(None) => {} + Err(e) => { + tracing::warn!(build_id = %build.id, "Failed to register GC root: {e}"); + } + } + } + + // Get metadata from nix path-info + let (sha256_hash, file_size) = match get_path_info(output_path).await { + Some((hash, size)) => (Some(hash), Some(size)), + None => (None, None), + }; + + let product = repo::build_products::create( + pool, + CreateBuildProduct { + build_id: build.id, + name: output_name, + path: output_path.clone(), + sha256_hash, + file_size, + content_type: None, + is_directory: true, + }, + ) + .await?; + + // Update the build product with GC root path if registered + if gc_root_path.is_some() { + sqlx::query("UPDATE build_products SET gc_root_path = $1 WHERE id = $2") + .bind(&gc_root_path) + .bind(product.id) + .execute(pool) + .await?; + } + } + + // Sign outputs at build time + if sign_outputs(&build_result.output_paths, signing_config).await { + let _ = repo::builds::mark_signed(pool, build.id).await; + } + + let primary_output = build_result.output_paths.first().map(|s| s.as_str()); + + repo::builds::complete( + pool, + build.id, + BuildStatus::Completed, + log_path.as_deref(), + primary_output, + None, + ) + .await?; + + tracing::info!(build_id = %build.id, "Build completed successfully"); + } else { + // Check if we should retry + if build.retry_count < build.max_retries { + tracing::info!( + build_id = %build.id, + retry = build.retry_count + 1, + max = build.max_retries, + "Build failed, scheduling retry" + ); + sqlx::query( + "UPDATE builds SET status = 'pending', started_at = NULL, \ + retry_count = retry_count + 1, completed_at = NULL \ + WHERE id = $1", + ) + .bind(build.id) + .execute(pool) + .await?; + // Clean up live log + let _ = tokio::fs::remove_file(&live_log_path).await; + return Ok(()); + } + + repo::builds::complete( + pool, + build.id, + BuildStatus::Failed, + log_path.as_deref(), + None, + Some(&build_result.stderr), + ) + .await?; + + tracing::warn!(build_id = %build.id, "Build failed"); + } + } + Err(e) => { + let msg = e.to_string(); + + // Write error log + if let Some(ref storage) = log_storage { + let _ = storage.write_log(&build.id, "", &msg); + } + // Clean up live log + let _ = tokio::fs::remove_file(&live_log_path).await; + + repo::build_steps::complete(pool, step.id, 1, None, Some(&msg)).await?; + repo::builds::complete(pool, build.id, BuildStatus::Failed, None, None, Some(&msg)) + .await?; + tracing::error!(build_id = %build.id, "Build error: {msg}"); + } + } + + // Dispatch notifications after build completion + let updated_build = repo::builds::get(pool, build.id).await?; + if updated_build.status == BuildStatus::Completed || updated_build.status == BuildStatus::Failed + { + if let Some((project, commit_hash)) = get_project_for_build(pool, build).await { + fc_common::notifications::dispatch_build_finished( + &updated_build, + &project, + &commit_hash, + notifications_config, + ) + .await; + } + + // Auto-promote channels if all builds in the evaluation are done + if updated_build.status == BuildStatus::Completed { + if let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await { + let _ = + repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id).await; + } + } + } + + Ok(()) +} diff --git a/crates/queue-runner/tests/runner_tests.rs b/crates/queue-runner/tests/runner_tests.rs new file mode 100644 index 0000000..eec8332 --- /dev/null +++ b/crates/queue-runner/tests/runner_tests.rs @@ -0,0 +1,285 @@ +//! Tests for the queue runner. +//! Nix log parsing tests require no external binaries. +//! Database tests require TEST_DATABASE_URL. + +// --- Nix log line parsing --- + +#[test] +fn test_parse_nix_log_start() { + let line = r#"@nix {"action":"start","derivation":"/nix/store/abc-hello.drv"}"#; + let result = fc_queue_runner::builder::parse_nix_log_line(line); + assert!(result.is_some()); + let (action, drv) = result.unwrap(); + assert_eq!(action, "start"); + assert_eq!(drv, "/nix/store/abc-hello.drv"); +} + +#[test] +fn test_parse_nix_log_stop() { + let line = r#"@nix {"action":"stop","derivation":"/nix/store/abc-hello.drv"}"#; + let result = fc_queue_runner::builder::parse_nix_log_line(line); + assert!(result.is_some()); + let (action, drv) = result.unwrap(); + assert_eq!(action, "stop"); + assert_eq!(drv, "/nix/store/abc-hello.drv"); +} + +#[test] +fn test_parse_nix_log_unknown_action() { + let line = r#"@nix {"action":"msg","msg":"building..."}"#; + let result = fc_queue_runner::builder::parse_nix_log_line(line); + assert!(result.is_none()); +} + +#[test] +fn test_parse_nix_log_not_nix_prefix() { + let line = "building '/nix/store/abc-hello.drv'..."; + let result = fc_queue_runner::builder::parse_nix_log_line(line); + assert!(result.is_none()); +} + +#[test] +fn test_parse_nix_log_invalid_json() { + let line = "@nix {invalid json}"; + let result = fc_queue_runner::builder::parse_nix_log_line(line); + assert!(result.is_none()); +} + +#[test] +fn test_parse_nix_log_no_derivation_field() { + let line = r#"@nix {"action":"start","type":"build"}"#; + let result = fc_queue_runner::builder::parse_nix_log_line(line); + assert!(result.is_none()); +} + +#[test] +fn test_parse_nix_log_empty_line() { + let result = fc_queue_runner::builder::parse_nix_log_line(""); + assert!(result.is_none()); +} + +// --- WorkerPool drain --- + +#[tokio::test] +async fn test_worker_pool_drain_stops_dispatch() { + // Create a minimal worker pool + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping: TEST_DATABASE_URL not set"); + return; + } + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(1) + .connect(&url) + .await + .expect("failed to connect"); + + let worker_pool = fc_queue_runner::worker::WorkerPool::new( + pool, + 2, + std::env::temp_dir(), + std::time::Duration::from_secs(60), + fc_common::config::LogConfig::default(), + fc_common::config::GcConfig::default(), + fc_common::config::NotificationsConfig::default(), + fc_common::config::SigningConfig::default(), + ); + + // Drain should not panic + worker_pool.drain(); + + // After drain, dispatching should be a no-op (build won't start) + // We can't easily test this without a real build, but at least verify drain doesn't crash +} + +// --- Database-dependent tests --- + +#[tokio::test] +async fn test_atomic_build_claiming() { + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping: TEST_DATABASE_URL not set"); + return; + } + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&url) + .await + .expect("failed to connect"); + + sqlx::migrate!("../common/migrations") + .run(&pool) + .await + .expect("migration failed"); + + // Create a project -> jobset -> evaluation -> build chain + let project = fc_common::repo::projects::create( + &pool, + fc_common::models::CreateProject { + name: format!("runner-test-{}", uuid::Uuid::new_v4()), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create project"); + + let jobset = fc_common::repo::jobsets::create( + &pool, + fc_common::models::CreateJobset { + project_id: project.id, + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: None, + }, + ) + .await + .expect("create jobset"); + + let eval = fc_common::repo::evaluations::create( + &pool, + fc_common::models::CreateEvaluation { + jobset_id: jobset.id, + commit_hash: "abcdef1234567890abcdef1234567890abcdef12".to_string(), + }, + ) + .await + .expect("create eval"); + + let build = fc_common::repo::builds::create( + &pool, + fc_common::models::CreateBuild { + evaluation_id: eval.id, + job_name: "test-build".to_string(), + drv_path: "/nix/store/test-runner-test.drv".to_string(), + system: Some("x86_64-linux".to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }, + ) + .await + .expect("create build"); + + assert_eq!(build.status, fc_common::models::BuildStatus::Pending); + + // First claim should succeed + let claimed = fc_common::repo::builds::start(&pool, build.id) + .await + .expect("start build"); + assert!(claimed.is_some()); + + // Second claim should return None (already claimed) + let claimed2 = fc_common::repo::builds::start(&pool, build.id) + .await + .expect("start build again"); + assert!(claimed2.is_none()); + + // Clean up + let _ = fc_common::repo::projects::delete(&pool, project.id).await; +} + +#[tokio::test] +async fn test_orphan_build_reset() { + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping: TEST_DATABASE_URL not set"); + return; + } + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&url) + .await + .expect("failed to connect"); + + sqlx::migrate!("../common/migrations") + .run(&pool) + .await + .expect("migration failed"); + + let project = fc_common::repo::projects::create( + &pool, + fc_common::models::CreateProject { + name: format!("orphan-test-{}", uuid::Uuid::new_v4()), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create project"); + + let jobset = fc_common::repo::jobsets::create( + &pool, + fc_common::models::CreateJobset { + project_id: project.id, + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: None, + }, + ) + .await + .expect("create jobset"); + + let eval = fc_common::repo::evaluations::create( + &pool, + fc_common::models::CreateEvaluation { + jobset_id: jobset.id, + commit_hash: "1234567890abcdef1234567890abcdef12345678".to_string(), + }, + ) + .await + .expect("create eval"); + + // Create a build and mark it running + let build = fc_common::repo::builds::create( + &pool, + fc_common::models::CreateBuild { + evaluation_id: eval.id, + job_name: "orphan-build".to_string(), + drv_path: "/nix/store/test-orphan.drv".to_string(), + system: None, + outputs: None, + is_aggregate: None, + constituents: None, + }, + ) + .await + .expect("create build"); + + let _ = fc_common::repo::builds::start(&pool, build.id).await; + + // Simulate the build being stuck for a while by manually backdating started_at + sqlx::query("UPDATE builds SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = $1") + .bind(build.id) + .execute(&pool) + .await + .expect("backdate build"); + + // Reset orphaned builds (older than 5 minutes) + let count = fc_common::repo::builds::reset_orphaned(&pool, 300) + .await + .expect("reset orphaned"); + assert!(count >= 1, "should have reset at least 1 orphaned build"); + + // Verify build is pending again + let reset_build = fc_common::repo::builds::get(&pool, build.id) + .await + .expect("get build"); + assert_eq!(reset_build.status, fc_common::models::BuildStatus::Pending); + + // Clean up + let _ = fc_common::repo::projects::delete(&pool, project.id).await; +}