Compare commits
10 commits
1336f998bf
...
eb8b231340
| Author | SHA1 | Date | |
|---|---|---|---|
|
eb8b231340 |
|||
|
92844d302e |
|||
|
4050de5b4e |
|||
|
f5c16aef83 |
|||
|
b43a11756a |
|||
|
609ac53c3f |
|||
|
ec28069b69 |
|||
|
959aba0933 |
|||
|
01cd4439aa |
|||
|
b1a7233a05 |
16 changed files with 564 additions and 30 deletions
17
Cargo.lock
generated
17
Cargo.lock
generated
|
|
@ -812,6 +812,7 @@ dependencies = [
|
|||
"config",
|
||||
"git2",
|
||||
"hex",
|
||||
"humantime-serde",
|
||||
"lettre",
|
||||
"libc",
|
||||
"regex",
|
||||
|
|
@ -1288,6 +1289,22 @@ version = "1.0.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||
|
||||
[[package]]
|
||||
name = "humantime-serde"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
|
||||
dependencies = [
|
||||
"humantime",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.8.1"
|
||||
|
|
|
|||
11
Cargo.toml
11
Cargo.toml
|
|
@ -23,7 +23,6 @@ fc-evaluator = { path = "./crates/evaluator" }
|
|||
fc-queue-runner = { path = "./crates/queue-runner" }
|
||||
fc-server = { path = "./crates/server" }
|
||||
|
||||
|
||||
anyhow = "1.0.101"
|
||||
argon2 = "0.5.3"
|
||||
askama = "0.15.4"
|
||||
|
|
@ -38,6 +37,7 @@ futures = "0.3.31"
|
|||
git2 = "0.20.4"
|
||||
hex = "0.4.3"
|
||||
hmac = "0.12.1"
|
||||
humantime-serde = "1.1.1"
|
||||
lettre = { version = "0.11.19", default-features = false, features = [
|
||||
"tokio1-rustls-tls",
|
||||
"smtp-transport",
|
||||
|
|
@ -81,6 +81,10 @@ style = { level = "warn", priority = -1 }
|
|||
# enable those to keep our sanity.
|
||||
absolute_paths = "allow"
|
||||
arbitrary_source_item_ordering = "allow"
|
||||
cast_possible_truncation = "allow"
|
||||
cast_possible_wrap = "allow"
|
||||
cast_precision_loss = "allow"
|
||||
cast_sign_loss = "allow"
|
||||
clone_on_ref_ptr = "warn"
|
||||
dbg_macro = "warn"
|
||||
empty_drop = "warn"
|
||||
|
|
@ -105,11 +109,8 @@ similar_names = "allow"
|
|||
single_call_fn = "allow"
|
||||
std_instead_of_core = "allow"
|
||||
too_long_first_doc_paragraph = "allow"
|
||||
too_many_arguments = "allow" # I don't care
|
||||
too_many_lines = "allow"
|
||||
cast_possible_truncation = "allow"
|
||||
cast_possible_wrap = "allow"
|
||||
cast_precision_loss = "allow"
|
||||
cast_sign_loss = "allow"
|
||||
undocumented_unsafe_blocks = "warn"
|
||||
unnecessary_safety_comment = "warn"
|
||||
unused_result_ok = "warn"
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ clap.workspace = true
|
|||
config.workspace = true
|
||||
git2.workspace = true
|
||||
hex.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lettre.workspace = true
|
||||
libc.workspace = true
|
||||
regex.workspace = true
|
||||
|
|
|
|||
|
|
@ -158,6 +158,16 @@ CREATE TABLE builds (
|
|||
UNIQUE (evaluation_id, job_name)
|
||||
);
|
||||
|
||||
-- build_outputs: normalized output storage
|
||||
CREATE TABLE build_outputs (
|
||||
build UUID NOT NULL REFERENCES builds (id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
path TEXT,
|
||||
PRIMARY KEY (build, name)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_build_outputs_path ON build_outputs USING btree (path);
|
||||
|
||||
-- build_products: output artifacts and metadata
|
||||
CREATE TABLE build_products (
|
||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (),
|
||||
|
|
@ -534,10 +544,7 @@ CREATE TABLE notification_tasks (
|
|||
);
|
||||
|
||||
-- Indexes: notification_tasks
|
||||
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (
|
||||
status,
|
||||
next_retry_at
|
||||
)
|
||||
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (status, next_retry_at)
|
||||
WHERE
|
||||
status IN ('pending', 'running');
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
//! Configuration management for FC CI
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use config as config_crate;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
@ -90,6 +91,12 @@ pub struct QueueRunnerConfig {
|
|||
/// TTL in seconds for failed paths cache entries (default 24h).
|
||||
#[serde(default = "default_failed_paths_ttl")]
|
||||
pub failed_paths_ttl: u64,
|
||||
|
||||
/// Timeout after which builds for unsupported systems are aborted.
|
||||
/// None or 0 = disabled (Hydra maxUnsupportedTime compatibility).
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub unsupported_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -546,13 +553,14 @@ impl Default for EvaluatorConfig {
|
|||
impl Default for QueueRunnerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
workers: 4,
|
||||
poll_interval: 5,
|
||||
build_timeout: 3600,
|
||||
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
|
||||
strict_errors: false,
|
||||
failed_paths_cache: true,
|
||||
failed_paths_ttl: 86400,
|
||||
workers: 4,
|
||||
poll_interval: 5,
|
||||
build_timeout: 3600,
|
||||
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
|
||||
strict_errors: false,
|
||||
failed_paths_cache: true,
|
||||
failed_paths_ttl: 86400,
|
||||
unsupported_timeout: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -853,4 +861,82 @@ mod tests {
|
|||
env::remove_var("FC_SERVER__PORT");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_timeout_config() {
|
||||
let mut config = Config::default();
|
||||
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(3600));
|
||||
|
||||
// Serialize and deserialize to verify serde works
|
||||
let toml_str = toml::to_string(&config).unwrap();
|
||||
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||
assert_eq!(
|
||||
parsed.queue_runner.unsupported_timeout,
|
||||
Some(Duration::from_secs(3600))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_timeout_default() {
|
||||
let config = Config::default();
|
||||
assert_eq!(config.queue_runner.unsupported_timeout, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_timeout_various_formats() {
|
||||
// Test 30 minutes
|
||||
let mut config = Config::default();
|
||||
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(1800));
|
||||
let toml_str = toml::to_string(&config).unwrap();
|
||||
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||
assert_eq!(
|
||||
parsed.queue_runner.unsupported_timeout,
|
||||
Some(Duration::from_secs(1800))
|
||||
);
|
||||
|
||||
// Test disabled (0s)
|
||||
let mut config = Config::default();
|
||||
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(0));
|
||||
let toml_str = toml::to_string(&config).unwrap();
|
||||
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||
assert_eq!(
|
||||
parsed.queue_runner.unsupported_timeout,
|
||||
Some(Duration::from_secs(0))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_humantime_serde_parsing() {
|
||||
// Test that we can directly parse a QueueRunnerConfig with humantime format
|
||||
let toml = r#"
|
||||
workers = 4
|
||||
poll_interval = 5
|
||||
build_timeout = 3600
|
||||
work_dir = "/tmp/fc"
|
||||
unsupported_timeout = "2h 30m"
|
||||
"#;
|
||||
|
||||
let qr_config: QueueRunnerConfig = toml::from_str(toml).unwrap();
|
||||
assert_eq!(
|
||||
qr_config.unsupported_timeout,
|
||||
Some(Duration::from_secs(9000)) // 2.5 hours
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod humantime_option_test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_option_humantime_missing() {
|
||||
let toml = r#"
|
||||
workers = 4
|
||||
poll_interval = 5
|
||||
build_timeout = 3600
|
||||
work_dir = "/tmp/fc"
|
||||
"#;
|
||||
let config: QueueRunnerConfig = toml::from_str(toml).unwrap();
|
||||
assert_eq!(config.unsupported_timeout, None);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -487,6 +487,14 @@ pub struct StarredJob {
|
|||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Normalized build output (Hydra-compatible)
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||
pub struct BuildOutput {
|
||||
pub build: Uuid,
|
||||
pub name: String,
|
||||
pub path: Option<String>,
|
||||
}
|
||||
|
||||
/// Project membership for per-project permissions
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||
pub struct ProjectMember {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
//! Notification dispatch for build events
|
||||
|
||||
use std::sync::OnceLock;
|
||||
use std::{
|
||||
sync::OnceLock,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use sqlx::PgPool;
|
||||
use tracing::{error, info, warn};
|
||||
|
|
@ -18,6 +21,31 @@ fn http_client() -> &'static reqwest::Client {
|
|||
CLIENT.get_or_init(reqwest::Client::new)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RateLimitState {
|
||||
pub limit: u64,
|
||||
pub remaining: u64,
|
||||
pub reset_at: u64,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn extract_rate_limit_from_headers(
|
||||
headers: &reqwest::header::HeaderMap,
|
||||
) -> Option<RateLimitState> {
|
||||
let limit = headers.get("X-RateLimit-Limit")?.to_str().ok()?.parse().ok()?;
|
||||
let remaining = headers.get("X-RateLimit-Remaining")?.to_str().ok()?.parse().ok()?;
|
||||
let reset_at = headers.get("X-RateLimit-Reset")?.to_str().ok()?.parse().ok()?;
|
||||
Some(RateLimitState { limit, remaining, reset_at })
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn calculate_delay(state: &RateLimitState, now: u64) -> u64 {
|
||||
let seconds_until_reset = state.reset_at.saturating_sub(now).max(1);
|
||||
let consumed = state.limit.saturating_sub(state.remaining);
|
||||
let delay = (consumed * 5) / seconds_until_reset;
|
||||
delay.max(1)
|
||||
}
|
||||
|
||||
/// Dispatch all configured notifications for a completed build.
|
||||
/// If retry queue is enabled, enqueues tasks; otherwise sends immediately.
|
||||
pub async fn dispatch_build_finished(
|
||||
|
|
@ -452,13 +480,45 @@ async fn set_github_status(
|
|||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
if resp.status().is_success() {
|
||||
let is_success = resp.status().is_success();
|
||||
let status = resp.status();
|
||||
|
||||
// Extract rate limit state from response headers before consuming body
|
||||
let rate_limit = extract_rate_limit_from_headers(resp.headers());
|
||||
|
||||
if is_success {
|
||||
info!(build_id = %build.id, "Set GitHub commit status: {state}");
|
||||
} else {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
warn!("GitHub status API returned {status}: {text}");
|
||||
}
|
||||
|
||||
// Handle rate limiting based on extracted state
|
||||
if let Some(rate_limit) = rate_limit {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
// Log when approaching limit (Hydra threshold: 2000)
|
||||
if rate_limit.remaining <= 2000 {
|
||||
let seconds_until_reset = rate_limit.reset_at.saturating_sub(now);
|
||||
info!(
|
||||
"GitHub rate limit: {}/{}, resets in {}s",
|
||||
rate_limit.remaining, rate_limit.limit, seconds_until_reset
|
||||
);
|
||||
}
|
||||
|
||||
// Sleep when critical (Hydra threshold: 1000)
|
||||
if rate_limit.remaining <= 1000 {
|
||||
let delay = calculate_delay(&rate_limit, now);
|
||||
warn!(
|
||||
"GitHub rate limit critical: {}/{}, sleeping {}s",
|
||||
rate_limit.remaining, rate_limit.limit, delay
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => error!("GitHub status API request failed: {e}"),
|
||||
}
|
||||
|
|
|
|||
91
crates/common/src/repo/build_outputs.rs
Normal file
91
crates/common/src/repo/build_outputs.rs
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
error::{CiError, Result},
|
||||
models::BuildOutput,
|
||||
};
|
||||
|
||||
/// Create a build output record.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database insert fails or if a duplicate (build, name) pair
|
||||
/// exists.
|
||||
pub async fn create(
|
||||
pool: &PgPool,
|
||||
build: Uuid,
|
||||
name: &str,
|
||||
path: Option<&str>,
|
||||
) -> Result<BuildOutput> {
|
||||
sqlx::query_as::<_, BuildOutput>(
|
||||
"INSERT INTO build_outputs (build, name, path) VALUES ($1, $2, $3) \
|
||||
RETURNING *",
|
||||
)
|
||||
.bind(build)
|
||||
.bind(name)
|
||||
.bind(path)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if let sqlx::Error::Database(db_err) = &e
|
||||
&& db_err.is_unique_violation() {
|
||||
return CiError::Conflict(format!(
|
||||
"Build output with name '{name}' already exists for build {build}"
|
||||
));
|
||||
}
|
||||
CiError::Database(e)
|
||||
})
|
||||
}
|
||||
|
||||
/// List all build outputs for a build, ordered by name.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails.
|
||||
pub async fn list_for_build(
|
||||
pool: &PgPool,
|
||||
build: Uuid,
|
||||
) -> Result<Vec<BuildOutput>> {
|
||||
sqlx::query_as::<_, BuildOutput>(
|
||||
"SELECT * FROM build_outputs WHERE build = $1 ORDER BY name ASC",
|
||||
)
|
||||
.bind(build)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Find build outputs by path.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails.
|
||||
pub async fn find_by_path(
|
||||
pool: &PgPool,
|
||||
path: &str,
|
||||
) -> Result<Vec<BuildOutput>> {
|
||||
sqlx::query_as::<_, BuildOutput>(
|
||||
"SELECT * FROM build_outputs WHERE path = $1 ORDER BY build, name",
|
||||
)
|
||||
.bind(path)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Delete all build outputs for a build.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails.
|
||||
pub async fn delete_for_build(pool: &PgPool, build: Uuid) -> Result<u64> {
|
||||
let result =
|
||||
sqlx::query("DELETE FROM build_outputs WHERE build = $1")
|
||||
.bind(build)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
|
@ -512,3 +512,21 @@ pub async fn set_builder(
|
|||
.map_err(CiError::Database)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a build by ID.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails or build not found.
|
||||
pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
|
||||
let result = sqlx::query("DELETE FROM builds WHERE id = $1")
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
if result.rows_affected() == 0 {
|
||||
return Err(CiError::NotFound(format!("Build {id} not found")));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
pub mod api_keys;
|
||||
pub mod build_dependencies;
|
||||
pub mod build_metrics;
|
||||
pub mod build_outputs;
|
||||
pub mod build_products;
|
||||
pub mod build_steps;
|
||||
pub mod builds;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
//! Integration tests for database and configuration
|
||||
|
||||
mod notifications_tests;
|
||||
|
||||
use fc_common::{
|
||||
Database,
|
||||
config::{Config, DatabaseConfig},
|
||||
|
|
|
|||
59
crates/common/tests/notifications_tests.rs
Normal file
59
crates/common/tests/notifications_tests.rs
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
use fc_common::notifications::*;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[test]
|
||||
fn test_rate_limit_extraction() {
|
||||
let mut headers = reqwest::header::HeaderMap::new();
|
||||
headers.insert("X-RateLimit-Limit", "5000".parse().unwrap());
|
||||
headers.insert("X-RateLimit-Remaining", "1234".parse().unwrap());
|
||||
headers.insert("X-RateLimit-Reset", "1735689600".parse().unwrap());
|
||||
|
||||
let state = extract_rate_limit_from_headers(&headers);
|
||||
assert!(state.is_some());
|
||||
|
||||
let state = state.unwrap();
|
||||
assert_eq!(state.limit, 5000);
|
||||
assert_eq!(state.remaining, 1234);
|
||||
assert_eq!(state.reset_at, 1735689600);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rate_limit_missing_headers() {
|
||||
let headers = reqwest::header::HeaderMap::new();
|
||||
let state = extract_rate_limit_from_headers(&headers);
|
||||
assert!(state.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sleep_duration_calculation() {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let state = RateLimitState {
|
||||
limit: 5000,
|
||||
remaining: 500,
|
||||
reset_at: now + 3600,
|
||||
};
|
||||
|
||||
let delay = calculate_delay(&state, now);
|
||||
assert!(delay >= 6 && delay <= 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sleep_duration_minimum() {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let state = RateLimitState {
|
||||
limit: 5000,
|
||||
remaining: 4999,
|
||||
reset_at: now + 10000,
|
||||
};
|
||||
|
||||
let delay = calculate_delay(&state, now);
|
||||
assert_eq!(delay, 1);
|
||||
}
|
||||
|
|
@ -814,3 +814,98 @@ async fn test_dedup_by_drv_path() {
|
|||
// Cleanup
|
||||
repo::projects::delete(&pool, project.id).await.ok();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_outputs_crud() {
|
||||
let Some(pool) = get_pool().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Create project, jobset, evaluation, build
|
||||
let project = create_test_project(&pool, "test-project").await;
|
||||
let jobset = create_test_jobset(&pool, project.id).await;
|
||||
let eval = create_test_eval(&pool, jobset.id).await;
|
||||
let build = create_test_build(
|
||||
&pool,
|
||||
eval.id,
|
||||
"test-job",
|
||||
"/nix/store/test.drv",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Create outputs
|
||||
let _out1 = repo::build_outputs::create(
|
||||
&pool,
|
||||
build.id,
|
||||
"out",
|
||||
Some("/nix/store/abc-result"),
|
||||
)
|
||||
.await
|
||||
.expect("create output 1");
|
||||
|
||||
let _out2 = repo::build_outputs::create(
|
||||
&pool,
|
||||
build.id,
|
||||
"dev",
|
||||
Some("/nix/store/def-result-dev"),
|
||||
)
|
||||
.await
|
||||
.expect("create output 2");
|
||||
|
||||
// List outputs for build
|
||||
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
|
||||
.await
|
||||
.expect("list outputs");
|
||||
assert_eq!(outputs.len(), 2);
|
||||
assert_eq!(outputs[0].name, "dev"); // Alphabetical order
|
||||
assert_eq!(outputs[1].name, "out");
|
||||
|
||||
// Find by path
|
||||
let found = repo::build_outputs::find_by_path(&pool, "/nix/store/abc-result")
|
||||
.await
|
||||
.expect("find by path");
|
||||
assert_eq!(found.len(), 1);
|
||||
assert_eq!(found[0].build, build.id);
|
||||
assert_eq!(found[0].name, "out");
|
||||
|
||||
// Cleanup
|
||||
repo::projects::delete(&pool, project.id).await.ok();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_outputs_cascade_delete() {
|
||||
let Some(pool) = get_pool().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
let project = create_test_project(&pool, "test-project").await;
|
||||
let jobset = create_test_jobset(&pool, project.id).await;
|
||||
let eval = create_test_eval(&pool, jobset.id).await;
|
||||
let build = create_test_build(
|
||||
&pool,
|
||||
eval.id,
|
||||
"test-job",
|
||||
"/nix/store/test.drv",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
repo::build_outputs::create(&pool, build.id, "out", Some("/nix/store/abc"))
|
||||
.await
|
||||
.expect("create output");
|
||||
|
||||
// Delete build
|
||||
repo::builds::delete(&pool, build.id)
|
||||
.await
|
||||
.expect("delete build");
|
||||
|
||||
// Verify outputs cascade deleted
|
||||
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
|
||||
.await
|
||||
.expect("list outputs after delete");
|
||||
assert_eq!(outputs.len(), 0);
|
||||
|
||||
// Cleanup
|
||||
repo::projects::delete(&pool, project.id).await.ok();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
let failed_paths_cache = qr_config.failed_paths_cache;
|
||||
let failed_paths_ttl = qr_config.failed_paths_ttl;
|
||||
let work_dir = qr_config.work_dir;
|
||||
let unsupported_timeout = qr_config.unsupported_timeout;
|
||||
|
||||
// Ensure the work directory exists
|
||||
tokio::fs::create_dir_all(&work_dir).await?;
|
||||
|
|
@ -82,7 +83,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
let active_builds = worker_pool.active_builds().clone();
|
||||
|
||||
tokio::select! {
|
||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => {
|
||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone(), unsupported_timeout) => {
|
||||
if let Err(e) = result {
|
||||
tracing::error!("Runner loop failed: {e}");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ pub async fn run(
|
|||
strict_errors: bool,
|
||||
failed_paths_cache: bool,
|
||||
notifications_config: fc_common::config::NotificationsConfig,
|
||||
unsupported_timeout: Option<Duration>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||
|
|
@ -207,6 +208,51 @@ pub async fn run(
|
|||
},
|
||||
}
|
||||
|
||||
// Unsupported system timeout: abort builds with no available builders
|
||||
if let Some(timeout) = unsupported_timeout
|
||||
&& let Some(system) = &build.system {
|
||||
match repo::remote_builders::find_for_system(&pool, system).await {
|
||||
Ok(builders) if builders.is_empty() => {
|
||||
let timeout_at = build.created_at + timeout;
|
||||
if chrono::Utc::now() > timeout_at {
|
||||
tracing::info!(
|
||||
build_id = %build.id,
|
||||
system = %system,
|
||||
timeout = ?timeout,
|
||||
"Aborting build: no builder available for system type"
|
||||
);
|
||||
|
||||
if let Err(e) = repo::builds::start(&pool, build.id).await {
|
||||
tracing::warn!(build_id = %build.id, "Failed to start unsupported build: {e}");
|
||||
}
|
||||
|
||||
if let Err(e) = repo::builds::complete(
|
||||
&pool,
|
||||
build.id,
|
||||
BuildStatus::UnsupportedSystem,
|
||||
None,
|
||||
None,
|
||||
Some("No builder available for system type"),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(build_id = %build.id, "Failed to complete unsupported build: {e}");
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ok(_) => {}, // Builders available, proceed normally
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
build_id = %build.id,
|
||||
"Failed to check builders for unsupported system: {e}"
|
||||
);
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// One-at-a-time scheduling: check if jobset allows concurrent builds
|
||||
// First, get the evaluation to find the jobset
|
||||
let eval =
|
||||
|
|
|
|||
|
|
@ -588,23 +588,64 @@ async fn run_build(
|
|||
};
|
||||
|
||||
if build_result.success {
|
||||
// Parse output names from build's outputs JSON
|
||||
let output_names: Vec<String> = build
|
||||
// Build a reverse lookup map: path -> output_name
|
||||
// The outputs JSON is a HashMap<String, String> where keys are output names
|
||||
// and values are store paths. We need to match paths to names correctly.
|
||||
let path_to_name: std::collections::HashMap<String, String> = build
|
||||
.outputs
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_object())
|
||||
.map(|obj| obj.keys().cloned().collect())
|
||||
.map(|obj| {
|
||||
obj
|
||||
.iter()
|
||||
.filter_map(|(name, path)| {
|
||||
path.as_str().map(|p| (p.to_string(), name.clone()))
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
// Store build outputs in normalized table
|
||||
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
||||
let output_name = path_to_name
|
||||
.get(output_path)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
if i == 0 {
|
||||
"out".to_string()
|
||||
} else {
|
||||
format!("out{i}")
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = repo::build_outputs::create(
|
||||
pool,
|
||||
build.id,
|
||||
&output_name,
|
||||
Some(output_path),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
build_id = %build.id,
|
||||
output_name = %output_name,
|
||||
"Failed to store build output: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Register GC roots and create build products for each output
|
||||
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
||||
let output_name = output_names.get(i).cloned().unwrap_or_else(|| {
|
||||
if i == 0 {
|
||||
build.job_name.clone()
|
||||
} else {
|
||||
format!("{}-{i}", build.job_name)
|
||||
}
|
||||
});
|
||||
let output_name = path_to_name
|
||||
.get(output_path)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
if i == 0 {
|
||||
build.job_name.clone()
|
||||
} else {
|
||||
format!("{}-{i}", build.job_name)
|
||||
}
|
||||
});
|
||||
|
||||
// Register GC root
|
||||
let mut gc_root_path = None;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue