From 2378ff6661ca2f1d9ba64e0f266d5ccc4b13adac Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Mon, 2 Feb 2026 01:23:45 +0300 Subject: [PATCH] crates/common: update repos for weighted scheduling and jobset inputs Signed-off-by: NotAShelf Change-Id: I0fe2ceb20dc6692f3315185765ea80756a6a6964 --- crates/common/src/repo/api_keys.rs | 18 +++++++++++++++++- crates/common/src/repo/builds.rs | 26 +++++++++++++++++++++++++- crates/common/src/repo/jobsets.rs | 24 +++++++++++++++++++----- crates/common/src/repo/projects.rs | 16 ++++++++++++++++ 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/crates/common/src/repo/api_keys.rs b/crates/common/src/repo/api_keys.rs index 175f7e1..b4614f0 100644 --- a/crates/common/src/repo/api_keys.rs +++ b/crates/common/src/repo/api_keys.rs @@ -15,12 +15,28 @@ pub async fn create(pool: &PgPool, name: &str, key_hash: &str, role: &str) -> Re .await .map_err(|e| match &e { sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { - CiError::Conflict(format!("API key with this hash already exists")) + CiError::Conflict("API key with this hash already exists".to_string()) } _ => CiError::Database(e), }) } +pub async fn upsert(pool: &PgPool, name: &str, key_hash: &str, role: &str) -> Result { + sqlx::query_as::<_, ApiKey>( + "INSERT INTO api_keys (name, key_hash, role) VALUES ($1, $2, $3) \ + ON CONFLICT (key_hash) DO UPDATE SET \ + name = EXCLUDED.name, \ + role = EXCLUDED.role \ + RETURNING *", + ) + .bind(name) + .bind(key_hash) + .bind(role) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + pub async fn get_by_hash(pool: &PgPool, key_hash: &str) -> Result> { sqlx::query_as::<_, ApiKey>("SELECT * FROM api_keys WHERE key_hash = $1") .bind(key_hash) diff --git a/crates/common/src/repo/builds.rs b/crates/common/src/repo/builds.rs index d532bfb..d278747 100644 --- a/crates/common/src/repo/builds.rs +++ b/crates/common/src/repo/builds.rs @@ -60,7 +60,12 @@ pub async fn list_for_evaluation(pool: &PgPool, evaluation_id: Uuid) -> Result Result> { sqlx::query_as::<_, Build>( - "SELECT * FROM builds WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT $1", + "SELECT b.* FROM builds b \ + JOIN evaluations e ON b.evaluation_id = e.id \ + JOIN jobsets j ON e.jobset_id = j.id \ + WHERE b.status = 'pending' \ + ORDER BY b.priority DESC, j.scheduling_shares DESC, b.created_at ASC \ + LIMIT $1", ) .bind(limit) .fetch_all(pool) @@ -245,6 +250,25 @@ pub async fn cancel_cascade(pool: &PgPool, id: Uuid) -> Result> { Ok(cancelled) } +/// Restart a build by resetting it to pending state. +/// Only works for failed, completed, or cancelled builds. +pub async fn restart(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Build>( + "UPDATE builds SET status = 'pending', started_at = NULL, completed_at = NULL, \ + log_path = NULL, build_output_path = NULL, error_message = NULL, \ + retry_count = retry_count + 1 \ + WHERE id = $1 AND status IN ('failed', 'completed', 'cancelled') RETURNING *", + ) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| { + CiError::NotFound(format!( + "Build {id} not found or not in a restartable state" + )) + }) +} + /// Mark a build's outputs as signed. pub async fn mark_signed(pool: &PgPool, id: Uuid) -> Result<()> { sqlx::query("UPDATE builds SET signed = true WHERE id = $1") diff --git a/crates/common/src/repo/jobsets.rs b/crates/common/src/repo/jobsets.rs index deecd4b..a066e7f 100644 --- a/crates/common/src/repo/jobsets.rs +++ b/crates/common/src/repo/jobsets.rs @@ -8,9 +8,10 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { let enabled = input.enabled.unwrap_or(true); let flake_mode = input.flake_mode.unwrap_or(true); let check_interval = input.check_interval.unwrap_or(60); + let scheduling_shares = input.scheduling_shares.unwrap_or(100); sqlx::query_as::<_, Jobset>( - "INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *", + "INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval, branch, scheduling_shares) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *", ) .bind(input.project_id) .bind(&input.name) @@ -18,6 +19,8 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { .bind(enabled) .bind(flake_mode) .bind(check_interval) + .bind(&input.branch) + .bind(scheduling_shares) .fetch_one(pool) .await .map_err(|e| match &e { @@ -70,15 +73,21 @@ pub async fn update(pool: &PgPool, id: Uuid, input: UpdateJobset) -> Result( - "UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, flake_mode = $4, check_interval = $5 WHERE id = $6 RETURNING *", + "UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, flake_mode = $4, check_interval = $5, branch = $6, scheduling_shares = $7 WHERE id = $8 RETURNING *", ) .bind(&name) .bind(&nix_expression) .bind(enabled) .bind(flake_mode) .bind(check_interval) + .bind(&branch) + .bind(scheduling_shares) .bind(id) .fetch_one(pool) .await @@ -107,15 +116,18 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { let enabled = input.enabled.unwrap_or(true); let flake_mode = input.flake_mode.unwrap_or(true); let check_interval = input.check_interval.unwrap_or(60); + let scheduling_shares = input.scheduling_shares.unwrap_or(100); sqlx::query_as::<_, Jobset>( - "INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval) \ - VALUES ($1, $2, $3, $4, $5, $6) \ + "INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval, branch, scheduling_shares) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ ON CONFLICT (project_id, name) DO UPDATE SET \ nix_expression = EXCLUDED.nix_expression, \ enabled = EXCLUDED.enabled, \ flake_mode = EXCLUDED.flake_mode, \ - check_interval = EXCLUDED.check_interval \ + check_interval = EXCLUDED.check_interval, \ + branch = EXCLUDED.branch, \ + scheduling_shares = EXCLUDED.scheduling_shares \ RETURNING *", ) .bind(input.project_id) @@ -124,6 +136,8 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { .bind(enabled) .bind(flake_mode) .bind(check_interval) + .bind(&input.branch) + .bind(scheduling_shares) .fetch_one(pool) .await .map_err(CiError::Database) diff --git a/crates/common/src/repo/projects.rs b/crates/common/src/repo/projects.rs index 3aa7cec..ea1cfde 100644 --- a/crates/common/src/repo/projects.rs +++ b/crates/common/src/repo/projects.rs @@ -81,6 +81,22 @@ pub async fn update(pool: &PgPool, id: Uuid, input: UpdateProject) -> Result Result { + sqlx::query_as::<_, Project>( + "INSERT INTO projects (name, description, repository_url) VALUES ($1, $2, $3) \ + ON CONFLICT (name) DO UPDATE SET \ + description = EXCLUDED.description, \ + repository_url = EXCLUDED.repository_url \ + RETURNING *", + ) + .bind(&input.name) + .bind(&input.description) + .bind(&input.repository_url) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { let result = sqlx::query("DELETE FROM projects WHERE id = $1") .bind(id)