From b1a7233a05408b2e6d82eaa8e5cef47ed67dc035 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 21:33:09 +0300 Subject: [PATCH 01/10] fc-common: add build_outputs table to base schema Signed-off-by: NotAShelf Change-Id: I1a9c6cb82967fe73aa403e3656c7cab96a6a6964 --- crates/common/migrations/0001_schema.sql | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/common/migrations/0001_schema.sql b/crates/common/migrations/0001_schema.sql index c20f9b9..39b96ca 100644 --- a/crates/common/migrations/0001_schema.sql +++ b/crates/common/migrations/0001_schema.sql @@ -158,6 +158,16 @@ CREATE TABLE builds ( UNIQUE (evaluation_id, job_name) ); +-- build_outputs: normalized output storage +CREATE TABLE build_outputs ( + build UUID NOT NULL REFERENCES builds (id) ON DELETE CASCADE, + name TEXT NOT NULL, + path TEXT, + PRIMARY KEY (build, name) +); + +CREATE INDEX idx_build_outputs_path ON build_outputs USING hash (path); + -- build_products: output artifacts and metadata CREATE TABLE build_products ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (), @@ -534,10 +544,7 @@ CREATE TABLE notification_tasks ( ); -- Indexes: notification_tasks -CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks ( - status, - next_retry_at -) +CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (status, next_retry_at) WHERE status IN ('pending', 'running'); From 01cd4439aa9beb4e5c38d447eb1fd36ea5d5bdc1 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 21:33:11 +0300 Subject: [PATCH 02/10] fc-common: add `BuildOutput` model; implement CRUD operations Signed-off-by: NotAShelf Change-Id: Iecbe7a5561caf7bf7f770a277b11f3816a6a6964 --- Cargo.toml | 8 +-- crates/common/src/models.rs | 8 +++ crates/common/src/repo/build_outputs.rs | 92 ++++++++++++++++++++++++ crates/common/src/repo/builds.rs | 18 +++++ crates/common/src/repo/mod.rs | 1 + crates/common/tests/repo_tests.rs | 95 +++++++++++++++++++++++++ 6 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 crates/common/src/repo/build_outputs.rs diff --git a/Cargo.toml b/Cargo.toml index 04f5c88..17a2197 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,10 @@ style = { level = "warn", priority = -1 } # enable those to keep our sanity. absolute_paths = "allow" arbitrary_source_item_ordering = "allow" +cast_possible_truncation = "allow" +cast_possible_wrap = "allow" +cast_precision_loss = "allow" +cast_sign_loss = "allow" clone_on_ref_ptr = "warn" dbg_macro = "warn" empty_drop = "warn" @@ -106,10 +110,6 @@ single_call_fn = "allow" std_instead_of_core = "allow" too_long_first_doc_paragraph = "allow" too_many_lines = "allow" -cast_possible_truncation = "allow" -cast_possible_wrap = "allow" -cast_precision_loss = "allow" -cast_sign_loss = "allow" undocumented_unsafe_blocks = "warn" unnecessary_safety_comment = "warn" unused_result_ok = "warn" diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index a228fa9..1d1a0a0 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -487,6 +487,14 @@ pub struct StarredJob { pub created_at: DateTime, } +/// Normalized build output (Hydra-compatible) +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct BuildOutput { + pub build: Uuid, + pub name: String, + pub path: Option, +} + /// Project membership for per-project permissions #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct ProjectMember { diff --git a/crates/common/src/repo/build_outputs.rs b/crates/common/src/repo/build_outputs.rs new file mode 100644 index 0000000..558cbdc --- /dev/null +++ b/crates/common/src/repo/build_outputs.rs @@ -0,0 +1,92 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::{ + error::{CiError, Result}, + models::BuildOutput, +}; + +/// Create a build output record. +/// +/// # Errors +/// +/// Returns error if database insert fails or if a duplicate (build, name) pair +/// exists. +pub async fn create( + pool: &PgPool, + build: Uuid, + name: &str, + path: Option<&str>, +) -> Result { + sqlx::query_as::<_, BuildOutput>( + "INSERT INTO build_outputs (build, name, path) VALUES ($1, $2, $3) \ + RETURNING *", + ) + .bind(build) + .bind(name) + .bind(path) + .fetch_one(pool) + .await + .map_err(|e| { + if let sqlx::Error::Database(db_err) = &e { + if db_err.is_unique_violation() { + return CiError::Conflict(format!( + "Build output with name '{name}' already exists for build {build}" + )); + } + } + CiError::Database(e) + }) +} + +/// List all build outputs for a build, ordered by name. +/// +/// # Errors +/// +/// Returns error if database query fails. +pub async fn list_for_build( + pool: &PgPool, + build: Uuid, +) -> Result> { + sqlx::query_as::<_, BuildOutput>( + "SELECT * FROM build_outputs WHERE build = $1 ORDER BY name ASC", + ) + .bind(build) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// Find build outputs by path. +/// +/// # Errors +/// +/// Returns error if database query fails. +pub async fn find_by_path( + pool: &PgPool, + path: &str, +) -> Result> { + sqlx::query_as::<_, BuildOutput>( + "SELECT * FROM build_outputs WHERE path = $1 ORDER BY build, name", + ) + .bind(path) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// Delete all build outputs for a build. +/// +/// # Errors +/// +/// Returns error if database query fails. +pub async fn delete_for_build(pool: &PgPool, build: Uuid) -> Result { + let result = + sqlx::query("DELETE FROM build_outputs WHERE build = $1") + .bind(build) + .execute(pool) + .await + .map_err(CiError::Database)?; + + Ok(result.rows_affected()) +} diff --git a/crates/common/src/repo/builds.rs b/crates/common/src/repo/builds.rs index d3619cf..86e6d0b 100644 --- a/crates/common/src/repo/builds.rs +++ b/crates/common/src/repo/builds.rs @@ -512,3 +512,21 @@ pub async fn set_builder( .map_err(CiError::Database)?; Ok(()) } + +/// Delete a build by ID. +/// +/// # Errors +/// +/// Returns error if database query fails or build not found. +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM builds WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Build {id} not found"))); + } + + Ok(()) +} diff --git a/crates/common/src/repo/mod.rs b/crates/common/src/repo/mod.rs index df7da2a..335b5df 100644 --- a/crates/common/src/repo/mod.rs +++ b/crates/common/src/repo/mod.rs @@ -1,6 +1,7 @@ pub mod api_keys; pub mod build_dependencies; pub mod build_metrics; +pub mod build_outputs; pub mod build_products; pub mod build_steps; pub mod builds; diff --git a/crates/common/tests/repo_tests.rs b/crates/common/tests/repo_tests.rs index 959f897..50f6a0d 100644 --- a/crates/common/tests/repo_tests.rs +++ b/crates/common/tests/repo_tests.rs @@ -814,3 +814,98 @@ async fn test_dedup_by_drv_path() { // Cleanup repo::projects::delete(&pool, project.id).await.ok(); } + +#[tokio::test] +async fn test_build_outputs_crud() { + let Some(pool) = get_pool().await else { + return; + }; + + // Create project, jobset, evaluation, build + let project = create_test_project(&pool, "test-project").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + let build = create_test_build( + &pool, + eval.id, + "test-job", + "/nix/store/test.drv", + None, + ) + .await; + + // Create outputs + let _out1 = repo::build_outputs::create( + &pool, + build.id, + "out", + Some("/nix/store/abc-result"), + ) + .await + .expect("create output 1"); + + let _out2 = repo::build_outputs::create( + &pool, + build.id, + "dev", + Some("/nix/store/def-result-dev"), + ) + .await + .expect("create output 2"); + + // List outputs for build + let outputs = repo::build_outputs::list_for_build(&pool, build.id) + .await + .expect("list outputs"); + assert_eq!(outputs.len(), 2); + assert_eq!(outputs[0].name, "dev"); // Alphabetical order + assert_eq!(outputs[1].name, "out"); + + // Find by path + let found = repo::build_outputs::find_by_path(&pool, "/nix/store/abc-result") + .await + .expect("find by path"); + assert_eq!(found.len(), 1); + assert_eq!(found[0].build, build.id); + assert_eq!(found[0].name, "out"); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_build_outputs_cascade_delete() { + let Some(pool) = get_pool().await else { + return; + }; + + let project = create_test_project(&pool, "test-project").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + let build = create_test_build( + &pool, + eval.id, + "test-job", + "/nix/store/test.drv", + None, + ) + .await; + + repo::build_outputs::create(&pool, build.id, "out", Some("/nix/store/abc")) + .await + .expect("create output"); + + // Delete build + repo::builds::delete(&pool, build.id) + .await + .expect("delete build"); + + // Verify outputs cascade deleted + let outputs = repo::build_outputs::list_for_build(&pool, build.id) + .await + .expect("list outputs after delete"); + assert_eq!(outputs.len(), 0); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} From 959aba0933c14720d4c927d84ac096fe90645699 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 21:42:27 +0300 Subject: [PATCH 03/10] fc-common: use btree index for `build_outputs.path` in db schema Btree supports NULL values and provides better flexibility than hash index for nullable columns. Signed-off-by: NotAShelf Change-Id: Ida28ef6f88683c360e6f405efca435af6a6a6964 --- crates/common/migrations/0001_schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/migrations/0001_schema.sql b/crates/common/migrations/0001_schema.sql index 39b96ca..a94f5da 100644 --- a/crates/common/migrations/0001_schema.sql +++ b/crates/common/migrations/0001_schema.sql @@ -166,7 +166,7 @@ CREATE TABLE build_outputs ( PRIMARY KEY (build, name) ); -CREATE INDEX idx_build_outputs_path ON build_outputs USING hash (path); +CREATE INDEX idx_build_outputs_path ON build_outputs USING btree (path); -- build_products: output artifacts and metadata CREATE TABLE build_products ( From ec28069b69a5635cc6a78fcde7b53c728956cd9e Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 21:46:59 +0300 Subject: [PATCH 04/10] fc-queue-runner: store build outputs in normalized table Replaces JSON blob storage with BuildOutput records and parse derivation outputs after successful build, then INSERT into `build_outputs` per output. Warnings are logged on storage failures, but it's not fatal. - Parse derivation outputs after successful build - INSERT into build_outputs table per output - Log warnings on storage failures (non-fatal) Signed-off-by: NotAShelf Change-Id: I30120a5ee4aea1bdb170987b22ddc2df6a6a6964 --- crates/queue-runner/src/worker.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index 4c0817a..ed852ad 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -596,6 +596,32 @@ async fn run_build( .map(|obj| obj.keys().cloned().collect()) .unwrap_or_default(); + // Store build outputs in normalized table + for (i, output_path) in build_result.output_paths.iter().enumerate() { + let output_name = output_names.get(i).cloned().unwrap_or_else(|| { + if i == 0 { + "out".to_string() + } else { + format!("out{i}") + } + }); + + if let Err(e) = repo::build_outputs::create( + pool, + build.id, + &output_name, + Some(output_path), + ) + .await + { + tracing::warn!( + build_id = %build.id, + output_name = %output_name, + "Failed to store build output: {e}" + ); + } + } + // Register GC roots and create build products for each output for (i, output_path) in build_result.output_paths.iter().enumerate() { let output_name = output_names.get(i).cloned().unwrap_or_else(|| { From 609ac53c3fd7d588f6fdebb519b21bc52c498f7c Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 21:50:45 +0300 Subject: [PATCH 05/10] fc-queue-server: match output names by path instead of index Now creates a path-to-name lookup `HashMap` from the outputs JSON and matches each output path to its correct name. Both the `build_outputs` table insert and GC root registration loops now use the same lookup to ensure we're consistent. Signed-off-by: NotAShelf Change-Id: Id9f4ce0be6131a7c1a8ce6775ab249db6a6a6964 --- crates/queue-runner/src/worker.rs | 49 ++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/crates/queue-runner/src/worker.rs b/crates/queue-runner/src/worker.rs index ed852ad..d1bc6b9 100644 --- a/crates/queue-runner/src/worker.rs +++ b/crates/queue-runner/src/worker.rs @@ -588,23 +588,35 @@ async fn run_build( }; if build_result.success { - // Parse output names from build's outputs JSON - let output_names: Vec = build + // Build a reverse lookup map: path -> output_name + // The outputs JSON is a HashMap where keys are output names + // and values are store paths. We need to match paths to names correctly. + let path_to_name: std::collections::HashMap = build .outputs .as_ref() .and_then(|v| v.as_object()) - .map(|obj| obj.keys().cloned().collect()) + .map(|obj| { + obj + .iter() + .filter_map(|(name, path)| { + path.as_str().map(|p| (p.to_string(), name.clone())) + }) + .collect() + }) .unwrap_or_default(); // Store build outputs in normalized table for (i, output_path) in build_result.output_paths.iter().enumerate() { - let output_name = output_names.get(i).cloned().unwrap_or_else(|| { - if i == 0 { - "out".to_string() - } else { - format!("out{i}") - } - }); + let output_name = path_to_name + .get(output_path) + .cloned() + .unwrap_or_else(|| { + if i == 0 { + "out".to_string() + } else { + format!("out{i}") + } + }); if let Err(e) = repo::build_outputs::create( pool, @@ -624,13 +636,16 @@ async fn run_build( // Register GC roots and create build products for each output for (i, output_path) in build_result.output_paths.iter().enumerate() { - let output_name = output_names.get(i).cloned().unwrap_or_else(|| { - if i == 0 { - build.job_name.clone() - } else { - format!("{}-{i}", build.job_name) - } - }); + let output_name = path_to_name + .get(output_path) + .cloned() + .unwrap_or_else(|| { + if i == 0 { + build.job_name.clone() + } else { + format!("{}-{i}", build.job_name) + } + }); // Register GC root let mut gc_root_path = None; From b43a11756aa8756ba1aed0b64dad3c77e018dc42 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 22:06:53 +0300 Subject: [PATCH 06/10] fc-common: add `unsupported_timeout` for queue runner Hydra maxUnsupportedTime compatibility. DUration is optional, defaults to `None` and is configurable through `fc.toml` under `queue_runner.unsupported_timeout` key. E.g., "1h". Signed-off-by: NotAShelf Change-Id: I299722e2a3865c873e212a7615b97b806a6a6964 --- Cargo.lock | 17 ++++++++ Cargo.toml | 2 +- crates/common/Cargo.toml | 1 + crates/common/src/config.rs | 83 +++++++++++++++++++++++++++++++++---- 4 files changed, 95 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e00df57..356c56d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -812,6 +812,7 @@ dependencies = [ "config", "git2", "hex", + "humantime-serde", "lettre", "libc", "regex", @@ -1288,6 +1289,22 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index 17a2197..e1d01b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ fc-evaluator = { path = "./crates/evaluator" } fc-queue-runner = { path = "./crates/queue-runner" } fc-server = { path = "./crates/server" } - anyhow = "1.0.101" argon2 = "0.5.3" askama = "0.15.4" @@ -38,6 +37,7 @@ futures = "0.3.31" git2 = "0.20.4" hex = "0.4.3" hmac = "0.12.1" +humantime-serde = "1.1.1" lettre = { version = "0.11.19", default-features = false, features = [ "tokio1-rustls-tls", "smtp-transport", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 1a55cfc..923db5e 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -14,6 +14,7 @@ clap.workspace = true config.workspace = true git2.workspace = true hex.workspace = true +humantime-serde.workspace = true lettre.workspace = true libc.workspace = true regex.workspace = true diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index c7bedbf..66f65f1 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -1,6 +1,7 @@ //! Configuration management for FC CI use std::path::PathBuf; +use std::time::Duration; use config as config_crate; use serde::{Deserialize, Serialize}; @@ -90,6 +91,12 @@ pub struct QueueRunnerConfig { /// TTL in seconds for failed paths cache entries (default 24h). #[serde(default = "default_failed_paths_ttl")] pub failed_paths_ttl: u64, + + /// Timeout after which builds for unsupported systems are aborted. + /// None or 0 = disabled (Hydra maxUnsupportedTime compatibility). + #[serde(default)] + #[serde(with = "humantime_serde")] + pub unsupported_timeout: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -546,13 +553,14 @@ impl Default for EvaluatorConfig { impl Default for QueueRunnerConfig { fn default() -> Self { Self { - workers: 4, - poll_interval: 5, - build_timeout: 3600, - work_dir: PathBuf::from("/tmp/fc-queue-runner"), - strict_errors: false, - failed_paths_cache: true, - failed_paths_ttl: 86400, + workers: 4, + poll_interval: 5, + build_timeout: 3600, + work_dir: PathBuf::from("/tmp/fc-queue-runner"), + strict_errors: false, + failed_paths_cache: true, + failed_paths_ttl: 86400, + unsupported_timeout: None, } } } @@ -853,4 +861,65 @@ mod tests { env::remove_var("FC_SERVER__PORT"); } } + + #[test] + fn test_unsupported_timeout_config() { + let mut config = Config::default(); + config.queue_runner.unsupported_timeout = Some(Duration::from_secs(3600)); + + // Serialize and deserialize to verify serde works + let toml_str = toml::to_string(&config).unwrap(); + let parsed: Config = toml::from_str(&toml_str).unwrap(); + assert_eq!( + parsed.queue_runner.unsupported_timeout, + Some(Duration::from_secs(3600)) + ); + } + + #[test] + fn test_unsupported_timeout_default() { + let config = Config::default(); + assert_eq!(config.queue_runner.unsupported_timeout, None); + } + + #[test] + fn test_unsupported_timeout_various_formats() { + // Test 30 minutes + let mut config = Config::default(); + config.queue_runner.unsupported_timeout = Some(Duration::from_secs(1800)); + let toml_str = toml::to_string(&config).unwrap(); + let parsed: Config = toml::from_str(&toml_str).unwrap(); + assert_eq!( + parsed.queue_runner.unsupported_timeout, + Some(Duration::from_secs(1800)) + ); + + // Test disabled (0s) + let mut config = Config::default(); + config.queue_runner.unsupported_timeout = Some(Duration::from_secs(0)); + let toml_str = toml::to_string(&config).unwrap(); + let parsed: Config = toml::from_str(&toml_str).unwrap(); + assert_eq!( + parsed.queue_runner.unsupported_timeout, + Some(Duration::from_secs(0)) + ); + } + + #[test] + fn test_humantime_serde_parsing() { + // Test that we can directly parse a QueueRunnerConfig with humantime format + let toml = r#" +workers = 4 +poll_interval = 5 +build_timeout = 3600 +work_dir = "/tmp/fc" +unsupported_timeout = "2h 30m" + "#; + + let qr_config: QueueRunnerConfig = toml::from_str(toml).unwrap(); + assert_eq!( + qr_config.unsupported_timeout, + Some(Duration::from_secs(9000)) // 2.5 hours + ); + } } From f5c16aef83812db867dcc7ab65c7327f90290cc8 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 22:54:18 +0300 Subject: [PATCH 07/10] fc-common: add `unsupported_timeout` for queue runner Signed-off-by: NotAShelf Change-Id: I76805c31bbfc11e0a596c6b3b88c52c06a6a6964 --- crates/common/src/config.rs | 17 ++++++++++ crates/queue-runner/src/main.rs | 3 +- crates/queue-runner/src/runner_loop.rs | 47 ++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 66f65f1..4de4587 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -923,3 +923,20 @@ unsupported_timeout = "2h 30m" ); } } + +#[cfg(test)] +mod humantime_option_test { + use super::*; + + #[test] + fn test_option_humantime_missing() { + let toml = r#" +workers = 4 +poll_interval = 5 +build_timeout = 3600 +work_dir = "/tmp/fc" + "#; + let config: QueueRunnerConfig = toml::from_str(toml).unwrap(); + assert_eq!(config.unsupported_timeout, None); + } +} diff --git a/crates/queue-runner/src/main.rs b/crates/queue-runner/src/main.rs index f109d55..f1ab427 100644 --- a/crates/queue-runner/src/main.rs +++ b/crates/queue-runner/src/main.rs @@ -40,6 +40,7 @@ async fn main() -> anyhow::Result<()> { let failed_paths_cache = qr_config.failed_paths_cache; let failed_paths_ttl = qr_config.failed_paths_ttl; let work_dir = qr_config.work_dir; + let unsupported_timeout = qr_config.unsupported_timeout; // Ensure the work directory exists tokio::fs::create_dir_all(&work_dir).await?; @@ -82,7 +83,7 @@ async fn main() -> anyhow::Result<()> { let active_builds = worker_pool.active_builds().clone(); tokio::select! { - result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => { + result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone(), unsupported_timeout) => { if let Err(e) = result { tracing::error!("Runner loop failed: {e}"); } diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs index 451f032..4f3f24f 100644 --- a/crates/queue-runner/src/runner_loop.rs +++ b/crates/queue-runner/src/runner_loop.rs @@ -38,6 +38,7 @@ pub async fn run( strict_errors: bool, failed_paths_cache: bool, notifications_config: fc_common::config::NotificationsConfig, + unsupported_timeout: Option, ) -> anyhow::Result<()> { // Reset orphaned builds from previous crashes (older than 5 minutes) match repo::builds::reset_orphaned(&pool, 300).await { @@ -207,6 +208,52 @@ pub async fn run( }, } + // Unsupported system timeout: abort builds with no available builders + if let Some(timeout) = unsupported_timeout { + if let Some(system) = &build.system { + match repo::remote_builders::find_for_system(&pool, system).await { + Ok(builders) if builders.is_empty() => { + let timeout_at = build.created_at + timeout; + if chrono::Utc::now() > timeout_at { + tracing::info!( + build_id = %build.id, + system = %system, + timeout = ?timeout, + "Aborting build: no builder available for system type" + ); + + if let Err(e) = repo::builds::start(&pool, build.id).await { + tracing::warn!(build_id = %build.id, "Failed to start unsupported build: {e}"); + } + + if let Err(e) = repo::builds::complete( + &pool, + build.id, + BuildStatus::UnsupportedSystem, + None, + None, + Some("No builder available for system type"), + ) + .await + { + tracing::warn!(build_id = %build.id, "Failed to complete unsupported build: {e}"); + } + + continue; + } + }, + Ok(_) => {}, // Builders available, proceed normally + Err(e) => { + tracing::error!( + build_id = %build.id, + "Failed to check builders for unsupported system: {e}" + ); + continue; + }, + } + } + } + // One-at-a-time scheduling: check if jobset allows concurrent builds // First, get the evaluation to find the jobset let eval = From 4050de5b4eb1a6c660ae9233d5ef9bc02dd25b1a Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 23:17:58 +0300 Subject: [PATCH 08/10] fc-common: GitHub rate limit state extraction Extract `X-RateLimit-*` headers from responses and calculate an adaptive delay. Minimum delay is 1 seconds to prevent division by 0. Signed-off-by: NotAShelf Change-Id: Ib35b0d0e720098e2c68ced88a8821c7b6a6a6964 --- crates/common/src/notifications.rs | 23 +++++++++ crates/common/tests/mod.rs | 2 + crates/common/tests/notifications_tests.rs | 59 ++++++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 crates/common/tests/notifications_tests.rs diff --git a/crates/common/src/notifications.rs b/crates/common/src/notifications.rs index b97b766..4bc5088 100644 --- a/crates/common/src/notifications.rs +++ b/crates/common/src/notifications.rs @@ -18,6 +18,29 @@ fn http_client() -> &'static reqwest::Client { CLIENT.get_or_init(reqwest::Client::new) } +#[derive(Debug, Clone, Copy)] +pub struct RateLimitState { + pub limit: u64, + pub remaining: u64, + pub reset_at: u64, +} + +pub fn extract_rate_limit_from_headers( + headers: &reqwest::header::HeaderMap, +) -> Option { + let limit = headers.get("X-RateLimit-Limit")?.to_str().ok()?.parse().ok()?; + let remaining = headers.get("X-RateLimit-Remaining")?.to_str().ok()?.parse().ok()?; + let reset_at = headers.get("X-RateLimit-Reset")?.to_str().ok()?.parse().ok()?; + Some(RateLimitState { limit, remaining, reset_at }) +} + +pub fn calculate_delay(state: &RateLimitState, now: u64) -> u64 { + let seconds_until_reset = state.reset_at.saturating_sub(now).max(1); + let consumed = state.limit.saturating_sub(state.remaining); + let delay = (consumed * 5) / seconds_until_reset; + delay.max(1) +} + /// Dispatch all configured notifications for a completed build. /// If retry queue is enabled, enqueues tasks; otherwise sends immediately. pub async fn dispatch_build_finished( diff --git a/crates/common/tests/mod.rs b/crates/common/tests/mod.rs index 9f396e4..715df32 100644 --- a/crates/common/tests/mod.rs +++ b/crates/common/tests/mod.rs @@ -1,5 +1,7 @@ //! Integration tests for database and configuration +mod notifications_tests; + use fc_common::{ Database, config::{Config, DatabaseConfig}, diff --git a/crates/common/tests/notifications_tests.rs b/crates/common/tests/notifications_tests.rs new file mode 100644 index 0000000..66c7141 --- /dev/null +++ b/crates/common/tests/notifications_tests.rs @@ -0,0 +1,59 @@ +use fc_common::notifications::*; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[test] +fn test_rate_limit_extraction() { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert("X-RateLimit-Limit", "5000".parse().unwrap()); + headers.insert("X-RateLimit-Remaining", "1234".parse().unwrap()); + headers.insert("X-RateLimit-Reset", "1735689600".parse().unwrap()); + + let state = extract_rate_limit_from_headers(&headers); + assert!(state.is_some()); + + let state = state.unwrap(); + assert_eq!(state.limit, 5000); + assert_eq!(state.remaining, 1234); + assert_eq!(state.reset_at, 1735689600); +} + +#[test] +fn test_rate_limit_missing_headers() { + let headers = reqwest::header::HeaderMap::new(); + let state = extract_rate_limit_from_headers(&headers); + assert!(state.is_none()); +} + +#[test] +fn test_sleep_duration_calculation() { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let state = RateLimitState { + limit: 5000, + remaining: 500, + reset_at: now + 3600, + }; + + let delay = calculate_delay(&state, now); + assert!(delay >= 6 && delay <= 7); +} + +#[test] +fn test_sleep_duration_minimum() { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let state = RateLimitState { + limit: 5000, + remaining: 4999, + reset_at: now + 10000, + }; + + let delay = calculate_delay(&state, now); + assert_eq!(delay, 1); +} From 92844d302e683c08a93d11c08d9e696bbb47ddc6 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 23:28:57 +0300 Subject: [PATCH 09/10] fc-common: properly implement GitHub API rate limiting; cleanup More or less mimics Hydra's `GithubStatus.pm` adaptive throttling: - Log rate limit status when remaining <= 2000 - Sleep when remaining <= 1000 Adaptive delay spreads requests over reset window. Rate limits are extracted from *every response* for better accuracy. Accuracy is *critical* because the primary failure is to get rate limited more often due to miscalculation. Signed-off-by: NotAShelf Change-Id: I3b1fa444c937715c604002d71510bcf76a6a6964 --- crates/common/src/notifications.rs | 41 +++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/crates/common/src/notifications.rs b/crates/common/src/notifications.rs index 4bc5088..2e57c2b 100644 --- a/crates/common/src/notifications.rs +++ b/crates/common/src/notifications.rs @@ -1,6 +1,9 @@ //! Notification dispatch for build events -use std::sync::OnceLock; +use std::{ + sync::OnceLock, + time::{SystemTime, UNIX_EPOCH}, +}; use sqlx::PgPool; use tracing::{error, info, warn}; @@ -475,13 +478,45 @@ async fn set_github_status( .await { Ok(resp) => { - if resp.status().is_success() { + let is_success = resp.status().is_success(); + let status = resp.status(); + + // Extract rate limit state from response headers before consuming body + let rate_limit = extract_rate_limit_from_headers(resp.headers()); + + if is_success { info!(build_id = %build.id, "Set GitHub commit status: {state}"); } else { - let status = resp.status(); let text = resp.text().await.unwrap_or_default(); warn!("GitHub status API returned {status}: {text}"); } + + // Handle rate limiting based on extracted state + if let Some(rate_limit) = rate_limit { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Log when approaching limit (Hydra threshold: 2000) + if rate_limit.remaining <= 2000 { + let seconds_until_reset = rate_limit.reset_at.saturating_sub(now); + info!( + "GitHub rate limit: {}/{}, resets in {}s", + rate_limit.remaining, rate_limit.limit, seconds_until_reset + ); + } + + // Sleep when critical (Hydra threshold: 1000) + if rate_limit.remaining <= 1000 { + let delay = calculate_delay(&rate_limit, now); + warn!( + "GitHub rate limit critical: {}/{}, sleeping {}s", + rate_limit.remaining, rate_limit.limit, delay + ); + tokio::time::sleep(std::time::Duration::from_secs(delay)).await; + } + } }, Err(e) => error!("GitHub status API request failed: {e}"), } From eb8b23134096de269fb927014c44776fd30ec8ba Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 28 Feb 2026 23:33:16 +0300 Subject: [PATCH 10/10] chore: collapse if statements; autofix Clippy warnings Signed-off-by: NotAShelf Change-Id: I2765a6864b5435d803472b0fba313d7e6a6a6964 --- Cargo.toml | 1 + crates/common/src/notifications.rs | 2 ++ crates/common/src/repo/build_outputs.rs | 5 ++--- crates/queue-runner/src/runner_loop.rs | 5 ++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e1d01b5..3d5bf5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ similar_names = "allow" single_call_fn = "allow" std_instead_of_core = "allow" too_long_first_doc_paragraph = "allow" +too_many_arguments = "allow" # I don't care too_many_lines = "allow" undocumented_unsafe_blocks = "warn" unnecessary_safety_comment = "warn" diff --git a/crates/common/src/notifications.rs b/crates/common/src/notifications.rs index 2e57c2b..ae7e817 100644 --- a/crates/common/src/notifications.rs +++ b/crates/common/src/notifications.rs @@ -28,6 +28,7 @@ pub struct RateLimitState { pub reset_at: u64, } +#[must_use] pub fn extract_rate_limit_from_headers( headers: &reqwest::header::HeaderMap, ) -> Option { @@ -37,6 +38,7 @@ pub fn extract_rate_limit_from_headers( Some(RateLimitState { limit, remaining, reset_at }) } +#[must_use] pub fn calculate_delay(state: &RateLimitState, now: u64) -> u64 { let seconds_until_reset = state.reset_at.saturating_sub(now).max(1); let consumed = state.limit.saturating_sub(state.remaining); diff --git a/crates/common/src/repo/build_outputs.rs b/crates/common/src/repo/build_outputs.rs index 558cbdc..66ed5b5 100644 --- a/crates/common/src/repo/build_outputs.rs +++ b/crates/common/src/repo/build_outputs.rs @@ -28,13 +28,12 @@ pub async fn create( .fetch_one(pool) .await .map_err(|e| { - if let sqlx::Error::Database(db_err) = &e { - if db_err.is_unique_violation() { + if let sqlx::Error::Database(db_err) = &e + && db_err.is_unique_violation() { return CiError::Conflict(format!( "Build output with name '{name}' already exists for build {build}" )); } - } CiError::Database(e) }) } diff --git a/crates/queue-runner/src/runner_loop.rs b/crates/queue-runner/src/runner_loop.rs index 4f3f24f..aa3655b 100644 --- a/crates/queue-runner/src/runner_loop.rs +++ b/crates/queue-runner/src/runner_loop.rs @@ -209,8 +209,8 @@ pub async fn run( } // Unsupported system timeout: abort builds with no available builders - if let Some(timeout) = unsupported_timeout { - if let Some(system) = &build.system { + if let Some(timeout) = unsupported_timeout + && let Some(system) = &build.system { match repo::remote_builders::find_for_system(&pool, system).await { Ok(builders) if builders.is_empty() => { let timeout_at = build.created_at + timeout; @@ -252,7 +252,6 @@ pub async fn run( }, } } - } // One-at-a-time scheduling: check if jobset allows concurrent builds // First, get the evaluation to find the jobset