fc-common: add upsert and sync_for_jobset for declarative jobset inputs
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I40f2724aeb9615e8c37a397187064fa66a6a6964
This commit is contained in:
parent
73919f2f9e
commit
dead111dfb
1 changed files with 65 additions and 0 deletions
|
|
@ -2,6 +2,7 @@ use sqlx::PgPool;
|
|||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
config::DeclarativeJobsetInput,
|
||||
error::{CiError, Result},
|
||||
models::JobsetInput,
|
||||
};
|
||||
|
|
@ -60,3 +61,67 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Upsert a jobset input (insert or update on conflict).
|
||||
pub async fn upsert(
|
||||
pool: &PgPool,
|
||||
jobset_id: Uuid,
|
||||
name: &str,
|
||||
input_type: &str,
|
||||
value: &str,
|
||||
revision: Option<&str>,
|
||||
) -> Result<JobsetInput> {
|
||||
sqlx::query_as::<_, JobsetInput>(
|
||||
"INSERT INTO jobset_inputs (jobset_id, name, input_type, value, revision) \
|
||||
VALUES ($1, $2, $3, $4, $5) \
|
||||
ON CONFLICT (jobset_id, name) DO UPDATE SET \
|
||||
input_type = EXCLUDED.input_type, \
|
||||
value = EXCLUDED.value, \
|
||||
revision = EXCLUDED.revision \
|
||||
RETURNING *",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.bind(name)
|
||||
.bind(input_type)
|
||||
.bind(value)
|
||||
.bind(revision)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Sync jobset inputs from declarative config.
|
||||
/// Deletes inputs not in the config and upserts those that are.
|
||||
pub async fn sync_for_jobset(
|
||||
pool: &PgPool,
|
||||
jobset_id: Uuid,
|
||||
inputs: &[DeclarativeJobsetInput],
|
||||
) -> Result<()> {
|
||||
// Get names from declarative config
|
||||
let names: Vec<&str> = inputs.iter().map(|i| i.name.as_str()).collect();
|
||||
|
||||
// Delete inputs not in declarative config
|
||||
sqlx::query(
|
||||
"DELETE FROM jobset_inputs WHERE jobset_id = $1 AND name != ALL($2::text[])",
|
||||
)
|
||||
.bind(jobset_id)
|
||||
.bind(&names)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
|
||||
// Upsert each input
|
||||
for input in inputs {
|
||||
upsert(
|
||||
pool,
|
||||
jobset_id,
|
||||
&input.name,
|
||||
&input.input_type,
|
||||
&input.value,
|
||||
input.revision.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue