queue-runner: semaphore-based worker pool with atomic build claiming

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ie3a8d343c3705200f0cac566227db54f6a6a6964
This commit is contained in:
raf 2026-02-01 15:13:19 +03:00
commit 44d1ee1d6b
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
7 changed files with 1367 additions and 5 deletions

View file

@ -19,4 +19,10 @@ anyhow.workspace = true
thiserror.workspace = true thiserror.workspace = true
clap.workspace = true clap.workspace = true
config.workspace = true config.workspace = true
fc-common = { path = "../common" } tokio-util.workspace = true
# Our crates
fc-common.workspace = true
[dev-dependencies]
tempfile.workspace = true

View file

@ -0,0 +1,305 @@
use std::path::Path;
use std::time::Duration;
use fc_common::CiError;
use fc_common::error::Result;
use tokio::io::{AsyncBufReadExt, BufReader};
const MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100MB
/// Run a build on a remote machine via `nix build --store ssh://...`.
pub async fn run_nix_build_remote(
drv_path: &str,
work_dir: &Path,
timeout: Duration,
store_uri: &str,
ssh_key_file: Option<&str>,
live_log_path: Option<&Path>,
) -> Result<BuildResult> {
let result = tokio::time::timeout(timeout, async {
let mut cmd = tokio::process::Command::new("nix");
cmd.args([
"build",
"--no-link",
"--print-out-paths",
"--log-format",
"internal-json",
"--option",
"sandbox",
"true",
"--max-build-log-size",
"104857600",
"--store",
store_uri,
drv_path,
])
.current_dir(work_dir)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(key_file) = ssh_key_file {
cmd.env(
"NIX_SSHOPTS",
format!("-i {key_file} -o StrictHostKeyChecking=accept-new"),
);
}
let mut child = cmd
.spawn()
.map_err(|e| CiError::Build(format!("Failed to run remote nix build: {e}")))?;
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();
let stdout_task = tokio::spawn(async move {
let mut buf = String::new();
if let Some(stdout) = stdout_handle {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
buf.push_str(&line);
line.clear();
}
}
buf
});
let live_log_path_owned = live_log_path.map(|p| p.to_path_buf());
let stderr_task = tokio::spawn(async move {
let mut buf = String::new();
let steps: Vec<SubStep> = Vec::new();
let mut log_file = if let Some(ref path) = live_log_path_owned {
tokio::fs::File::create(path).await.ok()
} else {
None
};
if let Some(stderr) = stderr_handle {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
if let Some(ref mut f) = log_file {
use tokio::io::AsyncWriteExt;
let _ = f.write_all(line.as_bytes()).await;
let _ = f.flush().await;
}
if buf.len() < MAX_LOG_SIZE {
buf.push_str(&line);
}
line.clear();
}
}
(buf, steps)
});
let stdout_buf = stdout_task.await.unwrap_or_default();
let (stderr_buf, sub_steps) = stderr_task.await.unwrap_or_default();
let status = child
.wait()
.await
.map_err(|e| CiError::Build(format!("Failed to wait for remote nix build: {e}")))?;
let output_paths: Vec<String> = stdout_buf
.lines()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
Ok::<_, CiError>(BuildResult {
success: status.success(),
stdout: stdout_buf,
stderr: stderr_buf,
output_paths,
sub_steps,
})
})
.await;
match result {
Ok(inner) => inner,
Err(_) => Err(CiError::Timeout(format!(
"Remote build timed out after {timeout:?}"
))),
}
}
pub struct BuildResult {
pub success: bool,
pub stdout: String,
pub stderr: String,
pub output_paths: Vec<String>,
pub sub_steps: Vec<SubStep>,
}
/// A sub-step parsed from nix's internal JSON log format.
pub struct SubStep {
pub drv_path: String,
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
pub success: bool,
}
/// Parse a single nix internal JSON log line (`@nix {...}`).
/// Returns `Some(action, drv_path)` if the line contains a derivation action.
pub fn parse_nix_log_line(line: &str) -> Option<(&'static str, String)> {
let json_str = line.strip_prefix("@nix ")?.trim();
let parsed: serde_json::Value = serde_json::from_str(json_str).ok()?;
let action = parsed.get("action")?.as_str()?;
let drv = parsed.get("derivation")?.as_str()?.to_string();
match action {
"start" => Some(("start", drv)),
"stop" => Some(("stop", drv)),
_ => None,
}
}
/// Run `nix build` for a derivation path.
/// If `live_log_path` is provided, build output is streamed to that file incrementally.
pub async fn run_nix_build(
drv_path: &str,
work_dir: &Path,
timeout: Duration,
live_log_path: Option<&Path>,
) -> Result<BuildResult> {
let result = tokio::time::timeout(timeout, async {
let mut child = tokio::process::Command::new("nix")
.args([
"build",
"--no-link",
"--print-out-paths",
"--log-format",
"internal-json",
"--option",
"sandbox",
"true",
"--max-build-log-size",
"104857600",
drv_path,
])
.current_dir(work_dir)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| CiError::Build(format!("Failed to run nix build: {e}")))?;
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();
// Read stdout (output paths)
let stdout_task = tokio::spawn(async move {
let mut buf = String::new();
if let Some(stdout) = stdout_handle {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
buf.push_str(&line);
line.clear();
}
}
buf
});
// Read stderr (logs + internal JSON)
let live_log_path_owned = live_log_path.map(|p| p.to_path_buf());
let stderr_task = tokio::spawn(async move {
let mut buf = String::new();
let mut steps: Vec<SubStep> = Vec::new();
let mut log_file = if let Some(ref path) = live_log_path_owned {
tokio::fs::File::create(path).await.ok()
} else {
None
};
if let Some(stderr) = stderr_handle {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
// Write to live log file if available
if let Some(ref mut f) = log_file {
use tokio::io::AsyncWriteExt;
let _ = f.write_all(line.as_bytes()).await;
let _ = f.flush().await;
}
// Parse nix internal JSON log lines
if line.starts_with("@nix ") {
if let Some(json_str) = line.strip_prefix("@nix ") {
if let Ok(parsed) =
serde_json::from_str::<serde_json::Value>(json_str.trim())
{
if let Some(action) = parsed.get("action").and_then(|a| a.as_str())
{
match action {
"start" => {
if let Some(drv) =
parsed.get("derivation").and_then(|d| d.as_str())
{
steps.push(SubStep {
drv_path: drv.to_string(),
completed_at: None,
success: false,
});
}
}
"stop" => {
if let Some(drv) =
parsed.get("derivation").and_then(|d| d.as_str())
{
if let Some(step) =
steps.iter_mut().rfind(|s| s.drv_path == drv)
{
step.completed_at = Some(chrono::Utc::now());
step.success = true;
}
}
}
_ => {}
}
}
}
}
}
if buf.len() < MAX_LOG_SIZE {
buf.push_str(&line);
}
line.clear();
}
}
(buf, steps)
});
let stdout_buf = stdout_task.await.unwrap_or_default();
let (stderr_buf, sub_steps) = stderr_task.await.unwrap_or_default();
let status = child
.wait()
.await
.map_err(|e| CiError::Build(format!("Failed to wait for nix build: {e}")))?;
let output_paths: Vec<String> = stdout_buf
.lines()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
Ok::<_, CiError>(BuildResult {
success: status.success(),
stdout: stdout_buf,
stderr: stderr_buf,
output_paths,
sub_steps,
})
})
.await;
match result {
Ok(inner) => inner,
Err(_) => Err(CiError::Timeout(format!(
"Build timed out after {timeout:?}"
))),
}
}

View file

@ -0,0 +1,3 @@
pub mod builder;
pub mod runner_loop;
pub mod worker;

View file

@ -1,5 +1,13 @@
use std::time::Duration;
use clap::Parser; use clap::Parser;
use tracing_subscriber::fmt::init;
use fc_common::config::{Config, GcConfig};
use fc_common::database::Database;
use fc_common::gc_roots;
use std::sync::Arc;
use fc_queue_runner::worker::WorkerPool;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "fc-queue-runner")] #[command(name = "fc-queue-runner")]
@ -11,13 +19,143 @@ struct Cli {
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
#[allow(unused_variables, reason = "Main application logic is TODO")] tracing_subscriber::fmt::init();
let cli = Cli::parse(); let cli = Cli::parse();
tracing::info!("Starting CI Queue Runner"); tracing::info!("Starting CI Queue Runner");
init();
// TODO: Implement queue runner logic let config = Config::load()?;
let log_config = config.logs;
let gc_config = config.gc;
let gc_config_for_loop = gc_config.clone();
let notifications_config = config.notifications;
let signing_config = config.signing;
let qr_config = config.queue_runner;
let workers = cli.workers.unwrap_or(qr_config.workers);
let poll_interval = Duration::from_secs(qr_config.poll_interval);
let build_timeout = Duration::from_secs(qr_config.build_timeout);
let work_dir = qr_config.work_dir;
// Ensure the work directory exists
tokio::fs::create_dir_all(&work_dir).await?;
// Clean up orphaned active logs from previous crashes
cleanup_stale_logs(&log_config.log_dir).await;
let db = Database::new(config.database).await?;
let worker_pool = Arc::new(WorkerPool::new(
db.pool().clone(),
workers,
work_dir.clone(),
build_timeout,
log_config,
gc_config,
notifications_config,
signing_config,
));
tracing::info!(
workers = workers,
poll_interval = ?poll_interval,
build_timeout = ?build_timeout,
work_dir = %work_dir.display(),
"Queue runner configured"
);
let worker_pool_for_drain = worker_pool.clone();
tokio::select! {
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval) => {
if let Err(e) = result {
tracing::error!("Runner loop failed: {e}");
}
}
() = gc_loop(gc_config_for_loop) => {}
() = shutdown_signal() => {
tracing::info!("Shutdown signal received, draining in-flight builds...");
worker_pool_for_drain.drain();
worker_pool_for_drain.wait_for_drain().await;
tracing::info!("All in-flight builds completed");
}
}
tracing::info!("Queue runner shutting down, closing database pool");
db.close().await;
Ok(()) Ok(())
} }
async fn cleanup_stale_logs(log_dir: &std::path::Path) {
if let Ok(mut entries) = tokio::fs::read_dir(log_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
if entry.file_name().to_string_lossy().ends_with(".active.log") {
let _ = tokio::fs::remove_file(entry.path()).await;
tracing::info!("Removed stale active log: {}", entry.path().display());
}
}
}
}
async fn gc_loop(gc_config: GcConfig) {
if !gc_config.enabled {
return std::future::pending().await;
}
let interval = std::time::Duration::from_secs(gc_config.cleanup_interval);
let max_age = std::time::Duration::from_secs(gc_config.max_age_days * 86400);
loop {
tokio::time::sleep(interval).await;
match gc_roots::cleanup_old_roots(&gc_config.gc_roots_dir, max_age) {
Ok(count) if count > 0 => {
tracing::info!(count, "Cleaned up old GC roots");
// Optionally run nix-collect-garbage
match tokio::process::Command::new("nix-collect-garbage")
.output()
.await
{
Ok(output) if output.status.success() => {
tracing::info!("nix-collect-garbage completed");
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::warn!("nix-collect-garbage failed: {stderr}");
}
Err(e) => {
tracing::warn!("Failed to run nix-collect-garbage: {e}");
}
}
}
Ok(_) => {}
Err(e) => {
tracing::error!("GC cleanup failed: {e}");
}
}
}
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {},
() = terminate => {},
}
}

View file

@ -0,0 +1,125 @@
use std::sync::Arc;
use std::time::Duration;
use sqlx::PgPool;
use fc_common::models::BuildStatus;
use fc_common::repo;
use crate::worker::WorkerPool;
pub async fn run(
pool: PgPool,
worker_pool: Arc<WorkerPool>,
poll_interval: Duration,
) -> anyhow::Result<()> {
// Reset orphaned builds from previous crashes (older than 5 minutes)
match repo::builds::reset_orphaned(&pool, 300).await {
Ok(count) if count > 0 => {
tracing::warn!(count, "Reset orphaned builds back to pending");
}
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to reset orphaned builds: {e}");
}
}
loop {
match repo::builds::list_pending(&pool, 10).await {
Ok(builds) => {
if !builds.is_empty() {
tracing::info!("Found {} pending builds", builds.len());
}
for build in builds {
// Aggregate builds: check if all constituents are done
if build.is_aggregate {
match repo::build_dependencies::all_deps_completed(&pool, build.id).await {
Ok(true) => {
// All constituents done — mark aggregate as completed
tracing::info!(
build_id = %build.id,
job = %build.job_name,
"Aggregate build: all constituents completed"
);
let _ = repo::builds::start(&pool, build.id).await;
let _ = repo::builds::complete(
&pool,
build.id,
BuildStatus::Completed,
None,
None,
None,
)
.await;
continue;
}
Ok(false) => {
tracing::debug!(
build_id = %build.id,
"Aggregate build waiting for constituents"
);
continue;
}
Err(e) => {
tracing::error!(
build_id = %build.id,
"Failed to check aggregate deps: {e}"
);
continue;
}
}
}
// Derivation deduplication: reuse result if same drv was already built
match repo::builds::get_completed_by_drv_path(&pool, &build.drv_path).await {
Ok(Some(existing)) if existing.id != build.id => {
tracing::info!(
build_id = %build.id,
existing_id = %existing.id,
drv = %build.drv_path,
"Dedup: reusing result from existing build"
);
let _ = repo::builds::start(&pool, build.id).await;
let _ = repo::builds::complete(
&pool,
build.id,
BuildStatus::Completed,
existing.log_path.as_deref(),
existing.build_output_path.as_deref(),
None,
)
.await;
continue;
}
_ => {}
}
// Dependency-aware scheduling: skip if deps not met
match repo::build_dependencies::all_deps_completed(&pool, build.id).await {
Ok(true) => {}
Ok(false) => {
tracing::debug!(
build_id = %build.id,
"Build waiting for dependencies"
);
continue;
}
Err(e) => {
tracing::error!(
build_id = %build.id,
"Failed to check build deps: {e}"
);
continue;
}
}
worker_pool.dispatch(build);
}
}
Err(e) => {
tracing::error!("Failed to fetch pending builds: {e}");
}
}
tokio::time::sleep(poll_interval).await;
}
}

View file

@ -0,0 +1,500 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sqlx::PgPool;
use tokio::sync::Semaphore;
use fc_common::config::{GcConfig, LogConfig, NotificationsConfig, SigningConfig};
use fc_common::gc_roots::GcRoots;
use fc_common::log_storage::LogStorage;
use fc_common::models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep};
use fc_common::repo;
pub struct WorkerPool {
semaphore: Arc<Semaphore>,
pool: PgPool,
work_dir: Arc<PathBuf>,
build_timeout: Duration,
log_config: Arc<LogConfig>,
gc_config: Arc<GcConfig>,
notifications_config: Arc<NotificationsConfig>,
signing_config: Arc<SigningConfig>,
drain_token: tokio_util::sync::CancellationToken,
}
impl WorkerPool {
pub fn new(
db_pool: PgPool,
workers: usize,
work_dir: PathBuf,
build_timeout: Duration,
log_config: LogConfig,
gc_config: GcConfig,
notifications_config: NotificationsConfig,
signing_config: SigningConfig,
) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(workers)),
pool: db_pool,
work_dir: Arc::new(work_dir),
build_timeout,
log_config: Arc::new(log_config),
gc_config: Arc::new(gc_config),
notifications_config: Arc::new(notifications_config),
signing_config: Arc::new(signing_config),
drain_token: tokio_util::sync::CancellationToken::new(),
}
}
/// Signal all workers to stop accepting new builds. In-flight builds will finish.
pub fn drain(&self) {
self.drain_token.cancel();
}
/// Wait until all in-flight builds complete (semaphore fully available).
pub async fn wait_for_drain(&self) {
// Acquire all permits = all workers idle
let workers = self.semaphore.available_permits() + 1; // at least 1
let _ = tokio::time::timeout(
Duration::from_secs(self.build_timeout.as_secs() + 60),
async {
for _ in 0..workers {
if let Ok(permit) = self.semaphore.acquire().await {
permit.forget(); // don't release back
}
}
},
)
.await;
}
pub fn dispatch(&self, build: Build) {
if self.drain_token.is_cancelled() {
tracing::info!(build_id = %build.id, "Drain in progress, not dispatching");
return;
}
let semaphore = self.semaphore.clone();
let pool = self.pool.clone();
let work_dir = self.work_dir.clone();
let timeout = self.build_timeout;
let log_config = self.log_config.clone();
let gc_config = self.gc_config.clone();
let notifications_config = self.notifications_config.clone();
let signing_config = self.signing_config.clone();
tokio::spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(p) => p,
Err(_) => return,
};
if let Err(e) = run_build(
&pool,
&build,
&work_dir,
timeout,
&log_config,
&gc_config,
&notifications_config,
&signing_config,
)
.await
{
tracing::error!(build_id = %build.id, "Build dispatch failed: {e}");
}
});
}
}
/// Query nix path-info for narHash and narSize of an output path.
async fn get_path_info(output_path: &str) -> Option<(String, i64)> {
let output = tokio::process::Command::new("nix")
.args(["path-info", "--json", output_path])
.output()
.await
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
let parsed: serde_json::Value = serde_json::from_str(&stdout).ok()?;
let entry = parsed.as_array()?.first()?;
let nar_hash = entry.get("narHash")?.as_str()?.to_string();
let nar_size = entry.get("narSize")?.as_i64()?;
Some((nar_hash, nar_size))
}
/// Look up the project that owns a build (build -> evaluation -> jobset -> project).
async fn get_project_for_build(
pool: &PgPool,
build: &Build,
) -> Option<(fc_common::models::Project, String)> {
let eval = repo::evaluations::get(pool, build.evaluation_id)
.await
.ok()?;
let jobset = repo::jobsets::get(pool, eval.jobset_id).await.ok()?;
let project = repo::projects::get(pool, jobset.project_id).await.ok()?;
Some((project, eval.commit_hash))
}
/// Sign nix store outputs using the configured signing key.
async fn sign_outputs(output_paths: &[String], signing_config: &SigningConfig) -> bool {
let key_file = match &signing_config.key_file {
Some(kf) if signing_config.enabled && kf.exists() => kf,
_ => return false,
};
for output_path in output_paths {
let result = tokio::process::Command::new("nix")
.args([
"store",
"sign",
"--key-file",
&key_file.to_string_lossy(),
output_path,
])
.output()
.await;
match result {
Ok(o) if o.status.success() => {
tracing::debug!(output = output_path, "Signed store path");
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!(output = output_path, "Failed to sign: {stderr}");
}
Err(e) => {
tracing::warn!(output = output_path, "Failed to run nix store sign: {e}");
}
}
}
true
}
/// Try to run the build on a remote builder if one is available for the build's system.
async fn try_remote_build(
pool: &PgPool,
build: &Build,
work_dir: &std::path::Path,
timeout: Duration,
live_log_path: Option<&std::path::Path>,
) -> Option<crate::builder::BuildResult> {
let system = build.system.as_deref()?;
let builders = repo::remote_builders::find_for_system(pool, system)
.await
.ok()?;
for builder in &builders {
tracing::info!(
build_id = %build.id,
builder = %builder.name,
"Attempting remote build on {}",
builder.ssh_uri,
);
// Set builder_id
let _ = repo::builds::set_builder(pool, build.id, builder.id).await;
// Build remotely via --store
let store_uri = format!("ssh://{}", builder.ssh_uri);
let result = crate::builder::run_nix_build_remote(
&build.drv_path,
work_dir,
timeout,
&store_uri,
builder.ssh_key_file.as_deref(),
live_log_path,
)
.await;
match result {
Ok(r) => return Some(r),
Err(e) => {
tracing::warn!(
build_id = %build.id,
builder = %builder.name,
"Remote build failed: {e}, trying next builder"
);
}
}
}
None
}
async fn run_build(
pool: &PgPool,
build: &Build,
work_dir: &std::path::Path,
timeout: Duration,
log_config: &LogConfig,
gc_config: &GcConfig,
notifications_config: &NotificationsConfig,
signing_config: &SigningConfig,
) -> anyhow::Result<()> {
// Atomically claim the build
let claimed = repo::builds::start(pool, build.id).await?;
if claimed.is_none() {
tracing::debug!(build_id = %build.id, "Build already claimed, skipping");
return Ok(());
}
tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build");
// Create a build step record
let step = repo::build_steps::create(
pool,
CreateBuildStep {
build_id: build.id,
step_number: 1,
command: format!("nix build --no-link --print-out-paths {}", build.drv_path),
},
)
.await?;
// Set up live log path
let live_log_path = log_config.log_dir.join(format!("{}.active.log", build.id));
let _ = tokio::fs::create_dir_all(&log_config.log_dir).await;
// Try remote build first, then fall back to local
let result = if build.system.is_some() {
match try_remote_build(pool, build, work_dir, timeout, Some(&live_log_path)).await {
Some(r) => Ok(r),
None => {
// No remote builder available or all failed — build locally
crate::builder::run_nix_build(
&build.drv_path,
work_dir,
timeout,
Some(&live_log_path),
)
.await
}
}
} else {
crate::builder::run_nix_build(&build.drv_path, work_dir, timeout, Some(&live_log_path))
.await
};
// Initialize log storage
let log_storage = LogStorage::new(log_config.log_dir.clone()).ok();
match result {
Ok(build_result) => {
// Complete the build step
let exit_code = if build_result.success { 0 } else { 1 };
repo::build_steps::complete(
pool,
step.id,
exit_code,
Some(&build_result.stdout),
Some(&build_result.stderr),
)
.await?;
// Create sub-step records from parsed nix log
for (i, sub_step) in build_result.sub_steps.iter().enumerate() {
let sub = repo::build_steps::create(
pool,
CreateBuildStep {
build_id: build.id,
step_number: (i as i32) + 2,
command: format!("nix build {}", sub_step.drv_path),
},
)
.await?;
let sub_exit = if sub_step.success { 0 } else { 1 };
repo::build_steps::complete(pool, sub.id, sub_exit, None, None).await?;
}
// Write build log (rename active log to final)
let log_path = if let Some(ref storage) = log_storage {
let final_path = storage.log_path(&build.id);
if live_log_path.exists() {
let _ = tokio::fs::rename(&live_log_path, &final_path).await;
} else {
match storage.write_log(&build.id, &build_result.stdout, &build_result.stderr) {
Ok(_) => {}
Err(e) => {
tracing::warn!(build_id = %build.id, "Failed to write build log: {e}");
}
}
}
Some(final_path.to_string_lossy().to_string())
} else {
None
};
if build_result.success {
// Parse output names from build's outputs JSON
let output_names: Vec<String> = build
.outputs
.as_ref()
.and_then(|v| v.as_object())
.map(|obj| obj.keys().cloned().collect())
.unwrap_or_default();
// Register GC roots and create build products for each output
for (i, output_path) in build_result.output_paths.iter().enumerate() {
let output_name = output_names.get(i).cloned().unwrap_or_else(|| {
if i == 0 {
build.job_name.clone()
} else {
format!("{}-{i}", build.job_name)
}
});
// Register GC root
let mut gc_root_path = None;
if let Ok(gc_roots) =
GcRoots::new(gc_config.gc_roots_dir.clone(), gc_config.enabled)
{
let gc_id = if i == 0 {
build.id
} else {
uuid::Uuid::new_v4()
};
match gc_roots.register(&gc_id, output_path) {
Ok(Some(link_path)) => {
gc_root_path = Some(link_path.to_string_lossy().to_string());
}
Ok(None) => {}
Err(e) => {
tracing::warn!(build_id = %build.id, "Failed to register GC root: {e}");
}
}
}
// Get metadata from nix path-info
let (sha256_hash, file_size) = match get_path_info(output_path).await {
Some((hash, size)) => (Some(hash), Some(size)),
None => (None, None),
};
let product = repo::build_products::create(
pool,
CreateBuildProduct {
build_id: build.id,
name: output_name,
path: output_path.clone(),
sha256_hash,
file_size,
content_type: None,
is_directory: true,
},
)
.await?;
// Update the build product with GC root path if registered
if gc_root_path.is_some() {
sqlx::query("UPDATE build_products SET gc_root_path = $1 WHERE id = $2")
.bind(&gc_root_path)
.bind(product.id)
.execute(pool)
.await?;
}
}
// Sign outputs at build time
if sign_outputs(&build_result.output_paths, signing_config).await {
let _ = repo::builds::mark_signed(pool, build.id).await;
}
let primary_output = build_result.output_paths.first().map(|s| s.as_str());
repo::builds::complete(
pool,
build.id,
BuildStatus::Completed,
log_path.as_deref(),
primary_output,
None,
)
.await?;
tracing::info!(build_id = %build.id, "Build completed successfully");
} else {
// Check if we should retry
if build.retry_count < build.max_retries {
tracing::info!(
build_id = %build.id,
retry = build.retry_count + 1,
max = build.max_retries,
"Build failed, scheduling retry"
);
sqlx::query(
"UPDATE builds SET status = 'pending', started_at = NULL, \
retry_count = retry_count + 1, completed_at = NULL \
WHERE id = $1",
)
.bind(build.id)
.execute(pool)
.await?;
// Clean up live log
let _ = tokio::fs::remove_file(&live_log_path).await;
return Ok(());
}
repo::builds::complete(
pool,
build.id,
BuildStatus::Failed,
log_path.as_deref(),
None,
Some(&build_result.stderr),
)
.await?;
tracing::warn!(build_id = %build.id, "Build failed");
}
}
Err(e) => {
let msg = e.to_string();
// Write error log
if let Some(ref storage) = log_storage {
let _ = storage.write_log(&build.id, "", &msg);
}
// Clean up live log
let _ = tokio::fs::remove_file(&live_log_path).await;
repo::build_steps::complete(pool, step.id, 1, None, Some(&msg)).await?;
repo::builds::complete(pool, build.id, BuildStatus::Failed, None, None, Some(&msg))
.await?;
tracing::error!(build_id = %build.id, "Build error: {msg}");
}
}
// Dispatch notifications after build completion
let updated_build = repo::builds::get(pool, build.id).await?;
if updated_build.status == BuildStatus::Completed || updated_build.status == BuildStatus::Failed
{
if let Some((project, commit_hash)) = get_project_for_build(pool, build).await {
fc_common::notifications::dispatch_build_finished(
&updated_build,
&project,
&commit_hash,
notifications_config,
)
.await;
}
// Auto-promote channels if all builds in the evaluation are done
if updated_build.status == BuildStatus::Completed {
if let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await {
let _ =
repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id).await;
}
}
}
Ok(())
}

View file

@ -0,0 +1,285 @@
//! Tests for the queue runner.
//! Nix log parsing tests require no external binaries.
//! Database tests require TEST_DATABASE_URL.
// --- Nix log line parsing ---
#[test]
fn test_parse_nix_log_start() {
let line = r#"@nix {"action":"start","derivation":"/nix/store/abc-hello.drv"}"#;
let result = fc_queue_runner::builder::parse_nix_log_line(line);
assert!(result.is_some());
let (action, drv) = result.unwrap();
assert_eq!(action, "start");
assert_eq!(drv, "/nix/store/abc-hello.drv");
}
#[test]
fn test_parse_nix_log_stop() {
let line = r#"@nix {"action":"stop","derivation":"/nix/store/abc-hello.drv"}"#;
let result = fc_queue_runner::builder::parse_nix_log_line(line);
assert!(result.is_some());
let (action, drv) = result.unwrap();
assert_eq!(action, "stop");
assert_eq!(drv, "/nix/store/abc-hello.drv");
}
#[test]
fn test_parse_nix_log_unknown_action() {
let line = r#"@nix {"action":"msg","msg":"building..."}"#;
let result = fc_queue_runner::builder::parse_nix_log_line(line);
assert!(result.is_none());
}
#[test]
fn test_parse_nix_log_not_nix_prefix() {
let line = "building '/nix/store/abc-hello.drv'...";
let result = fc_queue_runner::builder::parse_nix_log_line(line);
assert!(result.is_none());
}
#[test]
fn test_parse_nix_log_invalid_json() {
let line = "@nix {invalid json}";
let result = fc_queue_runner::builder::parse_nix_log_line(line);
assert!(result.is_none());
}
#[test]
fn test_parse_nix_log_no_derivation_field() {
let line = r#"@nix {"action":"start","type":"build"}"#;
let result = fc_queue_runner::builder::parse_nix_log_line(line);
assert!(result.is_none());
}
#[test]
fn test_parse_nix_log_empty_line() {
let result = fc_queue_runner::builder::parse_nix_log_line("");
assert!(result.is_none());
}
// --- WorkerPool drain ---
#[tokio::test]
async fn test_worker_pool_drain_stops_dispatch() {
// Create a minimal worker pool
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
}
};
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("failed to connect");
let worker_pool = fc_queue_runner::worker::WorkerPool::new(
pool,
2,
std::env::temp_dir(),
std::time::Duration::from_secs(60),
fc_common::config::LogConfig::default(),
fc_common::config::GcConfig::default(),
fc_common::config::NotificationsConfig::default(),
fc_common::config::SigningConfig::default(),
);
// Drain should not panic
worker_pool.drain();
// After drain, dispatching should be a no-op (build won't start)
// We can't easily test this without a real build, but at least verify drain doesn't crash
}
// --- Database-dependent tests ---
#[tokio::test]
async fn test_atomic_build_claiming() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
}
};
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&url)
.await
.expect("failed to connect");
sqlx::migrate!("../common/migrations")
.run(&pool)
.await
.expect("migration failed");
// Create a project -> jobset -> evaluation -> build chain
let project = fc_common::repo::projects::create(
&pool,
fc_common::models::CreateProject {
name: format!("runner-test-{}", uuid::Uuid::new_v4()),
description: None,
repository_url: "https://github.com/test/repo".to_string(),
},
)
.await
.expect("create project");
let jobset = fc_common::repo::jobsets::create(
&pool,
fc_common::models::CreateJobset {
project_id: project.id,
name: "main".to_string(),
nix_expression: "packages".to_string(),
enabled: None,
flake_mode: None,
check_interval: None,
},
)
.await
.expect("create jobset");
let eval = fc_common::repo::evaluations::create(
&pool,
fc_common::models::CreateEvaluation {
jobset_id: jobset.id,
commit_hash: "abcdef1234567890abcdef1234567890abcdef12".to_string(),
},
)
.await
.expect("create eval");
let build = fc_common::repo::builds::create(
&pool,
fc_common::models::CreateBuild {
evaluation_id: eval.id,
job_name: "test-build".to_string(),
drv_path: "/nix/store/test-runner-test.drv".to_string(),
system: Some("x86_64-linux".to_string()),
outputs: None,
is_aggregate: None,
constituents: None,
},
)
.await
.expect("create build");
assert_eq!(build.status, fc_common::models::BuildStatus::Pending);
// First claim should succeed
let claimed = fc_common::repo::builds::start(&pool, build.id)
.await
.expect("start build");
assert!(claimed.is_some());
// Second claim should return None (already claimed)
let claimed2 = fc_common::repo::builds::start(&pool, build.id)
.await
.expect("start build again");
assert!(claimed2.is_none());
// Clean up
let _ = fc_common::repo::projects::delete(&pool, project.id).await;
}
#[tokio::test]
async fn test_orphan_build_reset() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
}
};
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&url)
.await
.expect("failed to connect");
sqlx::migrate!("../common/migrations")
.run(&pool)
.await
.expect("migration failed");
let project = fc_common::repo::projects::create(
&pool,
fc_common::models::CreateProject {
name: format!("orphan-test-{}", uuid::Uuid::new_v4()),
description: None,
repository_url: "https://github.com/test/repo".to_string(),
},
)
.await
.expect("create project");
let jobset = fc_common::repo::jobsets::create(
&pool,
fc_common::models::CreateJobset {
project_id: project.id,
name: "main".to_string(),
nix_expression: "packages".to_string(),
enabled: None,
flake_mode: None,
check_interval: None,
},
)
.await
.expect("create jobset");
let eval = fc_common::repo::evaluations::create(
&pool,
fc_common::models::CreateEvaluation {
jobset_id: jobset.id,
commit_hash: "1234567890abcdef1234567890abcdef12345678".to_string(),
},
)
.await
.expect("create eval");
// Create a build and mark it running
let build = fc_common::repo::builds::create(
&pool,
fc_common::models::CreateBuild {
evaluation_id: eval.id,
job_name: "orphan-build".to_string(),
drv_path: "/nix/store/test-orphan.drv".to_string(),
system: None,
outputs: None,
is_aggregate: None,
constituents: None,
},
)
.await
.expect("create build");
let _ = fc_common::repo::builds::start(&pool, build.id).await;
// Simulate the build being stuck for a while by manually backdating started_at
sqlx::query("UPDATE builds SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = $1")
.bind(build.id)
.execute(&pool)
.await
.expect("backdate build");
// Reset orphaned builds (older than 5 minutes)
let count = fc_common::repo::builds::reset_orphaned(&pool, 300)
.await
.expect("reset orphaned");
assert!(count >= 1, "should have reset at least 1 orphaned build");
// Verify build is pending again
let reset_build = fc_common::repo::builds::get(&pool, build.id)
.await
.expect("get build");
assert_eq!(reset_build.status, fc_common::models::BuildStatus::Pending);
// Clean up
let _ = fc_common::repo::projects::delete(&pool, project.id).await;
}