From 21446c6dcb0d1969baf37a95da19ffcf194f5ac4 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Fri, 27 Feb 2026 21:19:32 +0300 Subject: [PATCH] fc-queue-runner: implement persistent notification retry queue with exponential backoff Adds a `notification_tasks` table and a background worker to (hopefully reliably) deliver webhooks, git status updates, and e-mail notifications with automatic retry on transient failures. This was one of the critical gaps, finally done. Signed-off-by: NotAShelf Change-Id: I794967c66958658c4d8aed40793d67f96a6a6964 --- crates/common/migrations/0001_schema.sql | 35 ++ crates/common/src/config.rs | 40 +- crates/common/src/models.rs | 27 ++ crates/common/src/notifications.rs | 476 +++++++++++++++++++ crates/common/src/repo/mod.rs | 1 + crates/common/src/repo/notification_tasks.rs | 179 +++++++ crates/queue-runner/src/main.rs | 98 ++++ crates/queue-runner/src/worker.rs | 1 + 8 files changed, 849 insertions(+), 8 deletions(-) create mode 100644 crates/common/src/repo/notification_tasks.rs diff --git a/crates/common/migrations/0001_schema.sql b/crates/common/migrations/0001_schema.sql index b327f56..c20f9b9 100644 --- a/crates/common/migrations/0001_schema.sql +++ b/crates/common/migrations/0001_schema.sql @@ -508,6 +508,41 @@ CREATE TRIGGER trg_jobsets_delete_notify AFTER DELETE ON jobsets FOR EACH ROW EXECUTE FUNCTION notify_jobsets_changed (); +-- notification_tasks: persistent notification retry queue +-- Stores notification delivery tasks with automatic retry and exponential backoff +CREATE TABLE notification_tasks ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (), + notification_type VARCHAR(50) NOT NULL CHECK ( + notification_type IN ( + 'webhook', + 'github_status', + 'gitea_status', + 'gitlab_status', + 'email' + ) + ), + payload JSONB NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK ( + status IN ('pending', 'running', 'completed', 'failed') + ), + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 5, + next_retry_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + last_error TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP WITH TIME ZONE +); + +-- Indexes: notification_tasks +CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks ( + status, + next_retry_at +) +WHERE + status IN ('pending', 'running'); + +CREATE INDEX idx_notification_tasks_created_at ON notification_tasks (created_at); + -- Views CREATE VIEW active_jobsets AS SELECT diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 283348d..6f0296b 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -134,14 +134,26 @@ impl std::fmt::Debug for GitHubOAuthConfig { #[serde(default)] #[derive(Default)] pub struct NotificationsConfig { - pub webhook_url: Option, - pub github_token: Option, - pub gitea_url: Option, - pub gitea_token: Option, - pub gitlab_url: Option, - pub gitlab_token: Option, - pub email: Option, - pub alerts: Option, + pub webhook_url: Option, + pub github_token: Option, + pub gitea_url: Option, + pub gitea_token: Option, + pub gitlab_url: Option, + pub gitlab_token: Option, + pub email: Option, + pub alerts: Option, + /// Enable notification retry queue (persistent, with exponential backoff) + #[serde(default = "default_true")] + pub enable_retry_queue: bool, + /// Maximum retry attempts per notification (default 5) + #[serde(default = "default_notification_max_attempts")] + pub max_retry_attempts: i32, + /// Retention period for old completed/failed tasks in days (default 7) + #[serde(default = "default_notification_retention_days")] + pub retention_days: i64, + /// Polling interval for retry worker in seconds (default 5) + #[serde(default = "default_notification_poll_interval")] + pub retry_poll_interval: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -434,6 +446,18 @@ fn default_role() -> String { "read-only".to_string() } +const fn default_notification_max_attempts() -> i32 { + 5 +} + +const fn default_notification_retention_days() -> i64 { + 7 +} + +const fn default_notification_poll_interval() -> u64 { + 5 +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct TracingConfig { diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index 742429d..fbda198 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -488,6 +488,33 @@ pub struct UserSession { pub last_used_at: Option>, } +/// Notification task for reliable delivery with retry +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct NotificationTask { + pub id: Uuid, + pub notification_type: String, + pub payload: serde_json::Value, + pub status: NotificationTaskStatus, + pub attempts: i32, + pub max_attempts: i32, + pub next_retry_at: DateTime, + pub last_error: Option, + pub created_at: DateTime, + pub completed_at: Option>, +} + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, +)] +#[serde(rename_all = "lowercase")] +#[sqlx(type_name = "varchar", rename_all = "lowercase")] +pub enum NotificationTaskStatus { + Pending, + Running, + Completed, + Failed, +} + // Pagination #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/common/src/notifications.rs b/crates/common/src/notifications.rs index 2933132..8ae09bd 100644 --- a/crates/common/src/notifications.rs +++ b/crates/common/src/notifications.rs @@ -2,11 +2,13 @@ use std::sync::OnceLock; +use sqlx::PgPool; use tracing::{error, info, warn}; use crate::{ config::{EmailConfig, NotificationsConfig}, models::{Build, BuildStatus, Project}, + repo, }; /// Shared HTTP client for all notification dispatches. @@ -17,7 +19,162 @@ fn http_client() -> &'static reqwest::Client { } /// Dispatch all configured notifications for a completed build. +/// If retry queue is enabled, enqueues tasks; otherwise sends immediately. pub async fn dispatch_build_finished( + pool: Option<&PgPool>, + build: &Build, + project: &Project, + commit_hash: &str, + config: &NotificationsConfig, +) { + // If retry queue is enabled and pool is available, enqueue tasks + if config.enable_retry_queue + && let Some(pool) = pool + { + enqueue_notifications(pool, build, project, commit_hash, config).await; + return; + } + + // Otherwise, send immediately (legacy fire-and-forget behavior) + send_notifications_immediate(build, project, commit_hash, config).await; +} + +/// Enqueue notification tasks for reliable delivery with retry +async fn enqueue_notifications( + pool: &PgPool, + build: &Build, + project: &Project, + commit_hash: &str, + config: &NotificationsConfig, +) { + let max_attempts = config.max_retry_attempts; + + // 1. Generic webhook notification + if let Some(ref url) = config.webhook_url { + let payload = serde_json::json!({ + "type": "webhook", + "url": url, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + "build_drv": build.drv_path, + "build_output": build.build_output_path, + "project_name": project.name, + "project_url": project.repository_url, + "commit_hash": commit_hash, + }); + + if let Err(e) = + repo::notification_tasks::create(pool, "webhook", payload, max_attempts) + .await + { + error!(build_id = %build.id, "Failed to enqueue webhook notification: {e}"); + } + } + + // 2. GitHub commit status + if let Some(ref token) = config.github_token + && project.repository_url.contains("github.com") + { + let payload = serde_json::json!({ + "type": "github_status", + "token": token, + "repository_url": project.repository_url, + "commit_hash": commit_hash, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + }); + + if let Err(e) = repo::notification_tasks::create( + pool, + "github_status", + payload, + max_attempts, + ) + .await + { + error!(build_id = %build.id, "Failed to enqueue GitHub status notification: {e}"); + } + } + + // 3. Gitea/Forgejo commit status + if let (Some(url), Some(token)) = (&config.gitea_url, &config.gitea_token) { + let payload = serde_json::json!({ + "type": "gitea_status", + "base_url": url, + "token": token, + "repository_url": project.repository_url, + "commit_hash": commit_hash, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + }); + + if let Err(e) = repo::notification_tasks::create( + pool, + "gitea_status", + payload, + max_attempts, + ) + .await + { + error!(build_id = %build.id, "Failed to enqueue Gitea status notification: {e}"); + } + } + + // 4. GitLab commit status + if let (Some(url), Some(token)) = (&config.gitlab_url, &config.gitlab_token) { + let payload = serde_json::json!({ + "type": "gitlab_status", + "base_url": url, + "token": token, + "repository_url": project.repository_url, + "commit_hash": commit_hash, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + }); + + if let Err(e) = repo::notification_tasks::create( + pool, + "gitlab_status", + payload, + max_attempts, + ) + .await + { + error!(build_id = %build.id, "Failed to enqueue GitLab status notification: {e}"); + } + } + + // 5. Email notification + let is_failure = !build.status.is_success(); + if let Some(ref email_config) = config.email + && (!email_config.on_failure_only || is_failure) + { + let payload = serde_json::json!({ + "type": "email", + "config": email_config, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + "build_drv": build.drv_path, + "build_output": build.build_output_path, + "project_name": project.name, + }); + + if let Err(e) = + repo::notification_tasks::create(pool, "email", payload, max_attempts) + .await + { + error!(build_id = %build.id, "Failed to enqueue email notification: {e}"); + } + } +} + +/// Send notifications immediately (legacy fire-and-forget behavior) +async fn send_notifications_immediate( build: &Build, project: &Project, commit_hash: &str, @@ -448,6 +605,325 @@ async fn send_email_notification( } } +/// Process a notification task from the retry queue +pub async fn process_notification_task( + task: &crate::models::NotificationTask, +) -> Result<(), String> { + let task_type = task.notification_type.as_str(); + let payload = &task.payload; + + match task_type { + "webhook" => { + let url = payload["url"] + .as_str() + .ok_or("Missing url in webhook payload")?; + let status_str = match payload["build_status"].as_str() { + Some("succeeded") | Some("cached_failure") => "success", + Some("failed") => "failure", + Some("cancelled") => "cancelled", + Some("aborted") => "aborted", + Some("unsupported_system") => "skipped", + _ => "pending", + }; + + let body = serde_json::json!({ + "build_id": payload["build_id"], + "build_status": status_str, + "build_job": payload["build_job"], + "build_drv": payload["build_drv"], + "build_output": payload["build_output"], + "project_name": payload["project_name"], + "project_url": payload["project_url"], + "commit_hash": payload["commit_hash"], + }); + + let resp = http_client() + .post(url) + .json(&body) + .send() + .await + .map_err(|e| format!("HTTP request failed: {e}"))?; + + if !resp.status().is_success() { + return Err(format!("Webhook returned status: {}", resp.status())); + } + + Ok(()) + }, + "github_status" => { + let token = payload["token"] + .as_str() + .ok_or("Missing token in github_status payload")?; + let repo_url = payload["repository_url"] + .as_str() + .ok_or("Missing repository_url")?; + let commit = payload["commit_hash"] + .as_str() + .ok_or("Missing commit_hash")?; + let job_name = + payload["build_job"].as_str().ok_or("Missing build_job")?; + + let (owner, repo) = parse_github_repo(repo_url) + .ok_or_else(|| format!("Cannot parse GitHub repo from {repo_url}"))?; + + let (state, description) = match payload["build_status"].as_str() { + Some("succeeded") | Some("cached_failure") => { + ("success", "Build succeeded") + }, + Some("failed") => ("failure", "Build failed"), + Some("running") => ("pending", "Build in progress"), + Some("cancelled") => ("error", "Build cancelled"), + _ => ("pending", "Build queued"), + }; + + let url = format!( + "https://api.github.com/repos/{owner}/{repo}/statuses/{commit}" + ); + let body = serde_json::json!({ + "state": state, + "description": description, + "context": format!("fc/{job_name}"), + }); + + let resp = http_client() + .post(&url) + .header("Authorization", format!("token {token}")) + .header("User-Agent", "fc-ci") + .header("Accept", "application/vnd.github+json") + .json(&body) + .send() + .await + .map_err(|e| format!("GitHub API request failed: {e}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + return Err(format!("GitHub API returned {status}: {text}")); + } + + Ok(()) + }, + "gitea_status" => { + let base_url = payload["base_url"] + .as_str() + .ok_or("Missing base_url in gitea_status payload")?; + let token = payload["token"].as_str().ok_or("Missing token")?; + let repo_url = payload["repository_url"] + .as_str() + .ok_or("Missing repository_url")?; + let commit = payload["commit_hash"] + .as_str() + .ok_or("Missing commit_hash")?; + let job_name = + payload["build_job"].as_str().ok_or("Missing build_job")?; + + let (owner, repo) = parse_gitea_repo(repo_url, base_url) + .ok_or_else(|| format!("Cannot parse Gitea repo from {repo_url}"))?; + + let (state, description) = match payload["build_status"].as_str() { + Some("succeeded") | Some("cached_failure") => { + ("success", "Build succeeded") + }, + Some("failed") => ("failure", "Build failed"), + Some("running") => ("pending", "Build in progress"), + Some("cancelled") => ("error", "Build cancelled"), + _ => ("pending", "Build queued"), + }; + + let url = + format!("{base_url}/api/v1/repos/{owner}/{repo}/statuses/{commit}"); + let body = serde_json::json!({ + "state": state, + "description": description, + "context": format!("fc/{job_name}"), + }); + + let resp = http_client() + .post(&url) + .header("Authorization", format!("token {token}")) + .json(&body) + .send() + .await + .map_err(|e| format!("Gitea API request failed: {e}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + return Err(format!("Gitea API returned {status}: {text}")); + } + + Ok(()) + }, + "gitlab_status" => { + let base_url = payload["base_url"] + .as_str() + .ok_or("Missing base_url in gitlab_status payload")?; + let token = payload["token"].as_str().ok_or("Missing token")?; + let repo_url = payload["repository_url"] + .as_str() + .ok_or("Missing repository_url")?; + let commit = payload["commit_hash"] + .as_str() + .ok_or("Missing commit_hash")?; + let job_name = + payload["build_job"].as_str().ok_or("Missing build_job")?; + + let project_path = + parse_gitlab_project(repo_url, base_url).ok_or_else(|| { + format!("Cannot parse GitLab project from {repo_url}") + })?; + + let (state, description) = match payload["build_status"].as_str() { + Some("succeeded") | Some("cached_failure") => { + ("success", "Build succeeded") + }, + Some("failed") => ("failed", "Build failed"), + Some("running") => ("running", "Build in progress"), + Some("cancelled") => ("canceled", "Build cancelled"), + _ => ("pending", "Build queued"), + }; + + let encoded_project = urlencoding::encode(&project_path); + let url = format!( + "{}/api/v4/projects/{}/statuses/{}", + base_url.trim_end_matches('/'), + encoded_project, + commit + ); + + let body = serde_json::json!({ + "state": state, + "description": description, + "name": format!("fc/{job_name}"), + }); + + let resp = http_client() + .post(&url) + .header("PRIVATE-TOKEN", token) + .json(&body) + .send() + .await + .map_err(|e| format!("GitLab API request failed: {e}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + return Err(format!("GitLab API returned {status}: {text}")); + } + + Ok(()) + }, + "email" => { + // Email sending is complex, so we'll reuse the existing function + // by deserializing the config from payload + let email_config: EmailConfig = + serde_json::from_value(payload["config"].clone()) + .map_err(|e| format!("Failed to deserialize email config: {e}"))?; + + // Create a minimal Build struct from payload + let build_id = payload["build_id"] + .as_str() + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + .ok_or("Invalid build_id")?; + let job_name = payload["build_job"] + .as_str() + .ok_or("Missing build_job")? + .to_string(); + let drv_path = payload["build_drv"] + .as_str() + .ok_or("Missing build_drv")? + .to_string(); + let build_output_path = + payload["build_output"].as_str().map(String::from); + + let status_str = payload["build_status"] + .as_str() + .ok_or("Missing build_status")?; + let status = match status_str { + "succeeded" => BuildStatus::Succeeded, + "failed" => BuildStatus::Failed, + _ => BuildStatus::Failed, + }; + + let project_name = payload["project_name"] + .as_str() + .ok_or("Missing project_name")?; + + // Simplified email send (direct implementation to avoid complex struct + // creation) + use lettre::{ + AsyncSmtpTransport, + AsyncTransport, + Message, + Tokio1Executor, + transport::smtp::authentication::Credentials, + }; + + let status_display = match status { + BuildStatus::Succeeded => "SUCCESS", + _ => "FAILURE", + }; + + let subject = + format!("[FC] {} - {} ({})", status_display, job_name, project_name); + let body = format!( + "Build notification from FC CI\n\nProject: {}\nJob: {}\nStatus: \ + {}\nDerivation: {}\nOutput: {}\nBuild ID: {}\n", + project_name, + job_name, + status_display, + drv_path, + build_output_path.as_deref().unwrap_or("N/A"), + build_id, + ); + + for to_addr in &email_config.to_addresses { + let email = Message::builder() + .from( + email_config + .from_address + .parse() + .map_err(|e| format!("Invalid from address: {e}"))?, + ) + .to( + to_addr + .parse() + .map_err(|e| format!("Invalid to address: {e}"))?, + ) + .subject(&subject) + .body(body.clone()) + .map_err(|e| format!("Failed to build email: {e}"))?; + + let mut mailer_builder = if email_config.tls { + AsyncSmtpTransport::::relay(&email_config.smtp_host) + .map_err(|e| format!("Failed to create SMTP transport: {e}"))? + } else { + AsyncSmtpTransport::::builder_dangerous( + &email_config.smtp_host, + ) + } + .port(email_config.smtp_port); + + if let (Some(user), Some(pass)) = + (&email_config.smtp_user, &email_config.smtp_password) + { + mailer_builder = mailer_builder + .credentials(Credentials::new(user.clone(), pass.clone())); + } + + let mailer = mailer_builder.build(); + mailer + .send(email) + .await + .map_err(|e| format!("Failed to send email: {e}"))?; + } + + Ok(()) + }, + _ => Err(format!("Unknown notification type: {task_type}")), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/common/src/repo/mod.rs b/crates/common/src/repo/mod.rs index 013db8e..df7da2a 100644 --- a/crates/common/src/repo/mod.rs +++ b/crates/common/src/repo/mod.rs @@ -10,6 +10,7 @@ pub mod failed_paths_cache; pub mod jobset_inputs; pub mod jobsets; pub mod notification_configs; +pub mod notification_tasks; pub mod project_members; pub mod projects; pub mod remote_builders; diff --git a/crates/common/src/repo/notification_tasks.rs b/crates/common/src/repo/notification_tasks.rs new file mode 100644 index 0000000..c2effdb --- /dev/null +++ b/crates/common/src/repo/notification_tasks.rs @@ -0,0 +1,179 @@ +//! Database operations for notification task retry queue + +use sqlx::PgPool; +use uuid::Uuid; + +use crate::{error::Result, models::NotificationTask}; + +/// Create a new notification task for later delivery +pub async fn create( + pool: &PgPool, + notification_type: &str, + payload: serde_json::Value, + max_attempts: i32, +) -> Result { + let task = sqlx::query_as::<_, NotificationTask>( + r#" + INSERT INTO notification_tasks (notification_type, payload, max_attempts) + VALUES ($1, $2, $3) + RETURNING * + "#, + ) + .bind(notification_type) + .bind(payload) + .bind(max_attempts) + .fetch_one(pool) + .await?; + + Ok(task) +} + +/// Fetch pending tasks that are ready for retry +pub async fn list_pending( + pool: &PgPool, + limit: i32, +) -> Result> { + let tasks = sqlx::query_as::<_, NotificationTask>( + r#" + SELECT * + FROM notification_tasks + WHERE status = 'pending' + AND next_retry_at <= NOW() + ORDER BY next_retry_at ASC + LIMIT $1 + "#, + ) + .bind(limit) + .fetch_all(pool) + .await?; + + Ok(tasks) +} + +/// Mark a task as running (claimed by worker) +pub async fn mark_running(pool: &PgPool, task_id: Uuid) -> Result<()> { + sqlx::query( + r#" + UPDATE notification_tasks + SET status = 'running', + attempts = attempts + 1 + WHERE id = $1 + "#, + ) + .bind(task_id) + .execute(pool) + .await?; + + Ok(()) +} + +/// Mark a task as completed successfully +pub async fn mark_completed(pool: &PgPool, task_id: Uuid) -> Result<()> { + sqlx::query( + r#" + UPDATE notification_tasks + SET status = 'completed', + completed_at = NOW() + WHERE id = $1 + "#, + ) + .bind(task_id) + .execute(pool) + .await?; + + Ok(()) +} + +/// Mark a task as failed and schedule retry with exponential backoff +/// Backoff formula: 1s, 2s, 4s, 8s, 16s... +pub async fn mark_failed_and_retry( + pool: &PgPool, + task_id: Uuid, + error: &str, +) -> Result<()> { + sqlx::query( + r#" + UPDATE notification_tasks + SET status = CASE + WHEN attempts >= max_attempts THEN 'failed'::varchar + ELSE 'pending'::varchar + END, + last_error = $2, + next_retry_at = CASE + WHEN attempts >= max_attempts THEN NOW() + ELSE NOW() + (POWER(2, attempts) || ' seconds')::interval + END, + completed_at = CASE + WHEN attempts >= max_attempts THEN NOW() + ELSE NULL + END + WHERE id = $1 + "#, + ) + .bind(task_id) + .bind(error) + .execute(pool) + .await?; + + Ok(()) +} + +/// Get task by ID +pub async fn get(pool: &PgPool, task_id: Uuid) -> Result { + let task = sqlx::query_as::<_, NotificationTask>( + r#" + SELECT * FROM notification_tasks WHERE id = $1 + "#, + ) + .bind(task_id) + .fetch_one(pool) + .await?; + + Ok(task) +} + +/// Clean up old completed/failed tasks (older than retention days) +pub async fn cleanup_old_tasks( + pool: &PgPool, + retention_days: i64, +) -> Result { + let result = sqlx::query( + r#" + DELETE FROM notification_tasks + WHERE status IN ('completed', 'failed') + AND (completed_at < NOW() - ($1 || ' days')::interval + OR created_at < NOW() - ($1 || ' days')::interval) + "#, + ) + .bind(retention_days) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + +/// Count pending tasks (for monitoring) +pub async fn count_pending(pool: &PgPool) -> Result { + let count: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) FROM notification_tasks WHERE status = 'pending' + "#, + ) + .fetch_one(pool) + .await?; + + Ok(count.0) +} + +/// Count failed tasks (for monitoring) +pub async fn count_failed(pool: &PgPool) -> Result { + let count: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) FROM notification_tasks WHERE status = 'failed' + "#, + ) + .fetch_one(pool) + .await?; + + Ok(count.0) +} diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index 4cd817e..70a8b58 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -90,6 +90,7 @@ async fn main() -> anyhow::Result<()> { () = gc_loop(gc_config_for_loop, db.pool().clone()) => {} () = failed_paths_cleanup_loop(db.pool().clone(), failed_paths_ttl, failed_paths_cache) => {} () = cancel_checker_loop(db.pool().clone(), active_builds) => {} + () = notification_retry_loop(db.pool().clone(), notifications_config.clone()) => {} () = shutdown_signal() => { tracing::info!("Shutdown signal received, draining in-flight builds..."); worker_pool_for_drain.drain(); @@ -218,6 +219,103 @@ async fn cancel_checker_loop(pool: sqlx::PgPool, active_builds: ActiveBuilds) { } } +async fn notification_retry_loop( + pool: sqlx::PgPool, + config: fc_common::config::NotificationsConfig, +) { + if !config.enable_retry_queue { + return std::future::pending().await; + } + + let poll_interval = + std::time::Duration::from_secs(config.retry_poll_interval); + let retention_days = config.retention_days; + + let cleanup_pool = pool.clone(); + tokio::spawn(async move { + let cleanup_interval = std::time::Duration::from_secs(3600); + loop { + tokio::time::sleep(cleanup_interval).await; + match repo::notification_tasks::cleanup_old_tasks( + &cleanup_pool, + retention_days, + ) + .await + { + Ok(count) if count > 0 => { + tracing::info!(count, "Cleaned up old notification tasks"); + }, + Ok(_) => {}, + Err(e) => { + tracing::error!("Notification task cleanup failed: {e}"); + }, + } + } + }); + + loop { + tokio::time::sleep(poll_interval).await; + + let tasks = match repo::notification_tasks::list_pending(&pool, 10).await { + Ok(t) => t, + Err(e) => { + tracing::warn!("Failed to fetch pending notification tasks: {e}"); + continue; + }, + }; + + for task in tasks { + if let Err(e) = + repo::notification_tasks::mark_running(&pool, task.id).await + { + tracing::warn!(task_id = %task.id, "Failed to mark task as running: {e}"); + continue; + } + + match fc_common::notifications::process_notification_task(&task).await { + Ok(()) => { + if let Err(e) = + repo::notification_tasks::mark_completed(&pool, task.id).await + { + tracing::error!(task_id = %task.id, "Failed to mark task as completed: {e}"); + } else { + tracing::info!( + task_id = %task.id, + notification_type = %task.notification_type, + attempts = task.attempts + 1, + "Notification task completed" + ); + } + }, + Err(err) => { + if let Err(e) = repo::notification_tasks::mark_failed_and_retry( + &pool, task.id, &err, + ) + .await + { + tracing::error!(task_id = %task.id, "Failed to update task status: {e}"); + } else { + let status_after = if task.attempts + 1 >= task.max_attempts { + "failed permanently" + } else { + "scheduled for retry" + }; + tracing::warn!( + task_id = %task.id, + notification_type = %task.notification_type, + attempts = task.attempts + 1, + max_attempts = task.max_attempts, + error = %err, + status = status_after, + "Notification task failed" + ); + } + }, + } + } + } +} + async fn shutdown_signal() { let ctrl_c = async { tokio::signal::ctrl_c() diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index 6d26c4b..094c58e 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -834,6 +834,7 @@ async fn run_build( get_project_for_build(pool, build).await { fc_common::notifications::dispatch_build_finished( + Some(pool), &updated_build, &project, &commit_hash,