diff --git a/crates/queue-runner/tests/runner_tests.rs b/crates/queue-runner/tests/runner_tests.rs index 3a03c7f..18ffc67 100644 --- a/crates/queue-runner/tests/runner_tests.rs +++ b/crates/queue-runner/tests/runner_tests.rs @@ -100,6 +100,132 @@ async fn test_worker_pool_drain_stops_dispatch() { // doesn't crash } +// Per-build cancellation + +#[tokio::test] +async fn test_active_builds_registry_cancellation() { + use std::sync::Arc; + + use dashmap::DashMap; + use tokio_util::sync::CancellationToken; + + let active_builds: fc_queue_runner::worker::ActiveBuilds = + Arc::new(DashMap::new()); + + let id1 = uuid::Uuid::new_v4(); + let id2 = uuid::Uuid::new_v4(); + let id3 = uuid::Uuid::new_v4(); + + let token1 = CancellationToken::new(); + let token2 = CancellationToken::new(); + let token3 = CancellationToken::new(); + + active_builds.insert(id1, token1.clone()); + active_builds.insert(id2, token2.clone()); + active_builds.insert(id3, token3.clone()); + + assert_eq!(active_builds.len(), 3); + assert!(!token1.is_cancelled()); + assert!(!token2.is_cancelled()); + assert!(!token3.is_cancelled()); + + // Simulate cancel checker finding id1 and id2 cancelled in DB + if let Some((_, token)) = active_builds.remove(&id1) { + token.cancel(); + } + if let Some((_, token)) = active_builds.remove(&id2) { + token.cancel(); + } + + assert!(token1.is_cancelled()); + assert!(token2.is_cancelled()); + assert!(!token3.is_cancelled()); + assert_eq!(active_builds.len(), 1); + assert!(active_builds.contains_key(&id3)); +} + +#[tokio::test] +async fn test_cancellation_token_aborts_select() { + use tokio_util::sync::CancellationToken; + + let token = CancellationToken::new(); + let token_clone = token.clone(); + + // Simulate a long-running build + let build_future = async { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + "completed" + }; + + // Cancel after 50ms + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + token_clone.cancel(); + }); + + let start = std::time::Instant::now(); + let result = tokio::select! { + val = build_future => val, + () = token.cancelled() => "cancelled", + }; + + assert_eq!(result, "cancelled"); + // Should complete in well under a second (the 60s "build" was aborted) + assert!(start.elapsed() < std::time::Duration::from_secs(1)); +} + +#[tokio::test] +async fn test_worker_pool_active_builds_cancel() { + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping: TEST_DATABASE_URL not set"); + return; + }, + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(1) + .connect(&url) + .await + .expect("failed to connect"); + + let worker_pool = fc_queue_runner::worker::WorkerPool::new( + pool, + 2, + std::env::temp_dir(), + std::time::Duration::from_secs(60), + fc_common::config::LogConfig::default(), + fc_common::config::GcConfig::default(), + fc_common::config::NotificationsConfig::default(), + fc_common::config::SigningConfig::default(), + fc_common::config::CacheUploadConfig::default(), + None, + ); + + // Active builds map should start empty + assert!(worker_pool.active_builds().is_empty()); + + // Manually insert a token (simulating what dispatch does internally) + let build_id = uuid::Uuid::new_v4(); + let token = tokio_util::sync::CancellationToken::new(); + worker_pool + .active_builds() + .insert(build_id, token.clone()); + + assert_eq!(worker_pool.active_builds().len(), 1); + assert!(worker_pool.active_builds().contains_key(&build_id)); + assert!(!token.is_cancelled()); + + // Simulate cancel checker removing and triggering the token + if let Some((_, t)) = worker_pool.active_builds().remove(&build_id) { + t.cancel(); + } + + assert!(token.is_cancelled()); + assert!(worker_pool.active_builds().is_empty()); +} + // Database-dependent tests #[tokio::test] @@ -299,3 +425,153 @@ async fn test_orphan_build_reset() { // Clean up let _ = fc_common::repo::projects::delete(&pool, project.id).await; } + +#[tokio::test] +async fn test_get_cancelled_among() { + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping: TEST_DATABASE_URL not set"); + return; + }, + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&url) + .await + .expect("failed to connect"); + + sqlx::migrate!("../common/migrations") + .run(&pool) + .await + .expect("migration failed"); + + let project = fc_common::repo::projects::create( + &pool, + fc_common::models::CreateProject { + name: format!("cancel-among-{}", uuid::Uuid::new_v4()), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create project"); + + let jobset = + fc_common::repo::jobsets::create(&pool, fc_common::models::CreateJobset { + project_id: project.id, + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: None, + branch: None, + scheduling_shares: None, + state: None, + }) + .await + .expect("create jobset"); + + let eval = fc_common::repo::evaluations::create( + &pool, + fc_common::models::CreateEvaluation { + jobset_id: jobset.id, + commit_hash: "aabbccdd1234567890aabbccdd1234567890aabb".to_string(), + pr_number: None, + pr_head_branch: None, + pr_base_branch: None, + pr_action: None, + }, + ) + .await + .expect("create eval"); + + // Create a pending build + let build_pending = + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval.id, + job_name: "pending-job".to_string(), + drv_path: format!( + "/nix/store/{}-pending.drv", + uuid::Uuid::new_v4() + ), + system: None, + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create pending build"); + + // Create a running build + let build_running = + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval.id, + job_name: "running-job".to_string(), + drv_path: format!( + "/nix/store/{}-running.drv", + uuid::Uuid::new_v4() + ), + system: None, + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create running build"); + fc_common::repo::builds::start(&pool, build_running.id) + .await + .expect("start running build"); + + // Create a cancelled build (start then cancel) + let build_cancelled = + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval.id, + job_name: "cancelled-job".to_string(), + drv_path: format!( + "/nix/store/{}-cancelled.drv", + uuid::Uuid::new_v4() + ), + system: None, + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create cancelled build"); + fc_common::repo::builds::start(&pool, build_cancelled.id) + .await + .expect("start cancelled build"); + fc_common::repo::builds::cancel(&pool, build_cancelled.id) + .await + .expect("cancel build"); + + // Query for cancelled among all three + let all_ids = + vec![build_pending.id, build_running.id, build_cancelled.id]; + let cancelled = + fc_common::repo::builds::get_cancelled_among(&pool, &all_ids) + .await + .expect("get cancelled among"); + assert_eq!(cancelled.len(), 1); + assert_eq!(cancelled[0], build_cancelled.id); + + // Empty input returns empty + let empty = fc_common::repo::builds::get_cancelled_among(&pool, &[]) + .await + .expect("empty query"); + assert!(empty.is_empty()); + + // Query with only non-cancelled builds returns empty + let none_cancelled = fc_common::repo::builds::get_cancelled_among( + &pool, + &[build_pending.id, build_running.id], + ) + .await + .expect("no cancelled"); + assert!(none_cancelled.is_empty()); + + // Clean up + let _ = fc_common::repo::projects::delete(&pool, project.id).await; +}