crates: production models and repo layer
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Iceb76724c09eaca7ca5d823010db76776a6a6964
This commit is contained in:
parent
17fb0bbe80
commit
1b12be3f8a
31 changed files with 3841 additions and 12 deletions
146
crates/common/src/repo/evaluations.rs
Normal file
146
crates/common/src/repo/evaluations.rs
Normal file
|
|
@ -0,0 +1,146 @@
|
|||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::{CiError, Result};
|
||||
use crate::models::{CreateEvaluation, Evaluation, EvaluationStatus};
|
||||
|
||||
pub async fn create(pool: &PgPool, input: CreateEvaluation) -> Result<Evaluation> {
|
||||
sqlx::query_as::<_, Evaluation>(
|
||||
"INSERT INTO evaluations (jobset_id, commit_hash, status) VALUES ($1, $2, 'pending') RETURNING *",
|
||||
)
|
||||
.bind(input.jobset_id)
|
||||
.bind(&input.commit_hash)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(|e| match &e {
|
||||
sqlx::Error::Database(db_err) if db_err.is_unique_violation() => {
|
||||
CiError::Conflict(format!(
|
||||
"Evaluation for commit '{}' already exists in this jobset",
|
||||
input.commit_hash
|
||||
))
|
||||
}
|
||||
_ => CiError::Database(e),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get(pool: &PgPool, id: Uuid) -> Result<Evaluation> {
|
||||
sqlx::query_as::<_, Evaluation>("SELECT * FROM evaluations WHERE id = $1")
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
.ok_or_else(|| CiError::NotFound(format!("Evaluation {id} not found")))
|
||||
}
|
||||
|
||||
pub async fn list_for_jobset(pool: &PgPool, jobset_id: Uuid) -> Result<Vec<Evaluation>> {
|
||||
sqlx::query_as::<_, Evaluation>(
|
||||
"SELECT * FROM evaluations WHERE jobset_id = $1 ORDER BY evaluation_time DESC",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// List evaluations with optional jobset_id and status filters, with pagination.
|
||||
pub async fn list_filtered(
|
||||
pool: &PgPool,
|
||||
jobset_id: Option<Uuid>,
|
||||
status: Option<&str>,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<Evaluation>> {
|
||||
sqlx::query_as::<_, Evaluation>(
|
||||
"SELECT * FROM evaluations \
|
||||
WHERE ($1::uuid IS NULL OR jobset_id = $1) \
|
||||
AND ($2::text IS NULL OR status = $2) \
|
||||
ORDER BY evaluation_time DESC LIMIT $3 OFFSET $4",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.bind(status)
|
||||
.bind(limit)
|
||||
.bind(offset)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
pub async fn count_filtered(
|
||||
pool: &PgPool,
|
||||
jobset_id: Option<Uuid>,
|
||||
status: Option<&str>,
|
||||
) -> Result<i64> {
|
||||
let row: (i64,) = sqlx::query_as(
|
||||
"SELECT COUNT(*) FROM evaluations \
|
||||
WHERE ($1::uuid IS NULL OR jobset_id = $1) \
|
||||
AND ($2::text IS NULL OR status = $2)",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.bind(status)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
Ok(row.0)
|
||||
}
|
||||
|
||||
pub async fn update_status(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
status: EvaluationStatus,
|
||||
error_message: Option<&str>,
|
||||
) -> Result<Evaluation> {
|
||||
sqlx::query_as::<_, Evaluation>(
|
||||
"UPDATE evaluations SET status = $1, error_message = $2 WHERE id = $3 RETURNING *",
|
||||
)
|
||||
.bind(status)
|
||||
.bind(error_message)
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
.ok_or_else(|| CiError::NotFound(format!("Evaluation {id} not found")))
|
||||
}
|
||||
|
||||
pub async fn get_latest(pool: &PgPool, jobset_id: Uuid) -> Result<Option<Evaluation>> {
|
||||
sqlx::query_as::<_, Evaluation>(
|
||||
"SELECT * FROM evaluations WHERE jobset_id = $1 ORDER BY evaluation_time DESC LIMIT 1",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Set the inputs hash for an evaluation (used for eval caching).
|
||||
pub async fn set_inputs_hash(pool: &PgPool, id: Uuid, hash: &str) -> Result<()> {
|
||||
sqlx::query("UPDATE evaluations SET inputs_hash = $1 WHERE id = $2")
|
||||
.bind(hash)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if an evaluation with the same inputs_hash already exists for this jobset.
|
||||
pub async fn get_by_inputs_hash(
|
||||
pool: &PgPool,
|
||||
jobset_id: Uuid,
|
||||
inputs_hash: &str,
|
||||
) -> Result<Option<Evaluation>> {
|
||||
sqlx::query_as::<_, Evaluation>(
|
||||
"SELECT * FROM evaluations WHERE jobset_id = $1 AND inputs_hash = $2 \
|
||||
AND status = 'completed' ORDER BY evaluation_time DESC LIMIT 1",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.bind(inputs_hash)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
pub async fn count(pool: &PgPool) -> Result<i64> {
|
||||
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM evaluations")
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
Ok(row.0)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue