fc-queue-runner: collect metrics and trigger alerts on threshold

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: If1d682abdc6a932bdf5b9bfe23737c3e6a6a6964
This commit is contained in:
raf 2026-02-14 01:37:52 +03:00
commit fcb32aa9be
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
3 changed files with 94 additions and 5 deletions

View file

@ -52,9 +52,10 @@ async fn main() -> anyhow::Result<()> {
build_timeout, build_timeout,
log_config, log_config,
gc_config, gc_config,
notifications_config, notifications_config.clone(),
signing_config, signing_config,
cache_upload_config, cache_upload_config,
notifications_config.alerts.clone(),
)); ));
tracing::info!( tracing::info!(

View file

@ -1,7 +1,9 @@
use std::{path::PathBuf, sync::Arc, time::Duration}; use std::{path::PathBuf, sync::Arc, time::Duration};
use fc_common::{ use fc_common::{
alerts::AlertManager,
config::{ config::{
AlertConfig,
CacheUploadConfig, CacheUploadConfig,
GcConfig, GcConfig,
LogConfig, LogConfig,
@ -10,7 +12,14 @@ use fc_common::{
}, },
gc_roots::GcRoots, gc_roots::GcRoots,
log_storage::LogStorage, log_storage::LogStorage,
models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep}, models::{
Build,
BuildStatus,
CreateBuildProduct,
CreateBuildStep,
metric_names,
metric_units,
},
repo, repo,
}; };
use sqlx::PgPool; use sqlx::PgPool;
@ -26,6 +35,7 @@ pub struct WorkerPool {
notifications_config: Arc<NotificationsConfig>, notifications_config: Arc<NotificationsConfig>,
signing_config: Arc<SigningConfig>, signing_config: Arc<SigningConfig>,
cache_upload_config: Arc<CacheUploadConfig>, cache_upload_config: Arc<CacheUploadConfig>,
alert_manager: Arc<Option<AlertManager>>,
drain_token: tokio_util::sync::CancellationToken, drain_token: tokio_util::sync::CancellationToken,
} }
@ -42,7 +52,9 @@ impl WorkerPool {
notifications_config: NotificationsConfig, notifications_config: NotificationsConfig,
signing_config: SigningConfig, signing_config: SigningConfig,
cache_upload_config: CacheUploadConfig, cache_upload_config: CacheUploadConfig,
alert_config: Option<AlertConfig>,
) -> Self { ) -> Self {
let alert_manager = alert_config.map(AlertManager::new);
Self { Self {
semaphore: Arc::new(Semaphore::new(workers)), semaphore: Arc::new(Semaphore::new(workers)),
pool: db_pool, pool: db_pool,
@ -53,6 +65,7 @@ impl WorkerPool {
notifications_config: Arc::new(notifications_config), notifications_config: Arc::new(notifications_config),
signing_config: Arc::new(signing_config), signing_config: Arc::new(signing_config),
cache_upload_config: Arc::new(cache_upload_config), cache_upload_config: Arc::new(cache_upload_config),
alert_manager: Arc::new(alert_manager),
drain_token: tokio_util::sync::CancellationToken::new(), drain_token: tokio_util::sync::CancellationToken::new(),
} }
} }
@ -96,6 +109,7 @@ impl WorkerPool {
let notifications_config = self.notifications_config.clone(); let notifications_config = self.notifications_config.clone();
let signing_config = self.signing_config.clone(); let signing_config = self.signing_config.clone();
let cache_upload_config = self.cache_upload_config.clone(); let cache_upload_config = self.cache_upload_config.clone();
let alert_manager = self.alert_manager.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _permit = match semaphore.acquire().await { let _permit = match semaphore.acquire().await {
@ -113,6 +127,7 @@ impl WorkerPool {
&notifications_config, &notifications_config,
&signing_config, &signing_config,
&cache_upload_config, &cache_upload_config,
&alert_manager,
) )
.await .await
{ {
@ -278,6 +293,68 @@ async fn try_remote_build(
None None
} }
async fn collect_metrics_and_alert(
pool: &PgPool,
build: &Build,
output_paths: &[String],
alert_manager: &Option<AlertManager>,
) {
if let (Some(started), Some(completed)) =
(build.started_at, build.completed_at)
{
let duration = completed.signed_duration_since(started);
let duration_secs = duration.num_seconds() as f64;
if let Err(e) = repo::build_metrics::upsert(
pool,
build.id,
metric_names::BUILD_DURATION_SECONDS,
duration_secs,
metric_units::SECONDS,
)
.await
{
tracing::warn!("Failed to save build duration metric: {}", e);
}
}
for path in output_paths.iter() {
if let Ok(meta) = tokio::fs::metadata(path).await {
let size = meta.len();
if let Err(e) = repo::build_metrics::upsert(
pool,
build.id,
metric_names::OUTPUT_SIZE_BYTES,
size as f64,
metric_units::BYTES,
)
.await
{
tracing::warn!("Failed to save output size metric: {}", e);
continue;
}
break;
}
}
let manager = match alert_manager {
Some(m) => m,
None => return,
};
if manager.is_enabled() {
if let Ok(evaluation) =
repo::evaluations::get(pool, build.evaluation_id).await
{
if let Ok(jobset) = repo::jobsets::get(pool, evaluation.jobset_id).await {
let _ = manager
.check_and_alert(pool, Some(jobset.project_id), Some(jobset.id))
.await;
}
}
}
}
#[tracing::instrument(skip(pool, build, work_dir, log_config, gc_config, notifications_config, signing_config, cache_upload_config), fields(build_id = %build.id, job = %build.job_name))] #[tracing::instrument(skip(pool, build, work_dir, log_config, gc_config, notifications_config, signing_config, cache_upload_config), fields(build_id = %build.id, job = %build.job_name))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn run_build( async fn run_build(
@ -290,6 +367,7 @@ async fn run_build(
notifications_config: &NotificationsConfig, notifications_config: &NotificationsConfig,
signing_config: &SigningConfig, signing_config: &SigningConfig,
cache_upload_config: &CacheUploadConfig, cache_upload_config: &CacheUploadConfig,
alert_manager: &Option<AlertManager>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Atomically claim the build // Atomically claim the build
let claimed = repo::builds::start(pool, build.id).await?; let claimed = repo::builds::start(pool, build.id).await?;
@ -491,6 +569,14 @@ async fn run_build(
) )
.await?; .await?;
collect_metrics_and_alert(
pool,
&build,
&build_result.output_paths,
&alert_manager,
)
.await;
tracing::info!(build_id = %build.id, "Build completed successfully"); tracing::info!(build_id = %build.id, "Build completed successfully");
} else { } else {
// Check if we should retry // Check if we should retry

View file

@ -2,7 +2,7 @@
//! Nix log parsing tests require no external binaries. //! Nix log parsing tests require no external binaries.
//! Database tests require TEST_DATABASE_URL. //! Database tests require TEST_DATABASE_URL.
// --- Nix log line parsing --- // Nix log line parsing
#[test] #[test]
fn test_parse_nix_log_start() { fn test_parse_nix_log_start() {
@ -60,7 +60,7 @@ fn test_parse_nix_log_empty_line() {
assert!(result.is_none()); assert!(result.is_none());
} }
// --- WorkerPool drain --- // WorkerPool drain
#[tokio::test] #[tokio::test]
async fn test_worker_pool_drain_stops_dispatch() { async fn test_worker_pool_drain_stops_dispatch() {
@ -89,6 +89,7 @@ async fn test_worker_pool_drain_stops_dispatch() {
fc_common::config::NotificationsConfig::default(), fc_common::config::NotificationsConfig::default(),
fc_common::config::SigningConfig::default(), fc_common::config::SigningConfig::default(),
fc_common::config::CacheUploadConfig::default(), fc_common::config::CacheUploadConfig::default(),
None,
); );
// Drain should not panic // Drain should not panic
@ -99,7 +100,7 @@ async fn test_worker_pool_drain_stops_dispatch() {
// doesn't crash // doesn't crash
} }
// --- Database-dependent tests --- // Database-dependent tests
#[tokio::test] #[tokio::test]
async fn test_atomic_build_claiming() { async fn test_atomic_build_claiming() {
@ -273,6 +274,7 @@ async fn test_orphan_build_reset() {
// Simulate the build being stuck for a while by manually backdating // Simulate the build being stuck for a while by manually backdating
// started_at // started_at
// Truly a genius way to test.
sqlx::query( sqlx::query(
"UPDATE builds SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = \ "UPDATE builds SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = \
$1", $1",