From 4100ac54c2a02a5a7468a5fd72d02a870cd2bf3a Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 16 Feb 2026 23:41:55 +0300 Subject: [PATCH] fc-common: implement deficit-based fair-share scheduling in `list_pending` Signed-off-by: NotAShelf Change-Id: Ic1345cfdf712aa6ee6f0eeae45b3e62b6a6a6964 --- crates/common/src/repo/builds.rs | 24 ++++++++++++++++++++---- crates/common/tests/repo_tests.rs | 2 +- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/crates/common/src/repo/builds.rs b/crates/common/src/repo/builds.rs index 57a56aa..23ccb70 100644 --- a/crates/common/src/repo/builds.rs +++ b/crates/common/src/repo/builds.rs @@ -69,13 +69,29 @@ pub async fn list_for_evaluation( .map_err(CiError::Database) } -pub async fn list_pending(pool: &PgPool, limit: i64) -> Result> { +pub async fn list_pending( + pool: &PgPool, + limit: i64, + worker_count: i32, +) -> Result> { sqlx::query_as::<_, Build>( - "SELECT b.* FROM builds b JOIN evaluations e ON b.evaluation_id = e.id \ - JOIN jobsets j ON e.jobset_id = j.id WHERE b.status = 'pending' ORDER BY \ - b.priority DESC, j.scheduling_shares DESC, b.created_at ASC LIMIT $1", + "WITH running_counts AS ( SELECT e.jobset_id, COUNT(*) AS running FROM \ + builds b JOIN evaluations e ON b.evaluation_id = e.id WHERE b.status = \ + 'running' GROUP BY e.jobset_id ), active_shares AS ( SELECT j.id AS \ + jobset_id, j.scheduling_shares, COALESCE(rc.running, 0) AS running, \ + SUM(j.scheduling_shares) OVER () AS total_shares FROM jobsets j JOIN \ + evaluations e2 ON e2.jobset_id = j.id JOIN builds b2 ON b2.evaluation_id \ + = e2.id AND b2.status = 'pending' LEFT JOIN running_counts rc ON \ + rc.jobset_id = j.id WHERE j.scheduling_shares > 0 GROUP BY j.id, \ + j.scheduling_shares, rc.running ) SELECT b.* FROM builds b JOIN \ + evaluations e ON b.evaluation_id = e.id JOIN active_shares ash ON \ + ash.jobset_id = e.jobset_id WHERE b.status = 'pending' ORDER BY \ + b.priority DESC, (ash.scheduling_shares::float / \ + GREATEST(ash.total_shares, 1) - ash.running::float / GREATEST($2, 1)) \ + DESC, b.created_at ASC LIMIT $1", ) .bind(limit) + .bind(worker_count) .fetch_all(pool) .await .map_err(CiError::Database) diff --git a/crates/common/tests/repo_tests.rs b/crates/common/tests/repo_tests.rs index 5766666..0cc08ce 100644 --- a/crates/common/tests/repo_tests.rs +++ b/crates/common/tests/repo_tests.rs @@ -293,7 +293,7 @@ async fn test_evaluation_and_build_lifecycle() { assert_eq!(build.system.as_deref(), Some("x86_64-linux")); // List pending - let pending = repo::builds::list_pending(&pool, 10) + let pending = repo::builds::list_pending(&pool, 10, 4) .await .expect("list pending"); assert!(pending.iter().any(|b| b.id == build.id));