From c0df24c6e1ab0f407716dc02654166e812e4ef72 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 2 Feb 2026 01:25:10 +0300 Subject: [PATCH] crates/queue-runner: add cache upload config and worker improvements Signed-off-by: NotAShelf Change-Id: I781a843b88a9b62b929a8d0407274bc86a6a6964 --- crates/queue-runner/src/builder.rs | 18 ++++----- crates/queue-runner/src/main.rs | 9 +++-- crates/queue-runner/src/worker.rs | 45 +++++++++++++++++++++-- crates/queue-runner/tests/runner_tests.rs | 5 +++ 4 files changed, 58 insertions(+), 19 deletions(-) diff --git a/crates/queue-runner/src/builder.rs b/crates/queue-runner/src/builder.rs index eff5963..9aaca40 100644 --- a/crates/queue-runner/src/builder.rs +++ b/crates/queue-runner/src/builder.rs @@ -8,6 +8,7 @@ use tokio::io::{AsyncBufReadExt, BufReader}; const MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100MB /// Run a build on a remote machine via `nix build --store ssh://...`. +#[tracing::instrument(skip(work_dir, live_log_path), fields(drv_path, store_uri))] pub async fn run_nix_build_remote( drv_path: &str, work_dir: &Path, @@ -157,6 +158,7 @@ pub fn parse_nix_log_line(line: &str) -> Option<(&'static str, String)> { /// Run `nix build` for a derivation path. /// If `live_log_path` is provided, build output is streamed to that file incrementally. +#[tracing::instrument(skip(work_dir, live_log_path), fields(drv_path))] pub async fn run_nix_build( drv_path: &str, work_dir: &Path, @@ -225,12 +227,11 @@ pub async fn run_nix_build( } // Parse nix internal JSON log lines - if line.starts_with("@nix ") { - if let Some(json_str) = line.strip_prefix("@nix ") { - if let Ok(parsed) = + if line.starts_with("@nix ") + && let Some(json_str) = line.strip_prefix("@nix ") + && let Ok(parsed) = serde_json::from_str::(json_str.trim()) - { - if let Some(action) = parsed.get("action").and_then(|a| a.as_str()) + && let Some(action) = parsed.get("action").and_then(|a| a.as_str()) { match action { "start" => { @@ -247,21 +248,16 @@ pub async fn run_nix_build( "stop" => { if let Some(drv) = parsed.get("derivation").and_then(|d| d.as_str()) - { - if let Some(step) = + && let Some(step) = steps.iter_mut().rfind(|s| s.drv_path == drv) { step.completed_at = Some(chrono::Utc::now()); step.success = true; } - } } _ => {} } } - } - } - } if buf.len() < MAX_LOG_SIZE { buf.push_str(&line); diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index fd91aec..720955e 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -19,18 +19,18 @@ struct Cli { #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let cli = Cli::parse(); - tracing::info!("Starting CI Queue Runner"); - let config = Config::load()?; + fc_common::init_tracing(&config.tracing); + + tracing::info!("Starting CI Queue Runner"); let log_config = config.logs; let gc_config = config.gc; let gc_config_for_loop = gc_config.clone(); let notifications_config = config.notifications; let signing_config = config.signing; + let cache_upload_config = config.cache_upload; let qr_config = config.queue_runner; let workers = cli.workers.unwrap_or(qr_config.workers); @@ -55,6 +55,7 @@ async fn main() -> anyhow::Result<()> { gc_config, notifications_config, signing_config, + cache_upload_config, )); tracing::info!( diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index 29e35e6..4365762 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -5,7 +5,9 @@ use std::time::Duration; use sqlx::PgPool; use tokio::sync::Semaphore; -use fc_common::config::{GcConfig, LogConfig, NotificationsConfig, SigningConfig}; +use fc_common::config::{ + CacheUploadConfig, GcConfig, LogConfig, NotificationsConfig, SigningConfig, +}; use fc_common::gc_roots::GcRoots; use fc_common::log_storage::LogStorage; use fc_common::models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep}; @@ -20,6 +22,7 @@ pub struct WorkerPool { gc_config: Arc, notifications_config: Arc, signing_config: Arc, + cache_upload_config: Arc, drain_token: tokio_util::sync::CancellationToken, } @@ -33,6 +36,7 @@ impl WorkerPool { gc_config: GcConfig, notifications_config: NotificationsConfig, signing_config: SigningConfig, + cache_upload_config: CacheUploadConfig, ) -> Self { Self { semaphore: Arc::new(Semaphore::new(workers)), @@ -43,6 +47,7 @@ impl WorkerPool { gc_config: Arc::new(gc_config), notifications_config: Arc::new(notifications_config), signing_config: Arc::new(signing_config), + cache_upload_config: Arc::new(cache_upload_config), drain_token: tokio_util::sync::CancellationToken::new(), } } @@ -69,6 +74,7 @@ impl WorkerPool { .await; } + #[tracing::instrument(skip(self, build), fields(build_id = %build.id, job = %build.job_name))] pub fn dispatch(&self, build: Build) { if self.drain_token.is_cancelled() { tracing::info!(build_id = %build.id, "Drain in progress, not dispatching"); @@ -83,6 +89,7 @@ impl WorkerPool { let gc_config = self.gc_config.clone(); let notifications_config = self.notifications_config.clone(); let signing_config = self.signing_config.clone(); + let cache_upload_config = self.cache_upload_config.clone(); tokio::spawn(async move { let _permit = match semaphore.acquire().await { @@ -99,6 +106,7 @@ impl WorkerPool { &gc_config, ¬ifications_config, &signing_config, + &cache_upload_config, ) .await { @@ -178,6 +186,28 @@ async fn sign_outputs(output_paths: &[String], signing_config: &SigningConfig) - true } +/// Push output paths to an external binary cache via `nix copy`. +async fn push_to_cache(output_paths: &[String], store_uri: &str) { + for path in output_paths { + let result = tokio::process::Command::new("nix") + .args(["copy", "--to", store_uri, path]) + .output() + .await; + match result { + Ok(o) if o.status.success() => { + tracing::debug!(output = path, store = store_uri, "Pushed to binary cache"); + } + Ok(o) => { + let stderr = String::from_utf8_lossy(&o.stderr); + tracing::warn!(output = path, "Failed to push to cache: {stderr}"); + } + Err(e) => { + tracing::warn!(output = path, "Failed to run nix copy: {e}"); + } + } + } +} + /// Try to run the build on a remote builder if one is available for the build's system. async fn try_remote_build( pool: &PgPool, @@ -230,6 +260,7 @@ async fn try_remote_build( None } +#[tracing::instrument(skip(pool, build, work_dir, log_config, gc_config, notifications_config, signing_config, cache_upload_config), fields(build_id = %build.id, job = %build.job_name))] async fn run_build( pool: &PgPool, build: &Build, @@ -239,6 +270,7 @@ async fn run_build( gc_config: &GcConfig, notifications_config: &NotificationsConfig, signing_config: &SigningConfig, + cache_upload_config: &CacheUploadConfig, ) -> anyhow::Result<()> { // Atomically claim the build let claimed = repo::builds::start(pool, build.id).await?; @@ -408,6 +440,12 @@ async fn run_build( let _ = repo::builds::mark_signed(pool, build.id).await; } + // Push to external binary cache if configured + if cache_upload_config.enabled + && let Some(ref store_uri) = cache_upload_config.store_uri { + push_to_cache(&build_result.output_paths, store_uri).await; + } + let primary_output = build_result.output_paths.first().map(|s| s.as_str()); repo::builds::complete( @@ -488,12 +526,11 @@ async fn run_build( } // Auto-promote channels if all builds in the evaluation are done - if updated_build.status == BuildStatus::Completed { - if let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await { + if updated_build.status == BuildStatus::Completed + && let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await { let _ = repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id).await; } - } } Ok(()) diff --git a/crates/queue-runner/tests/runner_tests.rs b/crates/queue-runner/tests/runner_tests.rs index eec8332..6ed89f7 100644 --- a/crates/queue-runner/tests/runner_tests.rs +++ b/crates/queue-runner/tests/runner_tests.rs @@ -86,6 +86,7 @@ async fn test_worker_pool_drain_stops_dispatch() { fc_common::config::GcConfig::default(), fc_common::config::NotificationsConfig::default(), fc_common::config::SigningConfig::default(), + fc_common::config::CacheUploadConfig::default(), ); // Drain should not panic @@ -139,6 +140,8 @@ async fn test_atomic_build_claiming() { enabled: None, flake_mode: None, check_interval: None, + branch: None, + scheduling_shares: None, }, ) .await @@ -228,6 +231,8 @@ async fn test_orphan_build_reset() { enabled: None, flake_mode: None, check_interval: None, + branch: None, + scheduling_shares: None, }, ) .await