fc-queue-runner: implement persistent notification retry queue with exponential backoff
Adds a `notification_tasks` table and a background worker to (hopefully reliably) deliver webhooks, git status updates, and e-mail notifications with automatic retry on transient failures. This was one of the critical gaps, finally done. Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I794967c66958658c4d8aed40793d67f96a6a6964
This commit is contained in:
parent
3807293eb7
commit
21446c6dcb
8 changed files with 849 additions and 8 deletions
|
|
@ -508,6 +508,41 @@ CREATE TRIGGER trg_jobsets_delete_notify
|
|||
AFTER DELETE ON jobsets FOR EACH ROW
|
||||
EXECUTE FUNCTION notify_jobsets_changed ();
|
||||
|
||||
-- notification_tasks: persistent notification retry queue
|
||||
-- Stores notification delivery tasks with automatic retry and exponential backoff
|
||||
CREATE TABLE notification_tasks (
|
||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (),
|
||||
notification_type VARCHAR(50) NOT NULL CHECK (
|
||||
notification_type IN (
|
||||
'webhook',
|
||||
'github_status',
|
||||
'gitea_status',
|
||||
'gitlab_status',
|
||||
'email'
|
||||
)
|
||||
),
|
||||
payload JSONB NOT NULL,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (
|
||||
status IN ('pending', 'running', 'completed', 'failed')
|
||||
),
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
max_attempts INTEGER NOT NULL DEFAULT 5,
|
||||
next_retry_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
last_error TEXT,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
completed_at TIMESTAMP WITH TIME ZONE
|
||||
);
|
||||
|
||||
-- Indexes: notification_tasks
|
||||
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (
|
||||
status,
|
||||
next_retry_at
|
||||
)
|
||||
WHERE
|
||||
status IN ('pending', 'running');
|
||||
|
||||
CREATE INDEX idx_notification_tasks_created_at ON notification_tasks (created_at);
|
||||
|
||||
-- Views
|
||||
CREATE VIEW active_jobsets AS
|
||||
SELECT
|
||||
|
|
|
|||
|
|
@ -134,14 +134,26 @@ impl std::fmt::Debug for GitHubOAuthConfig {
|
|||
#[serde(default)]
|
||||
#[derive(Default)]
|
||||
pub struct NotificationsConfig {
|
||||
pub webhook_url: Option<String>,
|
||||
pub github_token: Option<String>,
|
||||
pub gitea_url: Option<String>,
|
||||
pub gitea_token: Option<String>,
|
||||
pub gitlab_url: Option<String>,
|
||||
pub gitlab_token: Option<String>,
|
||||
pub email: Option<EmailConfig>,
|
||||
pub alerts: Option<AlertConfig>,
|
||||
pub webhook_url: Option<String>,
|
||||
pub github_token: Option<String>,
|
||||
pub gitea_url: Option<String>,
|
||||
pub gitea_token: Option<String>,
|
||||
pub gitlab_url: Option<String>,
|
||||
pub gitlab_token: Option<String>,
|
||||
pub email: Option<EmailConfig>,
|
||||
pub alerts: Option<AlertConfig>,
|
||||
/// Enable notification retry queue (persistent, with exponential backoff)
|
||||
#[serde(default = "default_true")]
|
||||
pub enable_retry_queue: bool,
|
||||
/// Maximum retry attempts per notification (default 5)
|
||||
#[serde(default = "default_notification_max_attempts")]
|
||||
pub max_retry_attempts: i32,
|
||||
/// Retention period for old completed/failed tasks in days (default 7)
|
||||
#[serde(default = "default_notification_retention_days")]
|
||||
pub retention_days: i64,
|
||||
/// Polling interval for retry worker in seconds (default 5)
|
||||
#[serde(default = "default_notification_poll_interval")]
|
||||
pub retry_poll_interval: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -434,6 +446,18 @@ fn default_role() -> String {
|
|||
"read-only".to_string()
|
||||
}
|
||||
|
||||
const fn default_notification_max_attempts() -> i32 {
|
||||
5
|
||||
}
|
||||
|
||||
const fn default_notification_retention_days() -> i64 {
|
||||
7
|
||||
}
|
||||
|
||||
const fn default_notification_poll_interval() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct TracingConfig {
|
||||
|
|
|
|||
|
|
@ -488,6 +488,33 @@ pub struct UserSession {
|
|||
pub last_used_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// Notification task for reliable delivery with retry
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||
pub struct NotificationTask {
|
||||
pub id: Uuid,
|
||||
pub notification_type: String,
|
||||
pub payload: serde_json::Value,
|
||||
pub status: NotificationTaskStatus,
|
||||
pub attempts: i32,
|
||||
pub max_attempts: i32,
|
||||
pub next_retry_at: DateTime<Utc>,
|
||||
pub last_error: Option<String>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type,
|
||||
)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
#[sqlx(type_name = "varchar", rename_all = "lowercase")]
|
||||
pub enum NotificationTaskStatus {
|
||||
Pending,
|
||||
Running,
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
// Pagination
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
|
|||
|
|
@ -2,11 +2,13 @@
|
|||
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use sqlx::PgPool;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::{
|
||||
config::{EmailConfig, NotificationsConfig},
|
||||
models::{Build, BuildStatus, Project},
|
||||
repo,
|
||||
};
|
||||
|
||||
/// Shared HTTP client for all notification dispatches.
|
||||
|
|
@ -17,7 +19,162 @@ fn http_client() -> &'static reqwest::Client {
|
|||
}
|
||||
|
||||
/// Dispatch all configured notifications for a completed build.
|
||||
/// If retry queue is enabled, enqueues tasks; otherwise sends immediately.
|
||||
pub async fn dispatch_build_finished(
|
||||
pool: Option<&PgPool>,
|
||||
build: &Build,
|
||||
project: &Project,
|
||||
commit_hash: &str,
|
||||
config: &NotificationsConfig,
|
||||
) {
|
||||
// If retry queue is enabled and pool is available, enqueue tasks
|
||||
if config.enable_retry_queue
|
||||
&& let Some(pool) = pool
|
||||
{
|
||||
enqueue_notifications(pool, build, project, commit_hash, config).await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise, send immediately (legacy fire-and-forget behavior)
|
||||
send_notifications_immediate(build, project, commit_hash, config).await;
|
||||
}
|
||||
|
||||
/// Enqueue notification tasks for reliable delivery with retry
|
||||
async fn enqueue_notifications(
|
||||
pool: &PgPool,
|
||||
build: &Build,
|
||||
project: &Project,
|
||||
commit_hash: &str,
|
||||
config: &NotificationsConfig,
|
||||
) {
|
||||
let max_attempts = config.max_retry_attempts;
|
||||
|
||||
// 1. Generic webhook notification
|
||||
if let Some(ref url) = config.webhook_url {
|
||||
let payload = serde_json::json!({
|
||||
"type": "webhook",
|
||||
"url": url,
|
||||
"build_id": build.id,
|
||||
"build_status": build.status,
|
||||
"build_job": build.job_name,
|
||||
"build_drv": build.drv_path,
|
||||
"build_output": build.build_output_path,
|
||||
"project_name": project.name,
|
||||
"project_url": project.repository_url,
|
||||
"commit_hash": commit_hash,
|
||||
});
|
||||
|
||||
if let Err(e) =
|
||||
repo::notification_tasks::create(pool, "webhook", payload, max_attempts)
|
||||
.await
|
||||
{
|
||||
error!(build_id = %build.id, "Failed to enqueue webhook notification: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// 2. GitHub commit status
|
||||
if let Some(ref token) = config.github_token
|
||||
&& project.repository_url.contains("github.com")
|
||||
{
|
||||
let payload = serde_json::json!({
|
||||
"type": "github_status",
|
||||
"token": token,
|
||||
"repository_url": project.repository_url,
|
||||
"commit_hash": commit_hash,
|
||||
"build_id": build.id,
|
||||
"build_status": build.status,
|
||||
"build_job": build.job_name,
|
||||
});
|
||||
|
||||
if let Err(e) = repo::notification_tasks::create(
|
||||
pool,
|
||||
"github_status",
|
||||
payload,
|
||||
max_attempts,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(build_id = %build.id, "Failed to enqueue GitHub status notification: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Gitea/Forgejo commit status
|
||||
if let (Some(url), Some(token)) = (&config.gitea_url, &config.gitea_token) {
|
||||
let payload = serde_json::json!({
|
||||
"type": "gitea_status",
|
||||
"base_url": url,
|
||||
"token": token,
|
||||
"repository_url": project.repository_url,
|
||||
"commit_hash": commit_hash,
|
||||
"build_id": build.id,
|
||||
"build_status": build.status,
|
||||
"build_job": build.job_name,
|
||||
});
|
||||
|
||||
if let Err(e) = repo::notification_tasks::create(
|
||||
pool,
|
||||
"gitea_status",
|
||||
payload,
|
||||
max_attempts,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(build_id = %build.id, "Failed to enqueue Gitea status notification: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// 4. GitLab commit status
|
||||
if let (Some(url), Some(token)) = (&config.gitlab_url, &config.gitlab_token) {
|
||||
let payload = serde_json::json!({
|
||||
"type": "gitlab_status",
|
||||
"base_url": url,
|
||||
"token": token,
|
||||
"repository_url": project.repository_url,
|
||||
"commit_hash": commit_hash,
|
||||
"build_id": build.id,
|
||||
"build_status": build.status,
|
||||
"build_job": build.job_name,
|
||||
});
|
||||
|
||||
if let Err(e) = repo::notification_tasks::create(
|
||||
pool,
|
||||
"gitlab_status",
|
||||
payload,
|
||||
max_attempts,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(build_id = %build.id, "Failed to enqueue GitLab status notification: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Email notification
|
||||
let is_failure = !build.status.is_success();
|
||||
if let Some(ref email_config) = config.email
|
||||
&& (!email_config.on_failure_only || is_failure)
|
||||
{
|
||||
let payload = serde_json::json!({
|
||||
"type": "email",
|
||||
"config": email_config,
|
||||
"build_id": build.id,
|
||||
"build_status": build.status,
|
||||
"build_job": build.job_name,
|
||||
"build_drv": build.drv_path,
|
||||
"build_output": build.build_output_path,
|
||||
"project_name": project.name,
|
||||
});
|
||||
|
||||
if let Err(e) =
|
||||
repo::notification_tasks::create(pool, "email", payload, max_attempts)
|
||||
.await
|
||||
{
|
||||
error!(build_id = %build.id, "Failed to enqueue email notification: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send notifications immediately (legacy fire-and-forget behavior)
|
||||
async fn send_notifications_immediate(
|
||||
build: &Build,
|
||||
project: &Project,
|
||||
commit_hash: &str,
|
||||
|
|
@ -448,6 +605,325 @@ async fn send_email_notification(
|
|||
}
|
||||
}
|
||||
|
||||
/// Process a notification task from the retry queue
|
||||
pub async fn process_notification_task(
|
||||
task: &crate::models::NotificationTask,
|
||||
) -> Result<(), String> {
|
||||
let task_type = task.notification_type.as_str();
|
||||
let payload = &task.payload;
|
||||
|
||||
match task_type {
|
||||
"webhook" => {
|
||||
let url = payload["url"]
|
||||
.as_str()
|
||||
.ok_or("Missing url in webhook payload")?;
|
||||
let status_str = match payload["build_status"].as_str() {
|
||||
Some("succeeded") | Some("cached_failure") => "success",
|
||||
Some("failed") => "failure",
|
||||
Some("cancelled") => "cancelled",
|
||||
Some("aborted") => "aborted",
|
||||
Some("unsupported_system") => "skipped",
|
||||
_ => "pending",
|
||||
};
|
||||
|
||||
let body = serde_json::json!({
|
||||
"build_id": payload["build_id"],
|
||||
"build_status": status_str,
|
||||
"build_job": payload["build_job"],
|
||||
"build_drv": payload["build_drv"],
|
||||
"build_output": payload["build_output"],
|
||||
"project_name": payload["project_name"],
|
||||
"project_url": payload["project_url"],
|
||||
"commit_hash": payload["commit_hash"],
|
||||
});
|
||||
|
||||
let resp = http_client()
|
||||
.post(url)
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("HTTP request failed: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(format!("Webhook returned status: {}", resp.status()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
"github_status" => {
|
||||
let token = payload["token"]
|
||||
.as_str()
|
||||
.ok_or("Missing token in github_status payload")?;
|
||||
let repo_url = payload["repository_url"]
|
||||
.as_str()
|
||||
.ok_or("Missing repository_url")?;
|
||||
let commit = payload["commit_hash"]
|
||||
.as_str()
|
||||
.ok_or("Missing commit_hash")?;
|
||||
let job_name =
|
||||
payload["build_job"].as_str().ok_or("Missing build_job")?;
|
||||
|
||||
let (owner, repo) = parse_github_repo(repo_url)
|
||||
.ok_or_else(|| format!("Cannot parse GitHub repo from {repo_url}"))?;
|
||||
|
||||
let (state, description) = match payload["build_status"].as_str() {
|
||||
Some("succeeded") | Some("cached_failure") => {
|
||||
("success", "Build succeeded")
|
||||
},
|
||||
Some("failed") => ("failure", "Build failed"),
|
||||
Some("running") => ("pending", "Build in progress"),
|
||||
Some("cancelled") => ("error", "Build cancelled"),
|
||||
_ => ("pending", "Build queued"),
|
||||
};
|
||||
|
||||
let url = format!(
|
||||
"https://api.github.com/repos/{owner}/{repo}/statuses/{commit}"
|
||||
);
|
||||
let body = serde_json::json!({
|
||||
"state": state,
|
||||
"description": description,
|
||||
"context": format!("fc/{job_name}"),
|
||||
});
|
||||
|
||||
let resp = http_client()
|
||||
.post(&url)
|
||||
.header("Authorization", format!("token {token}"))
|
||||
.header("User-Agent", "fc-ci")
|
||||
.header("Accept", "application/vnd.github+json")
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("GitHub API request failed: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
return Err(format!("GitHub API returned {status}: {text}"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
"gitea_status" => {
|
||||
let base_url = payload["base_url"]
|
||||
.as_str()
|
||||
.ok_or("Missing base_url in gitea_status payload")?;
|
||||
let token = payload["token"].as_str().ok_or("Missing token")?;
|
||||
let repo_url = payload["repository_url"]
|
||||
.as_str()
|
||||
.ok_or("Missing repository_url")?;
|
||||
let commit = payload["commit_hash"]
|
||||
.as_str()
|
||||
.ok_or("Missing commit_hash")?;
|
||||
let job_name =
|
||||
payload["build_job"].as_str().ok_or("Missing build_job")?;
|
||||
|
||||
let (owner, repo) = parse_gitea_repo(repo_url, base_url)
|
||||
.ok_or_else(|| format!("Cannot parse Gitea repo from {repo_url}"))?;
|
||||
|
||||
let (state, description) = match payload["build_status"].as_str() {
|
||||
Some("succeeded") | Some("cached_failure") => {
|
||||
("success", "Build succeeded")
|
||||
},
|
||||
Some("failed") => ("failure", "Build failed"),
|
||||
Some("running") => ("pending", "Build in progress"),
|
||||
Some("cancelled") => ("error", "Build cancelled"),
|
||||
_ => ("pending", "Build queued"),
|
||||
};
|
||||
|
||||
let url =
|
||||
format!("{base_url}/api/v1/repos/{owner}/{repo}/statuses/{commit}");
|
||||
let body = serde_json::json!({
|
||||
"state": state,
|
||||
"description": description,
|
||||
"context": format!("fc/{job_name}"),
|
||||
});
|
||||
|
||||
let resp = http_client()
|
||||
.post(&url)
|
||||
.header("Authorization", format!("token {token}"))
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Gitea API request failed: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
return Err(format!("Gitea API returned {status}: {text}"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
"gitlab_status" => {
|
||||
let base_url = payload["base_url"]
|
||||
.as_str()
|
||||
.ok_or("Missing base_url in gitlab_status payload")?;
|
||||
let token = payload["token"].as_str().ok_or("Missing token")?;
|
||||
let repo_url = payload["repository_url"]
|
||||
.as_str()
|
||||
.ok_or("Missing repository_url")?;
|
||||
let commit = payload["commit_hash"]
|
||||
.as_str()
|
||||
.ok_or("Missing commit_hash")?;
|
||||
let job_name =
|
||||
payload["build_job"].as_str().ok_or("Missing build_job")?;
|
||||
|
||||
let project_path =
|
||||
parse_gitlab_project(repo_url, base_url).ok_or_else(|| {
|
||||
format!("Cannot parse GitLab project from {repo_url}")
|
||||
})?;
|
||||
|
||||
let (state, description) = match payload["build_status"].as_str() {
|
||||
Some("succeeded") | Some("cached_failure") => {
|
||||
("success", "Build succeeded")
|
||||
},
|
||||
Some("failed") => ("failed", "Build failed"),
|
||||
Some("running") => ("running", "Build in progress"),
|
||||
Some("cancelled") => ("canceled", "Build cancelled"),
|
||||
_ => ("pending", "Build queued"),
|
||||
};
|
||||
|
||||
let encoded_project = urlencoding::encode(&project_path);
|
||||
let url = format!(
|
||||
"{}/api/v4/projects/{}/statuses/{}",
|
||||
base_url.trim_end_matches('/'),
|
||||
encoded_project,
|
||||
commit
|
||||
);
|
||||
|
||||
let body = serde_json::json!({
|
||||
"state": state,
|
||||
"description": description,
|
||||
"name": format!("fc/{job_name}"),
|
||||
});
|
||||
|
||||
let resp = http_client()
|
||||
.post(&url)
|
||||
.header("PRIVATE-TOKEN", token)
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("GitLab API request failed: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
return Err(format!("GitLab API returned {status}: {text}"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
"email" => {
|
||||
// Email sending is complex, so we'll reuse the existing function
|
||||
// by deserializing the config from payload
|
||||
let email_config: EmailConfig =
|
||||
serde_json::from_value(payload["config"].clone())
|
||||
.map_err(|e| format!("Failed to deserialize email config: {e}"))?;
|
||||
|
||||
// Create a minimal Build struct from payload
|
||||
let build_id = payload["build_id"]
|
||||
.as_str()
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok())
|
||||
.ok_or("Invalid build_id")?;
|
||||
let job_name = payload["build_job"]
|
||||
.as_str()
|
||||
.ok_or("Missing build_job")?
|
||||
.to_string();
|
||||
let drv_path = payload["build_drv"]
|
||||
.as_str()
|
||||
.ok_or("Missing build_drv")?
|
||||
.to_string();
|
||||
let build_output_path =
|
||||
payload["build_output"].as_str().map(String::from);
|
||||
|
||||
let status_str = payload["build_status"]
|
||||
.as_str()
|
||||
.ok_or("Missing build_status")?;
|
||||
let status = match status_str {
|
||||
"succeeded" => BuildStatus::Succeeded,
|
||||
"failed" => BuildStatus::Failed,
|
||||
_ => BuildStatus::Failed,
|
||||
};
|
||||
|
||||
let project_name = payload["project_name"]
|
||||
.as_str()
|
||||
.ok_or("Missing project_name")?;
|
||||
|
||||
// Simplified email send (direct implementation to avoid complex struct
|
||||
// creation)
|
||||
use lettre::{
|
||||
AsyncSmtpTransport,
|
||||
AsyncTransport,
|
||||
Message,
|
||||
Tokio1Executor,
|
||||
transport::smtp::authentication::Credentials,
|
||||
};
|
||||
|
||||
let status_display = match status {
|
||||
BuildStatus::Succeeded => "SUCCESS",
|
||||
_ => "FAILURE",
|
||||
};
|
||||
|
||||
let subject =
|
||||
format!("[FC] {} - {} ({})", status_display, job_name, project_name);
|
||||
let body = format!(
|
||||
"Build notification from FC CI\n\nProject: {}\nJob: {}\nStatus: \
|
||||
{}\nDerivation: {}\nOutput: {}\nBuild ID: {}\n",
|
||||
project_name,
|
||||
job_name,
|
||||
status_display,
|
||||
drv_path,
|
||||
build_output_path.as_deref().unwrap_or("N/A"),
|
||||
build_id,
|
||||
);
|
||||
|
||||
for to_addr in &email_config.to_addresses {
|
||||
let email = Message::builder()
|
||||
.from(
|
||||
email_config
|
||||
.from_address
|
||||
.parse()
|
||||
.map_err(|e| format!("Invalid from address: {e}"))?,
|
||||
)
|
||||
.to(
|
||||
to_addr
|
||||
.parse()
|
||||
.map_err(|e| format!("Invalid to address: {e}"))?,
|
||||
)
|
||||
.subject(&subject)
|
||||
.body(body.clone())
|
||||
.map_err(|e| format!("Failed to build email: {e}"))?;
|
||||
|
||||
let mut mailer_builder = if email_config.tls {
|
||||
AsyncSmtpTransport::<Tokio1Executor>::relay(&email_config.smtp_host)
|
||||
.map_err(|e| format!("Failed to create SMTP transport: {e}"))?
|
||||
} else {
|
||||
AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(
|
||||
&email_config.smtp_host,
|
||||
)
|
||||
}
|
||||
.port(email_config.smtp_port);
|
||||
|
||||
if let (Some(user), Some(pass)) =
|
||||
(&email_config.smtp_user, &email_config.smtp_password)
|
||||
{
|
||||
mailer_builder = mailer_builder
|
||||
.credentials(Credentials::new(user.clone(), pass.clone()));
|
||||
}
|
||||
|
||||
let mailer = mailer_builder.build();
|
||||
mailer
|
||||
.send(email)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to send email: {e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
_ => Err(format!("Unknown notification type: {task_type}")),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ pub mod failed_paths_cache;
|
|||
pub mod jobset_inputs;
|
||||
pub mod jobsets;
|
||||
pub mod notification_configs;
|
||||
pub mod notification_tasks;
|
||||
pub mod project_members;
|
||||
pub mod projects;
|
||||
pub mod remote_builders;
|
||||
|
|
|
|||
179
crates/common/src/repo/notification_tasks.rs
Normal file
179
crates/common/src/repo/notification_tasks.rs
Normal file
|
|
@ -0,0 +1,179 @@
|
|||
//! Database operations for notification task retry queue
|
||||
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{error::Result, models::NotificationTask};
|
||||
|
||||
/// Create a new notification task for later delivery
|
||||
pub async fn create(
|
||||
pool: &PgPool,
|
||||
notification_type: &str,
|
||||
payload: serde_json::Value,
|
||||
max_attempts: i32,
|
||||
) -> Result<NotificationTask> {
|
||||
let task = sqlx::query_as::<_, NotificationTask>(
|
||||
r#"
|
||||
INSERT INTO notification_tasks (notification_type, payload, max_attempts)
|
||||
VALUES ($1, $2, $3)
|
||||
RETURNING *
|
||||
"#,
|
||||
)
|
||||
.bind(notification_type)
|
||||
.bind(payload)
|
||||
.bind(max_attempts)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
/// Fetch pending tasks that are ready for retry
|
||||
pub async fn list_pending(
|
||||
pool: &PgPool,
|
||||
limit: i32,
|
||||
) -> Result<Vec<NotificationTask>> {
|
||||
let tasks = sqlx::query_as::<_, NotificationTask>(
|
||||
r#"
|
||||
SELECT *
|
||||
FROM notification_tasks
|
||||
WHERE status = 'pending'
|
||||
AND next_retry_at <= NOW()
|
||||
ORDER BY next_retry_at ASC
|
||||
LIMIT $1
|
||||
"#,
|
||||
)
|
||||
.bind(limit)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
/// Mark a task as running (claimed by worker)
|
||||
pub async fn mark_running(pool: &PgPool, task_id: Uuid) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE notification_tasks
|
||||
SET status = 'running',
|
||||
attempts = attempts + 1
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(task_id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark a task as completed successfully
|
||||
pub async fn mark_completed(pool: &PgPool, task_id: Uuid) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE notification_tasks
|
||||
SET status = 'completed',
|
||||
completed_at = NOW()
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(task_id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark a task as failed and schedule retry with exponential backoff
|
||||
/// Backoff formula: 1s, 2s, 4s, 8s, 16s...
|
||||
pub async fn mark_failed_and_retry(
|
||||
pool: &PgPool,
|
||||
task_id: Uuid,
|
||||
error: &str,
|
||||
) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE notification_tasks
|
||||
SET status = CASE
|
||||
WHEN attempts >= max_attempts THEN 'failed'::varchar
|
||||
ELSE 'pending'::varchar
|
||||
END,
|
||||
last_error = $2,
|
||||
next_retry_at = CASE
|
||||
WHEN attempts >= max_attempts THEN NOW()
|
||||
ELSE NOW() + (POWER(2, attempts) || ' seconds')::interval
|
||||
END,
|
||||
completed_at = CASE
|
||||
WHEN attempts >= max_attempts THEN NOW()
|
||||
ELSE NULL
|
||||
END
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(task_id)
|
||||
.bind(error)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get task by ID
|
||||
pub async fn get(pool: &PgPool, task_id: Uuid) -> Result<NotificationTask> {
|
||||
let task = sqlx::query_as::<_, NotificationTask>(
|
||||
r#"
|
||||
SELECT * FROM notification_tasks WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(task_id)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
/// Clean up old completed/failed tasks (older than retention days)
|
||||
pub async fn cleanup_old_tasks(
|
||||
pool: &PgPool,
|
||||
retention_days: i64,
|
||||
) -> Result<u64> {
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
DELETE FROM notification_tasks
|
||||
WHERE status IN ('completed', 'failed')
|
||||
AND (completed_at < NOW() - ($1 || ' days')::interval
|
||||
OR created_at < NOW() - ($1 || ' days')::interval)
|
||||
"#,
|
||||
)
|
||||
.bind(retention_days)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
||||
/// Count pending tasks (for monitoring)
|
||||
pub async fn count_pending(pool: &PgPool) -> Result<i64> {
|
||||
let count: (i64,) = sqlx::query_as(
|
||||
r#"
|
||||
SELECT COUNT(*) FROM notification_tasks WHERE status = 'pending'
|
||||
"#,
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(count.0)
|
||||
}
|
||||
|
||||
/// Count failed tasks (for monitoring)
|
||||
pub async fn count_failed(pool: &PgPool) -> Result<i64> {
|
||||
let count: (i64,) = sqlx::query_as(
|
||||
r#"
|
||||
SELECT COUNT(*) FROM notification_tasks WHERE status = 'failed'
|
||||
"#,
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(count.0)
|
||||
}
|
||||
|
|
@ -90,6 +90,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
() = gc_loop(gc_config_for_loop, db.pool().clone()) => {}
|
||||
() = failed_paths_cleanup_loop(db.pool().clone(), failed_paths_ttl, failed_paths_cache) => {}
|
||||
() = cancel_checker_loop(db.pool().clone(), active_builds) => {}
|
||||
() = notification_retry_loop(db.pool().clone(), notifications_config.clone()) => {}
|
||||
() = shutdown_signal() => {
|
||||
tracing::info!("Shutdown signal received, draining in-flight builds...");
|
||||
worker_pool_for_drain.drain();
|
||||
|
|
@ -218,6 +219,103 @@ async fn cancel_checker_loop(pool: sqlx::PgPool, active_builds: ActiveBuilds) {
|
|||
}
|
||||
}
|
||||
|
||||
async fn notification_retry_loop(
|
||||
pool: sqlx::PgPool,
|
||||
config: fc_common::config::NotificationsConfig,
|
||||
) {
|
||||
if !config.enable_retry_queue {
|
||||
return std::future::pending().await;
|
||||
}
|
||||
|
||||
let poll_interval =
|
||||
std::time::Duration::from_secs(config.retry_poll_interval);
|
||||
let retention_days = config.retention_days;
|
||||
|
||||
let cleanup_pool = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
let cleanup_interval = std::time::Duration::from_secs(3600);
|
||||
loop {
|
||||
tokio::time::sleep(cleanup_interval).await;
|
||||
match repo::notification_tasks::cleanup_old_tasks(
|
||||
&cleanup_pool,
|
||||
retention_days,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(count) if count > 0 => {
|
||||
tracing::info!(count, "Cleaned up old notification tasks");
|
||||
},
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
tracing::error!("Notification task cleanup failed: {e}");
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
|
||||
let tasks = match repo::notification_tasks::list_pending(&pool, 10).await {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to fetch pending notification tasks: {e}");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
for task in tasks {
|
||||
if let Err(e) =
|
||||
repo::notification_tasks::mark_running(&pool, task.id).await
|
||||
{
|
||||
tracing::warn!(task_id = %task.id, "Failed to mark task as running: {e}");
|
||||
continue;
|
||||
}
|
||||
|
||||
match fc_common::notifications::process_notification_task(&task).await {
|
||||
Ok(()) => {
|
||||
if let Err(e) =
|
||||
repo::notification_tasks::mark_completed(&pool, task.id).await
|
||||
{
|
||||
tracing::error!(task_id = %task.id, "Failed to mark task as completed: {e}");
|
||||
} else {
|
||||
tracing::info!(
|
||||
task_id = %task.id,
|
||||
notification_type = %task.notification_type,
|
||||
attempts = task.attempts + 1,
|
||||
"Notification task completed"
|
||||
);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
if let Err(e) = repo::notification_tasks::mark_failed_and_retry(
|
||||
&pool, task.id, &err,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(task_id = %task.id, "Failed to update task status: {e}");
|
||||
} else {
|
||||
let status_after = if task.attempts + 1 >= task.max_attempts {
|
||||
"failed permanently"
|
||||
} else {
|
||||
"scheduled for retry"
|
||||
};
|
||||
tracing::warn!(
|
||||
task_id = %task.id,
|
||||
notification_type = %task.notification_type,
|
||||
attempts = task.attempts + 1,
|
||||
max_attempts = task.max_attempts,
|
||||
error = %err,
|
||||
status = status_after,
|
||||
"Notification task failed"
|
||||
);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown_signal() {
|
||||
let ctrl_c = async {
|
||||
tokio::signal::ctrl_c()
|
||||
|
|
|
|||
|
|
@ -834,6 +834,7 @@ async fn run_build(
|
|||
get_project_for_build(pool, build).await
|
||||
{
|
||||
fc_common::notifications::dispatch_build_finished(
|
||||
Some(pool),
|
||||
&updated_build,
|
||||
&project,
|
||||
&commit_hash,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue