diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs index 02e4651..00129e0 100644 --- a/crates/queue-runner/src/runner_loop.rs +++ b/crates/queue-runner/src/runner_loop.rs @@ -29,7 +29,8 @@ pub async fn run( } loop { - match repo::builds::list_pending(&pool, 10).await { + let wc = worker_pool.worker_count() as i32; + match repo::builds::list_pending(&pool, 10, wc).await { Ok(builds) => { if !builds.is_empty() { tracing::info!("Found {} pending builds", builds.len()); diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index aaa9488..51b2bde 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -102,6 +102,10 @@ impl WorkerPool { .await; } + pub fn worker_count(&self) -> usize { + self.worker_count + } + pub fn active_builds(&self) -> &ActiveBuilds { &self.active_builds } diff --git a/crates/queue-runner/tests/runner_tests.rs b/crates/queue-runner/tests/runner_tests.rs index 18ffc67..102967e 100644 --- a/crates/queue-runner/tests/runner_tests.rs +++ b/crates/queue-runner/tests/runner_tests.rs @@ -209,9 +209,7 @@ async fn test_worker_pool_active_builds_cancel() { // Manually insert a token (simulating what dispatch does internally) let build_id = uuid::Uuid::new_v4(); let token = tokio_util::sync::CancellationToken::new(); - worker_pool - .active_builds() - .insert(build_id, token.clone()); + worker_pool.active_builds().insert(build_id, token.clone()); assert_eq!(worker_pool.active_builds().len(), 1); assert!(worker_pool.active_builds().contains_key(&build_id)); @@ -226,6 +224,221 @@ async fn test_worker_pool_active_builds_cancel() { assert!(worker_pool.active_builds().is_empty()); } +// Fair-share scheduling + +#[tokio::test] +async fn test_fair_share_scheduling() { + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping: TEST_DATABASE_URL not set"); + return; + }, + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&url) + .await + .expect("failed to connect"); + + sqlx::migrate!("../common/migrations") + .run(&pool) + .await + .expect("migration failed"); + + // Create two projects with different scheduling shares + let project_hi = fc_common::repo::projects::create( + &pool, + fc_common::models::CreateProject { + name: format!("fair-hi-{}", uuid::Uuid::new_v4()), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create project hi"); + + let project_lo = fc_common::repo::projects::create( + &pool, + fc_common::models::CreateProject { + name: format!("fair-lo-{}", uuid::Uuid::new_v4()), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create project lo"); + + // High-share jobset (200 shares) and low-share jobset (100 shares) + let jobset_hi = + fc_common::repo::jobsets::create(&pool, fc_common::models::CreateJobset { + project_id: project_hi.id, + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: None, + branch: None, + scheduling_shares: Some(200), + state: None, + }) + .await + .expect("create jobset hi"); + + let jobset_lo = + fc_common::repo::jobsets::create(&pool, fc_common::models::CreateJobset { + project_id: project_lo.id, + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: None, + branch: None, + scheduling_shares: Some(100), + state: None, + }) + .await + .expect("create jobset lo"); + + let eval_hi = fc_common::repo::evaluations::create( + &pool, + fc_common::models::CreateEvaluation { + jobset_id: jobset_hi.id, + commit_hash: format!("hi{}", uuid::Uuid::new_v4().simple()), + pr_number: None, + pr_head_branch: None, + pr_base_branch: None, + pr_action: None, + }, + ) + .await + .expect("create eval hi"); + + let eval_lo = fc_common::repo::evaluations::create( + &pool, + fc_common::models::CreateEvaluation { + jobset_id: jobset_lo.id, + commit_hash: format!("lo{}", uuid::Uuid::new_v4().simple()), + pr_number: None, + pr_head_branch: None, + pr_base_branch: None, + pr_action: None, + }, + ) + .await + .expect("create eval lo"); + + // Create pending builds: 2 for hi-share, 2 for lo-share + let drv_hi_1 = + format!("/nix/store/{}-hi1.drv", uuid::Uuid::new_v4().simple()); + let drv_hi_2 = + format!("/nix/store/{}-hi2.drv", uuid::Uuid::new_v4().simple()); + let drv_lo_1 = + format!("/nix/store/{}-lo1.drv", uuid::Uuid::new_v4().simple()); + let drv_lo_2 = + format!("/nix/store/{}-lo2.drv", uuid::Uuid::new_v4().simple()); + + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval_hi.id, + job_name: "hi-build-1".to_string(), + drv_path: drv_hi_1, + system: Some("x86_64-linux".to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create hi build 1"); + + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval_hi.id, + job_name: "hi-build-2".to_string(), + drv_path: drv_hi_2, + system: Some("x86_64-linux".to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create hi build 2"); + + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval_lo.id, + job_name: "lo-build-1".to_string(), + drv_path: drv_lo_1, + system: Some("x86_64-linux".to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create lo build 1"); + + fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { + evaluation_id: eval_lo.id, + job_name: "lo-build-2".to_string(), + drv_path: drv_lo_2, + system: Some("x86_64-linux".to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }) + .await + .expect("create lo build 2"); + + // With no running builds, hi-share jobset (200) should come first due to + // higher share fraction (200/300 > 100/300) + let pending = fc_common::repo::builds::list_pending(&pool, 10, 4) + .await + .expect("list pending cold start"); + assert!( + pending.len() >= 4, + "expected at least 4 pending builds, got {}", + pending.len() + ); + + // The first builds should belong to the high-share jobset + let first_eval = pending[0].evaluation_id; + assert_eq!( + first_eval, eval_hi.id, + "high-share jobset should be scheduled first on cold start" + ); + + // Now simulate: start 2 builds from the high-share jobset to make it + // over-served (2 running out of 4 workers, but only 66% of shares) + let hi_build_ids: Vec<_> = pending + .iter() + .filter(|b| b.evaluation_id == eval_hi.id) + .map(|b| b.id) + .take(2) + .collect(); + + for &id in &hi_build_ids { + fc_common::repo::builds::start(&pool, id) + .await + .expect("start hi build"); + } + + // Re-query: lo-share jobset should now be prioritized because it is + // underserved (0 running vs its fair share) + let pending2 = fc_common::repo::builds::list_pending(&pool, 10, 4) + .await + .expect("list pending after running"); + + if !pending2.is_empty() { + assert_eq!( + pending2[0].evaluation_id, eval_lo.id, + "underserved lo-share jobset should be scheduled first when hi-share is \ + over-served" + ); + } + + // Clean up + let _ = fc_common::repo::projects::delete(&pool, project_hi.id).await; + let _ = fc_common::repo::projects::delete(&pool, project_lo.id).await; +} + // Database-dependent tests #[tokio::test] @@ -492,10 +705,7 @@ async fn test_get_cancelled_among() { fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { evaluation_id: eval.id, job_name: "pending-job".to_string(), - drv_path: format!( - "/nix/store/{}-pending.drv", - uuid::Uuid::new_v4() - ), + drv_path: format!("/nix/store/{}-pending.drv", uuid::Uuid::new_v4()), system: None, outputs: None, is_aggregate: None, @@ -509,10 +719,7 @@ async fn test_get_cancelled_among() { fc_common::repo::builds::create(&pool, fc_common::models::CreateBuild { evaluation_id: eval.id, job_name: "running-job".to_string(), - drv_path: format!( - "/nix/store/{}-running.drv", - uuid::Uuid::new_v4() - ), + drv_path: format!("/nix/store/{}-running.drv", uuid::Uuid::new_v4()), system: None, outputs: None, is_aggregate: None, @@ -548,12 +755,10 @@ async fn test_get_cancelled_among() { .expect("cancel build"); // Query for cancelled among all three - let all_ids = - vec![build_pending.id, build_running.id, build_cancelled.id]; - let cancelled = - fc_common::repo::builds::get_cancelled_among(&pool, &all_ids) - .await - .expect("get cancelled among"); + let all_ids = vec![build_pending.id, build_running.id, build_cancelled.id]; + let cancelled = fc_common::repo::builds::get_cancelled_among(&pool, &all_ids) + .await + .expect("get cancelled among"); assert_eq!(cancelled.len(), 1); assert_eq!(cancelled[0], build_cancelled.id); @@ -564,10 +769,10 @@ async fn test_get_cancelled_among() { assert!(empty.is_empty()); // Query with only non-cancelled builds returns empty - let none_cancelled = fc_common::repo::builds::get_cancelled_among( - &pool, - &[build_pending.id, build_running.id], - ) + let none_cancelled = fc_common::repo::builds::get_cancelled_among(&pool, &[ + build_pending.id, + build_running.id, + ]) .await .expect("no cancelled"); assert!(none_cancelled.is_empty());