Compare commits
8 commits
70ca754fa5
...
1336f998bf
| Author | SHA1 | Date | |
|---|---|---|---|
|
1336f998bf |
|||
|
1e28c31077 |
|||
|
85a43c8ca0 |
|||
|
5bf755960d |
|||
|
569da83b78 |
|||
|
a1c0142fb0 |
|||
|
a132591228 |
|||
|
a127f3f62c |
7 changed files with 256 additions and 9 deletions
|
|
@ -65,12 +65,12 @@ fn to_flake_ref(url: &str) -> String {
|
||||||
.unwrap_or(url_trimmed);
|
.unwrap_or(url_trimmed);
|
||||||
let without_dotgit = without_scheme.trim_end_matches(".git");
|
let without_dotgit = without_scheme.trim_end_matches(".git");
|
||||||
|
|
||||||
// github.com/owner/repo → github:owner/repo
|
// github.com/owner/repo -> github:owner/repo
|
||||||
if let Some(path) = without_dotgit.strip_prefix("github.com/") {
|
if let Some(path) = without_dotgit.strip_prefix("github.com/") {
|
||||||
return format!("github:{path}");
|
return format!("github:{path}");
|
||||||
}
|
}
|
||||||
|
|
||||||
// gitlab.com/owner/repo → gitlab:owner/repo
|
// gitlab.com/owner/repo -> gitlab:owner/repo
|
||||||
if let Some(path) = without_dotgit.strip_prefix("gitlab.com/") {
|
if let Some(path) = without_dotgit.strip_prefix("gitlab.com/") {
|
||||||
return format!("gitlab:{path}");
|
return format!("gitlab:{path}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -173,7 +173,151 @@ async fn enqueue_notifications(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send notifications immediately (legacy fire-and-forget behavior)
|
/// Enqueue commit status notifications for GitHub/GitLab/Gitea/Forgejo.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Logs database errors if task creation fails.
|
||||||
|
async fn enqueue_commit_status_notification(
|
||||||
|
pool: &PgPool,
|
||||||
|
build: &Build,
|
||||||
|
project: &Project,
|
||||||
|
commit_hash: &str,
|
||||||
|
config: &NotificationsConfig,
|
||||||
|
) {
|
||||||
|
let max_attempts = config.max_retry_attempts;
|
||||||
|
|
||||||
|
// GitHub commit status
|
||||||
|
if let Some(ref token) = config.github_token
|
||||||
|
&& project.repository_url.contains("github.com")
|
||||||
|
{
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"type": "github_status",
|
||||||
|
"token": token,
|
||||||
|
"repository_url": project.repository_url,
|
||||||
|
"commit_hash": commit_hash,
|
||||||
|
"build_id": build.id,
|
||||||
|
"build_status": build.status,
|
||||||
|
"build_job": build.job_name,
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = repo::notification_tasks::create(
|
||||||
|
pool,
|
||||||
|
"github_status",
|
||||||
|
payload,
|
||||||
|
max_attempts,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(build_id = %build.id, "Failed to enqueue GitHub status notification: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gitea/Forgejo commit status
|
||||||
|
if let (Some(url), Some(token)) = (&config.gitea_url, &config.gitea_token) {
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"type": "gitea_status",
|
||||||
|
"base_url": url,
|
||||||
|
"token": token,
|
||||||
|
"repository_url": project.repository_url,
|
||||||
|
"commit_hash": commit_hash,
|
||||||
|
"build_id": build.id,
|
||||||
|
"build_status": build.status,
|
||||||
|
"build_job": build.job_name,
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = repo::notification_tasks::create(
|
||||||
|
pool,
|
||||||
|
"gitea_status",
|
||||||
|
payload,
|
||||||
|
max_attempts,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(build_id = %build.id, "Failed to enqueue Gitea status notification: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GitLab commit status
|
||||||
|
if let (Some(url), Some(token)) = (&config.gitlab_url, &config.gitlab_token) {
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"type": "gitlab_status",
|
||||||
|
"base_url": url,
|
||||||
|
"token": token,
|
||||||
|
"repository_url": project.repository_url,
|
||||||
|
"commit_hash": commit_hash,
|
||||||
|
"build_id": build.id,
|
||||||
|
"build_status": build.status,
|
||||||
|
"build_job": build.job_name,
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = repo::notification_tasks::create(
|
||||||
|
pool,
|
||||||
|
"gitlab_status",
|
||||||
|
payload,
|
||||||
|
max_attempts,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(build_id = %build.id, "Failed to enqueue GitLab status notification: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatch commit status notification when a build is created (pending state).
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Logs database errors if task creation fails.
|
||||||
|
pub async fn dispatch_build_created(
|
||||||
|
pool: &PgPool,
|
||||||
|
build: &Build,
|
||||||
|
project: &Project,
|
||||||
|
commit_hash: &str,
|
||||||
|
config: &NotificationsConfig,
|
||||||
|
) {
|
||||||
|
if !config.enable_retry_queue {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueue_commit_status_notification(pool, build, project, commit_hash, config)
|
||||||
|
.await;
|
||||||
|
info!(
|
||||||
|
build_id = %build.id,
|
||||||
|
job = %build.job_name,
|
||||||
|
status = %build.status,
|
||||||
|
"Enqueued commit status notification for build creation"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatch commit status notification when a build starts (running state).
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Logs database errors if task creation fails.
|
||||||
|
pub async fn dispatch_build_started(
|
||||||
|
pool: &PgPool,
|
||||||
|
build: &Build,
|
||||||
|
project: &Project,
|
||||||
|
commit_hash: &str,
|
||||||
|
config: &NotificationsConfig,
|
||||||
|
) {
|
||||||
|
if !config.enable_retry_queue {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueue_commit_status_notification(pool, build, project, commit_hash, config)
|
||||||
|
.await;
|
||||||
|
info!(
|
||||||
|
build_id = %build.id,
|
||||||
|
job = %build.job_name,
|
||||||
|
status = %build.status,
|
||||||
|
"Enqueued commit status notification for build start"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send notifications immediately.
|
||||||
|
/// This is the "legacy" fire-and-forget behavior.
|
||||||
async fn send_notifications_immediate(
|
async fn send_notifications_immediate(
|
||||||
build: &Build,
|
build: &Build,
|
||||||
project: &Project,
|
project: &Project,
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ use uuid::Uuid;
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
pool: PgPool,
|
pool: PgPool,
|
||||||
config: EvaluatorConfig,
|
config: EvaluatorConfig,
|
||||||
|
notifications_config: fc_common::config::NotificationsConfig,
|
||||||
wakeup: Arc<Notify>,
|
wakeup: Arc<Notify>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let poll_interval = Duration::from_secs(config.poll_interval);
|
let poll_interval = Duration::from_secs(config.poll_interval);
|
||||||
|
|
@ -37,7 +38,15 @@ pub async fn run(
|
||||||
let strict = config.strict_errors;
|
let strict = config.strict_errors;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
|
if let Err(e) = run_cycle(
|
||||||
|
&pool,
|
||||||
|
&config,
|
||||||
|
¬ifications_config,
|
||||||
|
nix_timeout,
|
||||||
|
git_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
if strict {
|
if strict {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
@ -51,6 +60,7 @@ pub async fn run(
|
||||||
async fn run_cycle(
|
async fn run_cycle(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
config: &EvaluatorConfig,
|
config: &EvaluatorConfig,
|
||||||
|
notifications_config: &fc_common::config::NotificationsConfig,
|
||||||
nix_timeout: Duration,
|
nix_timeout: Duration,
|
||||||
git_timeout: Duration,
|
git_timeout: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
@ -76,8 +86,15 @@ async fn run_cycle(
|
||||||
stream::iter(ready)
|
stream::iter(ready)
|
||||||
.for_each_concurrent(max_concurrent, |jobset| {
|
.for_each_concurrent(max_concurrent, |jobset| {
|
||||||
async move {
|
async move {
|
||||||
if let Err(e) =
|
if let Err(e) = evaluate_jobset(
|
||||||
evaluate_jobset(pool, &jobset, config, nix_timeout, git_timeout).await
|
pool,
|
||||||
|
&jobset,
|
||||||
|
config,
|
||||||
|
notifications_config,
|
||||||
|
nix_timeout,
|
||||||
|
git_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
jobset_id = %jobset.id,
|
jobset_id = %jobset.id,
|
||||||
|
|
@ -111,6 +128,7 @@ async fn evaluate_jobset(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
jobset: &fc_common::models::ActiveJobset,
|
jobset: &fc_common::models::ActiveJobset,
|
||||||
config: &EvaluatorConfig,
|
config: &EvaluatorConfig,
|
||||||
|
notifications_config: &fc_common::config::NotificationsConfig,
|
||||||
nix_timeout: Duration,
|
nix_timeout: Duration,
|
||||||
git_timeout: Duration,
|
git_timeout: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
@ -326,6 +344,41 @@ async fn evaluate_jobset(
|
||||||
|
|
||||||
create_builds_from_eval(pool, eval.id, &eval_result).await?;
|
create_builds_from_eval(pool, eval.id, &eval_result).await?;
|
||||||
|
|
||||||
|
// Dispatch pending notifications for created builds
|
||||||
|
if notifications_config.enable_retry_queue {
|
||||||
|
if let Ok(project) = repo::projects::get(pool, jobset.project_id).await
|
||||||
|
{
|
||||||
|
if let Ok(builds) =
|
||||||
|
repo::builds::list_for_evaluation(pool, eval.id).await
|
||||||
|
{
|
||||||
|
for build in builds {
|
||||||
|
// Skip aggregate builds (they complete later when constituents
|
||||||
|
// finish)
|
||||||
|
if !build.is_aggregate {
|
||||||
|
fc_common::notifications::dispatch_build_created(
|
||||||
|
pool,
|
||||||
|
&build,
|
||||||
|
&project,
|
||||||
|
&eval.commit_hash,
|
||||||
|
notifications_config,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
eval_id = %eval.id,
|
||||||
|
"Failed to fetch builds for pending notifications"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
project_id = %jobset.project_id,
|
||||||
|
"Failed to fetch project for pending notifications"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
repo::evaluations::update_status(
|
repo::evaluations::update_status(
|
||||||
pool,
|
pool,
|
||||||
eval.id,
|
eval.id,
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let pool = db.pool().clone();
|
let pool = db.pool().clone();
|
||||||
let eval_config = config.evaluator;
|
let eval_config = config.evaluator;
|
||||||
|
let notifications_config = config.notifications;
|
||||||
|
|
||||||
let wakeup = Arc::new(tokio::sync::Notify::new());
|
let wakeup = Arc::new(tokio::sync::Notify::new());
|
||||||
let listener_handle = fc_common::pg_notify::spawn_listener(
|
let listener_handle = fc_common::pg_notify::spawn_listener(
|
||||||
|
|
@ -39,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_evaluator::eval_loop::run(pool, eval_config, wakeup) => {
|
result = fc_evaluator::eval_loop::run(pool, eval_config, notifications_config, wakeup) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Evaluator loop failed: {e}");
|
tracing::error!("Evaluator loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let active_builds = worker_pool.active_builds().clone();
|
let active_builds = worker_pool.active_builds().clone();
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache) => {
|
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Runner loop failed: {e}");
|
tracing::error!("Runner loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use fc_common::{
|
use fc_common::{
|
||||||
models::{BuildStatus, JobsetState},
|
models::{Build, BuildStatus, JobsetState},
|
||||||
repo,
|
repo,
|
||||||
};
|
};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
|
|
@ -9,6 +9,21 @@ use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::worker::WorkerPool;
|
use crate::worker::WorkerPool;
|
||||||
|
|
||||||
|
/// Fetch project and commit hash for a build by traversing:
|
||||||
|
///
|
||||||
|
/// Build -> Evaluation -> Jobset -> Project.
|
||||||
|
async fn get_project_for_build(
|
||||||
|
pool: &PgPool,
|
||||||
|
build: &Build,
|
||||||
|
) -> Option<(fc_common::models::Project, String)> {
|
||||||
|
let eval = repo::evaluations::get(pool, build.evaluation_id)
|
||||||
|
.await
|
||||||
|
.ok()?;
|
||||||
|
let jobset = repo::jobsets::get(pool, eval.jobset_id).await.ok()?;
|
||||||
|
let project = repo::projects::get(pool, jobset.project_id).await.ok()?;
|
||||||
|
Some((project, eval.commit_hash))
|
||||||
|
}
|
||||||
|
|
||||||
/// Main queue runner loop. Polls for pending builds and dispatches them to
|
/// Main queue runner loop. Polls for pending builds and dispatches them to
|
||||||
/// workers.
|
/// workers.
|
||||||
///
|
///
|
||||||
|
|
@ -22,6 +37,7 @@ pub async fn run(
|
||||||
wakeup: Arc<Notify>,
|
wakeup: Arc<Notify>,
|
||||||
strict_errors: bool,
|
strict_errors: bool,
|
||||||
failed_paths_cache: bool,
|
failed_paths_cache: bool,
|
||||||
|
notifications_config: fc_common::config::NotificationsConfig,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||||
|
|
@ -68,6 +84,23 @@ pub async fn run(
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::warn!(build_id = %build.id, "Failed to complete aggregate build: {e}");
|
tracing::warn!(build_id = %build.id, "Failed to complete aggregate build: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dispatch completion notification for aggregate build
|
||||||
|
if let Ok(updated_build) =
|
||||||
|
repo::builds::get(&pool, build.id).await
|
||||||
|
&& let Some((project, commit_hash)) =
|
||||||
|
get_project_for_build(&pool, &updated_build).await
|
||||||
|
{
|
||||||
|
fc_common::notifications::dispatch_build_finished(
|
||||||
|
Some(&pool),
|
||||||
|
&updated_build,
|
||||||
|
&project,
|
||||||
|
&commit_hash,
|
||||||
|
¬ifications_config,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -474,6 +474,22 @@ async fn run_build(
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let claimed_build = claimed.unwrap(); // Safe: we checked is_some()
|
||||||
|
|
||||||
|
// Dispatch build started notification
|
||||||
|
if let Some((project, commit_hash)) =
|
||||||
|
get_project_for_build(pool, &claimed_build).await
|
||||||
|
{
|
||||||
|
fc_common::notifications::dispatch_build_started(
|
||||||
|
pool,
|
||||||
|
&claimed_build,
|
||||||
|
&project,
|
||||||
|
&commit_hash,
|
||||||
|
notifications_config,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build");
|
tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build");
|
||||||
|
|
||||||
// Create a build step record
|
// Create a build step record
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue