diff --git a/crates/common/src/nix_probe.rs b/crates/common/src/nix_probe.rs index 478c39d..14316cb 100644 --- a/crates/common/src/nix_probe.rs +++ b/crates/common/src/nix_probe.rs @@ -65,12 +65,12 @@ fn to_flake_ref(url: &str) -> String { .unwrap_or(url_trimmed); let without_dotgit = without_scheme.trim_end_matches(".git"); - // github.com/owner/repo → github:owner/repo + // github.com/owner/repo -> github:owner/repo if let Some(path) = without_dotgit.strip_prefix("github.com/") { return format!("github:{path}"); } - // gitlab.com/owner/repo → gitlab:owner/repo + // gitlab.com/owner/repo -> gitlab:owner/repo if let Some(path) = without_dotgit.strip_prefix("gitlab.com/") { return format!("gitlab:{path}"); } diff --git a/crates/common/src/notifications.rs b/crates/common/src/notifications.rs index d6eda62..b97b766 100644 --- a/crates/common/src/notifications.rs +++ b/crates/common/src/notifications.rs @@ -173,7 +173,151 @@ async fn enqueue_notifications( } } -/// Send notifications immediately (legacy fire-and-forget behavior) +/// Enqueue commit status notifications for GitHub/GitLab/Gitea/Forgejo. +/// +/// # Errors +/// +/// Logs database errors if task creation fails. +async fn enqueue_commit_status_notification( + pool: &PgPool, + build: &Build, + project: &Project, + commit_hash: &str, + config: &NotificationsConfig, +) { + let max_attempts = config.max_retry_attempts; + + // GitHub commit status + if let Some(ref token) = config.github_token + && project.repository_url.contains("github.com") + { + let payload = serde_json::json!({ + "type": "github_status", + "token": token, + "repository_url": project.repository_url, + "commit_hash": commit_hash, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + }); + + if let Err(e) = repo::notification_tasks::create( + pool, + "github_status", + payload, + max_attempts, + ) + .await + { + error!(build_id = %build.id, "Failed to enqueue GitHub status notification: {e}"); + } + } + + // Gitea/Forgejo commit status + if let (Some(url), Some(token)) = (&config.gitea_url, &config.gitea_token) { + let payload = serde_json::json!({ + "type": "gitea_status", + "base_url": url, + "token": token, + "repository_url": project.repository_url, + "commit_hash": commit_hash, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + }); + + if let Err(e) = repo::notification_tasks::create( + pool, + "gitea_status", + payload, + max_attempts, + ) + .await + { + error!(build_id = %build.id, "Failed to enqueue Gitea status notification: {e}"); + } + } + + // GitLab commit status + if let (Some(url), Some(token)) = (&config.gitlab_url, &config.gitlab_token) { + let payload = serde_json::json!({ + "type": "gitlab_status", + "base_url": url, + "token": token, + "repository_url": project.repository_url, + "commit_hash": commit_hash, + "build_id": build.id, + "build_status": build.status, + "build_job": build.job_name, + }); + + if let Err(e) = repo::notification_tasks::create( + pool, + "gitlab_status", + payload, + max_attempts, + ) + .await + { + error!(build_id = %build.id, "Failed to enqueue GitLab status notification: {e}"); + } + } +} + +/// Dispatch commit status notification when a build is created (pending state). +/// +/// # Errors +/// +/// Logs database errors if task creation fails. +pub async fn dispatch_build_created( + pool: &PgPool, + build: &Build, + project: &Project, + commit_hash: &str, + config: &NotificationsConfig, +) { + if !config.enable_retry_queue { + return; + } + + enqueue_commit_status_notification(pool, build, project, commit_hash, config) + .await; + info!( + build_id = %build.id, + job = %build.job_name, + status = %build.status, + "Enqueued commit status notification for build creation" + ); +} + +/// Dispatch commit status notification when a build starts (running state). +/// +/// # Errors +/// +/// Logs database errors if task creation fails. +pub async fn dispatch_build_started( + pool: &PgPool, + build: &Build, + project: &Project, + commit_hash: &str, + config: &NotificationsConfig, +) { + if !config.enable_retry_queue { + return; + } + + enqueue_commit_status_notification(pool, build, project, commit_hash, config) + .await; + info!( + build_id = %build.id, + job = %build.job_name, + status = %build.status, + "Enqueued commit status notification for build start" + ); +} + +/// Send notifications immediately. +/// This is the "legacy" fire-and-forget behavior. async fn send_notifications_immediate( build: &Build, project: &Project, diff --git a/crates/evaluator/src/eval_loop.rs b/crates/evaluator/src/eval_loop.rs index 3847520..ba5fe6d 100644 --- a/crates/evaluator/src/eval_loop.rs +++ b/crates/evaluator/src/eval_loop.rs @@ -28,6 +28,7 @@ use uuid::Uuid; pub async fn run( pool: PgPool, config: EvaluatorConfig, + notifications_config: fc_common::config::NotificationsConfig, wakeup: Arc, ) -> anyhow::Result<()> { let poll_interval = Duration::from_secs(config.poll_interval); @@ -37,7 +38,15 @@ pub async fn run( let strict = config.strict_errors; loop { - if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await { + if let Err(e) = run_cycle( + &pool, + &config, + ¬ifications_config, + nix_timeout, + git_timeout, + ) + .await + { if strict { return Err(e); } @@ -51,6 +60,7 @@ pub async fn run( async fn run_cycle( pool: &PgPool, config: &EvaluatorConfig, + notifications_config: &fc_common::config::NotificationsConfig, nix_timeout: Duration, git_timeout: Duration, ) -> anyhow::Result<()> { @@ -76,8 +86,15 @@ async fn run_cycle( stream::iter(ready) .for_each_concurrent(max_concurrent, |jobset| { async move { - if let Err(e) = - evaluate_jobset(pool, &jobset, config, nix_timeout, git_timeout).await + if let Err(e) = evaluate_jobset( + pool, + &jobset, + config, + notifications_config, + nix_timeout, + git_timeout, + ) + .await { tracing::error!( jobset_id = %jobset.id, @@ -111,6 +128,7 @@ async fn evaluate_jobset( pool: &PgPool, jobset: &fc_common::models::ActiveJobset, config: &EvaluatorConfig, + notifications_config: &fc_common::config::NotificationsConfig, nix_timeout: Duration, git_timeout: Duration, ) -> anyhow::Result<()> { @@ -326,6 +344,41 @@ async fn evaluate_jobset( create_builds_from_eval(pool, eval.id, &eval_result).await?; + // Dispatch pending notifications for created builds + if notifications_config.enable_retry_queue { + if let Ok(project) = repo::projects::get(pool, jobset.project_id).await + { + if let Ok(builds) = + repo::builds::list_for_evaluation(pool, eval.id).await + { + for build in builds { + // Skip aggregate builds (they complete later when constituents + // finish) + if !build.is_aggregate { + fc_common::notifications::dispatch_build_created( + pool, + &build, + &project, + &eval.commit_hash, + notifications_config, + ) + .await; + } + } + } else { + tracing::warn!( + eval_id = %eval.id, + "Failed to fetch builds for pending notifications" + ); + } + } else { + tracing::warn!( + project_id = %jobset.project_id, + "Failed to fetch project for pending notifications" + ); + } + } + repo::evaluations::update_status( pool, eval.id, diff --git a/crates/evaluator/src/main.rs b/crates/evaluator/src/main.rs index 8d5771b..736dc4e 100644 --- a/crates/evaluator/src/main.rs +++ b/crates/evaluator/src/main.rs @@ -30,6 +30,7 @@ async fn main() -> anyhow::Result<()> { let pool = db.pool().clone(); let eval_config = config.evaluator; + let notifications_config = config.notifications; let wakeup = Arc::new(tokio::sync::Notify::new()); let listener_handle = fc_common::pg_notify::spawn_listener( @@ -39,7 +40,7 @@ async fn main() -> anyhow::Result<()> { ); tokio::select! { - result = fc_evaluator::eval_loop::run(pool, eval_config, wakeup) => { + result = fc_evaluator::eval_loop::run(pool, eval_config, notifications_config, wakeup) => { if let Err(e) = result { tracing::error!("Evaluator loop failed: {e}"); } diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index 8fe4a73..f109d55 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -82,7 +82,7 @@ async fn main() -> anyhow::Result<()> { let active_builds = worker_pool.active_builds().clone(); tokio::select! { - result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache) => { + result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => { if let Err(e) = result { tracing::error!("Runner loop failed: {e}"); } diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs index 8494b0b..451f032 100644 --- a/crates/queue-runner/src/runner_loop.rs +++ b/crates/queue-runner/src/runner_loop.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use fc_common::{ - models::{BuildStatus, JobsetState}, + models::{Build, BuildStatus, JobsetState}, repo, }; use sqlx::PgPool; @@ -9,6 +9,21 @@ use tokio::sync::Notify; use crate::worker::WorkerPool; +/// Fetch project and commit hash for a build by traversing: +/// +/// Build -> Evaluation -> Jobset -> Project. +async fn get_project_for_build( + pool: &PgPool, + build: &Build, +) -> Option<(fc_common::models::Project, String)> { + let eval = repo::evaluations::get(pool, build.evaluation_id) + .await + .ok()?; + let jobset = repo::jobsets::get(pool, eval.jobset_id).await.ok()?; + let project = repo::projects::get(pool, jobset.project_id).await.ok()?; + Some((project, eval.commit_hash)) +} + /// Main queue runner loop. Polls for pending builds and dispatches them to /// workers. /// @@ -22,6 +37,7 @@ pub async fn run( wakeup: Arc, strict_errors: bool, failed_paths_cache: bool, + notifications_config: fc_common::config::NotificationsConfig, ) -> anyhow::Result<()> { // Reset orphaned builds from previous crashes (older than 5 minutes) match repo::builds::reset_orphaned(&pool, 300).await { @@ -68,6 +84,23 @@ pub async fn run( .await { tracing::warn!(build_id = %build.id, "Failed to complete aggregate build: {e}"); + continue; + } + + // Dispatch completion notification for aggregate build + if let Ok(updated_build) = + repo::builds::get(&pool, build.id).await + && let Some((project, commit_hash)) = + get_project_for_build(&pool, &updated_build).await + { + fc_common::notifications::dispatch_build_finished( + Some(&pool), + &updated_build, + &project, + &commit_hash, + ¬ifications_config, + ) + .await; } continue; }, diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index b9a79b7..4c0817a 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -474,6 +474,22 @@ async fn run_build( return Ok(()); } + let claimed_build = claimed.unwrap(); // Safe: we checked is_some() + + // Dispatch build started notification + if let Some((project, commit_hash)) = + get_project_for_build(pool, &claimed_build).await + { + fc_common::notifications::dispatch_build_started( + pool, + &claimed_build, + &project, + &commit_hash, + notifications_config, + ) + .await; + } + tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build"); // Create a build step record