diff --git a/crates/common/migrations/017_gc_pinning_and_machine_health.sql b/crates/common/migrations/017_gc_pinning_and_machine_health.sql new file mode 100644 index 0000000..9a9b090 --- /dev/null +++ b/crates/common/migrations/017_gc_pinning_and_machine_health.sql @@ -0,0 +1,32 @@ +-- GC pinning (#11) +ALTER TABLE builds ADD COLUMN IF NOT EXISTS keep BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE jobsets ADD COLUMN IF NOT EXISTS keep_nr INTEGER NOT NULL DEFAULT 3; + +-- Recreate active_jobsets view to include keep_nr +DROP VIEW IF EXISTS active_jobsets; +CREATE VIEW active_jobsets AS +SELECT + j.id, + j.project_id, + j.name, + j.nix_expression, + j.enabled, + j.flake_mode, + j.check_interval, + j.branch, + j.scheduling_shares, + j.created_at, + j.updated_at, + j.state, + j.last_checked_at, + j.keep_nr, + p.name as project_name, + p.repository_url +FROM jobsets j +JOIN projects p ON j.project_id = p.id +WHERE j.state IN ('enabled', 'one_shot', 'one_at_a_time'); + +-- Machine health tracking (#5) +ALTER TABLE remote_builders ADD COLUMN IF NOT EXISTS consecutive_failures INTEGER NOT NULL DEFAULT 0; +ALTER TABLE remote_builders ADD COLUMN IF NOT EXISTS disabled_until TIMESTAMP WITH TIME ZONE; +ALTER TABLE remote_builders ADD COLUMN IF NOT EXISTS last_failure TIMESTAMP WITH TIME ZONE; diff --git a/crates/common/src/bootstrap.rs b/crates/common/src/bootstrap.rs index c092f84..3bf369f 100644 --- a/crates/common/src/bootstrap.rs +++ b/crates/common/src/bootstrap.rs @@ -137,6 +137,7 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { branch: decl_jobset.branch.clone(), scheduling_shares: Some(decl_jobset.scheduling_shares), state, + keep_nr: decl_jobset.keep_nr, }) .await?; diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 607db6e..516b365 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -365,6 +365,8 @@ pub struct DeclarativeJobset { /// Scheduling priority shares (default 100, higher = more priority) #[serde(default = "default_scheduling_shares")] pub scheduling_shares: i32, + /// Number of recent successful evaluations to retain (default 3) + pub keep_nr: Option, /// Jobset inputs for parameterized evaluations #[serde(default)] pub inputs: Vec, @@ -762,6 +764,7 @@ mod tests { state: None, branch: None, scheduling_shares: 100, + keep_nr: None, inputs: vec![], }], notifications: vec![], diff --git a/crates/common/src/gc_roots.rs b/crates/common/src/gc_roots.rs index 9b843f2..f5f318e 100644 --- a/crates/common/src/gc_roots.rs +++ b/crates/common/src/gc_roots.rs @@ -1,18 +1,22 @@ //! GC root management - prevents nix-store --gc from deleting build outputs use std::{ + collections::HashSet, os::unix::fs::symlink, path::{Path, PathBuf}, time::Duration, }; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; +use uuid::Uuid; /// Remove GC root symlinks with mtime older than `max_age`. Returns count -/// removed. +/// removed. Symlinks whose filename matches a UUID in `pinned_build_ids` are +/// skipped regardless of age. pub fn cleanup_old_roots( roots_dir: &Path, max_age: Duration, + pinned_build_ids: &HashSet, ) -> std::io::Result { if !roots_dir.exists() { return Ok(0); @@ -23,6 +27,17 @@ pub fn cleanup_old_roots( for entry in std::fs::read_dir(roots_dir)? { let entry = entry?; + + // Check if this root is pinned (filename is a build UUID with keep=true) + if let Some(name) = entry.file_name().to_str() { + if let Ok(build_id) = name.parse::() { + if pinned_build_ids.contains(&build_id) { + debug!(build_id = %build_id, "Skipping pinned GC root"); + continue; + } + } + } + let metadata = match entry.metadata() { Ok(m) => m, Err(_) => continue, diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index cab2d0b..64968e2 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -30,6 +30,7 @@ pub struct Jobset { pub updated_at: DateTime, pub state: JobsetState, pub last_checked_at: Option>, + pub keep_nr: i32, } #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] @@ -119,6 +120,7 @@ pub struct Build { pub constituents: Option, pub builder_id: Option, pub signed: bool, + pub keep: bool, } #[derive( @@ -334,6 +336,7 @@ pub struct ActiveJobset { pub updated_at: DateTime, pub state: JobsetState, pub last_checked_at: Option>, + pub keep_nr: i32, pub project_name: String, pub repository_url: String, } @@ -410,18 +413,21 @@ pub struct Channel { /// Remote builder for multi-machine / multi-arch builds. #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct RemoteBuilder { - pub id: Uuid, - pub name: String, - pub ssh_uri: String, - pub systems: Vec, - pub max_jobs: i32, - pub speed_factor: i32, - pub supported_features: Vec, - pub mandatory_features: Vec, - pub enabled: bool, - pub public_host_key: Option, - pub ssh_key_file: Option, - pub created_at: DateTime, + pub id: Uuid, + pub name: String, + pub ssh_uri: String, + pub systems: Vec, + pub max_jobs: i32, + pub speed_factor: i32, + pub supported_features: Vec, + pub mandatory_features: Vec, + pub enabled: bool, + pub public_host_key: Option, + pub ssh_key_file: Option, + pub created_at: DateTime, + pub consecutive_failures: i32, + pub disabled_until: Option>, + pub last_failure: Option>, } /// User account for authentication and personalization @@ -546,6 +552,7 @@ pub struct CreateJobset { pub branch: Option, pub scheduling_shares: Option, pub state: Option, + pub keep_nr: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -558,6 +565,7 @@ pub struct UpdateJobset { pub branch: Option, pub scheduling_shares: Option, pub state: Option, + pub keep_nr: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/common/src/repo/builds.rs b/crates/common/src/repo/builds.rs index 23ccb70..f826ef3 100644 --- a/crates/common/src/repo/builds.rs +++ b/crates/common/src/repo/builds.rs @@ -374,6 +374,34 @@ pub async fn get_completed_by_drv_paths( ) } +/// Return the set of build IDs that have `keep = true` (GC-pinned). +pub async fn list_pinned_ids( + pool: &PgPool, +) -> Result> { + let rows: Vec<(Uuid,)> = + sqlx::query_as("SELECT id FROM builds WHERE keep = true") + .fetch_all(pool) + .await + .map_err(CiError::Database)?; + Ok(rows.into_iter().map(|(id,)| id).collect()) +} + +/// Set the `keep` (GC pin) flag on a build. +pub async fn set_keep( + pool: &PgPool, + id: Uuid, + keep: bool, +) -> Result { + sqlx::query_as::<_, Build>( + "UPDATE builds SET keep = $1 WHERE id = $2 RETURNING *", + ) + .bind(keep) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Build {id} not found"))) +} + /// Set the `builder_id` for a build. pub async fn set_builder( pool: &PgPool, diff --git a/crates/common/src/repo/jobsets.rs b/crates/common/src/repo/jobsets.rs index bd83e01..0f2d323 100644 --- a/crates/common/src/repo/jobsets.rs +++ b/crates/common/src/repo/jobsets.rs @@ -18,11 +18,12 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { let flake_mode = input.flake_mode.unwrap_or(true); let check_interval = input.check_interval.unwrap_or(60); let scheduling_shares = input.scheduling_shares.unwrap_or(100); + let keep_nr = input.keep_nr.unwrap_or(3); sqlx::query_as::<_, Jobset>( "INSERT INTO jobsets (project_id, name, nix_expression, enabled, \ - flake_mode, check_interval, branch, scheduling_shares, state) VALUES \ - ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *", + flake_mode, check_interval, branch, scheduling_shares, state, keep_nr) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *", ) .bind(input.project_id) .bind(&input.name) @@ -33,6 +34,7 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { .bind(&input.branch) .bind(scheduling_shares) .bind(state.as_str()) + .bind(keep_nr) .fetch_one(pool) .await .map_err(|e| { @@ -106,11 +108,12 @@ pub async fn update( let scheduling_shares = input .scheduling_shares .unwrap_or(existing.scheduling_shares); + let keep_nr = input.keep_nr.unwrap_or(existing.keep_nr); sqlx::query_as::<_, Jobset>( "UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, \ flake_mode = $4, check_interval = $5, branch = $6, scheduling_shares = \ - $7, state = $8 WHERE id = $9 RETURNING *", + $7, state = $8, keep_nr = $9 WHERE id = $10 RETURNING *", ) .bind(&name) .bind(&nix_expression) @@ -120,6 +123,7 @@ pub async fn update( .bind(&branch) .bind(scheduling_shares) .bind(state.as_str()) + .bind(keep_nr) .bind(id) .fetch_one(pool) .await @@ -160,15 +164,17 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { let flake_mode = input.flake_mode.unwrap_or(true); let check_interval = input.check_interval.unwrap_or(60); let scheduling_shares = input.scheduling_shares.unwrap_or(100); + let keep_nr = input.keep_nr.unwrap_or(3); sqlx::query_as::<_, Jobset>( "INSERT INTO jobsets (project_id, name, nix_expression, enabled, \ - flake_mode, check_interval, branch, scheduling_shares, state) VALUES \ - ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (project_id, name) DO \ - UPDATE SET nix_expression = EXCLUDED.nix_expression, enabled = \ - EXCLUDED.enabled, flake_mode = EXCLUDED.flake_mode, check_interval = \ - EXCLUDED.check_interval, branch = EXCLUDED.branch, scheduling_shares = \ - EXCLUDED.scheduling_shares, state = EXCLUDED.state RETURNING *", + flake_mode, check_interval, branch, scheduling_shares, state, keep_nr) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT \ + (project_id, name) DO UPDATE SET nix_expression = \ + EXCLUDED.nix_expression, enabled = EXCLUDED.enabled, flake_mode = \ + EXCLUDED.flake_mode, check_interval = EXCLUDED.check_interval, branch = \ + EXCLUDED.branch, scheduling_shares = EXCLUDED.scheduling_shares, state = \ + EXCLUDED.state, keep_nr = EXCLUDED.keep_nr RETURNING *", ) .bind(input.project_id) .bind(&input.name) @@ -179,6 +185,7 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { .bind(&input.branch) .bind(scheduling_shares) .bind(state.as_str()) + .bind(keep_nr) .fetch_one(pool) .await .map_err(CiError::Database) diff --git a/crates/common/src/repo/remote_builders.rs b/crates/common/src/repo/remote_builders.rs index a276539..dd3e315 100644 --- a/crates/common/src/repo/remote_builders.rs +++ b/crates/common/src/repo/remote_builders.rs @@ -70,12 +70,14 @@ pub async fn list_enabled(pool: &PgPool) -> Result> { } /// Find a suitable builder for the given system. +/// Excludes builders that are temporarily disabled due to consecutive failures. pub async fn find_for_system( pool: &PgPool, system: &str, ) -> Result> { sqlx::query_as::<_, RemoteBuilder>( "SELECT * FROM remote_builders WHERE enabled = true AND $1 = ANY(systems) \ + AND (disabled_until IS NULL OR disabled_until < NOW()) \ ORDER BY speed_factor DESC", ) .bind(system) @@ -84,6 +86,41 @@ pub async fn find_for_system( .map_err(CiError::Database) } +/// Record a build failure for a remote builder. +/// Increments consecutive_failures (capped at 4), sets last_failure, +/// and computes disabled_until with exponential backoff. +/// Backoff formula (from Hydra): delta = 60 * 3^(min(failures, 4) - 1) seconds. +pub async fn record_failure(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, RemoteBuilder>( + "UPDATE remote_builders SET \ + consecutive_failures = LEAST(consecutive_failures + 1, 4), \ + last_failure = NOW(), \ + disabled_until = NOW() + make_interval(secs => \ + 60.0 * power(3, LEAST(consecutive_failures + 1, 4) - 1) + (random() * 30)::int \ + ) \ + WHERE id = $1 RETURNING *", + ) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Remote builder {id} not found"))) +} + +/// Record a build success for a remote builder. +/// Resets consecutive_failures and clears disabled_until. +pub async fn record_success(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, RemoteBuilder>( + "UPDATE remote_builders SET \ + consecutive_failures = 0, \ + disabled_until = NULL \ + WHERE id = $1 RETURNING *", + ) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Remote builder {id} not found"))) +} + pub async fn update( pool: &PgPool, id: Uuid, diff --git a/crates/common/src/validate.rs b/crates/common/src/validate.rs index cacc5fa..8fe3010 100644 --- a/crates/common/src/validate.rs +++ b/crates/common/src/validate.rs @@ -608,6 +608,7 @@ mod tests { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }; assert!(j.validate().is_ok()); } @@ -624,6 +625,7 @@ mod tests { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }; assert!(j.validate().is_err()); } diff --git a/crates/common/tests/repo_tests.rs b/crates/common/tests/repo_tests.rs index 0cc08ce..5244da2 100644 --- a/crates/common/tests/repo_tests.rs +++ b/crates/common/tests/repo_tests.rs @@ -50,6 +50,7 @@ async fn create_test_jobset( branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset") @@ -193,6 +194,7 @@ async fn test_jobset_crud() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); @@ -222,6 +224,7 @@ async fn test_jobset_crud() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("update jobset"); diff --git a/crates/common/tests/search_tests.rs b/crates/common/tests/search_tests.rs index 98d0bf2..a97ae29 100644 --- a/crates/common/tests/search_tests.rs +++ b/crates/common/tests/search_tests.rs @@ -117,6 +117,7 @@ async fn test_build_search_with_filters() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); @@ -263,6 +264,7 @@ async fn test_multi_entity_search() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); @@ -481,6 +483,7 @@ async fn test_quick_search() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); diff --git a/crates/common/tests/user_management_tests.rs b/crates/common/tests/user_management_tests.rs index e0f8378..680ae82 100644 --- a/crates/common/tests/user_management_tests.rs +++ b/crates/common/tests/user_management_tests.rs @@ -364,6 +364,7 @@ async fn test_starred_jobs_crud() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); @@ -475,6 +476,7 @@ async fn test_starred_jobs_delete_by_job() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); diff --git a/crates/evaluator/src/eval_loop.rs b/crates/evaluator/src/eval_loop.rs index 6274ca9..2d21d5f 100644 --- a/crates/evaluator/src/eval_loop.rs +++ b/crates/evaluator/src/eval_loop.rs @@ -528,6 +528,7 @@ async fn check_declarative_config( branch: None, scheduling_shares: None, state: None, + keep_nr: None, }; if let Err(e) = repo::jobsets::upsert(pool, input).await { tracing::warn!("Failed to upsert declarative jobset: {e}"); diff --git a/crates/queue-runner/tests/runner_tests.rs b/crates/queue-runner/tests/runner_tests.rs index 7ea6312..ff4f4a9 100644 --- a/crates/queue-runner/tests/runner_tests.rs +++ b/crates/queue-runner/tests/runner_tests.rs @@ -282,6 +282,7 @@ async fn test_fair_share_scheduling() { branch: None, scheduling_shares: Some(200), state: None, + keep_nr: None, }) .await .expect("create jobset hi"); @@ -297,6 +298,7 @@ async fn test_fair_share_scheduling() { branch: None, scheduling_shares: Some(100), state: None, + keep_nr: None, }) .await .expect("create jobset lo"); @@ -487,6 +489,7 @@ async fn test_atomic_build_claiming() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); @@ -579,6 +582,7 @@ async fn test_orphan_build_reset() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); @@ -684,6 +688,7 @@ async fn test_get_cancelled_among() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset"); diff --git a/crates/server/src/routes/projects.rs b/crates/server/src/routes/projects.rs index 04b27e7..7dfacb8 100644 --- a/crates/server/src/routes/projects.rs +++ b/crates/server/src/routes/projects.rs @@ -173,6 +173,7 @@ async fn create_project_jobset( branch: body.branch, scheduling_shares: body.scheduling_shares, state: body.state, + keep_nr: None, }; input .validate() @@ -265,6 +266,7 @@ async fn setup_project( branch: None, scheduling_shares: None, state: None, + keep_nr: None, }; input .validate() diff --git a/crates/server/tests/e2e_test.rs b/crates/server/tests/e2e_test.rs index 81f2fac..32c62a7 100644 --- a/crates/server/tests/e2e_test.rs +++ b/crates/server/tests/e2e_test.rs @@ -64,6 +64,7 @@ async fn test_e2e_project_eval_build_flow() { branch: None, scheduling_shares: None, state: None, + keep_nr: None, }) .await .expect("create jobset");