crates/common: update repos for weighted scheduling and jobset inputs

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I0fe2ceb20dc6692f3315185765ea80756a6a6964
This commit is contained in:
raf 2026-02-02 01:23:45 +03:00
commit 2378ff6661
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
4 changed files with 77 additions and 7 deletions

View file

@ -15,12 +15,28 @@ pub async fn create(pool: &PgPool, name: &str, key_hash: &str, role: &str) -> Re
.await
.map_err(|e| match &e {
sqlx::Error::Database(db_err) if db_err.is_unique_violation() => {
CiError::Conflict(format!("API key with this hash already exists"))
CiError::Conflict("API key with this hash already exists".to_string())
}
_ => CiError::Database(e),
})
}
pub async fn upsert(pool: &PgPool, name: &str, key_hash: &str, role: &str) -> Result<ApiKey> {
sqlx::query_as::<_, ApiKey>(
"INSERT INTO api_keys (name, key_hash, role) VALUES ($1, $2, $3) \
ON CONFLICT (key_hash) DO UPDATE SET \
name = EXCLUDED.name, \
role = EXCLUDED.role \
RETURNING *",
)
.bind(name)
.bind(key_hash)
.bind(role)
.fetch_one(pool)
.await
.map_err(CiError::Database)
}
pub async fn get_by_hash(pool: &PgPool, key_hash: &str) -> Result<Option<ApiKey>> {
sqlx::query_as::<_, ApiKey>("SELECT * FROM api_keys WHERE key_hash = $1")
.bind(key_hash)

View file

@ -60,7 +60,12 @@ pub async fn list_for_evaluation(pool: &PgPool, evaluation_id: Uuid) -> Result<V
pub async fn list_pending(pool: &PgPool, limit: i64) -> Result<Vec<Build>> {
sqlx::query_as::<_, Build>(
"SELECT * FROM builds WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT $1",
"SELECT b.* FROM builds b \
JOIN evaluations e ON b.evaluation_id = e.id \
JOIN jobsets j ON e.jobset_id = j.id \
WHERE b.status = 'pending' \
ORDER BY b.priority DESC, j.scheduling_shares DESC, b.created_at ASC \
LIMIT $1",
)
.bind(limit)
.fetch_all(pool)
@ -245,6 +250,25 @@ pub async fn cancel_cascade(pool: &PgPool, id: Uuid) -> Result<Vec<Build>> {
Ok(cancelled)
}
/// Restart a build by resetting it to pending state.
/// Only works for failed, completed, or cancelled builds.
pub async fn restart(pool: &PgPool, id: Uuid) -> Result<Build> {
sqlx::query_as::<_, Build>(
"UPDATE builds SET status = 'pending', started_at = NULL, completed_at = NULL, \
log_path = NULL, build_output_path = NULL, error_message = NULL, \
retry_count = retry_count + 1 \
WHERE id = $1 AND status IN ('failed', 'completed', 'cancelled') RETURNING *",
)
.bind(id)
.fetch_optional(pool)
.await?
.ok_or_else(|| {
CiError::NotFound(format!(
"Build {id} not found or not in a restartable state"
))
})
}
/// Mark a build's outputs as signed.
pub async fn mark_signed(pool: &PgPool, id: Uuid) -> Result<()> {
sqlx::query("UPDATE builds SET signed = true WHERE id = $1")

View file

@ -8,9 +8,10 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
let enabled = input.enabled.unwrap_or(true);
let flake_mode = input.flake_mode.unwrap_or(true);
let check_interval = input.check_interval.unwrap_or(60);
let scheduling_shares = input.scheduling_shares.unwrap_or(100);
sqlx::query_as::<_, Jobset>(
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *",
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval, branch, scheduling_shares) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *",
)
.bind(input.project_id)
.bind(&input.name)
@ -18,6 +19,8 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
.bind(enabled)
.bind(flake_mode)
.bind(check_interval)
.bind(&input.branch)
.bind(scheduling_shares)
.fetch_one(pool)
.await
.map_err(|e| match &e {
@ -70,15 +73,21 @@ pub async fn update(pool: &PgPool, id: Uuid, input: UpdateJobset) -> Result<Jobs
let enabled = input.enabled.unwrap_or(existing.enabled);
let flake_mode = input.flake_mode.unwrap_or(existing.flake_mode);
let check_interval = input.check_interval.unwrap_or(existing.check_interval);
let branch = input.branch.or(existing.branch);
let scheduling_shares = input
.scheduling_shares
.unwrap_or(existing.scheduling_shares);
sqlx::query_as::<_, Jobset>(
"UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, flake_mode = $4, check_interval = $5 WHERE id = $6 RETURNING *",
"UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, flake_mode = $4, check_interval = $5, branch = $6, scheduling_shares = $7 WHERE id = $8 RETURNING *",
)
.bind(&name)
.bind(&nix_expression)
.bind(enabled)
.bind(flake_mode)
.bind(check_interval)
.bind(&branch)
.bind(scheduling_shares)
.bind(id)
.fetch_one(pool)
.await
@ -107,15 +116,18 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
let enabled = input.enabled.unwrap_or(true);
let flake_mode = input.flake_mode.unwrap_or(true);
let check_interval = input.check_interval.unwrap_or(60);
let scheduling_shares = input.scheduling_shares.unwrap_or(100);
sqlx::query_as::<_, Jobset>(
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval) \
VALUES ($1, $2, $3, $4, $5, $6) \
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval, branch, scheduling_shares) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
ON CONFLICT (project_id, name) DO UPDATE SET \
nix_expression = EXCLUDED.nix_expression, \
enabled = EXCLUDED.enabled, \
flake_mode = EXCLUDED.flake_mode, \
check_interval = EXCLUDED.check_interval \
check_interval = EXCLUDED.check_interval, \
branch = EXCLUDED.branch, \
scheduling_shares = EXCLUDED.scheduling_shares \
RETURNING *",
)
.bind(input.project_id)
@ -124,6 +136,8 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
.bind(enabled)
.bind(flake_mode)
.bind(check_interval)
.bind(&input.branch)
.bind(scheduling_shares)
.fetch_one(pool)
.await
.map_err(CiError::Database)

View file

@ -81,6 +81,22 @@ pub async fn update(pool: &PgPool, id: Uuid, input: UpdateProject) -> Result<Pro
})
}
pub async fn upsert(pool: &PgPool, input: CreateProject) -> Result<Project> {
sqlx::query_as::<_, Project>(
"INSERT INTO projects (name, description, repository_url) VALUES ($1, $2, $3) \
ON CONFLICT (name) DO UPDATE SET \
description = EXCLUDED.description, \
repository_url = EXCLUDED.repository_url \
RETURNING *",
)
.bind(&input.name)
.bind(&input.description)
.bind(&input.repository_url)
.fetch_one(pool)
.await
.map_err(CiError::Database)
}
pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
let result = sqlx::query("DELETE FROM projects WHERE id = $1")
.bind(id)