Compare commits

..

5 commits

Author SHA1 Message Date
70ca754fa5
docs: clarify hydra components in design document
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I56a1dee4df5e3898aa7ef6e24ed7342c6a6a6964
2026-02-28 13:05:55 +03:00
0bc3bba704
docs: clarify remote machine setup in README
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I9a8e8e1e7ceb85296d9291f797ba1e3e6a6a6964
2026-02-28 13:05:54 +03:00
bffc6f4ec5
docs: update README with new config keys; link security document
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I8820dfb50f2b236a66d1e57aa36b6b0d6a6a6964
2026-02-28 13:05:54 +03:00
f812ca50b3
chore: bump dependencies; add clippy config
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I7e4d1b531e6d9f1fa824707a95fb3f2e6a6a6964
2026-02-28 12:18:22 +03:00
0ca92f2710
treewide: address all clippy lints
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I5cf55cc4cb558c3f9f764c71224e87176a6a6964
2026-02-28 12:18:21 +03:00
7 changed files with 9 additions and 256 deletions

View file

@ -65,12 +65,12 @@ fn to_flake_ref(url: &str) -> String {
.unwrap_or(url_trimmed); .unwrap_or(url_trimmed);
let without_dotgit = without_scheme.trim_end_matches(".git"); let without_dotgit = without_scheme.trim_end_matches(".git");
// github.com/owner/repo -> github:owner/repo // github.com/owner/repo github:owner/repo
if let Some(path) = without_dotgit.strip_prefix("github.com/") { if let Some(path) = without_dotgit.strip_prefix("github.com/") {
return format!("github:{path}"); return format!("github:{path}");
} }
// gitlab.com/owner/repo -> gitlab:owner/repo // gitlab.com/owner/repo gitlab:owner/repo
if let Some(path) = without_dotgit.strip_prefix("gitlab.com/") { if let Some(path) = without_dotgit.strip_prefix("gitlab.com/") {
return format!("gitlab:{path}"); return format!("gitlab:{path}");
} }

View file

@ -173,151 +173,7 @@ async fn enqueue_notifications(
} }
} }
/// Enqueue commit status notifications for GitHub/GitLab/Gitea/Forgejo. /// Send notifications immediately (legacy fire-and-forget behavior)
///
/// # Errors
///
/// Logs database errors if task creation fails.
async fn enqueue_commit_status_notification(
pool: &PgPool,
build: &Build,
project: &Project,
commit_hash: &str,
config: &NotificationsConfig,
) {
let max_attempts = config.max_retry_attempts;
// GitHub commit status
if let Some(ref token) = config.github_token
&& project.repository_url.contains("github.com")
{
let payload = serde_json::json!({
"type": "github_status",
"token": token,
"repository_url": project.repository_url,
"commit_hash": commit_hash,
"build_id": build.id,
"build_status": build.status,
"build_job": build.job_name,
});
if let Err(e) = repo::notification_tasks::create(
pool,
"github_status",
payload,
max_attempts,
)
.await
{
error!(build_id = %build.id, "Failed to enqueue GitHub status notification: {e}");
}
}
// Gitea/Forgejo commit status
if let (Some(url), Some(token)) = (&config.gitea_url, &config.gitea_token) {
let payload = serde_json::json!({
"type": "gitea_status",
"base_url": url,
"token": token,
"repository_url": project.repository_url,
"commit_hash": commit_hash,
"build_id": build.id,
"build_status": build.status,
"build_job": build.job_name,
});
if let Err(e) = repo::notification_tasks::create(
pool,
"gitea_status",
payload,
max_attempts,
)
.await
{
error!(build_id = %build.id, "Failed to enqueue Gitea status notification: {e}");
}
}
// GitLab commit status
if let (Some(url), Some(token)) = (&config.gitlab_url, &config.gitlab_token) {
let payload = serde_json::json!({
"type": "gitlab_status",
"base_url": url,
"token": token,
"repository_url": project.repository_url,
"commit_hash": commit_hash,
"build_id": build.id,
"build_status": build.status,
"build_job": build.job_name,
});
if let Err(e) = repo::notification_tasks::create(
pool,
"gitlab_status",
payload,
max_attempts,
)
.await
{
error!(build_id = %build.id, "Failed to enqueue GitLab status notification: {e}");
}
}
}
/// Dispatch commit status notification when a build is created (pending state).
///
/// # Errors
///
/// Logs database errors if task creation fails.
pub async fn dispatch_build_created(
pool: &PgPool,
build: &Build,
project: &Project,
commit_hash: &str,
config: &NotificationsConfig,
) {
if !config.enable_retry_queue {
return;
}
enqueue_commit_status_notification(pool, build, project, commit_hash, config)
.await;
info!(
build_id = %build.id,
job = %build.job_name,
status = %build.status,
"Enqueued commit status notification for build creation"
);
}
/// Dispatch commit status notification when a build starts (running state).
///
/// # Errors
///
/// Logs database errors if task creation fails.
pub async fn dispatch_build_started(
pool: &PgPool,
build: &Build,
project: &Project,
commit_hash: &str,
config: &NotificationsConfig,
) {
if !config.enable_retry_queue {
return;
}
enqueue_commit_status_notification(pool, build, project, commit_hash, config)
.await;
info!(
build_id = %build.id,
job = %build.job_name,
status = %build.status,
"Enqueued commit status notification for build start"
);
}
/// Send notifications immediately.
/// This is the "legacy" fire-and-forget behavior.
async fn send_notifications_immediate( async fn send_notifications_immediate(
build: &Build, build: &Build,
project: &Project, project: &Project,

View file

@ -28,7 +28,6 @@ use uuid::Uuid;
pub async fn run( pub async fn run(
pool: PgPool, pool: PgPool,
config: EvaluatorConfig, config: EvaluatorConfig,
notifications_config: fc_common::config::NotificationsConfig,
wakeup: Arc<Notify>, wakeup: Arc<Notify>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let poll_interval = Duration::from_secs(config.poll_interval); let poll_interval = Duration::from_secs(config.poll_interval);
@ -38,15 +37,7 @@ pub async fn run(
let strict = config.strict_errors; let strict = config.strict_errors;
loop { loop {
if let Err(e) = run_cycle( if let Err(e) = run_cycle(&pool, &config, nix_timeout, git_timeout).await {
&pool,
&config,
&notifications_config,
nix_timeout,
git_timeout,
)
.await
{
if strict { if strict {
return Err(e); return Err(e);
} }
@ -60,7 +51,6 @@ pub async fn run(
async fn run_cycle( async fn run_cycle(
pool: &PgPool, pool: &PgPool,
config: &EvaluatorConfig, config: &EvaluatorConfig,
notifications_config: &fc_common::config::NotificationsConfig,
nix_timeout: Duration, nix_timeout: Duration,
git_timeout: Duration, git_timeout: Duration,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -86,15 +76,8 @@ async fn run_cycle(
stream::iter(ready) stream::iter(ready)
.for_each_concurrent(max_concurrent, |jobset| { .for_each_concurrent(max_concurrent, |jobset| {
async move { async move {
if let Err(e) = evaluate_jobset( if let Err(e) =
pool, evaluate_jobset(pool, &jobset, config, nix_timeout, git_timeout).await
&jobset,
config,
notifications_config,
nix_timeout,
git_timeout,
)
.await
{ {
tracing::error!( tracing::error!(
jobset_id = %jobset.id, jobset_id = %jobset.id,
@ -128,7 +111,6 @@ async fn evaluate_jobset(
pool: &PgPool, pool: &PgPool,
jobset: &fc_common::models::ActiveJobset, jobset: &fc_common::models::ActiveJobset,
config: &EvaluatorConfig, config: &EvaluatorConfig,
notifications_config: &fc_common::config::NotificationsConfig,
nix_timeout: Duration, nix_timeout: Duration,
git_timeout: Duration, git_timeout: Duration,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -344,41 +326,6 @@ async fn evaluate_jobset(
create_builds_from_eval(pool, eval.id, &eval_result).await?; create_builds_from_eval(pool, eval.id, &eval_result).await?;
// Dispatch pending notifications for created builds
if notifications_config.enable_retry_queue {
if let Ok(project) = repo::projects::get(pool, jobset.project_id).await
{
if let Ok(builds) =
repo::builds::list_for_evaluation(pool, eval.id).await
{
for build in builds {
// Skip aggregate builds (they complete later when constituents
// finish)
if !build.is_aggregate {
fc_common::notifications::dispatch_build_created(
pool,
&build,
&project,
&eval.commit_hash,
notifications_config,
)
.await;
}
}
} else {
tracing::warn!(
eval_id = %eval.id,
"Failed to fetch builds for pending notifications"
);
}
} else {
tracing::warn!(
project_id = %jobset.project_id,
"Failed to fetch project for pending notifications"
);
}
}
repo::evaluations::update_status( repo::evaluations::update_status(
pool, pool,
eval.id, eval.id,

View file

@ -30,7 +30,6 @@ async fn main() -> anyhow::Result<()> {
let pool = db.pool().clone(); let pool = db.pool().clone();
let eval_config = config.evaluator; let eval_config = config.evaluator;
let notifications_config = config.notifications;
let wakeup = Arc::new(tokio::sync::Notify::new()); let wakeup = Arc::new(tokio::sync::Notify::new());
let listener_handle = fc_common::pg_notify::spawn_listener( let listener_handle = fc_common::pg_notify::spawn_listener(
@ -40,7 +39,7 @@ async fn main() -> anyhow::Result<()> {
); );
tokio::select! { tokio::select! {
result = fc_evaluator::eval_loop::run(pool, eval_config, notifications_config, wakeup) => { result = fc_evaluator::eval_loop::run(pool, eval_config, wakeup) => {
if let Err(e) = result { if let Err(e) = result {
tracing::error!("Evaluator loop failed: {e}"); tracing::error!("Evaluator loop failed: {e}");
} }

View file

@ -82,7 +82,7 @@ async fn main() -> anyhow::Result<()> {
let active_builds = worker_pool.active_builds().clone(); let active_builds = worker_pool.active_builds().clone();
tokio::select! { tokio::select! {
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => { result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache) => {
if let Err(e) = result { if let Err(e) = result {
tracing::error!("Runner loop failed: {e}"); tracing::error!("Runner loop failed: {e}");
} }

View file

@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use fc_common::{ use fc_common::{
models::{Build, BuildStatus, JobsetState}, models::{BuildStatus, JobsetState},
repo, repo,
}; };
use sqlx::PgPool; use sqlx::PgPool;
@ -9,21 +9,6 @@ use tokio::sync::Notify;
use crate::worker::WorkerPool; use crate::worker::WorkerPool;
/// Fetch project and commit hash for a build by traversing:
///
/// Build -> Evaluation -> Jobset -> Project.
async fn get_project_for_build(
pool: &PgPool,
build: &Build,
) -> Option<(fc_common::models::Project, String)> {
let eval = repo::evaluations::get(pool, build.evaluation_id)
.await
.ok()?;
let jobset = repo::jobsets::get(pool, eval.jobset_id).await.ok()?;
let project = repo::projects::get(pool, jobset.project_id).await.ok()?;
Some((project, eval.commit_hash))
}
/// Main queue runner loop. Polls for pending builds and dispatches them to /// Main queue runner loop. Polls for pending builds and dispatches them to
/// workers. /// workers.
/// ///
@ -37,7 +22,6 @@ pub async fn run(
wakeup: Arc<Notify>, wakeup: Arc<Notify>,
strict_errors: bool, strict_errors: bool,
failed_paths_cache: bool, failed_paths_cache: bool,
notifications_config: fc_common::config::NotificationsConfig,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Reset orphaned builds from previous crashes (older than 5 minutes) // Reset orphaned builds from previous crashes (older than 5 minutes)
match repo::builds::reset_orphaned(&pool, 300).await { match repo::builds::reset_orphaned(&pool, 300).await {
@ -84,23 +68,6 @@ pub async fn run(
.await .await
{ {
tracing::warn!(build_id = %build.id, "Failed to complete aggregate build: {e}"); tracing::warn!(build_id = %build.id, "Failed to complete aggregate build: {e}");
continue;
}
// Dispatch completion notification for aggregate build
if let Ok(updated_build) =
repo::builds::get(&pool, build.id).await
&& let Some((project, commit_hash)) =
get_project_for_build(&pool, &updated_build).await
{
fc_common::notifications::dispatch_build_finished(
Some(&pool),
&updated_build,
&project,
&commit_hash,
&notifications_config,
)
.await;
} }
continue; continue;
}, },

View file

@ -474,22 +474,6 @@ async fn run_build(
return Ok(()); return Ok(());
} }
let claimed_build = claimed.unwrap(); // Safe: we checked is_some()
// Dispatch build started notification
if let Some((project, commit_hash)) =
get_project_for_build(pool, &claimed_build).await
{
fc_common::notifications::dispatch_build_started(
pool,
&claimed_build,
&project,
&commit_hash,
notifications_config,
)
.await;
}
tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build"); tracing::info!(build_id = %build.id, job = %build.job_name, "Starting build");
// Create a build step record // Create a build step record