From 1df28f60490f5b7a32d4aab515fb83c69834c92d Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 7 Feb 2026 23:32:25 +0300 Subject: [PATCH] fc-common: add jobset state enum; add db migration" I'm going to delete all migrations one of those days... Signed-off-by: NotAShelf Change-Id: I8e2e32118c2d85438a8b343614038eda6a6a6964 --- .../common/migrations/011_jobset_states.sql | 39 +++++++ crates/common/src/models.rs | 53 ++++++++- crates/common/src/repo/jobsets.rs | 104 ++++++++++++++++-- 3 files changed, 180 insertions(+), 16 deletions(-) create mode 100644 crates/common/migrations/011_jobset_states.sql diff --git a/crates/common/migrations/011_jobset_states.sql b/crates/common/migrations/011_jobset_states.sql new file mode 100644 index 0000000..0a5f8c4 --- /dev/null +++ b/crates/common/migrations/011_jobset_states.sql @@ -0,0 +1,39 @@ +-- Migration: Add jobset states for Hydra-compatible scheduling +-- Supports 4 states: disabled, enabled, one_shot, one_at_a_time + +-- Add state column with CHECK constraint +ALTER TABLE jobsets ADD COLUMN state VARCHAR(50) NOT NULL DEFAULT 'enabled' + CHECK (state IN ('disabled', 'enabled', 'one_shot', 'one_at_a_time')); + +-- Migrate existing data based on enabled column +UPDATE jobsets SET state = CASE WHEN enabled THEN 'enabled' ELSE 'disabled' END; + +-- Add last_checked_at for per-jobset interval tracking +ALTER TABLE jobsets ADD COLUMN last_checked_at TIMESTAMP WITH TIME ZONE; + +-- Drop and recreate active_jobsets view to include new columns +DROP VIEW IF EXISTS active_jobsets; +CREATE VIEW active_jobsets AS +SELECT + j.id, + j.project_id, + j.name, + j.nix_expression, + j.enabled, + j.flake_mode, + j.check_interval, + j.branch, + j.scheduling_shares, + j.created_at, + j.updated_at, + j.state, + j.last_checked_at, + p.name as project_name, + p.repository_url +FROM jobsets j +JOIN projects p ON j.project_id = p.id +WHERE j.state IN ('enabled', 'one_shot', 'one_at_a_time'); + +-- Indexes for efficient queries +CREATE INDEX idx_jobsets_state ON jobsets(state); +CREATE INDEX idx_jobsets_last_checked_at ON jobsets(last_checked_at); diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index 7b48808..86e9365 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -28,6 +28,8 @@ pub struct Jobset { pub scheduling_shares: i32, pub created_at: DateTime, pub updated_at: DateTime, + pub state: JobsetState, + pub last_checked_at: Option>, } #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] @@ -45,7 +47,7 @@ pub struct Evaluation { pub pr_action: Option, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)] #[sqlx(type_name = "text", rename_all = "lowercase")] pub enum EvaluationStatus { Pending, @@ -54,6 +56,43 @@ pub enum EvaluationStatus { Failed, } +/// Jobset scheduling state (Hydra-compatible). +/// +/// - `Disabled`: Jobset will not be evaluated +/// - `Enabled`: Normal operation, evaluated according to `check_interval` +/// - `OneShot`: Evaluated once, then automatically set to Disabled +/// - `OneAtATime`: Only one build can run at a time for this jobset +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, Default, +)] +#[sqlx(type_name = "text", rename_all = "snake_case")] +pub enum JobsetState { + Disabled, + #[default] + Enabled, + OneShot, + OneAtATime, +} + +impl JobsetState { + /// Returns true if this jobset state allows evaluation. + #[must_use] + pub const fn is_evaluable(&self) -> bool { + matches!(self, Self::Enabled | Self::OneShot | Self::OneAtATime) + } + + /// Returns the database string representation of this state. + #[must_use] + pub const fn as_str(&self) -> &'static str { + match self { + Self::Disabled => "disabled", + Self::Enabled => "enabled", + Self::OneShot => "one_shot", + Self::OneAtATime => "one_at_a_time", + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct Build { pub id: Uuid, @@ -80,7 +119,7 @@ pub struct Build { pub signed: bool, } -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq, Eq)] #[sqlx(type_name = "text", rename_all = "lowercase")] pub enum BuildStatus { Pending, @@ -138,11 +177,13 @@ pub struct ActiveJobset { pub scheduling_shares: i32, pub created_at: DateTime, pub updated_at: DateTime, + pub state: JobsetState, + pub last_checked_at: Option>, pub project_name: String, pub repository_url: String, } -/// Build statistics from the build_stats view. +/// Build statistics from the `build_stats` view. #[derive(Debug, Clone, Serialize, Deserialize, FromRow, Default)] pub struct BuildStats { pub total_builds: Option, @@ -248,7 +289,7 @@ pub struct User { pub last_login_at: Option>, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)] #[sqlx(type_name = "text", rename_all = "lowercase")] pub enum UserType { Local, @@ -297,10 +338,12 @@ pub struct PaginationParams { } impl PaginationParams { + #[must_use] pub fn limit(&self) -> i64 { self.limit.unwrap_or(50).clamp(1, 200) } + #[must_use] pub fn offset(&self) -> i64 { self.offset.unwrap_or(0).max(0) } @@ -349,6 +392,7 @@ pub struct CreateJobset { pub check_interval: Option, pub branch: Option, pub scheduling_shares: Option, + pub state: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -360,6 +404,7 @@ pub struct UpdateJobset { pub check_interval: Option, pub branch: Option, pub scheduling_shares: Option, + pub state: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/common/src/repo/jobsets.rs b/crates/common/src/repo/jobsets.rs index cbe0885..12abab1 100644 --- a/crates/common/src/repo/jobsets.rs +++ b/crates/common/src/repo/jobsets.rs @@ -3,19 +3,26 @@ use uuid::Uuid; use crate::{ error::{CiError, Result}, - models::{ActiveJobset, CreateJobset, Jobset, UpdateJobset}, + models::{ActiveJobset, CreateJobset, Jobset, JobsetState, UpdateJobset}, }; pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { - let enabled = input.enabled.unwrap_or(true); + let state = input.state.unwrap_or(JobsetState::Enabled); + // Sync enabled with state if state was explicitly set, otherwise use + // input.enabled + let enabled = if input.state.is_some() { + state.is_evaluable() + } else { + input.enabled.unwrap_or_else(|| state.is_evaluable()) + }; let flake_mode = input.flake_mode.unwrap_or(true); let check_interval = input.check_interval.unwrap_or(60); let scheduling_shares = input.scheduling_shares.unwrap_or(100); sqlx::query_as::<_, Jobset>( "INSERT INTO jobsets (project_id, name, nix_expression, enabled, \ - flake_mode, check_interval, branch, scheduling_shares) VALUES ($1, $2, \ - $3, $4, $5, $6, $7, $8) RETURNING *", + flake_mode, check_interval, branch, scheduling_shares, state) VALUES \ + ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *", ) .bind(input.project_id) .bind(&input.name) @@ -25,6 +32,7 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { .bind(check_interval) .bind(&input.branch) .bind(scheduling_shares) + .bind(state.as_str()) .fetch_one(pool) .await .map_err(|e| { @@ -85,7 +93,13 @@ pub async fn update( let name = input.name.unwrap_or(existing.name); let nix_expression = input.nix_expression.unwrap_or(existing.nix_expression); - let enabled = input.enabled.unwrap_or(existing.enabled); + let state = input.state.unwrap_or(existing.state); + // Sync enabled with state if state was explicitly set + let enabled = if input.state.is_some() { + state.is_evaluable() + } else { + input.enabled.unwrap_or(existing.enabled) + }; let flake_mode = input.flake_mode.unwrap_or(existing.flake_mode); let check_interval = input.check_interval.unwrap_or(existing.check_interval); let branch = input.branch.or(existing.branch); @@ -96,7 +110,7 @@ pub async fn update( sqlx::query_as::<_, Jobset>( "UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, \ flake_mode = $4, check_interval = $5, branch = $6, scheduling_shares = \ - $7 WHERE id = $8 RETURNING *", + $7, state = $8 WHERE id = $9 RETURNING *", ) .bind(&name) .bind(&nix_expression) @@ -105,6 +119,7 @@ pub async fn update( .bind(check_interval) .bind(&branch) .bind(scheduling_shares) + .bind(state.as_str()) .bind(id) .fetch_one(pool) .await @@ -134,19 +149,26 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { } pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { - let enabled = input.enabled.unwrap_or(true); + let state = input.state.unwrap_or(JobsetState::Enabled); + // Sync enabled with state if state was explicitly set, otherwise use + // input.enabled + let enabled = if input.state.is_some() { + state.is_evaluable() + } else { + input.enabled.unwrap_or_else(|| state.is_evaluable()) + }; let flake_mode = input.flake_mode.unwrap_or(true); let check_interval = input.check_interval.unwrap_or(60); let scheduling_shares = input.scheduling_shares.unwrap_or(100); sqlx::query_as::<_, Jobset>( "INSERT INTO jobsets (project_id, name, nix_expression, enabled, \ - flake_mode, check_interval, branch, scheduling_shares) VALUES ($1, $2, \ - $3, $4, $5, $6, $7, $8) ON CONFLICT (project_id, name) DO UPDATE SET \ - nix_expression = EXCLUDED.nix_expression, enabled = EXCLUDED.enabled, \ - flake_mode = EXCLUDED.flake_mode, check_interval = \ + flake_mode, check_interval, branch, scheduling_shares, state) VALUES \ + ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (project_id, name) DO \ + UPDATE SET nix_expression = EXCLUDED.nix_expression, enabled = \ + EXCLUDED.enabled, flake_mode = EXCLUDED.flake_mode, check_interval = \ EXCLUDED.check_interval, branch = EXCLUDED.branch, scheduling_shares = \ - EXCLUDED.scheduling_shares RETURNING *", + EXCLUDED.scheduling_shares, state = EXCLUDED.state RETURNING *", ) .bind(input.project_id) .bind(&input.name) @@ -156,6 +178,7 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { .bind(check_interval) .bind(&input.branch) .bind(scheduling_shares) + .bind(state.as_str()) .fetch_one(pool) .await .map_err(CiError::Database) @@ -167,3 +190,60 @@ pub async fn list_active(pool: &PgPool) -> Result> { .await .map_err(CiError::Database) } + +/// Mark a one-shot jobset as complete (set state to disabled). +pub async fn mark_one_shot_complete(pool: &PgPool, id: Uuid) -> Result<()> { + sqlx::query( + "UPDATE jobsets SET state = 'disabled', enabled = false WHERE id = $1 AND \ + state = 'one_shot'", + ) + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + Ok(()) +} + +/// Update the `last_checked_at` timestamp for a jobset. +pub async fn update_last_checked(pool: &PgPool, id: Uuid) -> Result<()> { + sqlx::query("UPDATE jobsets SET last_checked_at = NOW() WHERE id = $1") + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + Ok(()) +} + +/// Check if a jobset has any running builds. +pub async fn has_running_builds( + pool: &PgPool, + jobset_id: Uuid, +) -> Result { + let (count,): (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM builds b JOIN evaluations e ON b.evaluation_id = \ + e.id WHERE e.jobset_id = $1 AND b.status = 'running'", + ) + .bind(jobset_id) + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(count > 0) +} + +/// List jobsets that are due for evaluation based on their `check_interval`. +/// Returns jobsets where `last_checked_at` is NULL or older than `check_interval` +/// seconds. +pub async fn list_due_for_eval( + pool: &PgPool, + limit: i64, +) -> Result> { + sqlx::query_as::<_, ActiveJobset>( + "SELECT * FROM active_jobsets WHERE last_checked_at IS NULL OR \ + last_checked_at < NOW() - (check_interval || ' seconds')::interval ORDER \ + BY last_checked_at NULLS FIRST LIMIT $1", + ) + .bind(limit) + .fetch_all(pool) + .await + .map_err(CiError::Database) +}