From dead111dfbaac09cc0a0daac821d32f9976f26d2 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 8 Feb 2026 02:15:34 +0300 Subject: [PATCH] fc-common: add `upsert` and `sync_for_jobset` for declarative jobset inputs Signed-off-by: NotAShelf Change-Id: I40f2724aeb9615e8c37a397187064fa66a6a6964 --- crates/common/src/repo/jobset_inputs.rs | 65 +++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/crates/common/src/repo/jobset_inputs.rs b/crates/common/src/repo/jobset_inputs.rs index cc9385a..48f4ee7 100644 --- a/crates/common/src/repo/jobset_inputs.rs +++ b/crates/common/src/repo/jobset_inputs.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use uuid::Uuid; use crate::{ + config::DeclarativeJobsetInput, error::{CiError, Result}, models::JobsetInput, }; @@ -60,3 +61,67 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { } Ok(()) } + +/// Upsert a jobset input (insert or update on conflict). +pub async fn upsert( + pool: &PgPool, + jobset_id: Uuid, + name: &str, + input_type: &str, + value: &str, + revision: Option<&str>, +) -> Result { + sqlx::query_as::<_, JobsetInput>( + "INSERT INTO jobset_inputs (jobset_id, name, input_type, value, revision) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT (jobset_id, name) DO UPDATE SET \ + input_type = EXCLUDED.input_type, \ + value = EXCLUDED.value, \ + revision = EXCLUDED.revision \ + RETURNING *", + ) + .bind(jobset_id) + .bind(name) + .bind(input_type) + .bind(value) + .bind(revision) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +/// Sync jobset inputs from declarative config. +/// Deletes inputs not in the config and upserts those that are. +pub async fn sync_for_jobset( + pool: &PgPool, + jobset_id: Uuid, + inputs: &[DeclarativeJobsetInput], +) -> Result<()> { + // Get names from declarative config + let names: Vec<&str> = inputs.iter().map(|i| i.name.as_str()).collect(); + + // Delete inputs not in declarative config + sqlx::query( + "DELETE FROM jobset_inputs WHERE jobset_id = $1 AND name != ALL($2::text[])", + ) + .bind(jobset_id) + .bind(&names) + .execute(pool) + .await + .map_err(CiError::Database)?; + + // Upsert each input + for input in inputs { + upsert( + pool, + jobset_id, + &input.name, + &input.input_type, + &input.value, + input.revision.as_deref(), + ) + .await?; + } + + Ok(()) +}