fc-common: add declarative sync for webhooks and notifications
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I0b7c46feba776837158507bfe883cbfa6a6a6964
This commit is contained in:
parent
dead111dfb
commit
7e6fc22ba2
2 changed files with 132 additions and 0 deletions
|
|
@ -2,6 +2,7 @@ use sqlx::PgPool;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
config::DeclarativeNotification,
|
||||||
error::{CiError, Result},
|
error::{CiError, Result},
|
||||||
models::{CreateNotificationConfig, NotificationConfig},
|
models::{CreateNotificationConfig, NotificationConfig},
|
||||||
};
|
};
|
||||||
|
|
@ -58,3 +59,65 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Upsert a notification config (insert or update on conflict).
|
||||||
|
pub async fn upsert(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Uuid,
|
||||||
|
notification_type: &str,
|
||||||
|
config: &serde_json::Value,
|
||||||
|
enabled: bool,
|
||||||
|
) -> Result<NotificationConfig> {
|
||||||
|
sqlx::query_as::<_, NotificationConfig>(
|
||||||
|
"INSERT INTO notification_configs (project_id, notification_type, config, \
|
||||||
|
enabled) VALUES ($1, $2, $3, $4) ON CONFLICT (project_id, notification_type) \
|
||||||
|
DO UPDATE SET config = EXCLUDED.config, enabled = EXCLUDED.enabled \
|
||||||
|
RETURNING *",
|
||||||
|
)
|
||||||
|
.bind(project_id)
|
||||||
|
.bind(notification_type)
|
||||||
|
.bind(config)
|
||||||
|
.bind(enabled)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sync notification configs from declarative config.
|
||||||
|
/// Deletes configs not in the declarative list and upserts those that are.
|
||||||
|
pub async fn sync_for_project(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Uuid,
|
||||||
|
notifications: &[DeclarativeNotification],
|
||||||
|
) -> Result<()> {
|
||||||
|
// Get notification types from declarative config
|
||||||
|
let types: Vec<&str> = notifications
|
||||||
|
.iter()
|
||||||
|
.map(|n| n.notification_type.as_str())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Delete notification configs not in declarative config
|
||||||
|
sqlx::query(
|
||||||
|
"DELETE FROM notification_configs WHERE project_id = $1 AND \
|
||||||
|
notification_type != ALL($2::text[])",
|
||||||
|
)
|
||||||
|
.bind(project_id)
|
||||||
|
.bind(&types)
|
||||||
|
.execute(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)?;
|
||||||
|
|
||||||
|
// Upsert each notification config
|
||||||
|
for notification in notifications {
|
||||||
|
upsert(
|
||||||
|
pool,
|
||||||
|
project_id,
|
||||||
|
¬ification.notification_type,
|
||||||
|
¬ification.config,
|
||||||
|
notification.enabled,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ use sqlx::PgPool;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
config::DeclarativeWebhook,
|
||||||
error::{CiError, Result},
|
error::{CiError, Result},
|
||||||
models::{CreateWebhookConfig, WebhookConfig},
|
models::{CreateWebhookConfig, WebhookConfig},
|
||||||
};
|
};
|
||||||
|
|
@ -83,3 +84,71 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Upsert a webhook config (insert or update on conflict).
|
||||||
|
pub async fn upsert(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Uuid,
|
||||||
|
forge_type: &str,
|
||||||
|
secret_hash: Option<&str>,
|
||||||
|
enabled: bool,
|
||||||
|
) -> Result<WebhookConfig> {
|
||||||
|
sqlx::query_as::<_, WebhookConfig>(
|
||||||
|
"INSERT INTO webhook_configs (project_id, forge_type, secret_hash, enabled) \
|
||||||
|
VALUES ($1, $2, $3, $4) ON CONFLICT (project_id, forge_type) DO UPDATE SET \
|
||||||
|
secret_hash = COALESCE(EXCLUDED.secret_hash, webhook_configs.secret_hash), \
|
||||||
|
enabled = EXCLUDED.enabled RETURNING *",
|
||||||
|
)
|
||||||
|
.bind(project_id)
|
||||||
|
.bind(forge_type)
|
||||||
|
.bind(secret_hash)
|
||||||
|
.bind(enabled)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sync webhook configs from declarative config.
|
||||||
|
/// Deletes configs not in the declarative list and upserts those that are.
|
||||||
|
pub async fn sync_for_project(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Uuid,
|
||||||
|
webhooks: &[DeclarativeWebhook],
|
||||||
|
resolve_secret: impl Fn(&DeclarativeWebhook) -> Option<String>,
|
||||||
|
) -> Result<()> {
|
||||||
|
// Get forge types from declarative config
|
||||||
|
let types: Vec<&str> = webhooks.iter().map(|w| w.forge_type.as_str()).collect();
|
||||||
|
|
||||||
|
// Delete webhook configs not in declarative config
|
||||||
|
sqlx::query(
|
||||||
|
"DELETE FROM webhook_configs WHERE project_id = $1 AND forge_type != \
|
||||||
|
ALL($2::text[])",
|
||||||
|
)
|
||||||
|
.bind(project_id)
|
||||||
|
.bind(&types)
|
||||||
|
.execute(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)?;
|
||||||
|
|
||||||
|
// Upsert each webhook config
|
||||||
|
for webhook in webhooks {
|
||||||
|
let secret = resolve_secret(webhook);
|
||||||
|
let secret_hash = secret.as_ref().map(|s| {
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(s.as_bytes());
|
||||||
|
hex::encode(hasher.finalize())
|
||||||
|
});
|
||||||
|
|
||||||
|
upsert(
|
||||||
|
pool,
|
||||||
|
project_id,
|
||||||
|
&webhook.forge_type,
|
||||||
|
secret_hash.as_deref(),
|
||||||
|
webhook.enabled,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue