fc-common: add jobset state enum; add db migration"

I'm going to delete all migrations one of those days...

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I8e2e32118c2d85438a8b343614038eda6a6a6964
This commit is contained in:
raf 2026-02-07 23:32:25 +03:00
commit 1df28f6049
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
3 changed files with 180 additions and 16 deletions

View file

@ -0,0 +1,39 @@
-- Migration: Add jobset states for Hydra-compatible scheduling
-- Supports 4 states: disabled, enabled, one_shot, one_at_a_time
-- Add state column with CHECK constraint
ALTER TABLE jobsets ADD COLUMN state VARCHAR(50) NOT NULL DEFAULT 'enabled'
CHECK (state IN ('disabled', 'enabled', 'one_shot', 'one_at_a_time'));
-- Migrate existing data based on enabled column
UPDATE jobsets SET state = CASE WHEN enabled THEN 'enabled' ELSE 'disabled' END;
-- Add last_checked_at for per-jobset interval tracking
ALTER TABLE jobsets ADD COLUMN last_checked_at TIMESTAMP WITH TIME ZONE;
-- Drop and recreate active_jobsets view to include new columns
DROP VIEW IF EXISTS active_jobsets;
CREATE VIEW active_jobsets AS
SELECT
j.id,
j.project_id,
j.name,
j.nix_expression,
j.enabled,
j.flake_mode,
j.check_interval,
j.branch,
j.scheduling_shares,
j.created_at,
j.updated_at,
j.state,
j.last_checked_at,
p.name as project_name,
p.repository_url
FROM jobsets j
JOIN projects p ON j.project_id = p.id
WHERE j.state IN ('enabled', 'one_shot', 'one_at_a_time');
-- Indexes for efficient queries
CREATE INDEX idx_jobsets_state ON jobsets(state);
CREATE INDEX idx_jobsets_last_checked_at ON jobsets(last_checked_at);

View file

