From f8586a7f3c0f4adb577958f6bb1d9cb4ac373571 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 16 Feb 2026 23:32:40 +0300 Subject: [PATCH] fc-queue-runner: implement per-build cancellation via `CancellationToken` Adds an `ActiveBuild` registry (DashMap of ``) to `WorkerPool` and get `dispatch()` to create a per-build token to race `run_build` against it via Tokio's `select!`. The `cancel_checker_loop` then polls the DB every N seconds (currently 2) for builds cancelled while running, and triggers their tokens. Existing `kill_on_drop(true) on `nix build` processes handles subprocess cleanup when the future is dropped. Thank you past me for your insight. Signed-off-by: NotAShelf Change-Id: Ic8af58e92972c7d5d104d9c717e9217d6a6a6964 --- Cargo.lock | 1 + crates/queue-runner/Cargo.toml | 1 + crates/queue-runner/src/main.rs | 34 +++++++++++++++- crates/queue-runner/src/worker.rs | 67 ++++++++++++++++++++++--------- 4 files changed, 82 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c1721f..35fbfbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,6 +874,7 @@ dependencies = [ "chrono", "clap", "config", + "dashmap", "fc-common", "serde", "serde_json", diff --git a/crates/queue-runner/Cargo.toml b/crates/queue-runner/Cargo.toml index f169876..61052fa 100644 --- a/crates/queue-runner/Cargo.toml +++ b/crates/queue-runner/Cargo.toml @@ -11,6 +11,7 @@ anyhow.workspace = true chrono.workspace = true clap.workspace = true config.workspace = true +dashmap.workspace = true serde.workspace = true serde_json.workspace = true sqlx.workspace = true diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index 7cf5567..2a8454d 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -5,8 +5,9 @@ use fc_common::{ config::{Config, GcConfig}, database::Database, gc_roots, + repo, }; -use fc_queue_runner::worker::WorkerPool; +use fc_queue_runner::worker::{ActiveBuilds, WorkerPool}; #[derive(Parser)] #[command(name = "fc-queue-runner")] @@ -78,6 +79,8 @@ async fn main() -> anyhow::Result<()> { wakeup.clone(), ); + let active_builds = worker_pool.active_builds().clone(); + tokio::select! { result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache) => { if let Err(e) = result { @@ -86,6 +89,7 @@ async fn main() -> anyhow::Result<()> { } () = gc_loop(gc_config_for_loop) => {} () = failed_paths_cleanup_loop(db.pool().clone(), failed_paths_ttl, failed_paths_cache) => {} + () = cancel_checker_loop(db.pool().clone(), active_builds) => {} () = shutdown_signal() => { tracing::info!("Shutdown signal received, draining in-flight builds..."); worker_pool_for_drain.drain(); @@ -176,6 +180,34 @@ async fn failed_paths_cleanup_loop( } } +async fn cancel_checker_loop(pool: sqlx::PgPool, active_builds: ActiveBuilds) { + let interval = Duration::from_secs(2); + loop { + tokio::time::sleep(interval).await; + + let build_ids: Vec = + active_builds.iter().map(|entry| *entry.key()).collect(); + + if build_ids.is_empty() { + continue; + } + + match repo::builds::get_cancelled_among(&pool, &build_ids).await { + Ok(cancelled_ids) => { + for id in cancelled_ids { + if let Some((_, token)) = active_builds.remove(&id) { + tracing::info!(build_id = %id, "Triggering cancellation for running build"); + token.cancel(); + } + } + }, + Err(e) => { + tracing::warn!("Failed to check for cancelled builds: {e}"); + }, + } + } +} + async fn shutdown_signal() { let ctrl_c = async { tokio::signal::ctrl_c() diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index 2edec45..aaa9488 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -1,5 +1,6 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; +use dashmap::DashMap; use fc_common::{ alerts::AlertManager, config::{ @@ -24,6 +25,10 @@ use fc_common::{ }; use sqlx::PgPool; use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +pub type ActiveBuilds = Arc>; pub struct WorkerPool { semaphore: Arc, @@ -37,7 +42,8 @@ pub struct WorkerPool { signing_config: Arc, cache_upload_config: Arc, alert_manager: Arc>, - drain_token: tokio_util::sync::CancellationToken, + drain_token: CancellationToken, + active_builds: ActiveBuilds, } impl WorkerPool { @@ -68,7 +74,8 @@ impl WorkerPool { signing_config: Arc::new(signing_config), cache_upload_config: Arc::new(cache_upload_config), alert_manager: Arc::new(alert_manager), - drain_token: tokio_util::sync::CancellationToken::new(), + drain_token: CancellationToken::new(), + active_builds: Arc::new(DashMap::new()), } } @@ -95,6 +102,10 @@ impl WorkerPool { .await; } + pub fn active_builds(&self) -> &ActiveBuilds { + &self.active_builds + } + #[tracing::instrument(skip(self, build), fields(build_id = %build.id, job = %build.job_name))] pub fn dispatch(&self, build: Build) { if self.drain_token.is_cancelled() { @@ -112,29 +123,45 @@ impl WorkerPool { let signing_config = self.signing_config.clone(); let cache_upload_config = self.cache_upload_config.clone(); let alert_manager = self.alert_manager.clone(); + let active_builds = self.active_builds.clone(); + let cancel_token = CancellationToken::new(); + let build_id = build.id; + + active_builds.insert(build_id, cancel_token.clone()); tokio::spawn(async move { - let _permit = match semaphore.acquire().await { - Ok(p) => p, - Err(_) => return, + let result = async { + let _permit = match semaphore.acquire().await { + Ok(p) => p, + Err(_) => return, + }; + + if let Err(e) = run_build( + &pool, + &build, + &work_dir, + timeout, + &log_config, + &gc_config, + ¬ifications_config, + &signing_config, + &cache_upload_config, + &alert_manager, + ) + .await + { + tracing::error!(build_id = %build.id, "Build dispatch failed: {e}"); + } }; - if let Err(e) = run_build( - &pool, - &build, - &work_dir, - timeout, - &log_config, - &gc_config, - ¬ifications_config, - &signing_config, - &cache_upload_config, - &alert_manager, - ) - .await - { - tracing::error!(build_id = %build.id, "Build dispatch failed: {e}"); + tokio::select! { + () = result => {} + () = cancel_token.cancelled() => { + tracing::info!(build_id = %build_id, "Build cancelled, aborting"); + } } + + active_builds.remove(&build_id); }); } }