diff --git a/crates/common/src/repo/notification_configs.rs b/crates/common/src/repo/notification_configs.rs index bbe01db..57d766f 100644 --- a/crates/common/src/repo/notification_configs.rs +++ b/crates/common/src/repo/notification_configs.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use uuid::Uuid; use crate::{ + config::DeclarativeNotification, error::{CiError, Result}, models::{CreateNotificationConfig, NotificationConfig}, }; @@ -58,3 +59,65 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { } Ok(()) } + +/// Upsert a notification config (insert or update on conflict). +pub async fn upsert( + pool: &PgPool, + project_id: Uuid, + notification_type: &str, + config: &serde_json::Value, + enabled: bool, +) -> Result { + sqlx::query_as::<_, NotificationConfig>( + "INSERT INTO notification_configs (project_id, notification_type, config, \ + enabled) VALUES ($1, $2, $3, $4) ON CONFLICT (project_id, notification_type) \ + DO UPDATE SET config = EXCLUDED.config, enabled = EXCLUDED.enabled \ + RETURNING *", + ) + .bind(project_id) + .bind(notification_type) + .bind(config) + .bind(enabled) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +/// Sync notification configs from declarative config. +/// Deletes configs not in the declarative list and upserts those that are. +pub async fn sync_for_project( + pool: &PgPool, + project_id: Uuid, + notifications: &[DeclarativeNotification], +) -> Result<()> { + // Get notification types from declarative config + let types: Vec<&str> = notifications + .iter() + .map(|n| n.notification_type.as_str()) + .collect(); + + // Delete notification configs not in declarative config + sqlx::query( + "DELETE FROM notification_configs WHERE project_id = $1 AND \ + notification_type != ALL($2::text[])", + ) + .bind(project_id) + .bind(&types) + .execute(pool) + .await + .map_err(CiError::Database)?; + + // Upsert each notification config + for notification in notifications { + upsert( + pool, + project_id, + ¬ification.notification_type, + ¬ification.config, + notification.enabled, + ) + .await?; + } + + Ok(()) +} diff --git a/crates/common/src/repo/webhook_configs.rs b/crates/common/src/repo/webhook_configs.rs index 115dac0..dadaa48 100644 --- a/crates/common/src/repo/webhook_configs.rs +++ b/crates/common/src/repo/webhook_configs.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use uuid::Uuid; use crate::{ + config::DeclarativeWebhook, error::{CiError, Result}, models::{CreateWebhookConfig, WebhookConfig}, }; @@ -83,3 +84,71 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { } Ok(()) } + +/// Upsert a webhook config (insert or update on conflict). +pub async fn upsert( + pool: &PgPool, + project_id: Uuid, + forge_type: &str, + secret_hash: Option<&str>, + enabled: bool, +) -> Result { + sqlx::query_as::<_, WebhookConfig>( + "INSERT INTO webhook_configs (project_id, forge_type, secret_hash, enabled) \ + VALUES ($1, $2, $3, $4) ON CONFLICT (project_id, forge_type) DO UPDATE SET \ + secret_hash = COALESCE(EXCLUDED.secret_hash, webhook_configs.secret_hash), \ + enabled = EXCLUDED.enabled RETURNING *", + ) + .bind(project_id) + .bind(forge_type) + .bind(secret_hash) + .bind(enabled) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +/// Sync webhook configs from declarative config. +/// Deletes configs not in the declarative list and upserts those that are. +pub async fn sync_for_project( + pool: &PgPool, + project_id: Uuid, + webhooks: &[DeclarativeWebhook], + resolve_secret: impl Fn(&DeclarativeWebhook) -> Option, +) -> Result<()> { + // Get forge types from declarative config + let types: Vec<&str> = webhooks.iter().map(|w| w.forge_type.as_str()).collect(); + + // Delete webhook configs not in declarative config + sqlx::query( + "DELETE FROM webhook_configs WHERE project_id = $1 AND forge_type != \ + ALL($2::text[])", + ) + .bind(project_id) + .bind(&types) + .execute(pool) + .await + .map_err(CiError::Database)?; + + // Upsert each webhook config + for webhook in webhooks { + let secret = resolve_secret(webhook); + let secret_hash = secret.as_ref().map(|s| { + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(s.as_bytes()); + hex::encode(hasher.finalize()) + }); + + upsert( + pool, + project_id, + &webhook.forge_type, + secret_hash.as_deref(), + webhook.enabled, + ) + .await?; + } + + Ok(()) +}