@ -28,6 +28,8 @@ pub struct Jobset {
pub scheduling_shares: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub state: JobsetState,
pub last_checked_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
@ -45,7 +47,7 @@ pub struct Evaluation {
pub pr_action: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::Type)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "text", rename_all = "lowercase")]
pub enum EvaluationStatus {
Pending,
@ -54,6 +56,43 @@ pub enum EvaluationStatus {
Failed,
}
/// Jobset scheduling state (Hydra-compatible).
///
/// - `Disabled`: Jobset will not be evaluated
/// - `Enabled`: Normal operation, evaluated according to `check_interval`
/// - `OneShot`: Evaluated once, then automatically set to Disabled
/// - `OneAtATime`: Only one build can run at a time for this jobset
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, Default,
)]
#[sqlx(type_name = "text", rename_all = "snake_case")]
pub enum JobsetState {
Disabled,
#[default]
Enabled,
OneShot,
OneAtATime,
}
impl JobsetState {
/// Returns true if this jobset state allows evaluation.
#[must_use]
pub const fn is_evaluable(&self) -> bool {
matches!(self, Self::Enabled | Self::OneShot | Self::OneAtATime)
}
/// Returns the database string representation of this state.
#[must_use]
pub const fn as_str(&self) -> &'static str {
match self {
Self::Disabled => "disabled",
Self::Enabled => "enabled",
Self::OneShot => "one_shot",
Self::OneAtATime => "one_at_a_time",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Build {
pub id: Uuid,
@ -80,7 +119,7 @@ pub struct Build {
pub signed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq, Eq)]
#[sqlx(type_name = "text", rename_all = "lowercase")]
pub enum BuildStatus {
Pending,
@ -138,11 +177,13 @@ pub struct ActiveJobset {
pub scheduling_shares: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub state: JobsetState,
pub last_checked_at: Option<DateTime<Utc>>,
pub project_name: String,
pub repository_url: String,
}
/// Build statistics from the build_stats view.
/// Build statistics from the `build_stats` view.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow, Default)]
pub struct BuildStats {
pub total_builds: Option<i64>,
@ -248,7 +289,7 @@ pub struct User {
pub last_login_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::Type)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "text", rename_all = "lowercase")]
pub enum UserType {
Local,
@ -297,10 +338,12 @@ pub struct PaginationParams {
}
impl PaginationParams {
#[must_use]
pub fn limit(&self) -> i64 {
self.limit.unwrap_or(50).clamp(1, 200)
}
#[must_use]
pub fn offset(&self) -> i64 {
self.offset.unwrap_or(0).max(0)
}
@ -349,6 +392,7 @@ pub struct CreateJobset {
pub check_interval: Option<i32>,
pub branch: Option<String>,
pub scheduling_shares: Option<i32>,
pub state: Option<JobsetState>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -360,6 +404,7 @@ pub struct UpdateJobset {
pub check_interval: Option<i32>,
pub branch: Option<String>,
pub scheduling_shares: Option<i32>,
pub state: Option<JobsetState>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View file

@ -3,19 +3,26 @@ use uuid::Uuid;
use crate::{
error::{CiError, Result},
models::{ActiveJobset, CreateJobset, Jobset, UpdateJobset},
models::{ActiveJobset, CreateJobset, Jobset, JobsetState, UpdateJobset},
};
pub async fn create(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
let enabled = input.enabled.unwrap_or(true);
let state = input.state.unwrap_or(JobsetState::Enabled);
// Sync enabled with state if state was explicitly set, otherwise use
// input.enabled
let enabled = if input.state.is_some() {
state.is_evaluable()
} else {
input.enabled.unwrap_or_else(|| state.is_evaluable())
};
let flake_mode = input.flake_mode.unwrap_or(true);
let check_interval = input.check_interval.unwrap_or(60);
let scheduling_shares = input.scheduling_shares.unwrap_or(100);
sqlx::query_as::<_, Jobset>(
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, \
flake_mode, check_interval, branch, scheduling_shares) VALUES ($1, $2, \
$3, $4, $5, $6, $7, $8) RETURNING *",
flake_mode, check_interval, branch, scheduling_shares, state) VALUES \
($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *",
)
.bind(input.project_id)
.bind(&input.name)
@ -25,6 +32,7 @@ pub async fn create(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
.bind(check_interval)
.bind(&input.branch)
.bind(scheduling_shares)
.bind(state.as_str())
.fetch_one(pool)
.await
.map_err(|e| {
@ -85,7 +93,13 @@ pub async fn update(
let name = input.name.unwrap_or(existing.name);
let nix_expression = input.nix_expression.unwrap_or(existing.nix_expression);
let enabled = input.enabled.unwrap_or(existing.enabled);
let state = input.state.unwrap_or(existing.state);
// Sync enabled with state if state was explicitly set
let enabled = if input.state.is_some() {
state.is_evaluable()
} else {
input.enabled.unwrap_or(existing.enabled)
};
let flake_mode = input.flake_mode.unwrap_or(existing.flake_mode);
let check_interval = input.check_interval.unwrap_or(existing.check_interval);
let branch = input.branch.or(existing.branch);
@ -96,7 +110,7 @@ pub async fn update(
sqlx::query_as::<_, Jobset>(
"UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, \
flake_mode = $4, check_interval = $5, branch = $6, scheduling_shares = \
$7 WHERE id = $8 RETURNING *",
$7, state = $8 WHERE id = $9 RETURNING *",
)
.bind(&name)
.bind(&nix_expression)
@ -105,6 +119,7 @@ pub async fn update(
.bind(check_interval)
.bind(&branch)
.bind(scheduling_shares)
.bind(state.as_str())
.bind(id)
.fetch_one(pool)
.await
@ -134,19 +149,26 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
}
pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
let enabled = input.enabled.unwrap_or(true);
let state = input.state.unwrap_or(JobsetState::Enabled);
// Sync enabled with state if state was explicitly set, otherwise use
// input.enabled
let enabled = if input.state.is_some() {
state.is_evaluable()
} else {
input.enabled.unwrap_or_else(|| state.is_evaluable())
};
let flake_mode = input.flake_mode.unwrap_or(true);
let check_interval = input.check_interval.unwrap_or(60);
let scheduling_shares = input.scheduling_shares.unwrap_or(100);
sqlx::query_as::<_, Jobset>(
"INSERT INTO jobsets (project_id, name, nix_expression, enabled, \
flake_mode, check_interval, branch, scheduling_shares) VALUES ($1, $2, \
$3, $4, $5, $6, $7, $8) ON CONFLICT (project_id, name) DO UPDATE SET \
nix_expression = EXCLUDED.nix_expression, enabled = EXCLUDED.enabled, \
flake_mode = EXCLUDED.flake_mode, check_interval = \
flake_mode, check_interval, branch, scheduling_shares, state) VALUES \
($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (project_id, name) DO \
UPDATE SET nix_expression = EXCLUDED.nix_expression, enabled = \
EXCLUDED.enabled, flake_mode = EXCLUDED.flake_mode, check_interval = \
EXCLUDED.check_interval, branch = EXCLUDED.branch, scheduling_shares = \
EXCLUDED.scheduling_shares RETURNING *",
EXCLUDED.scheduling_shares, state = EXCLUDED.state RETURNING *",
)
.bind(input.project_id)
.bind(&input.name)
@ -156,6 +178,7 @@ pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result<Jobset> {
.bind(check_interval)
.bind(&input.branch)
.bind(scheduling_shares)
.bind(state.as_str())
.fetch_one(pool)
.await
.map_err(CiError::Database)
@ -167,3 +190,60 @@ pub async fn list_active(pool: &PgPool) -> Result<Vec<ActiveJobset>> {
.await
.map_err(CiError::Database)
}
/// Mark a one-shot jobset as complete (set state to disabled).
pub async fn mark_one_shot_complete(pool: &PgPool, id: Uuid) -> Result<()> {
sqlx::query(
"UPDATE jobsets SET state = 'disabled', enabled = false WHERE id = $1 AND \
state = 'one_shot'",
)
.bind(id)
.execute(pool)
.await
.map_err(CiError::Database)?;
Ok(())
}
/// Update the `last_checked_at` timestamp for a jobset.
pub async fn update_last_checked(pool: &PgPool, id: Uuid) -> Result<()> {
sqlx::query("UPDATE jobsets SET last_checked_at = NOW() WHERE id = $1")
.bind(id)
.execute(pool)
.await
.map_err(CiError::Database)?;
Ok(())
}
/// Check if a jobset has any running builds.
pub async fn has_running_builds(
pool: &PgPool,
jobset_id: Uuid,
) -> Result<bool> {
let (count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM builds b JOIN evaluations e ON b.evaluation_id = \
e.id WHERE e.jobset_id = $1 AND b.status = 'running'",
)
.bind(jobset_id)
.fetch_one(pool)
.await
.map_err(CiError::Database)?;
Ok(count > 0)
}
/// List jobsets that are due for evaluation based on their `check_interval`.
/// Returns jobsets where `last_checked_at` is NULL or older than `check_interval`
/// seconds.
pub async fn list_due_for_eval(
pool: &PgPool,
limit: i64,
) -> Result<Vec<ActiveJobset>> {
sqlx::query_as::<_, ActiveJobset>(
"SELECT * FROM active_jobsets WHERE last_checked_at IS NULL OR \
last_checked_at < NOW() - (check_interval || ' seconds')::interval ORDER \
BY last_checked_at NULLS FIRST LIMIT $1",
)
.bind(limit)
.fetch_all(pool)
.await
.map_err(CiError::Database)
}