Compare commits
No commits in common. "eb8b23134096de269fb927014c44776fd30ec8ba" and "1336f998bf6385dd564424c6ebd5bfd2ccf077dc" have entirely different histories.
eb8b231340
...
1336f998bf
16 changed files with 30 additions and 564 deletions
17
Cargo.lock
generated
17
Cargo.lock
generated
|
|
@ -812,7 +812,6 @@ dependencies = [
|
|||
"config",
|
||||
"git2",
|
||||
"hex",
|
||||
"humantime-serde",
|
||||
"lettre",
|
||||
"libc",
|
||||
"regex",
|
||||
|
|
@ -1289,22 +1288,6 @@ version = "1.0.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||
|
||||
[[package]]
|
||||
name = "humantime-serde"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
|
||||
dependencies = [
|
||||
"humantime",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.8.1"
|
||||
|
|
|
|||
11
Cargo.toml
11
Cargo.toml
|
|
@ -23,6 +23,7 @@ fc-evaluator = { path = "./crates/evaluator" }
|
|||
fc-queue-runner = { path = "./crates/queue-runner" }
|
||||
fc-server = { path = "./crates/server" }
|
||||
|
||||
|
||||
anyhow = "1.0.101"
|
||||
argon2 = "0.5.3"
|
||||
askama = "0.15.4"
|
||||
|
|
@ -37,7 +38,6 @@ futures = "0.3.31"
|
|||
git2 = "0.20.4"
|
||||
hex = "0.4.3"
|
||||
hmac = "0.12.1"
|
||||
humantime-serde = "1.1.1"
|
||||
lettre = { version = "0.11.19", default-features = false, features = [
|
||||
"tokio1-rustls-tls",
|
||||
"smtp-transport",
|
||||
|
|
@ -81,10 +81,6 @@ style = { level = "warn", priority = -1 }
|
|||
# enable those to keep our sanity.
|
||||
absolute_paths = "allow"
|
||||
arbitrary_source_item_ordering = "allow"
|
||||
cast_possible_truncation = "allow"
|
||||
cast_possible_wrap = "allow"
|
||||
cast_precision_loss = "allow"
|
||||
cast_sign_loss = "allow"
|
||||
clone_on_ref_ptr = "warn"
|
||||
dbg_macro = "warn"
|
||||
empty_drop = "warn"
|
||||
|
|
@ -109,8 +105,11 @@ similar_names = "allow"
|
|||
single_call_fn = "allow"
|
||||
std_instead_of_core = "allow"
|
||||
too_long_first_doc_paragraph = "allow"
|
||||
too_many_arguments = "allow" # I don't care
|
||||
too_many_lines = "allow"
|
||||
cast_possible_truncation = "allow"
|
||||
cast_possible_wrap = "allow"
|
||||
cast_precision_loss = "allow"
|
||||
cast_sign_loss = "allow"
|
||||
undocumented_unsafe_blocks = "warn"
|
||||
unnecessary_safety_comment = "warn"
|
||||
unused_result_ok = "warn"
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ clap.workspace = true
|
|||
config.workspace = true
|
||||
git2.workspace = true
|
||||
hex.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lettre.workspace = true
|
||||
libc.workspace = true
|
||||
regex.workspace = true
|
||||
|
|
|
|||
|
|
@ -158,16 +158,6 @@ CREATE TABLE builds (
|
|||
UNIQUE (evaluation_id, job_name)
|
||||
);
|
||||
|
||||
-- build_outputs: normalized output storage
|
||||
CREATE TABLE build_outputs (
|
||||
build UUID NOT NULL REFERENCES builds (id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
path TEXT,
|
||||
PRIMARY KEY (build, name)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_build_outputs_path ON build_outputs USING btree (path);
|
||||
|
||||
-- build_products: output artifacts and metadata
|
||||
CREATE TABLE build_products (
|
||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (),
|
||||
|
|
@ -544,7 +534,10 @@ CREATE TABLE notification_tasks (
|
|||
);
|
||||
|
||||
-- Indexes: notification_tasks
|
||||
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (status, next_retry_at)
|
||||
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (
|
||||
status,
|
||||
next_retry_at
|
||||
)
|
||||
WHERE
|
||||
status IN ('pending', 'running');
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
//! Configuration management for FC CI
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use config as config_crate;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
@ -91,12 +90,6 @@ pub struct QueueRunnerConfig {
|
|||
/// TTL in seconds for failed paths cache entries (default 24h).
|
||||
#[serde(default = "default_failed_paths_ttl")]
|
||||
pub failed_paths_ttl: u64,
|
||||
|
||||
/// Timeout after which builds for unsupported systems are aborted.
|
||||
/// None or 0 = disabled (Hydra maxUnsupportedTime compatibility).
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub unsupported_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -560,7 +553,6 @@ impl Default for QueueRunnerConfig {
|
|||
strict_errors: false,
|
||||
failed_paths_cache: true,
|
||||
failed_paths_ttl: 86400,
|
||||
unsupported_timeout: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -861,82 +853,4 @@ mod tests {
|
|||
env::remove_var("FC_SERVER__PORT");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_timeout_config() {
|
||||
let mut config = Config::default();
|
||||
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(3600));
|
||||
|
||||
// Serialize and deserialize to verify serde works
|
||||
let toml_str = toml::to_string(&config).unwrap();
|
||||
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||
assert_eq!(
|
||||
parsed.queue_runner.unsupported_timeout,
|
||||
Some(Duration::from_secs(3600))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_timeout_default() {
|
||||
let config = Config::default();
|
||||
assert_eq!(config.queue_runner.unsupported_timeout, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_timeout_various_formats() {
|
||||
// Test 30 minutes
|
||||
let mut config = Config::default();
|
||||
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(1800));
|
||||
let toml_str = toml::to_string(&config).unwrap();
|
||||
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||
assert_eq!(
|
||||
parsed.queue_runner.unsupported_timeout,
|
||||
Some(Duration::from_secs(1800))
|
||||
);
|
||||
|
||||
// Test disabled (0s)
|
||||
let mut config = Config::default();
|
||||
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(0));
|
||||
let toml_str = toml::to_string(&config).unwrap();
|
||||
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||
assert_eq!(
|
||||
parsed.queue_runner.unsupported_timeout,
|
||||
Some(Duration::from_secs(0))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_humantime_serde_parsing() {
|
||||
// Test that we can directly parse a QueueRunnerConfig with humantime format
|
||||
let toml = r#"
|
||||
workers = 4
|
||||
poll_interval = 5
|
||||
build_timeout = 3600
|
||||
work_dir = "/tmp/fc"
|
||||
unsupported_timeout = "2h 30m"
|
||||
"#;
|
||||
|
||||
let qr_config: QueueRunnerConfig = toml::from_str(toml).unwrap();
|
||||
assert_eq!(
|
||||
qr_config.unsupported_timeout,
|
||||
Some(Duration::from_secs(9000)) // 2.5 hours
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod humantime_option_test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_option_humantime_missing() {
|
||||
let toml = r#"
|
||||
workers = 4
|
||||
poll_interval = 5
|
||||
build_timeout = 3600
|
||||
work_dir = "/tmp/fc"
|
||||
"#;
|
||||
let config: QueueRunnerConfig = toml::from_str(toml).unwrap();
|
||||
assert_eq!(config.unsupported_timeout, None);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -487,14 +487,6 @@ pub struct StarredJob {
|
|||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Normalized build output (Hydra-compatible)
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||
pub struct BuildOutput {
|
||||
pub build: Uuid,
|
||||
pub name: String,
|
||||
pub path: Option<String>,
|
||||
}
|
||||
|
||||
/// Project membership for per-project permissions
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||
pub struct ProjectMember {
|
||||
|
|
|
|||
|
|
@ -1,9 +1,6 @@
|
|||
//! Notification dispatch for build events
|
||||
|
||||
use std::{
|
||||
sync::OnceLock,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use sqlx::PgPool;
|
||||
use tracing::{error, info, warn};
|
||||
|
|
@ -21,31 +18,6 @@ fn http_client() -> &'static reqwest::Client {
|
|||
CLIENT.get_or_init(reqwest::Client::new)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RateLimitState {
|
||||
pub limit: u64,
|
||||
pub remaining: u64,
|
||||
pub reset_at: u64,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn extract_rate_limit_from_headers(
|
||||
headers: &reqwest::header::HeaderMap,
|
||||
) -> Option<RateLimitState> {
|
||||
let limit = headers.get("X-RateLimit-Limit")?.to_str().ok()?.parse().ok()?;
|
||||
let remaining = headers.get("X-RateLimit-Remaining")?.to_str().ok()?.parse().ok()?;
|
||||
let reset_at = headers.get("X-RateLimit-Reset")?.to_str().ok()?.parse().ok()?;
|
||||
Some(RateLimitState { limit, remaining, reset_at })
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn calculate_delay(state: &RateLimitState, now: u64) -> u64 {
|
||||
let seconds_until_reset = state.reset_at.saturating_sub(now).max(1);
|
||||
let consumed = state.limit.saturating_sub(state.remaining);
|
||||
let delay = (consumed * 5) / seconds_until_reset;
|
||||
delay.max(1)
|
||||
}
|
||||
|
||||
/// Dispatch all configured notifications for a completed build.
|
||||
/// If retry queue is enabled, enqueues tasks; otherwise sends immediately.
|
||||
pub async fn dispatch_build_finished(
|
||||
|
|
@ -480,45 +452,13 @@ async fn set_github_status(
|
|||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
let is_success = resp.status().is_success();
|
||||
let status = resp.status();
|
||||
|
||||
// Extract rate limit state from response headers before consuming body
|
||||
let rate_limit = extract_rate_limit_from_headers(resp.headers());
|
||||
|
||||
if is_success {
|
||||
if resp.status().is_success() {
|
||||
info!(build_id = %build.id, "Set GitHub commit status: {state}");
|
||||
} else {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
warn!("GitHub status API returned {status}: {text}");
|
||||
}
|
||||
|
||||
// Handle rate limiting based on extracted state
|
||||
if let Some(rate_limit) = rate_limit {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
// Log when approaching limit (Hydra threshold: 2000)
|
||||
if rate_limit.remaining <= 2000 {
|
||||
let seconds_until_reset = rate_limit.reset_at.saturating_sub(now);
|
||||
info!(
|
||||
"GitHub rate limit: {}/{}, resets in {}s",
|
||||
rate_limit.remaining, rate_limit.limit, seconds_until_reset
|
||||
);
|
||||
}
|
||||
|
||||
// Sleep when critical (Hydra threshold: 1000)
|
||||
if rate_limit.remaining <= 1000 {
|
||||
let delay = calculate_delay(&rate_limit, now);
|
||||
warn!(
|
||||
"GitHub rate limit critical: {}/{}, sleeping {}s",
|
||||
rate_limit.remaining, rate_limit.limit, delay
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => error!("GitHub status API request failed: {e}"),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,91 +0,0 @@
|
|||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
error::{CiError, Result},
|
||||
models::BuildOutput,
|
||||
};
|
||||
|
||||
/// Create a build output record.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database insert fails or if a duplicate (build, name) pair
|
||||
/// exists.
|
||||
pub async fn create(
|
||||
pool: &PgPool,
|
||||
build: Uuid,
|
||||
name: &str,
|
||||
path: Option<&str>,
|
||||
) -> Result<BuildOutput> {
|
||||
sqlx::query_as::<_, BuildOutput>(
|
||||
"INSERT INTO build_outputs (build, name, path) VALUES ($1, $2, $3) \
|
||||
RETURNING *",
|
||||
)
|
||||
.bind(build)
|
||||
.bind(name)
|
||||
.bind(path)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if let sqlx::Error::Database(db_err) = &e
|
||||
&& db_err.is_unique_violation() {
|
||||
return CiError::Conflict(format!(
|
||||
"Build output with name '{name}' already exists for build {build}"
|
||||
));
|
||||
}
|
||||
CiError::Database(e)
|
||||
})
|
||||
}
|
||||
|
||||
/// List all build outputs for a build, ordered by name.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails.
|
||||
pub async fn list_for_build(
|
||||
pool: &PgPool,
|
||||
build: Uuid,
|
||||
) -> Result<Vec<BuildOutput>> {
|
||||
sqlx::query_as::<_, BuildOutput>(
|
||||
"SELECT * FROM build_outputs WHERE build = $1 ORDER BY name ASC",
|
||||
)
|
||||
.bind(build)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Find build outputs by path.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails.
|
||||
pub async fn find_by_path(
|
||||
pool: &PgPool,
|
||||
path: &str,
|
||||
) -> Result<Vec<BuildOutput>> {
|
||||
sqlx::query_as::<_, BuildOutput>(
|
||||
"SELECT * FROM build_outputs WHERE path = $1 ORDER BY build, name",
|
||||
)
|
||||
.bind(path)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Delete all build outputs for a build.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails.
|
||||
pub async fn delete_for_build(pool: &PgPool, build: Uuid) -> Result<u64> {
|
||||
let result =
|
||||
sqlx::query("DELETE FROM build_outputs WHERE build = $1")
|
||||
.bind(build)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
|
@ -512,21 +512,3 @@ pub async fn set_builder(
|
|||
.map_err(CiError::Database)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a build by ID.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns error if database query fails or build not found.
|
||||
pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
|
||||
let result = sqlx::query("DELETE FROM builds WHERE id = $1")
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
if result.rows_affected() == 0 {
|
||||
return Err(CiError::NotFound(format!("Build {id} not found")));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
pub mod api_keys;
|
||||
pub mod build_dependencies;
|
||||
pub mod build_metrics;
|
||||
pub mod build_outputs;
|
||||
pub mod build_products;
|
||||
pub mod build_steps;
|
||||
pub mod builds;
|
||||
|
|
|
|||
|
|
@ -1,7 +1,5 @@
|
|||
//! Integration tests for database and configuration
|
||||
|
||||
mod notifications_tests;
|
||||
|
||||
use fc_common::{
|
||||
Database,
|
||||
config::{Config, DatabaseConfig},
|
||||
|
|
|
|||
|
|
@ -1,59 +0,0 @@
|
|||
use fc_common::notifications::*;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[test]
|
||||
fn test_rate_limit_extraction() {
|
||||
let mut headers = reqwest::header::HeaderMap::new();
|
||||
headers.insert("X-RateLimit-Limit", "5000".parse().unwrap());
|
||||
headers.insert("X-RateLimit-Remaining", "1234".parse().unwrap());
|
||||
headers.insert("X-RateLimit-Reset", "1735689600".parse().unwrap());
|
||||
|
||||
let state = extract_rate_limit_from_headers(&headers);
|
||||
assert!(state.is_some());
|
||||
|
||||
let state = state.unwrap();
|
||||
assert_eq!(state.limit, 5000);
|
||||
assert_eq!(state.remaining, 1234);
|
||||
assert_eq!(state.reset_at, 1735689600);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rate_limit_missing_headers() {
|
||||
let headers = reqwest::header::HeaderMap::new();
|
||||
let state = extract_rate_limit_from_headers(&headers);
|
||||
assert!(state.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sleep_duration_calculation() {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let state = RateLimitState {
|
||||
limit: 5000,
|
||||
remaining: 500,
|
||||
reset_at: now + 3600,
|
||||
};
|
||||
|
||||
let delay = calculate_delay(&state, now);
|
||||
assert!(delay >= 6 && delay <= 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sleep_duration_minimum() {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let state = RateLimitState {
|
||||
limit: 5000,
|
||||
remaining: 4999,
|
||||
reset_at: now + 10000,
|
||||
};
|
||||
|
||||
let delay = calculate_delay(&state, now);
|
||||
assert_eq!(delay, 1);
|
||||
}
|
||||
|
|
@ -814,98 +814,3 @@ async fn test_dedup_by_drv_path() {
|
|||
// Cleanup
|
||||
repo::projects::delete(&pool, project.id).await.ok();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_outputs_crud() {
|
||||
let Some(pool) = get_pool().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Create project, jobset, evaluation, build
|
||||
let project = create_test_project(&pool, "test-project").await;
|
||||
let jobset = create_test_jobset(&pool, project.id).await;
|
||||
let eval = create_test_eval(&pool, jobset.id).await;
|
||||
let build = create_test_build(
|
||||
&pool,
|
||||
eval.id,
|
||||
"test-job",
|
||||
"/nix/store/test.drv",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Create outputs
|
||||
let _out1 = repo::build_outputs::create(
|
||||
&pool,
|
||||
build.id,
|
||||
"out",
|
||||
Some("/nix/store/abc-result"),
|
||||
)
|
||||
.await
|
||||
.expect("create output 1");
|
||||
|
||||
let _out2 = repo::build_outputs::create(
|
||||
&pool,
|
||||
build.id,
|
||||
"dev",
|
||||
Some("/nix/store/def-result-dev"),
|
||||
)
|
||||
.await
|
||||
.expect("create output 2");
|
||||
|
||||
// List outputs for build
|
||||
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
|
||||
.await
|
||||
.expect("list outputs");
|
||||
assert_eq!(outputs.len(), 2);
|
||||
assert_eq!(outputs[0].name, "dev"); // Alphabetical order
|
||||
assert_eq!(outputs[1].name, "out");
|
||||
|
||||
// Find by path
|
||||
let found = repo::build_outputs::find_by_path(&pool, "/nix/store/abc-result")
|
||||
.await
|
||||
.expect("find by path");
|
||||
assert_eq!(found.len(), 1);
|
||||
assert_eq!(found[0].build, build.id);
|
||||
assert_eq!(found[0].name, "out");
|
||||
|
||||
// Cleanup
|
||||
repo::projects::delete(&pool, project.id).await.ok();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_outputs_cascade_delete() {
|
||||
let Some(pool) = get_pool().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
let project = create_test_project(&pool, "test-project").await;
|
||||
let jobset = create_test_jobset(&pool, project.id).await;
|
||||
let eval = create_test_eval(&pool, jobset.id).await;
|
||||
let build = create_test_build(
|
||||
&pool,
|
||||
eval.id,
|
||||
"test-job",
|
||||
"/nix/store/test.drv",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
repo::build_outputs::create(&pool, build.id, "out", Some("/nix/store/abc"))
|
||||
.await
|
||||
.expect("create output");
|
||||
|
||||
// Delete build
|
||||
repo::builds::delete(&pool, build.id)
|
||||
.await
|
||||
.expect("delete build");
|
||||
|
||||
// Verify outputs cascade deleted
|
||||
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
|
||||
.await
|
||||
.expect("list outputs after delete");
|
||||
assert_eq!(outputs.len(), 0);
|
||||
|
||||
// Cleanup
|
||||
repo::projects::delete(&pool, project.id).await.ok();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
let failed_paths_cache = qr_config.failed_paths_cache;
|
||||
let failed_paths_ttl = qr_config.failed_paths_ttl;
|
||||
let work_dir = qr_config.work_dir;
|
||||
let unsupported_timeout = qr_config.unsupported_timeout;
|
||||
|
||||
// Ensure the work directory exists
|
||||
tokio::fs::create_dir_all(&work_dir).await?;
|
||||
|
|
@ -83,7 +82,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
let active_builds = worker_pool.active_builds().clone();
|
||||
|
||||
tokio::select! {
|
||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone(), unsupported_timeout) => {
|
||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => {
|
||||
if let Err(e) = result {
|
||||
tracing::error!("Runner loop failed: {e}");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,6 @@ pub async fn run(
|
|||
strict_errors: bool,
|
||||
failed_paths_cache: bool,
|
||||
notifications_config: fc_common::config::NotificationsConfig,
|
||||
unsupported_timeout: Option<Duration>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||
|
|
@ -208,51 +207,6 @@ pub async fn run(
|
|||
},
|
||||
}
|
||||
|
||||
// Unsupported system timeout: abort builds with no available builders
|
||||
if let Some(timeout) = unsupported_timeout
|
||||
&& let Some(system) = &build.system {
|
||||
match repo::remote_builders::find_for_system(&pool, system).await {
|
||||
Ok(builders) if builders.is_empty() => {
|
||||
let timeout_at = build.created_at + timeout;
|
||||
if chrono::Utc::now() > timeout_at {
|
||||
tracing::info!(
|
||||
build_id = %build.id,
|
||||
system = %system,
|
||||
timeout = ?timeout,
|
||||
"Aborting build: no builder available for system type"
|
||||
);
|
||||
|
||||
if let Err(e) = repo::builds::start(&pool, build.id).await {
|
||||
tracing::warn!(build_id = %build.id, "Failed to start unsupported build: {e}");
|
||||
}
|
||||
|
||||
if let Err(e) = repo::builds::complete(
|
||||
&pool,
|
||||
build.id,
|
||||
BuildStatus::UnsupportedSystem,
|
||||
None,
|
||||
None,
|
||||
Some("No builder available for system type"),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(build_id = %build.id, "Failed to complete unsupported build: {e}");
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ok(_) => {}, // Builders available, proceed normally
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
build_id = %build.id,
|
||||
"Failed to check builders for unsupported system: {e}"
|
||||
);
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// One-at-a-time scheduling: check if jobset allows concurrent builds
|
||||
// First, get the evaluation to find the jobset
|
||||
let eval =
|
||||
|
|
|
|||
|
|
@ -588,58 +588,17 @@ async fn run_build(
|
|||
};
|
||||
|
||||
if build_result.success {
|
||||
// Build a reverse lookup map: path -> output_name
|
||||
// The outputs JSON is a HashMap<String, String> where keys are output names
|
||||
// and values are store paths. We need to match paths to names correctly.
|
||||
let path_to_name: std::collections::HashMap<String, String> = build
|
||||
// Parse output names from build's outputs JSON
|
||||
let output_names: Vec<String> = build
|
||||
.outputs
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_object())
|
||||
.map(|obj| {
|
||||
obj
|
||||
.iter()
|
||||
.filter_map(|(name, path)| {
|
||||
path.as_str().map(|p| (p.to_string(), name.clone()))
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.map(|obj| obj.keys().cloned().collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
// Store build outputs in normalized table
|
||||
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
||||
let output_name = path_to_name
|
||||
.get(output_path)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
if i == 0 {
|
||||
"out".to_string()
|
||||
} else {
|
||||
format!("out{i}")
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = repo::build_outputs::create(
|
||||
pool,
|
||||
build.id,
|
||||
&output_name,
|
||||
Some(output_path),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
build_id = %build.id,
|
||||
output_name = %output_name,
|
||||
"Failed to store build output: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Register GC roots and create build products for each output
|
||||
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
||||
let output_name = path_to_name
|
||||
.get(output_path)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
let output_name = output_names.get(i).cloned().unwrap_or_else(|| {
|
||||
if i == 0 {
|
||||
build.job_name.clone()
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue