fc-common: implement deficit-based fair-share scheduling in list_pending
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ic1345cfdf712aa6ee6f0eeae45b3e62b6a6a6964
This commit is contained in:
parent
8cfc4c30ca
commit
4100ac54c2
2 changed files with 21 additions and 5 deletions
|
|
@ -69,13 +69,29 @@ pub async fn list_for_evaluation(
|
||||||
.map_err(CiError::Database)
|
.map_err(CiError::Database)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_pending(pool: &PgPool, limit: i64) -> Result<Vec<Build>> {
|
pub async fn list_pending(
|
||||||
|
pool: &PgPool,
|
||||||
|
limit: i64,
|
||||||
|
worker_count: i32,
|
||||||
|
) -> Result<Vec<Build>> {
|
||||||
sqlx::query_as::<_, Build>(
|
sqlx::query_as::<_, Build>(
|
||||||
"SELECT b.* FROM builds b JOIN evaluations e ON b.evaluation_id = e.id \
|
"WITH running_counts AS ( SELECT e.jobset_id, COUNT(*) AS running FROM \
|
||||||
JOIN jobsets j ON e.jobset_id = j.id WHERE b.status = 'pending' ORDER BY \
|
builds b JOIN evaluations e ON b.evaluation_id = e.id WHERE b.status = \
|
||||||
b.priority DESC, j.scheduling_shares DESC, b.created_at ASC LIMIT $1",
|
'running' GROUP BY e.jobset_id ), active_shares AS ( SELECT j.id AS \
|
||||||
|
jobset_id, j.scheduling_shares, COALESCE(rc.running, 0) AS running, \
|
||||||
|
SUM(j.scheduling_shares) OVER () AS total_shares FROM jobsets j JOIN \
|
||||||
|
evaluations e2 ON e2.jobset_id = j.id JOIN builds b2 ON b2.evaluation_id \
|
||||||
|
= e2.id AND b2.status = 'pending' LEFT JOIN running_counts rc ON \
|
||||||
|
rc.jobset_id = j.id WHERE j.scheduling_shares > 0 GROUP BY j.id, \
|
||||||
|
j.scheduling_shares, rc.running ) SELECT b.* FROM builds b JOIN \
|
||||||
|
evaluations e ON b.evaluation_id = e.id JOIN active_shares ash ON \
|
||||||
|
ash.jobset_id = e.jobset_id WHERE b.status = 'pending' ORDER BY \
|
||||||
|
b.priority DESC, (ash.scheduling_shares::float / \
|
||||||
|
GREATEST(ash.total_shares, 1) - ash.running::float / GREATEST($2, 1)) \
|
||||||
|
DESC, b.created_at ASC LIMIT $1",
|
||||||
)
|
)
|
||||||
.bind(limit)
|
.bind(limit)
|
||||||
|
.bind(worker_count)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
.await
|
.await
|
||||||
.map_err(CiError::Database)
|
.map_err(CiError::Database)
|
||||||
|
|
|
||||||
|
|
@ -293,7 +293,7 @@ async fn test_evaluation_and_build_lifecycle() {
|
||||||
assert_eq!(build.system.as_deref(), Some("x86_64-linux"));
|
assert_eq!(build.system.as_deref(), Some("x86_64-linux"));
|
||||||
|
|
||||||
// List pending
|
// List pending
|
||||||
let pending = repo::builds::list_pending(&pool, 10)
|
let pending = repo::builds::list_pending(&pool, 10, 4)
|
||||||
.await
|
.await
|
||||||
.expect("list pending");
|
.expect("list pending");
|
||||||
assert!(pending.iter().any(|b| b.id == build.id));
|
assert!(pending.iter().any(|b| b.id == build.id));
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue