Compare commits

..

No commits in common. "eb8b23134096de269fb927014c44776fd30ec8ba" and "1336f998bf6385dd564424c6ebd5bfd2ccf077dc" have entirely different histories.

16 changed files with 30 additions and 564 deletions

17
Cargo.lock generated
View file

@ -812,7 +812,6 @@ dependencies = [
"config",
"git2",
"hex",
"humantime-serde",
"lettre",
"libc",
"regex",
@ -1289,22 +1288,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "humantime-serde"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
dependencies = [
"humantime",
"serde",
]
[[package]]
name = "hyper"
version = "1.8.1"

View file

@ -23,6 +23,7 @@ fc-evaluator = { path = "./crates/evaluator" }
fc-queue-runner = { path = "./crates/queue-runner" }
fc-server = { path = "./crates/server" }
anyhow = "1.0.101"
argon2 = "0.5.3"
askama = "0.15.4"
@ -37,7 +38,6 @@ futures = "0.3.31"
git2 = "0.20.4"
hex = "0.4.3"
hmac = "0.12.1"
humantime-serde = "1.1.1"
lettre = { version = "0.11.19", default-features = false, features = [
"tokio1-rustls-tls",
"smtp-transport",
@ -81,10 +81,6 @@ style = { level = "warn", priority = -1 }
# enable those to keep our sanity.
absolute_paths = "allow"
arbitrary_source_item_ordering = "allow"
cast_possible_truncation = "allow"
cast_possible_wrap = "allow"
cast_precision_loss = "allow"
cast_sign_loss = "allow"
clone_on_ref_ptr = "warn"
dbg_macro = "warn"
empty_drop = "warn"
@ -109,8 +105,11 @@ similar_names = "allow"
single_call_fn = "allow"
std_instead_of_core = "allow"
too_long_first_doc_paragraph = "allow"
too_many_arguments = "allow" # I don't care
too_many_lines = "allow"
cast_possible_truncation = "allow"
cast_possible_wrap = "allow"
cast_precision_loss = "allow"
cast_sign_loss = "allow"
undocumented_unsafe_blocks = "warn"
unnecessary_safety_comment = "warn"
unused_result_ok = "warn"

View file

@ -14,7 +14,6 @@ clap.workspace = true
config.workspace = true
git2.workspace = true
hex.workspace = true
humantime-serde.workspace = true
lettre.workspace = true
libc.workspace = true
regex.workspace = true

View file

@ -158,16 +158,6 @@ CREATE TABLE builds (
UNIQUE (evaluation_id, job_name)
);
-- build_outputs: normalized output storage
CREATE TABLE build_outputs (
build UUID NOT NULL REFERENCES builds (id) ON DELETE CASCADE,
name TEXT NOT NULL,
path TEXT,
PRIMARY KEY (build, name)
);
CREATE INDEX idx_build_outputs_path ON build_outputs USING btree (path);
-- build_products: output artifacts and metadata
CREATE TABLE build_products (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (),
@ -544,7 +534,10 @@ CREATE TABLE notification_tasks (
);
-- Indexes: notification_tasks
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (status, next_retry_at)
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (
status,
next_retry_at
)
WHERE
status IN ('pending', 'running');

View file

@ -1,7 +1,6 @@
//! Configuration management for FC CI
use std::path::PathBuf;
use std::time::Duration;
use config as config_crate;
use serde::{Deserialize, Serialize};
@ -91,12 +90,6 @@ pub struct QueueRunnerConfig {
/// TTL in seconds for failed paths cache entries (default 24h).
#[serde(default = "default_failed_paths_ttl")]
pub failed_paths_ttl: u64,
/// Timeout after which builds for unsupported systems are aborted.
/// None or 0 = disabled (Hydra maxUnsupportedTime compatibility).
#[serde(default)]
#[serde(with = "humantime_serde")]
pub unsupported_timeout: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -553,14 +546,13 @@ impl Default for EvaluatorConfig {
impl Default for QueueRunnerConfig {
fn default() -> Self {
Self {
workers: 4,
poll_interval: 5,
build_timeout: 3600,
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
strict_errors: false,
failed_paths_cache: true,
failed_paths_ttl: 86400,
unsupported_timeout: None,
workers: 4,
poll_interval: 5,
build_timeout: 3600,
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
strict_errors: false,
failed_paths_cache: true,
failed_paths_ttl: 86400,
}
}
}
@ -861,82 +853,4 @@ mod tests {
env::remove_var("FC_SERVER__PORT");
}
}
#[test]
fn test_unsupported_timeout_config() {
let mut config = Config::default();
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(3600));
// Serialize and deserialize to verify serde works
let toml_str = toml::to_string(&config).unwrap();
let parsed: Config = toml::from_str(&toml_str).unwrap();
assert_eq!(
parsed.queue_runner.unsupported_timeout,
Some(Duration::from_secs(3600))
);
}
#[test]
fn test_unsupported_timeout_default() {
let config = Config::default();
assert_eq!(config.queue_runner.unsupported_timeout, None);
}
#[test]
fn test_unsupported_timeout_various_formats() {
// Test 30 minutes
let mut config = Config::default();
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(1800));
let toml_str = toml::to_string(&config).unwrap();
let parsed: Config = toml::from_str(&toml_str).unwrap();
assert_eq!(
parsed.queue_runner.unsupported_timeout,
Some(Duration::from_secs(1800))
);
// Test disabled (0s)
let mut config = Config::default();
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(0));
let toml_str = toml::to_string(&config).unwrap();
let parsed: Config = toml::from_str(&toml_str).unwrap();
assert_eq!(
parsed.queue_runner.unsupported_timeout,
Some(Duration::from_secs(0))
);
}
#[test]
fn test_humantime_serde_parsing() {
// Test that we can directly parse a QueueRunnerConfig with humantime format
let toml = r#"
workers = 4
poll_interval = 5
build_timeout = 3600
work_dir = "/tmp/fc"
unsupported_timeout = "2h 30m"
"#;
let qr_config: QueueRunnerConfig = toml::from_str(toml).unwrap();
assert_eq!(
qr_config.unsupported_timeout,
Some(Duration::from_secs(9000)) // 2.5 hours
);
}
}
#[cfg(test)]
mod humantime_option_test {
use super::*;
#[test]
fn test_option_humantime_missing() {
let toml = r#"
workers = 4
poll_interval = 5
build_timeout = 3600
work_dir = "/tmp/fc"
"#;
let config: QueueRunnerConfig = toml::from_str(toml).unwrap();
assert_eq!(config.unsupported_timeout, None);
}
}

View file

@ -487,14 +487,6 @@ pub struct StarredJob {
pub created_at: DateTime<Utc>,
}
/// Normalized build output (Hydra-compatible)
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct BuildOutput {
pub build: Uuid,
pub name: String,
pub path: Option<String>,
}
/// Project membership for per-project permissions
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct ProjectMember {

View file

@ -1,9 +1,6 @@
//! Notification dispatch for build events
use std::{
sync::OnceLock,
time::{SystemTime, UNIX_EPOCH},
};
use std::sync::OnceLock;
use sqlx::PgPool;
use tracing::{error, info, warn};
@ -21,31 +18,6 @@ fn http_client() -> &'static reqwest::Client {
CLIENT.get_or_init(reqwest::Client::new)
}
#[derive(Debug, Clone, Copy)]
pub struct RateLimitState {
pub limit: u64,
pub remaining: u64,
pub reset_at: u64,
}
#[must_use]
pub fn extract_rate_limit_from_headers(
headers: &reqwest::header::HeaderMap,
) -> Option<RateLimitState> {
let limit = headers.get("X-RateLimit-Limit")?.to_str().ok()?.parse().ok()?;
let remaining = headers.get("X-RateLimit-Remaining")?.to_str().ok()?.parse().ok()?;
let reset_at = headers.get("X-RateLimit-Reset")?.to_str().ok()?.parse().ok()?;
Some(RateLimitState { limit, remaining, reset_at })
}
#[must_use]
pub fn calculate_delay(state: &RateLimitState, now: u64) -> u64 {
let seconds_until_reset = state.reset_at.saturating_sub(now).max(1);
let consumed = state.limit.saturating_sub(state.remaining);
let delay = (consumed * 5) / seconds_until_reset;
delay.max(1)
}
/// Dispatch all configured notifications for a completed build.
/// If retry queue is enabled, enqueues tasks; otherwise sends immediately.
pub async fn dispatch_build_finished(
@ -480,45 +452,13 @@ async fn set_github_status(
.await
{
Ok(resp) => {
let is_success = resp.status().is_success();
let status = resp.status();
// Extract rate limit state from response headers before consuming body
let rate_limit = extract_rate_limit_from_headers(resp.headers());
if is_success {
if resp.status().is_success() {
info!(build_id = %build.id, "Set GitHub commit status: {state}");
} else {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!("GitHub status API returned {status}: {text}");
}
// Handle rate limiting based on extracted state
if let Some(rate_limit) = rate_limit {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
// Log when approaching limit (Hydra threshold: 2000)
if rate_limit.remaining <= 2000 {
let seconds_until_reset = rate_limit.reset_at.saturating_sub(now);
info!(
"GitHub rate limit: {}/{}, resets in {}s",
rate_limit.remaining, rate_limit.limit, seconds_until_reset
);
}
// Sleep when critical (Hydra threshold: 1000)
if rate_limit.remaining <= 1000 {
let delay = calculate_delay(&rate_limit, now);
warn!(
"GitHub rate limit critical: {}/{}, sleeping {}s",
rate_limit.remaining, rate_limit.limit, delay
);
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
}
}
},
Err(e) => error!("GitHub status API request failed: {e}"),
}

View file

@ -1,91 +0,0 @@
use sqlx::PgPool;
use uuid::Uuid;
use crate::{
error::{CiError, Result},
models::BuildOutput,
};
/// Create a build output record.
///
/// # Errors
///
/// Returns error if database insert fails or if a duplicate (build, name) pair
/// exists.
pub async fn create(
pool: &PgPool,
build: Uuid,
name: &str,
path: Option<&str>,
) -> Result<BuildOutput> {
sqlx::query_as::<_, BuildOutput>(
"INSERT INTO build_outputs (build, name, path) VALUES ($1, $2, $3) \
RETURNING *",
)
.bind(build)
.bind(name)
.bind(path)
.fetch_one(pool)
.await
.map_err(|e| {
if let sqlx::Error::Database(db_err) = &e
&& db_err.is_unique_violation() {
return CiError::Conflict(format!(
"Build output with name '{name}' already exists for build {build}"
));
}
CiError::Database(e)
})
}
/// List all build outputs for a build, ordered by name.
///
/// # Errors
///
/// Returns error if database query fails.
pub async fn list_for_build(
pool: &PgPool,
build: Uuid,
) -> Result<Vec<BuildOutput>> {
sqlx::query_as::<_, BuildOutput>(
"SELECT * FROM build_outputs WHERE build = $1 ORDER BY name ASC",
)
.bind(build)
.fetch_all(pool)
.await
.map_err(CiError::Database)
}
/// Find build outputs by path.
///
/// # Errors
///
/// Returns error if database query fails.
pub async fn find_by_path(
pool: &PgPool,
path: &str,
) -> Result<Vec<BuildOutput>> {
sqlx::query_as::<_, BuildOutput>(
"SELECT * FROM build_outputs WHERE path = $1 ORDER BY build, name",
)
.bind(path)
.fetch_all(pool)
.await
.map_err(CiError::Database)
}
/// Delete all build outputs for a build.
///
/// # Errors
///
/// Returns error if database query fails.
pub async fn delete_for_build(pool: &PgPool, build: Uuid) -> Result<u64> {
let result =
sqlx::query("DELETE FROM build_outputs WHERE build = $1")
.bind(build)
.execute(pool)
.await
.map_err(CiError::Database)?;
Ok(result.rows_affected())
}

View file

@ -512,21 +512,3 @@ pub async fn set_builder(
.map_err(CiError::Database)?;
Ok(())
}
/// Delete a build by ID.
///
/// # Errors
///
/// Returns error if database query fails or build not found.
pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
let result = sqlx::query("DELETE FROM builds WHERE id = $1")
.bind(id)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(CiError::NotFound(format!("Build {id} not found")));
}
Ok(())
}

View file

@ -1,7 +1,6 @@
pub mod api_keys;
pub mod build_dependencies;
pub mod build_metrics;
pub mod build_outputs;
pub mod build_products;
pub mod build_steps;
pub mod builds;

View file

@ -1,7 +1,5 @@
//! Integration tests for database and configuration
mod notifications_tests;
use fc_common::{
Database,
config::{Config, DatabaseConfig},

View file

@ -1,59 +0,0 @@
use fc_common::notifications::*;
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn test_rate_limit_extraction() {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("X-RateLimit-Limit", "5000".parse().unwrap());
headers.insert("X-RateLimit-Remaining", "1234".parse().unwrap());
headers.insert("X-RateLimit-Reset", "1735689600".parse().unwrap());
let state = extract_rate_limit_from_headers(&headers);
assert!(state.is_some());
let state = state.unwrap();
assert_eq!(state.limit, 5000);
assert_eq!(state.remaining, 1234);
assert_eq!(state.reset_at, 1735689600);
}
#[test]
fn test_rate_limit_missing_headers() {
let headers = reqwest::header::HeaderMap::new();
let state = extract_rate_limit_from_headers(&headers);
assert!(state.is_none());
}
#[test]
fn test_sleep_duration_calculation() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let state = RateLimitState {
limit: 5000,
remaining: 500,
reset_at: now + 3600,
};
let delay = calculate_delay(&state, now);
assert!(delay >= 6 && delay <= 7);
}
#[test]
fn test_sleep_duration_minimum() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let state = RateLimitState {
limit: 5000,
remaining: 4999,
reset_at: now + 10000,
};
let delay = calculate_delay(&state, now);
assert_eq!(delay, 1);
}

View file

@ -814,98 +814,3 @@ async fn test_dedup_by_drv_path() {
// Cleanup
repo::projects::delete(&pool, project.id).await.ok();
}
#[tokio::test]
async fn test_build_outputs_crud() {
let Some(pool) = get_pool().await else {
return;
};
// Create project, jobset, evaluation, build
let project = create_test_project(&pool, "test-project").await;
let jobset = create_test_jobset(&pool, project.id).await;
let eval = create_test_eval(&pool, jobset.id).await;
let build = create_test_build(
&pool,
eval.id,
"test-job",
"/nix/store/test.drv",
None,
)
.await;
// Create outputs
let _out1 = repo::build_outputs::create(
&pool,
build.id,
"out",
Some("/nix/store/abc-result"),
)
.await
.expect("create output 1");
let _out2 = repo::build_outputs::create(
&pool,
build.id,
"dev",
Some("/nix/store/def-result-dev"),
)
.await
.expect("create output 2");
// List outputs for build
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
.await
.expect("list outputs");
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].name, "dev"); // Alphabetical order
assert_eq!(outputs[1].name, "out");
// Find by path
let found = repo::build_outputs::find_by_path(&pool, "/nix/store/abc-result")
.await
.expect("find by path");
assert_eq!(found.len(), 1);
assert_eq!(found[0].build, build.id);
assert_eq!(found[0].name, "out");
// Cleanup
repo::projects::delete(&pool, project.id).await.ok();
}
#[tokio::test]
async fn test_build_outputs_cascade_delete() {
let Some(pool) = get_pool().await else {
return;
};
let project = create_test_project(&pool, "test-project").await;
let jobset = create_test_jobset(&pool, project.id).await;
let eval = create_test_eval(&pool, jobset.id).await;
let build = create_test_build(
&pool,
eval.id,
"test-job",
"/nix/store/test.drv",
None,
)
.await;
repo::build_outputs::create(&pool, build.id, "out", Some("/nix/store/abc"))
.await
.expect("create output");
// Delete build
repo::builds::delete(&pool, build.id)
.await
.expect("delete build");
// Verify outputs cascade deleted
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
.await
.expect("list outputs after delete");
assert_eq!(outputs.len(), 0);
// Cleanup
repo::projects::delete(&pool, project.id).await.ok();
}

