fc-common: add GC pinning and machine health infrastructure
Migration 017 adds `builds.keep`, `jobsets.keep_nr`, and health tracking columns to `remote_builders`. Repo layer implements `set_keep`, `list_pinned_ids`, `record_failure` with exponential backoff, `record_success`, and `find_for_system` filtering of disabled builders. GC root cleanup now skips pinned builds. Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ibba121de3dc42f71204e3a8f5776aa8b6a6a6964
This commit is contained in:
parent
25699e5e97
commit
5b472a2f57
16 changed files with 173 additions and 23 deletions
|
|
@ -0,0 +1,32 @@
|
|||
-- GC pinning (#11)
|
||||
ALTER TABLE builds ADD COLUMN IF NOT EXISTS keep BOOLEAN NOT NULL DEFAULT false;
|
||||
ALTER TABLE jobsets ADD COLUMN IF NOT EXISTS keep_nr INTEGER NOT NULL DEFAULT 3;
|
||||
|
||||
-- Recreate active_jobsets view to include keep_nr
|
||||
DROP VIEW IF EXISTS active_jobsets;
|
||||
CREATE VIEW active_jobsets AS
|
||||
SELECT
|
||||
j.id,
|
||||
j.project_id,
|
||||
j.name,
|
||||
j.nix_expression,
|
||||
j.enabled,
|
||||
j.flake_mode,
|
||||
j.check_interval,
|
||||
j.branch,
|
||||
j.scheduling_shares,
|
||||
j.created_at,
|
||||
j.updated_at,
|
||||
j.state,
|
||||
j.last_checked_at,
|
||||
j.keep_nr,
|
||||
p.name as project_name,
|
||||
p.repository_url
|
||||
FROM jobsets j
|
||||
JOIN projects p ON j.project_id = p.id
|
||||
WHERE j.state IN ('enabled', 'one_shot', 'one_at_a_time');
|
||||
|
||||
-- Machine health tracking (#5)
|
||||
ALTER TABLE remote_builders ADD COLUMN IF NOT EXISTS consecutive_failures INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE remote_builders ADD COLUMN IF NOT EXISTS disabled_until TIMESTAMP WITH TIME ZONE;
|
||||
ALTER TABLE remote_builders ADD COLUMN IF NOT EXISTS last_failure TIMESTAMP WITH TIME ZONE;
|
||||
|
|
@ -137,6 +137,7 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
|
|||
branch: decl_jobset.branch.clone(),
|
||||
scheduling_shares: Some(decl_jobset.scheduling_shares),
|
||||
state,
|
||||
keep_nr: decl_jobset.keep_nr,
|
||||
})
|
||||
.await?;
|
||||
|
||||
|
|
|
|||
|
|
@ -365,6 +365,8 @@ pub struct DeclarativeJobset {
|
|||
/// Scheduling priority shares (default 100, higher = more priority)
|
||||
#[serde(default = "default_scheduling_shares")]
|
||||
pub scheduling_shares: i32,
|
||||
/// Number of recent successful evaluations to retain (default 3)
|
||||
pub keep_nr: Option<i32>,
|
||||
/// Jobset inputs for parameterized evaluations
|
||||
#[serde(default)]
|
||||
pub inputs: Vec<DeclarativeJobsetInput>,
|
||||
|
|
@ -762,6 +764,7 @@ mod tests {
|
|||
state: None,
|
||||
branch: None,
|
||||
scheduling_shares: 100,
|
||||
keep_nr: None,
|
||||
inputs: vec![],
|
||||
}],
|
||||
notifications: vec![],
|
||||
|
|
|
|||
|
|
@ -1,18 +1,22 @@
|
|||
//! GC root management - prevents nix-store --gc from deleting build outputs
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
os::unix::fs::symlink,
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Remove GC root symlinks with mtime older than `max_age`. Returns count
|
||||
/// removed.
|
||||
/// removed. Symlinks whose filename matches a UUID in `pinned_build_ids` are
|
||||
/// skipped regardless of age.
|
||||
pub fn cleanup_old_roots(
|
||||
roots_dir: &Path,
|
||||
max_age: Duration,
|
||||
pinned_build_ids: &HashSet<Uuid>,
|
||||
) -> std::io::Result<u64> {
|
||||
if !roots_dir.exists() {
|
||||
return Ok(0);
|
||||
|
|
@ -23,6 +27,17 @@ pub fn cleanup_old_roots(
|
|||
|
||||
for entry in std::fs::read_dir(roots_dir)? {
|
||||
let entry = entry?;
|
||||
|
||||
// Check if this root is pinned (filename is a build UUID with keep=true)
|
||||
if let Some(name) = entry.file_name().to_str() {
|
||||
if let Ok(build_id) = name.parse::<Uuid>() {
|
||||
if pinned_build_ids.contains(&build_id) {
|
||||
debug!(build_id = %build_id, "Skipping pinned GC root");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let metadata = match entry.metadata() {
|
||||
Ok(m) => m,
|
||||
Err(_) => continue,
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ pub struct Jobset {
|
|||
pub updated_at: DateTime<Utc>,
|
||||
pub state: JobsetState,
|
||||
pub last_checked_at: Option<DateTime<Utc>>,
|
||||
pub keep_nr: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
|
||||
|
|
@ -119,6 +120,7 @@ pub struct Build {
|
|||
pub constituents: Option<serde_json::Value>,
|
||||
pub builder_id: Option<Uuid>,
|
||||
pub signed: bool,
|
||||
pub keep: bool,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
|
|
@ -334,6 +336,7 @@ pub struct ActiveJobset {
|
|||
pub updated_at: DateTime<Utc>,
|
||||
pub state: JobsetState,
|
||||
pub last_checked_at: Option<DateTime<Utc>>,
|
||||
pub keep_nr: i32,
|
||||
pub project_name: String,
|
||||
pub repository_url: String,
|
||||
}
|
||||
|
|
@ -422,6 +425,9 @@ pub struct RemoteBuilder {
|
|||
pub public_host_key: Option<String>,
|
||||
pub ssh_key_file: Option<String>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub consecutive_failures: i32,
|
||||
pub disabled_until: Option<DateTime<Utc>>,
|
||||
pub last_failure: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// User account for authentication and personalization
|
||||
|
|
@ -546,6 +552,7 @@ pub struct CreateJobset {
|
|||
pub branch: Option<String>,
|
||||
pub scheduling_shares: Option<i32>,
|
||||
pub state: Option<JobsetState>,
|
||||
pub keep_nr: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -558,6 +565,7 @@ pub struct UpdateJobset {
|
|||
pub branch: Option<String>,
|
||||
pub scheduling_shares: Option<i32>,
|
||||
pub state: Option<JobsetState>,
|
||||
pub keep_nr: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
|
|||
|
|
@ -374,6 +374,34 @@ pub async fn get_completed_by_drv_paths(
|
|||
)
|
||||
}
|
||||
|
||||
/// Return the set of build IDs that have `keep = true` (GC-pinned).
|
||||
pub async fn list_pinned_ids(
|
||||
pool: &PgPool,
|
||||
) -> Result<std::collections::HashSet<Uuid>> {
|
||||
let rows: Vec<(Uuid,)> =
|
||||
sqlx::query_as("SELECT id FROM builds WHERE keep = true")
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)?;
|
||||
Ok(rows.into_iter().map(|(id,)| id).collect())
|
||||
}
|
||||
|
||||
/// Set the `keep` (GC pin) flag on a build.
|
||||
pub async fn set_keep(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
keep: bool,
|
||||
) -> Result<Build> {
|
||||
sqlx::query_as::<_, Build>(
|
||||
"UPDATE builds SET keep = $1 WHERE id = $2 RETURNING *",
|
||||
)
|
||||
.bind(keep)
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
.ok_or_else(|| CiError::NotFound(format!("Build {id} not found")))
|
||||
}
|
||||
|
||||
/// Set the `builder_id` for a build.
|
||||
pub async fn set_builder(
|
||||
pool: &PgPool,
|
||||
|
|
|
|||
|
|
@ -18,11 +18,12 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
|
|||
let flake_mode = input.flake_mode.unwrap_or(true);
|
||||
let check_interval = input.check_interval.unwrap_or(60);
|
||||
let scheduling_shares = input.scheduling_shares.unwrap_or(100);
|
||||
let keep_nr = input.keep_nr.unwrap_or(3);
|
||||
|
||||
sqlx::query_as::<_, Jobset>(
|
||||
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, \
|
||||
flake_mode, check_interval, branch, scheduling_shares, state) VALUES \
|
||||
($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *",
|
||||
flake_mode, check_interval, branch, scheduling_shares, state, keep_nr) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *",
|
||||
)
|
||||
.bind(input.project_id)
|
||||
.bind(&input.name)
|
||||
|
|
@ -33,6 +34,7 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
|
|||
.bind(&input.branch)
|
||||
.bind(scheduling_shares)
|
||||
.bind(state.as_str())
|
||||
.bind(keep_nr)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
|
@ -106,11 +108,12 @@ pub async fn update(
|
|||
let scheduling_shares = input
|
||||
.scheduling_shares
|
||||
.unwrap_or(existing.scheduling_shares);
|
||||
let keep_nr = input.keep_nr.unwrap_or(existing.keep_nr);
|
||||
|
||||
sqlx::query_as::<_, Jobset>(
|
||||
"UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, \
|
||||
flake_mode = $4, check_interval = $5, branch = $6, scheduling_shares = \
|
||||
$7, state = $8 WHERE id = $9 RETURNING *",
|
||||
$7, state = $8, keep_nr = $9 WHERE id = $10 RETURNING *",
|
||||
)
|
||||
.bind(&name)
|
||||
.bind(&nix_expression)
|
||||
|
|
@ -120,6 +123,7 @@ pub async fn update(
|
|||
.bind(&branch)
|
||||
.bind(scheduling_shares)
|
||||
.bind(state.as_str())
|
||||
.bind(keep_nr)
|
||||
.bind(id)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
|
|
@ -160,15 +164,17 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
|
|||
let flake_mode = input.flake_mode.unwrap_or(true);
|
||||
let check_interval = input.check_interval.unwrap_or(60);
|
||||
let scheduling_shares = input.scheduling_shares.unwrap_or(100);
|
||||
let keep_nr = input.keep_nr.unwrap_or(3);
|
||||
|
||||
sqlx::query_as::<_, Jobset>(
|
||||
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, \
|
||||
flake_mode, check_interval, branch, scheduling_shares, state) VALUES \
|
||||
($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (project_id, name) DO \
|
||||
UPDATE SET nix_expression = EXCLUDED.nix_expression, enabled = \
|
||||
EXCLUDED.enabled, flake_mode = EXCLUDED.flake_mode, check_interval = \
|
||||
EXCLUDED.check_interval, branch = EXCLUDED.branch, scheduling_shares = \
|
||||
EXCLUDED.scheduling_shares, state = EXCLUDED.state RETURNING *",
|
||||
flake_mode, check_interval, branch, scheduling_shares, state, keep_nr) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT \
|
||||
(project_id, name) DO UPDATE SET nix_expression = \
|
||||
EXCLUDED.nix_expression, enabled = EXCLUDED.enabled, flake_mode = \
|
||||
EXCLUDED.flake_mode, check_interval = EXCLUDED.check_interval, branch = \
|
||||
EXCLUDED.branch, scheduling_shares = EXCLUDED.scheduling_shares, state = \
|
||||
EXCLUDED.state, keep_nr = EXCLUDED.keep_nr RETURNING *",
|
||||
)
|
||||
.bind(input.project_id)
|
||||
.bind(&input.name)
|
||||
|
|
@ -179,6 +185,7 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
|
|||
.bind(&input.branch)
|
||||
.bind(scheduling_shares)
|
||||
.bind(state.as_str())
|
||||
.bind(keep_nr)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(CiError::Database)
|
||||
|
|
|
|||
|
|
@ -70,12 +70,14 @@ pub async fn list_enabled(pool: &PgPool) -> Result<Vec<RemoteBuilder>> {
|
|||
}
|
||||
|
||||
/// Find a suitable builder for the given system.
|
||||
/// Excludes builders that are temporarily disabled due to consecutive failures.
|
||||
pub async fn find_for_system(
|
||||
pool: &PgPool,
|
||||
system: &str,
|
||||
) -> Result<Vec<RemoteBuilder>> {
|
||||
sqlx::query_as::<_, RemoteBuilder>(
|
||||
"SELECT * FROM remote_builders WHERE enabled = true AND $1 = ANY(systems) \
|
||||
AND (disabled_until IS NULL OR disabled_until < NOW()) \
|
||||
ORDER BY speed_factor DESC",
|
||||
)
|
||||
.bind(system)
|
||||
|
|
@ -84,6 +86,41 @@ pub async fn find_for_system(
|
|||
.map_err(CiError::Database)
|
||||
}
|
||||
|
||||
/// Record a build failure for a remote builder.
|
||||
/// Increments consecutive_failures (capped at 4), sets last_failure,
|
||||
/// and computes disabled_until with exponential backoff.
|
||||
/// Backoff formula (from Hydra): delta = 60 * 3^(min(failures, 4) - 1) seconds.
|
||||
pub async fn record_failure(pool: &PgPool, id: Uuid) -> Result<RemoteBuilder> {
|
||||
sqlx::query_as::<_, RemoteBuilder>(
|
||||
"UPDATE remote_builders SET \
|
||||
consecutive_failures = LEAST(consecutive_failures + 1, 4), \
|
||||
last_failure = NOW(), \
|
||||
disabled_until = NOW() + make_interval(secs => \
|
||||
60.0 * power(3, LEAST(consecutive_failures + 1, 4) - 1) + (random() * 30)::int \
|
||||
) \
|
||||
WHERE id = $1 RETURNING *",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
.ok_or_else(|| CiError::NotFound(format!("Remote builder {id} not found")))
|
||||
}
|
||||
|
||||
/// Record a build success for a remote builder.
|
||||
/// Resets consecutive_failures and clears disabled_until.
|
||||
pub async fn record_success(pool: &PgPool, id: Uuid) -> Result<RemoteBuilder> {
|
||||
sqlx::query_as::<_, RemoteBuilder>(
|
||||
"UPDATE remote_builders SET \
|
||||
consecutive_failures = 0, \
|
||||
disabled_until = NULL \
|
||||
WHERE id = $1 RETURNING *",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
.ok_or_else(|| CiError::NotFound(format!("Remote builder {id} not found")))
|
||||
}
|
||||
|
||||
pub async fn update(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
|
|
|
|||
|
|
@ -608,6 +608,7 @@ mod tests {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
};
|
||||
assert!(j.validate().is_ok());
|
||||
}
|
||||
|
|
@ -624,6 +625,7 @@ mod tests {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
};
|
||||
assert!(j.validate().is_err());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ async fn create_test_jobset(
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset")
|
||||
|
|
@ -193,6 +194,7 @@ async fn test_jobset_crud() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
@ -222,6 +224,7 @@ async fn test_jobset_crud() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("update jobset");
|
||||
|
|
|
|||
|
|
@ -117,6 +117,7 @@ async fn test_build_search_with_filters() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
@ -263,6 +264,7 @@ async fn test_multi_entity_search() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
@ -481,6 +483,7 @@ async fn test_quick_search() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
|
|||
|
|
@ -364,6 +364,7 @@ async fn test_starred_jobs_crud() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
@ -475,6 +476,7 @@ async fn test_starred_jobs_delete_by_job() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
|
|||
|
|
@ -528,6 +528,7 @@ async fn check_declarative_config(
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
};
|
||||
if let Err(e) = repo::jobsets::upsert(pool, input).await {
|
||||
tracing::warn!("Failed to upsert declarative jobset: {e}");
|
||||
|
|
|
|||
|
|
@ -282,6 +282,7 @@ async fn test_fair_share_scheduling() {
|
|||
branch: None,
|
||||
scheduling_shares: Some(200),
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset hi");
|
||||
|
|
@ -297,6 +298,7 @@ async fn test_fair_share_scheduling() {
|
|||
branch: None,
|
||||
scheduling_shares: Some(100),
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset lo");
|
||||
|
|
@ -487,6 +489,7 @@ async fn test_atomic_build_claiming() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
@ -579,6 +582,7 @@ async fn test_orphan_build_reset() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
@ -684,6 +688,7 @@ async fn test_get_cancelled_among() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
|
|||
|
|
@ -173,6 +173,7 @@ async fn create_project_jobset(
|
|||
branch: body.branch,
|
||||
scheduling_shares: body.scheduling_shares,
|
||||
state: body.state,
|
||||
keep_nr: None,
|
||||
};
|
||||
input
|
||||
.validate()
|
||||
|
|
@ -265,6 +266,7 @@ async fn setup_project(
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
};
|
||||
input
|
||||
.validate()
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ async fn test_e2e_project_eval_build_flow() {
|
|||
branch: None,
|
||||
scheduling_shares: None,
|
||||
state: None,
|
||||
keep_nr: None,
|
||||
})
|
||||
.await
|
||||
.expect("create jobset");
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue