Compare commits
10 commits
1336f998bf
...
eb8b231340
| Author | SHA1 | Date | |
|---|---|---|---|
|
eb8b231340 |
|||
|
92844d302e |
|||
|
4050de5b4e |
|||
|
f5c16aef83 |
|||
|
b43a11756a |
|||
|
609ac53c3f |
|||
|
ec28069b69 |
|||
|
959aba0933 |
|||
|
01cd4439aa |
|||
|
b1a7233a05 |
16 changed files with 564 additions and 30 deletions
17
Cargo.lock
generated
17
Cargo.lock
generated
|
|
@ -812,6 +812,7 @@ dependencies = [
|
||||||
"config",
|
"config",
|
||||||
"git2",
|
"git2",
|
||||||
"hex",
|
"hex",
|
||||||
|
"humantime-serde",
|
||||||
"lettre",
|
"lettre",
|
||||||
"libc",
|
"libc",
|
||||||
"regex",
|
"regex",
|
||||||
|
|
@ -1288,6 +1289,22 @@ version = "1.0.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "humantime"
|
||||||
|
version = "2.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "humantime-serde"
|
||||||
|
version = "1.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
|
||||||
|
dependencies = [
|
||||||
|
"humantime",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hyper"
|
name = "hyper"
|
||||||
version = "1.8.1"
|
version = "1.8.1"
|
||||||
|
|
|
||||||
11
Cargo.toml
11
Cargo.toml
|
|
@ -23,7 +23,6 @@ fc-evaluator = { path = "./crates/evaluator" }
|
||||||
fc-queue-runner = { path = "./crates/queue-runner" }
|
fc-queue-runner = { path = "./crates/queue-runner" }
|
||||||
fc-server = { path = "./crates/server" }
|
fc-server = { path = "./crates/server" }
|
||||||
|
|
||||||
|
|
||||||
anyhow = "1.0.101"
|
anyhow = "1.0.101"
|
||||||
argon2 = "0.5.3"
|
argon2 = "0.5.3"
|
||||||
askama = "0.15.4"
|
askama = "0.15.4"
|
||||||
|
|
@ -38,6 +37,7 @@ futures = "0.3.31"
|
||||||
git2 = "0.20.4"
|
git2 = "0.20.4"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
hmac = "0.12.1"
|
hmac = "0.12.1"
|
||||||
|
humantime-serde = "1.1.1"
|
||||||
lettre = { version = "0.11.19", default-features = false, features = [
|
lettre = { version = "0.11.19", default-features = false, features = [
|
||||||
"tokio1-rustls-tls",
|
"tokio1-rustls-tls",
|
||||||
"smtp-transport",
|
"smtp-transport",
|
||||||
|
|
@ -81,6 +81,10 @@ style = { level = "warn", priority = -1 }
|
||||||
# enable those to keep our sanity.
|
# enable those to keep our sanity.
|
||||||
absolute_paths = "allow"
|
absolute_paths = "allow"
|
||||||
arbitrary_source_item_ordering = "allow"
|
arbitrary_source_item_ordering = "allow"
|
||||||
|
cast_possible_truncation = "allow"
|
||||||
|
cast_possible_wrap = "allow"
|
||||||
|
cast_precision_loss = "allow"
|
||||||
|
cast_sign_loss = "allow"
|
||||||
clone_on_ref_ptr = "warn"
|
clone_on_ref_ptr = "warn"
|
||||||
dbg_macro = "warn"
|
dbg_macro = "warn"
|
||||||
empty_drop = "warn"
|
empty_drop = "warn"
|
||||||
|
|
@ -105,11 +109,8 @@ similar_names = "allow"
|
||||||
single_call_fn = "allow"
|
single_call_fn = "allow"
|
||||||
std_instead_of_core = "allow"
|
std_instead_of_core = "allow"
|
||||||
too_long_first_doc_paragraph = "allow"
|
too_long_first_doc_paragraph = "allow"
|
||||||
|
too_many_arguments = "allow" # I don't care
|
||||||
too_many_lines = "allow"
|
too_many_lines = "allow"
|
||||||
cast_possible_truncation = "allow"
|
|
||||||
cast_possible_wrap = "allow"
|
|
||||||
cast_precision_loss = "allow"
|
|
||||||
cast_sign_loss = "allow"
|
|
||||||
undocumented_unsafe_blocks = "warn"
|
undocumented_unsafe_blocks = "warn"
|
||||||
unnecessary_safety_comment = "warn"
|
unnecessary_safety_comment = "warn"
|
||||||
unused_result_ok = "warn"
|
unused_result_ok = "warn"
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ clap.workspace = true
|
||||||
config.workspace = true
|
config.workspace = true
|
||||||
git2.workspace = true
|
git2.workspace = true
|
||||||
hex.workspace = true
|
hex.workspace = true
|
||||||
|
humantime-serde.workspace = true
|
||||||
lettre.workspace = true
|
lettre.workspace = true
|
||||||
libc.workspace = true
|
libc.workspace = true
|
||||||
regex.workspace = true
|
regex.workspace = true
|
||||||
|
|
|
||||||
|
|
@ -158,6 +158,16 @@ CREATE TABLE builds (
|
||||||
UNIQUE (evaluation_id, job_name)
|
UNIQUE (evaluation_id, job_name)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
-- build_outputs: normalized output storage
|
||||||
|
CREATE TABLE build_outputs (
|
||||||
|
build UUID NOT NULL REFERENCES builds (id) ON DELETE CASCADE,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
path TEXT,
|
||||||
|
PRIMARY KEY (build, name)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_build_outputs_path ON build_outputs USING btree (path);
|
||||||
|
|
||||||
-- build_products: output artifacts and metadata
|
-- build_products: output artifacts and metadata
|
||||||
CREATE TABLE build_products (
|
CREATE TABLE build_products (
|
||||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (),
|
id UUID PRIMARY KEY DEFAULT uuid_generate_v4 (),
|
||||||
|
|
@ -534,10 +544,7 @@ CREATE TABLE notification_tasks (
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Indexes: notification_tasks
|
-- Indexes: notification_tasks
|
||||||
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (
|
CREATE INDEX idx_notification_tasks_status_next_retry ON notification_tasks (status, next_retry_at)
|
||||||
status,
|
|
||||||
next_retry_at
|
|
||||||
)
|
|
||||||
WHERE
|
WHERE
|
||||||
status IN ('pending', 'running');
|
status IN ('pending', 'running');
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
//! Configuration management for FC CI
|
//! Configuration management for FC CI
|
||||||
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use config as config_crate;
|
use config as config_crate;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
@ -90,6 +91,12 @@ pub struct QueueRunnerConfig {
|
||||||
/// TTL in seconds for failed paths cache entries (default 24h).
|
/// TTL in seconds for failed paths cache entries (default 24h).
|
||||||
#[serde(default = "default_failed_paths_ttl")]
|
#[serde(default = "default_failed_paths_ttl")]
|
||||||
pub failed_paths_ttl: u64,
|
pub failed_paths_ttl: u64,
|
||||||
|
|
||||||
|
/// Timeout after which builds for unsupported systems are aborted.
|
||||||
|
/// None or 0 = disabled (Hydra maxUnsupportedTime compatibility).
|
||||||
|
#[serde(default)]
|
||||||
|
#[serde(with = "humantime_serde")]
|
||||||
|
pub unsupported_timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
@ -546,13 +553,14 @@ impl Default for EvaluatorConfig {
|
||||||
impl Default for QueueRunnerConfig {
|
impl Default for QueueRunnerConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
workers: 4,
|
workers: 4,
|
||||||
poll_interval: 5,
|
poll_interval: 5,
|
||||||
build_timeout: 3600,
|
build_timeout: 3600,
|
||||||
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
|
work_dir: PathBuf::from("/tmp/fc-queue-runner"),
|
||||||
strict_errors: false,
|
strict_errors: false,
|
||||||
failed_paths_cache: true,
|
failed_paths_cache: true,
|
||||||
failed_paths_ttl: 86400,
|
failed_paths_ttl: 86400,
|
||||||
|
unsupported_timeout: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -853,4 +861,82 @@ mod tests {
|
||||||
env::remove_var("FC_SERVER__PORT");
|
env::remove_var("FC_SERVER__PORT");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_unsupported_timeout_config() {
|
||||||
|
let mut config = Config::default();
|
||||||
|
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(3600));
|
||||||
|
|
||||||
|
// Serialize and deserialize to verify serde works
|
||||||
|
let toml_str = toml::to_string(&config).unwrap();
|
||||||
|
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
parsed.queue_runner.unsupported_timeout,
|
||||||
|
Some(Duration::from_secs(3600))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_unsupported_timeout_default() {
|
||||||
|
let config = Config::default();
|
||||||
|
assert_eq!(config.queue_runner.unsupported_timeout, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_unsupported_timeout_various_formats() {
|
||||||
|
// Test 30 minutes
|
||||||
|
let mut config = Config::default();
|
||||||
|
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(1800));
|
||||||
|
let toml_str = toml::to_string(&config).unwrap();
|
||||||
|
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
parsed.queue_runner.unsupported_timeout,
|
||||||
|
Some(Duration::from_secs(1800))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Test disabled (0s)
|
||||||
|
let mut config = Config::default();
|
||||||
|
config.queue_runner.unsupported_timeout = Some(Duration::from_secs(0));
|
||||||
|
let toml_str = toml::to_string(&config).unwrap();
|
||||||
|
let parsed: Config = toml::from_str(&toml_str).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
parsed.queue_runner.unsupported_timeout,
|
||||||
|
Some(Duration::from_secs(0))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_humantime_serde_parsing() {
|
||||||
|
// Test that we can directly parse a QueueRunnerConfig with humantime format
|
||||||
|
let toml = r#"
|
||||||
|
workers = 4
|
||||||
|
poll_interval = 5
|
||||||
|
build_timeout = 3600
|
||||||
|
work_dir = "/tmp/fc"
|
||||||
|
unsupported_timeout = "2h 30m"
|
||||||
|
"#;
|
||||||
|
|
||||||
|
let qr_config: QueueRunnerConfig = toml::from_str(toml).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
qr_config.unsupported_timeout,
|
||||||
|
Some(Duration::from_secs(9000)) // 2.5 hours
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod humantime_option_test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_option_humantime_missing() {
|
||||||
|
let toml = r#"
|
||||||
|
workers = 4
|
||||||
|
poll_interval = 5
|
||||||
|
build_timeout = 3600
|
||||||
|
work_dir = "/tmp/fc"
|
||||||
|
"#;
|
||||||
|
let config: QueueRunnerConfig = toml::from_str(toml).unwrap();
|
||||||
|
assert_eq!(config.unsupported_timeout, None);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -487,6 +487,14 @@ pub struct StarredJob {
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Normalized build output (Hydra-compatible)
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||||
|
pub struct BuildOutput {
|
||||||
|
pub build: Uuid,
|
||||||
|
pub name: String,
|
||||||
|
pub path: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Project membership for per-project permissions
|
/// Project membership for per-project permissions
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||||
pub struct ProjectMember {
|
pub struct ProjectMember {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,9 @@
|
||||||
//! Notification dispatch for build events
|
//! Notification dispatch for build events
|
||||||
|
|
||||||
use std::sync::OnceLock;
|
use std::{
|
||||||
|
sync::OnceLock,
|
||||||
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
|
};
|
||||||
|
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
@ -18,6 +21,31 @@ fn http_client() -> &'static reqwest::Client {
|
||||||
CLIENT.get_or_init(reqwest::Client::new)
|
CLIENT.get_or_init(reqwest::Client::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct RateLimitState {
|
||||||
|
pub limit: u64,
|
||||||
|
pub remaining: u64,
|
||||||
|
pub reset_at: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn extract_rate_limit_from_headers(
|
||||||
|
headers: &reqwest::header::HeaderMap,
|
||||||
|
) -> Option<RateLimitState> {
|
||||||
|
let limit = headers.get("X-RateLimit-Limit")?.to_str().ok()?.parse().ok()?;
|
||||||
|
let remaining = headers.get("X-RateLimit-Remaining")?.to_str().ok()?.parse().ok()?;
|
||||||
|
let reset_at = headers.get("X-RateLimit-Reset")?.to_str().ok()?.parse().ok()?;
|
||||||
|
Some(RateLimitState { limit, remaining, reset_at })
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn calculate_delay(state: &RateLimitState, now: u64) -> u64 {
|
||||||
|
let seconds_until_reset = state.reset_at.saturating_sub(now).max(1);
|
||||||
|
let consumed = state.limit.saturating_sub(state.remaining);
|
||||||
|
let delay = (consumed * 5) / seconds_until_reset;
|
||||||
|
delay.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
/// Dispatch all configured notifications for a completed build.
|
/// Dispatch all configured notifications for a completed build.
|
||||||
/// If retry queue is enabled, enqueues tasks; otherwise sends immediately.
|
/// If retry queue is enabled, enqueues tasks; otherwise sends immediately.
|
||||||
pub async fn dispatch_build_finished(
|
pub async fn dispatch_build_finished(
|
||||||
|
|
@ -452,13 +480,45 @@ async fn set_github_status(
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
if resp.status().is_success() {
|
let is_success = resp.status().is_success();
|
||||||
|
let status = resp.status();
|
||||||
|
|
||||||
|
// Extract rate limit state from response headers before consuming body
|
||||||
|
let rate_limit = extract_rate_limit_from_headers(resp.headers());
|
||||||
|
|
||||||
|
if is_success {
|
||||||
info!(build_id = %build.id, "Set GitHub commit status: {state}");
|
info!(build_id = %build.id, "Set GitHub commit status: {state}");
|
||||||
} else {
|
} else {
|
||||||
let status = resp.status();
|
|
||||||
let text = resp.text().await.unwrap_or_default();
|
let text = resp.text().await.unwrap_or_default();
|
||||||
warn!("GitHub status API returned {status}: {text}");
|
warn!("GitHub status API returned {status}: {text}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle rate limiting based on extracted state
|
||||||
|
if let Some(rate_limit) = rate_limit {
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
// Log when approaching limit (Hydra threshold: 2000)
|
||||||
|
if rate_limit.remaining <= 2000 {
|
||||||
|
let seconds_until_reset = rate_limit.reset_at.saturating_sub(now);
|
||||||
|
info!(
|
||||||
|
"GitHub rate limit: {}/{}, resets in {}s",
|
||||||
|
rate_limit.remaining, rate_limit.limit, seconds_until_reset
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep when critical (Hydra threshold: 1000)
|
||||||
|
if rate_limit.remaining <= 1000 {
|
||||||
|
let delay = calculate_delay(&rate_limit, now);
|
||||||
|
warn!(
|
||||||
|
"GitHub rate limit critical: {}/{}, sleeping {}s",
|
||||||
|
rate_limit.remaining, rate_limit.limit, delay
|
||||||
|
);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Err(e) => error!("GitHub status API request failed: {e}"),
|
Err(e) => error!("GitHub status API request failed: {e}"),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
91
crates/common/src/repo/build_outputs.rs
Normal file
91
crates/common/src/repo/build_outputs.rs
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
error::{CiError, Result},
|
||||||
|
models::BuildOutput,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Create a build output record.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns error if database insert fails or if a duplicate (build, name) pair
|
||||||
|
/// exists.
|
||||||
|
pub async fn create(
|
||||||
|
pool: &PgPool,
|
||||||
|
build: Uuid,
|
||||||
|
name: &str,
|
||||||
|
path: Option<&str>,
|
||||||
|
) -> Result<BuildOutput> {
|
||||||
|
sqlx::query_as::<_, BuildOutput>(
|
||||||
|
"INSERT INTO build_outputs (build, name, path) VALUES ($1, $2, $3) \
|
||||||
|
RETURNING *",
|
||||||
|
)
|
||||||
|
.bind(build)
|
||||||
|
.bind(name)
|
||||||
|
.bind(path)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
if let sqlx::Error::Database(db_err) = &e
|
||||||
|
&& db_err.is_unique_violation() {
|
||||||
|
return CiError::Conflict(format!(
|
||||||
|
"Build output with name '{name}' already exists for build {build}"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
CiError::Database(e)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all build outputs for a build, ordered by name.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns error if database query fails.
|
||||||
|
pub async fn list_for_build(
|
||||||
|
pool: &PgPool,
|
||||||
|
build: Uuid,
|
||||||
|
) -> Result<Vec<BuildOutput>> {
|
||||||
|
sqlx::query_as::<_, BuildOutput>(
|
||||||
|
"SELECT * FROM build_outputs WHERE build = $1 ORDER BY name ASC",
|
||||||
|
)
|
||||||
|
.bind(build)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find build outputs by path.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns error if database query fails.
|
||||||
|
pub async fn find_by_path(
|
||||||
|
pool: &PgPool,
|
||||||
|
path: &str,
|
||||||
|
) -> Result<Vec<BuildOutput>> {
|
||||||
|
sqlx::query_as::<_, BuildOutput>(
|
||||||
|
"SELECT * FROM build_outputs WHERE path = $1 ORDER BY build, name",
|
||||||
|
)
|
||||||
|
.bind(path)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all build outputs for a build.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns error if database query fails.
|
||||||
|
pub async fn delete_for_build(pool: &PgPool, build: Uuid) -> Result<u64> {
|
||||||
|
let result =
|
||||||
|
sqlx::query("DELETE FROM build_outputs WHERE build = $1")
|
||||||
|
.bind(build)
|
||||||
|
.execute(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)?;
|
||||||
|
|
||||||
|
Ok(result.rows_affected())
|
||||||
|
}
|
||||||
|
|
@ -512,3 +512,21 @@ pub async fn set_builder(
|
||||||
.map_err(CiError::Database)?;
|
.map_err(CiError::Database)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete a build by ID.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns error if database query fails or build not found.
|
||||||
|
pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
|
||||||
|
let result = sqlx::query("DELETE FROM builds WHERE id = $1")
|
||||||
|
.bind(id)
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if result.rows_affected() == 0 {
|
||||||
|
return Err(CiError::NotFound(format!("Build {id} not found")));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
pub mod api_keys;
|
pub mod api_keys;
|
||||||
pub mod build_dependencies;
|
pub mod build_dependencies;
|
||||||
pub mod build_metrics;
|
pub mod build_metrics;
|
||||||
|
pub mod build_outputs;
|
||||||
pub mod build_products;
|
pub mod build_products;
|
||||||
pub mod build_steps;
|
pub mod build_steps;
|
||||||
pub mod builds;
|
pub mod builds;
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
//! Integration tests for database and configuration
|
//! Integration tests for database and configuration
|
||||||
|
|
||||||
|
mod notifications_tests;
|
||||||
|
|
||||||
use fc_common::{
|
use fc_common::{
|
||||||
Database,
|
Database,
|
||||||
config::{Config, DatabaseConfig},
|
config::{Config, DatabaseConfig},
|
||||||
|
|
|
||||||
59
crates/common/tests/notifications_tests.rs
Normal file
59
crates/common/tests/notifications_tests.rs
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
use fc_common::notifications::*;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_rate_limit_extraction() {
|
||||||
|
let mut headers = reqwest::header::HeaderMap::new();
|
||||||
|
headers.insert("X-RateLimit-Limit", "5000".parse().unwrap());
|
||||||
|
headers.insert("X-RateLimit-Remaining", "1234".parse().unwrap());
|
||||||
|
headers.insert("X-RateLimit-Reset", "1735689600".parse().unwrap());
|
||||||
|
|
||||||
|
let state = extract_rate_limit_from_headers(&headers);
|
||||||
|
assert!(state.is_some());
|
||||||
|
|
||||||
|
let state = state.unwrap();
|
||||||
|
assert_eq!(state.limit, 5000);
|
||||||
|
assert_eq!(state.remaining, 1234);
|
||||||
|
assert_eq!(state.reset_at, 1735689600);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_rate_limit_missing_headers() {
|
||||||
|
let headers = reqwest::header::HeaderMap::new();
|
||||||
|
let state = extract_rate_limit_from_headers(&headers);
|
||||||
|
assert!(state.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sleep_duration_calculation() {
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
let state = RateLimitState {
|
||||||
|
limit: 5000,
|
||||||
|
remaining: 500,
|
||||||
|
reset_at: now + 3600,
|
||||||
|
};
|
||||||
|
|
||||||
|
let delay = calculate_delay(&state, now);
|
||||||
|
assert!(delay >= 6 && delay <= 7);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sleep_duration_minimum() {
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
let state = RateLimitState {
|
||||||
|
limit: 5000,
|
||||||
|
remaining: 4999,
|
||||||
|
reset_at: now + 10000,
|
||||||
|
};
|
||||||
|
|
||||||
|
let delay = calculate_delay(&state, now);
|
||||||
|
assert_eq!(delay, 1);
|
||||||
|
}
|
||||||
|
|
@ -814,3 +814,98 @@ async fn test_dedup_by_drv_path() {
|
||||||
// Cleanup
|
// Cleanup
|
||||||
repo::projects::delete(&pool, project.id).await.ok();
|
repo::projects::delete(&pool, project.id).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_build_outputs_crud() {
|
||||||
|
let Some(pool) = get_pool().await else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create project, jobset, evaluation, build
|
||||||
|
let project = create_test_project(&pool, "test-project").await;
|
||||||
|
let jobset = create_test_jobset(&pool, project.id).await;
|
||||||
|
let eval = create_test_eval(&pool, jobset.id).await;
|
||||||
|
let build = create_test_build(
|
||||||
|
&pool,
|
||||||
|
eval.id,
|
||||||
|
"test-job",
|
||||||
|
"/nix/store/test.drv",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Create outputs
|
||||||
|
let _out1 = repo::build_outputs::create(
|
||||||
|
&pool,
|
||||||
|
build.id,
|
||||||
|
"out",
|
||||||
|
Some("/nix/store/abc-result"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("create output 1");
|
||||||
|
|
||||||
|
let _out2 = repo::build_outputs::create(
|
||||||
|
&pool,
|
||||||
|
build.id,
|
||||||
|
"dev",
|
||||||
|
Some("/nix/store/def-result-dev"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("create output 2");
|
||||||
|
|
||||||
|
// List outputs for build
|
||||||
|
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
|
||||||
|
.await
|
||||||
|
.expect("list outputs");
|
||||||
|
assert_eq!(outputs.len(), 2);
|
||||||
|
assert_eq!(outputs[0].name, "dev"); // Alphabetical order
|
||||||
|
assert_eq!(outputs[1].name, "out");
|
||||||
|
|
||||||
|
// Find by path
|
||||||
|
let found = repo::build_outputs::find_by_path(&pool, "/nix/store/abc-result")
|
||||||
|
.await
|
||||||
|
.expect("find by path");
|
||||||
|
assert_eq!(found.len(), 1);
|
||||||
|
assert_eq!(found[0].build, build.id);
|
||||||
|
assert_eq!(found[0].name, "out");
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
repo::projects::delete(&pool, project.id).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_build_outputs_cascade_delete() {
|
||||||
|
let Some(pool) = get_pool().await else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let project = create_test_project(&pool, "test-project").await;
|
||||||
|
let jobset = create_test_jobset(&pool, project.id).await;
|
||||||
|
let eval = create_test_eval(&pool, jobset.id).await;
|
||||||
|
let build = create_test_build(
|
||||||
|
&pool,
|
||||||
|
eval.id,
|
||||||
|
"test-job",
|
||||||
|
"/nix/store/test.drv",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
repo::build_outputs::create(&pool, build.id, "out", Some("/nix/store/abc"))
|
||||||
|
.await
|
||||||
|
.expect("create output");
|
||||||
|
|
||||||
|
// Delete build
|
||||||
|
repo::builds::delete(&pool, build.id)
|
||||||
|
.await
|
||||||
|
.expect("delete build");
|
||||||
|
|
||||||
|
// Verify outputs cascade deleted
|
||||||
|
let outputs = repo::build_outputs::list_for_build(&pool, build.id)
|
||||||
|
.await
|
||||||
|
.expect("list outputs after delete");
|
||||||
|
assert_eq!(outputs.len(), 0);
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
repo::projects::delete(&pool, project.id).await.ok();
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let failed_paths_cache = qr_config.failed_paths_cache;
|
let failed_paths_cache = qr_config.failed_paths_cache;
|
||||||
let failed_paths_ttl = qr_config.failed_paths_ttl;
|
let failed_paths_ttl = qr_config.failed_paths_ttl;
|
||||||
let work_dir = qr_config.work_dir;
|
let work_dir = qr_config.work_dir;
|
||||||
|
let unsupported_timeout = qr_config.unsupported_timeout;
|
||||||
|
|
||||||
// Ensure the work directory exists
|
// Ensure the work directory exists
|
||||||
tokio::fs::create_dir_all(&work_dir).await?;
|
tokio::fs::create_dir_all(&work_dir).await?;
|
||||||
|
|
@ -82,7 +83,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let active_builds = worker_pool.active_builds().clone();
|
let active_builds = worker_pool.active_builds().clone();
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone()) => {
|
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache, notifications_config.clone(), unsupported_timeout) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Runner loop failed: {e}");
|
tracing::error!("Runner loop failed: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ pub async fn run(
|
||||||
strict_errors: bool,
|
strict_errors: bool,
|
||||||
failed_paths_cache: bool,
|
failed_paths_cache: bool,
|
||||||
notifications_config: fc_common::config::NotificationsConfig,
|
notifications_config: fc_common::config::NotificationsConfig,
|
||||||
|
unsupported_timeout: Option<Duration>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||||
|
|
@ -207,6 +208,51 @@ pub async fn run(
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unsupported system timeout: abort builds with no available builders
|
||||||
|
if let Some(timeout) = unsupported_timeout
|
||||||
|
&& let Some(system) = &build.system {
|
||||||
|
match repo::remote_builders::find_for_system(&pool, system).await {
|
||||||
|
Ok(builders) if builders.is_empty() => {
|
||||||
|
let timeout_at = build.created_at + timeout;
|
||||||
|
if chrono::Utc::now() > timeout_at {
|
||||||
|
tracing::info!(
|
||||||
|
build_id = %build.id,
|
||||||
|
system = %system,
|
||||||
|
timeout = ?timeout,
|
||||||
|
"Aborting build: no builder available for system type"
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Err(e) = repo::builds::start(&pool, build.id).await {
|
||||||
|
tracing::warn!(build_id = %build.id, "Failed to start unsupported build: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = repo::builds::complete(
|
||||||
|
&pool,
|
||||||
|
build.id,
|
||||||
|
BuildStatus::UnsupportedSystem,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some("No builder available for system type"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(build_id = %build.id, "Failed to complete unsupported build: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Ok(_) => {}, // Builders available, proceed normally
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
build_id = %build.id,
|
||||||
|
"Failed to check builders for unsupported system: {e}"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// One-at-a-time scheduling: check if jobset allows concurrent builds
|
// One-at-a-time scheduling: check if jobset allows concurrent builds
|
||||||
// First, get the evaluation to find the jobset
|
// First, get the evaluation to find the jobset
|
||||||
let eval =
|
let eval =
|
||||||
|
|
|
||||||
|
|
@ -588,23 +588,64 @@ async fn run_build(
|
||||||
};
|
};
|
||||||
|
|
||||||
if build_result.success {
|
if build_result.success {
|
||||||
// Parse output names from build's outputs JSON
|
// Build a reverse lookup map: path -> output_name
|
||||||
let output_names: Vec<String> = build
|
// The outputs JSON is a HashMap<String, String> where keys are output names
|
||||||
|
// and values are store paths. We need to match paths to names correctly.
|
||||||
|
let path_to_name: std::collections::HashMap<String, String> = build
|
||||||
.outputs
|
.outputs
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|v| v.as_object())
|
.and_then(|v| v.as_object())
|
||||||
.map(|obj| obj.keys().cloned().collect())
|
.map(|obj| {
|
||||||
|
obj
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(name, path)| {
|
||||||
|
path.as_str().map(|p| (p.to_string(), name.clone()))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
// Store build outputs in normalized table
|
||||||
|
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
||||||
|
let output_name = path_to_name
|
||||||
|
.get(output_path)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
if i == 0 {
|
||||||
|
"out".to_string()
|
||||||
|
} else {
|
||||||
|
format!("out{i}")
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = repo::build_outputs::create(
|
||||||
|
pool,
|
||||||
|
build.id,
|
||||||
|
&output_name,
|
||||||
|
Some(output_path),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(
|
||||||
|
build_id = %build.id,
|
||||||
|
output_name = %output_name,
|
||||||
|
"Failed to store build output: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Register GC roots and create build products for each output
|
// Register GC roots and create build products for each output
|
||||||
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
for (i, output_path) in build_result.output_paths.iter().enumerate() {
|
||||||
let output_name = output_names.get(i).cloned().unwrap_or_else(|| {
|
let output_name = path_to_name
|
||||||
if i == 0 {
|
.get(output_path)
|
||||||
build.job_name.clone()
|
.cloned()
|
||||||
} else {
|
.unwrap_or_else(|| {
|
||||||
format!("{}-{i}", build.job_name)
|
if i == 0 {
|
||||||
}
|
build.job_name.clone()
|
||||||
});
|
} else {
|
||||||
|
format!("{}-{i}", build.job_name)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Register GC root
|
// Register GC root
|
||||||
let mut gc_root_path = None;
|
let mut gc_root_path = None;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue