treewide: address all clippy lints

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I5cf55cc4cb558c3f9f764c71224e87176a6a6964
This commit is contained in:
raf 2026-02-27 21:50:35 +03:00
commit 0ca92f2710
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
63 changed files with 1788 additions and 1087 deletions

View file

@ -10,6 +10,11 @@ const MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100MB
skip(work_dir, live_log_path),
fields(drv_path, store_uri)
)]
/// Run a nix build on a remote builder via SSH.
///
/// # Errors
///
/// Returns error if nix build command fails or times out.
pub async fn run_nix_build_remote(
drv_path: &str,
work_dir: &Path,
@ -120,14 +125,11 @@ pub async fn run_nix_build_remote(
})
.await;
match result {
Ok(inner) => inner,
Err(_) => {
Err(CiError::Timeout(format!(
"Remote build timed out after {timeout:?}"
)))
},
}
result.unwrap_or_else(|_| {
Err(CiError::Timeout(format!(
"Remote build timed out after {timeout:?}"
)))
})
}
pub struct BuildResult {
@ -165,6 +167,10 @@ pub fn parse_nix_log_line(line: &str) -> Option<(&'static str, String)> {
/// Run `nix build` for a derivation path.
/// If `live_log_path` is provided, build output is streamed to that file
/// incrementally.
///
/// # Errors
///
/// Returns error if nix build command fails or times out.
#[tracing::instrument(skip(work_dir, live_log_path), fields(drv_path))]
pub async fn run_nix_build(
drv_path: &str,
@ -299,12 +305,9 @@ pub async fn run_nix_build(
})
.await;
match result {
Ok(inner) => inner,
Err(_) => {
Err(CiError::Timeout(format!(
"Build timed out after {timeout:?}"
)))
},
}
result.unwrap_or_else(|_| {
Err(CiError::Timeout(format!(
"Build timed out after {timeout:?}"
)))
})
}

View file

@ -175,7 +175,7 @@ async fn failed_paths_cleanup_loop(
return std::future::pending().await;
}
let interval = std::time::Duration::from_secs(3600);
let interval = std::time::Duration::from_hours(1);
loop {
tokio::time::sleep(interval).await;
match fc_common::repo::failed_paths_cache::cleanup_expired(&pool, ttl).await
@ -233,7 +233,7 @@ async fn notification_retry_loop(
let cleanup_pool = pool.clone();
tokio::spawn(async move {
let cleanup_interval = std::time::Duration::from_secs(3600);
let cleanup_interval = std::time::Duration::from_hours(1);
loop {
tokio::time::sleep(cleanup_interval).await;
match repo::notification_tasks::cleanup_old_tasks(

View file

@ -9,6 +9,12 @@ use tokio::sync::Notify;
use crate::worker::WorkerPool;
/// Main queue runner loop. Polls for pending builds and dispatches them to
/// workers.
///
/// # Errors
///
/// Returns error if database operations fail and `strict_errors` is enabled.
pub async fn run(
pool: PgPool,
worker_pool: Arc<WorkerPool>,
@ -42,7 +48,7 @@ pub async fn run(
.await
{
Ok(true) => {
// All constituents done mark aggregate as completed
// All constituents done, mark aggregate as completed
tracing::info!(
build_id = %build.id,
job = %build.job_name,
@ -115,34 +121,36 @@ pub async fn run(
}
// Failed paths cache: skip known-failing derivations
if failed_paths_cache {
if let Ok(true) = repo::failed_paths_cache::is_cached_failure(
if failed_paths_cache
&& matches!(
repo::failed_paths_cache::is_cached_failure(
&pool,
&build.drv_path,
)
.await,
Ok(true)
)
{
tracing::info!(
build_id = %build.id, drv = %build.drv_path,
"Cached failure: skipping known-failing derivation"
);
if let Err(e) = repo::builds::start(&pool, build.id).await {
tracing::warn!(build_id = %build.id, "Failed to start cached-failure build: {e}");
}
if let Err(e) = repo::builds::complete(
&pool,
&build.drv_path,
build.id,
BuildStatus::CachedFailure,
None,
None,
Some("Build skipped: derivation is in failed paths cache"),
)
.await
{
tracing::info!(
build_id = %build.id, drv = %build.drv_path,
"Cached failure: skipping known-failing derivation"
);
if let Err(e) = repo::builds::start(&pool, build.id).await {
tracing::warn!(build_id = %build.id, "Failed to start cached-failure build: {e}");
}
if let Err(e) = repo::builds::complete(
&pool,
build.id,
BuildStatus::CachedFailure,
None,
None,
Some("Build skipped: derivation is in failed paths cache"),
)
.await
{
tracing::warn!(build_id = %build.id, "Failed to complete cached-failure build: {e}");
}
continue;
tracing::warn!(build_id = %build.id, "Failed to complete cached-failure build: {e}");
}
continue;
}
// Dependency-aware scheduling: skip if deps not met

View file

@ -102,11 +102,13 @@ impl WorkerPool {
.await;
}
pub fn worker_count(&self) -> usize {
#[must_use]
pub const fn worker_count(&self) -> usize {
self.worker_count
}
pub fn active_builds(&self) -> &ActiveBuilds {
#[must_use]
pub const fn active_builds(&self) -> &ActiveBuilds {
&self.active_builds
}
@ -135,9 +137,8 @@ impl WorkerPool {
tokio::spawn(async move {
let result = async {
let _permit = match semaphore.acquire().await {
Ok(p) => p,
Err(_) => return,
let Ok(_permit) = semaphore.acquire().await else {
return;
};
if let Err(e) = run_build(
@ -287,7 +288,7 @@ async fn push_to_cache(
/// Build S3 store URI with configuration options.
/// Nix S3 URIs support query parameters for configuration:
/// s3://bucket?region=us-east-1&endpoint=https://minio.example.com
/// <s3://bucket?region=us-east-1&endpoint=https://minio.example.com>
fn build_s3_store_uri(
base_uri: &str,
config: Option<&fc_common::config::S3CacheConfig>,
@ -325,66 +326,6 @@ fn build_s3_store_uri(
format!("{base_uri}?{query}")
}
#[cfg(test)]
mod tests {
use fc_common::config::S3CacheConfig;
use super::*;
#[test]
fn test_build_s3_store_uri_no_config() {
let result = build_s3_store_uri("s3://my-bucket", None);
assert_eq!(result, "s3://my-bucket");
}
#[test]
fn test_build_s3_store_uri_empty_config() {
let cfg = S3CacheConfig::default();
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
assert_eq!(result, "s3://my-bucket");
}
#[test]
fn test_build_s3_store_uri_with_region() {
let cfg = S3CacheConfig {
region: Some("us-east-1".to_string()),
..Default::default()
};
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
assert_eq!(result, "s3://my-bucket?region=us-east-1");
}
#[test]
fn test_build_s3_store_uri_with_endpoint_and_path_style() {
let cfg = S3CacheConfig {
endpoint_url: Some("https://minio.example.com".to_string()),
use_path_style: true,
..Default::default()
};
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
assert!(result.starts_with("s3://my-bucket?"));
assert!(result.contains("endpoint=https%3A%2F%2Fminio.example.com"));
assert!(result.contains("use-path-style=true"));
}
#[test]
fn test_build_s3_store_uri_all_params() {
let cfg = S3CacheConfig {
region: Some("eu-west-1".to_string()),
endpoint_url: Some("https://s3.example.com".to_string()),
use_path_style: true,
..Default::default()
};
let result = build_s3_store_uri("s3://cache-bucket", Some(&cfg));
assert!(result.starts_with("s3://cache-bucket?"));
assert!(result.contains("region=eu-west-1"));
assert!(result.contains("endpoint=https%3A%2F%2Fs3.example.com"));
assert!(result.contains("use-path-style=true"));
// Verify params are joined with &
assert_eq!(result.matches('&').count(), 2);
}
}
/// Try to run the build on a remote builder if one is available for the build's
/// system.
async fn try_remote_build(
@ -478,7 +419,7 @@ async fn collect_metrics_and_alert(
}
}
for path in output_paths.iter() {
for path in output_paths {
if let Ok(meta) = tokio::fs::metadata(path).await {
let size = meta.len();
if let Err(e) = repo::build_metrics::upsert(
@ -497,21 +438,18 @@ async fn collect_metrics_and_alert(
}
}
let manager = match alert_manager {
Some(m) => m,
None => return,
let Some(manager) = alert_manager else {
return;
};
if manager.is_enabled() {
if let Ok(evaluation) =
if manager.is_enabled()
&& let Ok(evaluation) =
repo::evaluations::get(pool, build.evaluation_id).await
{
if let Ok(jobset) = repo::jobsets::get(pool, evaluation.jobset_id).await {
manager
.check_and_alert(pool, Some(jobset.project_id), Some(jobset.id))
.await;
}
}
&& let Ok(jobset) = repo::jobsets::get(pool, evaluation.jobset_id).await
{
manager
.check_and_alert(pool, Some(jobset.project_id), Some(jobset.id))
.await;
}
}
@ -561,7 +499,7 @@ async fn run_build(
{
Some(r) => Ok(r),
None => {
// No remote builder available or all failed build locally
// No remote builder available or all failed, build locally
crate::builder::run_nix_build(
&build.drv_path,
work_dir,
@ -705,10 +643,10 @@ async fn run_build(
}
// Sign outputs at build time
if sign_outputs(&build_result.output_paths, signing_config).await {
if let Err(e) = repo::builds::mark_signed(pool, build.id).await {
tracing::warn!(build_id = %build.id, "Failed to mark build as signed: {e}");
}
if sign_outputs(&build_result.output_paths, signing_config).await
&& let Err(e) = repo::builds::mark_signed(pool, build.id).await
{
tracing::warn!(build_id = %build.id, "Failed to mark build as signed: {e}");
}
// Push to external binary cache if configured
@ -740,9 +678,9 @@ async fn run_build(
collect_metrics_and_alert(
pool,
&build,
build,
&build_result.output_paths,
&alert_manager,
alert_manager,
)
.await;
@ -775,8 +713,7 @@ async fn run_build(
let failure_status = build_result
.exit_code
.map(BuildStatus::from_exit_code)
.unwrap_or(BuildStatus::Failed);
.map_or(BuildStatus::Failed, BuildStatus::from_exit_code);
repo::builds::complete(
pool,
build.id,
@ -805,10 +742,10 @@ async fn run_build(
let msg = e.to_string();
// Write error log
if let Some(ref storage) = log_storage {
if let Err(e) = storage.write_log(&build.id, "", &msg) {
tracing::warn!(build_id = %build.id, "Failed to write error log: {e}");
}
if let Some(ref storage) = log_storage
&& let Err(e) = storage.write_log(&build.id, "", &msg)
{
tracing::warn!(build_id = %build.id, "Failed to write error log: {e}");
}
// Clean up live log
let _ = tokio::fs::remove_file(&live_log_path).await;
@ -846,15 +783,73 @@ async fn run_build(
// Auto-promote channels if all builds in the evaluation are done
if updated_build.status.is_success()
&& let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await
{
if let Err(e) =
&& let Err(e) =
repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id)
.await
{
tracing::warn!(build_id = %build.id, "Failed to auto-promote channels: {e}");
}
{
tracing::warn!(build_id = %build.id, "Failed to auto-promote channels: {e}");
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use fc_common::config::S3CacheConfig;
use super::*;
#[test]
fn test_build_s3_store_uri_no_config() {
let result = build_s3_store_uri("s3://my-bucket", None);
assert_eq!(result, "s3://my-bucket");
}
#[test]
fn test_build_s3_store_uri_empty_config() {
let cfg = S3CacheConfig::default();
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
assert_eq!(result, "s3://my-bucket");
}
#[test]
fn test_build_s3_store_uri_with_region() {
let cfg = S3CacheConfig {
region: Some("us-east-1".to_string()),
..Default::default()
};
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
assert_eq!(result, "s3://my-bucket?region=us-east-1");
}
#[test]
fn test_build_s3_store_uri_with_endpoint_and_path_style() {
let cfg = S3CacheConfig {
endpoint_url: Some("https://minio.example.com".to_string()),
use_path_style: true,
..Default::default()
};
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
assert!(result.starts_with("s3://my-bucket?"));
assert!(result.contains("endpoint=https%3A%2F%2Fminio.example.com"));
assert!(result.contains("use-path-style=true"));
}
#[test]
fn test_build_s3_store_uri_all_params() {
let cfg = S3CacheConfig {
region: Some("eu-west-1".to_string()),
endpoint_url: Some("https://s3.example.com".to_string()),
use_path_style: true,
..Default::default()
};
let result = build_s3_store_uri("s3://cache-bucket", Some(&cfg));
assert!(result.starts_with("s3://cache-bucket?"));
assert!(result.contains("region=eu-west-1"));
assert!(result.contains("endpoint=https%3A%2F%2Fs3.example.com"));
assert!(result.contains("use-path-style=true"));
// Verify params are joined with &
assert_eq!(result.matches('&').count(), 2);
}
}

View file

@ -1,6 +1,6 @@
//! Tests for the queue runner.
//! Nix log parsing tests require no external binaries.
//! Database tests require TEST_DATABASE_URL.
//! Database tests require `TEST_DATABASE_URL`.
// Nix log line parsing
@ -65,12 +65,9 @@ fn test_parse_nix_log_empty_line() {
#[tokio::test]
async fn test_worker_pool_drain_stops_dispatch() {
// Create a minimal worker pool
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
},
let Ok(url) = std::env::var("TEST_DATABASE_URL") else {
println!("Skipping: TEST_DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()
@ -83,7 +80,7 @@ async fn test_worker_pool_drain_stops_dispatch() {
pool,
2,
std::env::temp_dir(),
std::time::Duration::from_secs(60),
std::time::Duration::from_mins(1),
fc_common::config::LogConfig::default(),
fc_common::config::GcConfig::default(),
fc_common::config::NotificationsConfig::default(),
@ -153,7 +150,7 @@ async fn test_cancellation_token_aborts_select() {
// Simulate a long-running build
let build_future = async {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
tokio::time::sleep(std::time::Duration::from_mins(1)).await;
"completed"
};
@ -176,12 +173,9 @@ async fn test_cancellation_token_aborts_select() {
#[tokio::test]
async fn test_worker_pool_active_builds_cancel() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
},
let Ok(url) = std::env::var("TEST_DATABASE_URL") else {
println!("Skipping: TEST_DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()
@ -194,7 +188,7 @@ async fn test_worker_pool_active_builds_cancel() {
pool,
2,
std::env::temp_dir(),
std::time::Duration::from_secs(60),
std::time::Duration::from_mins(1),
fc_common::config::LogConfig::default(),
fc_common::config::GcConfig::default(),
fc_common::config::NotificationsConfig::default(),
@ -228,12 +222,9 @@ async fn test_worker_pool_active_builds_cancel() {
#[tokio::test]
async fn test_fair_share_scheduling() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
},
let Ok(url) = std::env::var("TEST_DATABASE_URL") else {
println!("Skipping: TEST_DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()
@ -447,12 +438,9 @@ async fn test_fair_share_scheduling() {
#[tokio::test]
async fn test_atomic_build_claiming() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
},
let Ok(url) = std::env::var("TEST_DATABASE_URL") else {
println!("Skipping: TEST_DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()
@ -541,12 +529,9 @@ async fn test_atomic_build_claiming() {
#[tokio::test]
async fn test_orphan_build_reset() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
},
let Ok(url) = std::env::var("TEST_DATABASE_URL") else {
println!("Skipping: TEST_DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()
@ -647,12 +632,9 @@ async fn test_orphan_build_reset() {
#[tokio::test]
async fn test_get_cancelled_among() {
let url = match std::env::var("TEST_DATABASE_URL") {
Ok(url) => url,
Err(_) => {
println!("Skipping: TEST_DATABASE_URL not set");
return;
},
let Ok(url) = std::env::var("TEST_DATABASE_URL") else {
println!("Skipping: TEST_DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()