View file

@ -40,7 +40,6 @@ async fn main() -> anyhow::Result<()> {
let failed_paths_cache = qr_config.failed_paths_cache;
let failed_paths_ttl = qr_config.failed_paths_ttl;
let work_dir = qr_config.work_dir;
let unsupported_timeout = qr_config.unsupported_timeout;
// Ensure the work directory exists
tokio::fs::create_dir_all(&work_dir).await?;
@ -83,7 +82,7 @@ async fn main() -> anyhow::Result<()> {
let active_builds = worker_pool.active_builds().clone();
tokio::select! {
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone(), unsupported_timeout) => {
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => {
if let Err(e) = result {
tracing::error!("Runner loop failed: {e}");
}

View file

@ -38,7 +38,6 @@ pub async fn run(
strict_errors: bool,
failed_paths_cache: bool,
notifications_config: fc_common::config::NotificationsConfig,
unsupported_timeout: Option<Duration>,
) -> anyhow::Result<()> {
// Reset orphaned builds from previous crashes (older than 5 minutes)
match repo::builds::reset_orphaned(&pool, 300).await {
@ -208,51 +207,6 @@ pub async fn run(
},
}
// Unsupported system timeout: abort builds with no available builders
if let Some(timeout) = unsupported_timeout
&& let Some(system) = &build.system {
match repo::remote_builders::find_for_system(&pool, system).await {
Ok(builders) if builders.is_empty() => {
let timeout_at = build.created_at + timeout;
if chrono::Utc::now() > timeout_at {
tracing::info!(
build_id = %build.id,
system = %system,
timeout = ?timeout,
"Aborting build: no builder available for system type"
);
if let Err(e) = repo::builds::start(&pool, build.id).await {
tracing::warn!(build_id = %build.id, "Failed to start unsupported build: {e}");
}
if let Err(e) = repo::builds::complete(
&pool,
build.id,
BuildStatus::UnsupportedSystem,
None,
None,
Some("No builder available for system type"),
)
.await
{
tracing::warn!(build_id = %build.id, "Failed to complete unsupported build: {e}");
}
continue;
}
},
Ok(_) => {}, // Builders available, proceed normally
Err(e) => {
tracing::error!(
build_id = %build.id,
"Failed to check builders for unsupported system: {e}"
);
continue;
},
}
}
// One-at-a-time scheduling: check if jobset allows concurrent builds
// First, get the evaluation to find the jobset
let eval =

View file

@ -588,64 +588,23 @@ async fn run_build(
};
if build_result.success {
// Build a reverse lookup map: path -> output_name
// The outputs JSON is a HashMap<String, String> where keys are output names
// and values are store paths. We need to match paths to names correctly.
let path_to_name: std::collections::HashMap<String, String> = build
// Parse output names from build's outputs JSON
let output_names: Vec<String> = build
.outputs
.as_ref()
.and_then(|v| v.as_object())
.map(|obj| {
obj
.iter()
.filter_map(|(name, path)| {
path.as_str().map(|p| (p.to_string(), name.clone()))
})
.collect()
})
.map(|obj| obj.keys().cloned().collect())
.unwrap_or_default();
// Store build outputs in normalized table
for (i, output_path) in build_result.output_paths.iter().enumerate() {
let output_name = path_to_name
.get(output_path)
.cloned()
.unwrap_or_else(|| {
if i == 0 {
"out".to_string()
} else {
format!("out{i}")
}
});
if let Err(e) = repo::build_outputs::create(
pool,
build.id,
&output_name,
Some(output_path),
)
.await
{
tracing::warn!(
build_id = %build.id,
output_name = %output_name,
"Failed to store build output: {e}"
);
}
}
// Register GC roots and create build products for each output
for (i, output_path) in build_result.output_paths.iter().enumerate() {
let output_name = path_to_name
.get(output_path)
.cloned()
.unwrap_or_else(|| {
if i == 0 {
build.job_name.clone()
} else {
format!("{}-{i}", build.job_name)
}
});
let output_name = output_names.get(i).cloned().unwrap_or_else(|| {
if i == 0 {
build.job_name.clone()
} else {
format!("{}-{i}", build.job_name)
}
});
// Register GC root
let mut gc_root_path = None;