From fcb32aa9be55d49c22abed651c7f1986cd26e04c Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 14 Feb 2026 01:37:52 +0300 Subject: [PATCH] fc-queue-runner: collect metrics and trigger alerts on threshold Signed-off-by: NotAShelf Change-Id: If1d682abdc6a932bdf5b9bfe23737c3e6a6a6964 --- crates/queue-runner/src/main.rs | 3 +- crates/queue-runner/src/worker.rs | 88 ++++++++++++++++++++++- crates/queue-runner/tests/runner_tests.rs | 8 ++- 3 files changed, 94 insertions(+), 5 deletions(-) diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index 881d70c..3793c76 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -52,9 +52,10 @@ async fn main() -> anyhow::Result<()> { build_timeout, log_config, gc_config, - notifications_config, + notifications_config.clone(), signing_config, cache_upload_config, + notifications_config.alerts.clone(), )); tracing::info!( diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index 847b2d8..34972e3 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -1,7 +1,9 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use fc_common::{ + alerts::AlertManager, config::{ + AlertConfig, CacheUploadConfig, GcConfig, LogConfig, @@ -10,7 +12,14 @@ use fc_common::{ }, gc_roots::GcRoots, log_storage::LogStorage, - models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep}, + models::{ + Build, + BuildStatus, + CreateBuildProduct, + CreateBuildStep, + metric_names, + metric_units, + }, repo, }; use sqlx::PgPool; @@ -26,6 +35,7 @@ pub struct WorkerPool { notifications_config: Arc, signing_config: Arc, cache_upload_config: Arc, + alert_manager: Arc>, drain_token: tokio_util::sync::CancellationToken, } @@ -42,7 +52,9 @@ impl WorkerPool { notifications_config: NotificationsConfig, signing_config: SigningConfig, cache_upload_config: CacheUploadConfig, + alert_config: Option, ) -> Self { + let alert_manager = alert_config.map(AlertManager::new); Self { semaphore: Arc::new(Semaphore::new(workers)), pool: db_pool, @@ -53,6 +65,7 @@ impl WorkerPool { notifications_config: Arc::new(notifications_config), signing_config: Arc::new(signing_config), cache_upload_config: Arc::new(cache_upload_config), + alert_manager: Arc::new(alert_manager), drain_token: tokio_util::sync::CancellationToken::new(), } } @@ -96,6 +109,7 @@ impl WorkerPool { let notifications_config = self.notifications_config.clone(); let signing_config = self.signing_config.clone(); let cache_upload_config = self.cache_upload_config.clone(); + let alert_manager = self.alert_manager.clone(); tokio::spawn(async move { let _permit = match semaphore.acquire().await { @@ -113,6 +127,7 @@ impl WorkerPool { ¬ifications_config, &signing_config, &cache_upload_config, + &alert_manager, ) .await { @@ -278,6 +293,68 @@ async fn try_remote_build( None } +async fn collect_metrics_and_alert( + pool: &PgPool, + build: &Build, + output_paths: &[String], + alert_manager: &Option, +) { + if let (Some(started), Some(completed)) = + (build.started_at, build.completed_at) + { + let duration = completed.signed_duration_since(started); + let duration_secs = duration.num_seconds() as f64; + + if let Err(e) = repo::build_metrics::upsert( + pool, + build.id, + metric_names::BUILD_DURATION_SECONDS, + duration_secs, + metric_units::SECONDS, + ) + .await + { + tracing::warn!("Failed to save build duration metric: {}", e); + } + } + + for path in output_paths.iter() { + if let Ok(meta) = tokio::fs::metadata(path).await { + let size = meta.len(); + if let Err(e) = repo::build_metrics::upsert( + pool, + build.id, + metric_names::OUTPUT_SIZE_BYTES, + size as f64, + metric_units::BYTES, + ) + .await + { + tracing::warn!("Failed to save output size metric: {}", e); + continue; + } + break; + } + } + + let manager = match alert_manager { + Some(m) => m, + None => return, + }; + + if manager.is_enabled() { + if let Ok(evaluation) = + repo::evaluations::get(pool, build.evaluation_id).await + { + if let Ok(jobset) = repo::jobsets::get(pool, evaluation.jobset_id).await { + let _ = manager + .check_and_alert(pool, Some(jobset.project_id), Some(jobset.id)) + .await; + } + } + } +} + #[tracing::instrument(skip(pool, build, work_dir, log_config, gc_config, notifications_config, signing_config, cache_upload_config), fields(build_id = %build.id, job = %build.job_name))] #[allow(clippy::too_many_arguments)] async fn run_build( @@ -290,6 +367,7 @@ async fn run_build( notifications_config: &NotificationsConfig, signing_config: &SigningConfig, cache_upload_config: &CacheUploadConfig, + alert_manager: &Option, ) -> anyhow::Result<()> { // Atomically claim the build let claimed = repo::builds::start(pool, build.id).await?; @@ -491,6 +569,14 @@ async fn run_build( ) .await?; + collect_metrics_and_alert( + pool, + &build, + &build_result.output_paths, + &alert_manager, + ) + .await; + tracing::info!(build_id = %build.id, "Build completed successfully"); } else { // Check if we should retry diff --git a/crates/queue-runner/tests/runner_tests.rs b/crates/queue-runner/tests/runner_tests.rs index 1959f00..3a03c7f 100644 --- a/crates/queue-runner/tests/runner_tests.rs +++ b/crates/queue-runner/tests/runner_tests.rs @@ -2,7 +2,7 @@ //! Nix log parsing tests require no external binaries. //! Database tests require TEST_DATABASE_URL. -// --- Nix log line parsing --- +// Nix log line parsing #[test] fn test_parse_nix_log_start() { @@ -60,7 +60,7 @@ fn test_parse_nix_log_empty_line() { assert!(result.is_none()); } -// --- WorkerPool drain --- +// WorkerPool drain #[tokio::test] async fn test_worker_pool_drain_stops_dispatch() { @@ -89,6 +89,7 @@ async fn test_worker_pool_drain_stops_dispatch() { fc_common::config::NotificationsConfig::default(), fc_common::config::SigningConfig::default(), fc_common::config::CacheUploadConfig::default(), + None, ); // Drain should not panic @@ -99,7 +100,7 @@ async fn test_worker_pool_drain_stops_dispatch() { // doesn't crash } -// --- Database-dependent tests --- +// Database-dependent tests #[tokio::test] async fn test_atomic_build_claiming() { @@ -273,6 +274,7 @@ async fn test_orphan_build_reset() { // Simulate the build being stuck for a while by manually backdating // started_at + // Truly a genius way to test. sqlx::query( "UPDATE builds SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = \ $1",