fc-common: add declarative sync for remote builders and channels

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I3dae89f04777f6d941824606aebe34446a6a6964
This commit is contained in:
raf 2026-02-08 21:16:32 +03:00
commit d4d9297d96
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
6 changed files with 455 additions and 27 deletions

View file

@ -4,16 +4,68 @@
//! Called once on server startup to reconcile declarative configuration
//! with database state. Uses upsert semantics so repeated runs are idempotent.
use std::collections::HashMap;
use sha2::{Digest, Sha256};
use sqlx::PgPool;
use uuid::Uuid;
use crate::{
config::DeclarativeConfig,
config::{DeclarativeConfig, DeclarativeWebhook},
error::Result,
models::{CreateJobset, CreateProject},
models::{CreateJobset, CreateProject, JobsetState},
repo,
};
/// Expand path with environment variables and home directory.
/// Supports ${VAR}, $VAR, and ~ for home directory.
fn expand_path(path: &str) -> String {
let expanded = if path.starts_with('~') {
if let Some(home) = std::env::var_os("HOME") {
path.replacen('~', &home.to_string_lossy(), 1)
} else {
path.to_string()
}
} else {
path.to_string()
};
// Expand ${VAR} and $VAR patterns
let mut result = expanded;
while let Some(start) = result.find("${") {
if let Some(end) = result[start..].find('}') {
let var_name = &result[start + 2..start + end];
let replacement = std::env::var(var_name).unwrap_or_default();
result = format!("{}{}{}", &result[..start], replacement, &result[start + end + 1..]);
} else {
break;
}
}
result
}
/// Resolve secret for a webhook from inline value or file.
fn resolve_webhook_secret(webhook: &DeclarativeWebhook) -> Option<String> {
if let Some(ref secret) = webhook.secret {
Some(secret.clone())
} else if let Some(ref file) = webhook.secret_file {
let expanded = expand_path(file);
match std::fs::read_to_string(&expanded) {
Ok(s) => Some(s.trim().to_string()),
Err(e) => {
tracing::warn!(
forge_type = %webhook.forge_type,
file = %expanded,
"Failed to read webhook secret file: {e}"
);
None
},
}
} else {
None
}
}
/// Bootstrap declarative configuration into the database.
///
/// This function is idempotent: running it multiple times with the same config
@ -23,6 +75,7 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
if config.projects.is_empty()
&& config.api_keys.is_empty()
&& config.users.is_empty()
&& config.remote_builders.is_empty()
{
return Ok(());
}
@ -31,12 +84,14 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
let n_jobsets: usize = config.projects.iter().map(|p| p.jobsets.len()).sum();
let n_keys = config.api_keys.len();
let n_users = config.users.len();
let n_builders = config.remote_builders.len();
tracing::info!(
projects = n_projects,
jobsets = n_jobsets,
api_keys = n_keys,
users = n_users,
remote_builders = n_builders,
"Bootstrapping declarative configuration"
);
@ -56,6 +111,15 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
);
for decl_jobset in &decl_project.jobsets {
// Parse state string to JobsetState enum
let state = decl_jobset.state.as_ref().map(|s| match s.as_str() {
"disabled" => JobsetState::Disabled,
"enabled" => JobsetState::Enabled,
"one_shot" => JobsetState::OneShot,
"one_at_a_time" => JobsetState::OneAtATime,
_ => JobsetState::Enabled, // Default to enabled for unknown values
});
let jobset = repo::jobsets::upsert(pool, CreateJobset {
project_id: project.id,
name: decl_jobset.name.clone(),
@ -63,9 +127,9 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
enabled: Some(decl_jobset.enabled),
flake_mode: Some(decl_jobset.flake_mode),
check_interval: Some(decl_jobset.check_interval),
branch: None,
scheduling_shares: None,
state: None,
branch: decl_jobset.branch.clone(),
scheduling_shares: Some(decl_jobset.scheduling_shares),
state,
})
.await?;
@ -74,7 +138,71 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
jobset = %jobset.name,
"Upserted declarative jobset"
);
// Sync jobset inputs
if !decl_jobset.inputs.is_empty() {
repo::jobset_inputs::sync_for_jobset(pool, jobset.id, &decl_jobset.inputs)
.await?;
tracing::info!(
project = %project.name,
jobset = %jobset.name,
inputs = decl_jobset.inputs.len(),
"Synced declarative jobset inputs"
);
}
}
// Build jobset name -> ID map for channel resolution
let jobset_map: HashMap<String, Uuid> = {
let jobsets =
repo::jobsets::list_for_project(pool, project.id, 1000, 0).await?;
jobsets.into_iter().map(|j| (j.name, j.id)).collect()
};
// Sync notifications
if !decl_project.notifications.is_empty() {
repo::notification_configs::sync_for_project(
pool,
project.id,
&decl_project.notifications,
)
.await?;
tracing::info!(
project = %project.name,
notifications = decl_project.notifications.len(),
"Synced declarative notifications"
);
}
// Sync webhooks
if !decl_project.webhooks.is_empty() {
repo::webhook_configs::sync_for_project(
pool,
project.id,
&decl_project.webhooks,
resolve_webhook_secret,
)
.await?;
tracing::info!(
project = %project.name,
webhooks = decl_project.webhooks.len(),
"Synced declarative webhooks"
);
}
// Sync channels
if !decl_project.channels.is_empty() {
repo::channels::sync_for_project(pool, project.id, &decl_project.channels, |name| {
jobset_map.get(name).copied()
})
.await?;
tracing::info!(
project = %project.name,
channels = decl_project.channels.len(),
"Synced declarative channels"
);
}
}
// Upsert API keys
@ -100,12 +228,13 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
let password = if let Some(ref p) = decl_user.password {
Some(p.clone())
} else if let Some(ref file) = decl_user.password_file {
match std::fs::read_to_string(file) {
let expanded = expand_path(file);
match std::fs::read_to_string(&expanded) {
Ok(p) => Some(p.trim().to_string()),
Err(e) => {
tracing::warn!(
username = %decl_user.username,
file = %file,
file = %expanded,
"Failed to read password file: {e}"
);
None
@ -180,6 +309,45 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> {
}
}
// Sync remote builders
if !config.remote_builders.is_empty() {
repo::remote_builders::sync_all(pool, &config.remote_builders).await?;
tracing::info!(
builders = config.remote_builders.len(),
"Synced declarative remote builders"
);
}
// Build username -> user ID map for project member resolution
let user_map: HashMap<String, Uuid> = {
// Get all users (use large limit to get all)
let users = repo::users::list(pool, 10000, 0).await?;
users.into_iter().map(|u| (u.username, u.id)).collect()
};
// Sync project members (now that users exist)
for decl_project in &config.projects {
if decl_project.members.is_empty() {
continue;
}
// Get project by name (already exists from earlier upsert)
if let Ok(project) = repo::projects::get_by_name(pool, &decl_project.name).await {
repo::project_members::sync_for_project(
pool,
project.id,
&decl_project.members,
|username| user_map.get(username).copied(),
)
.await?;
tracing::info!(
project = %project.name,
members = decl_project.members.len(),
"Synced declarative project members"
);
}
}
tracing::info!("Declarative bootstrap complete");
Ok(())
}

View file

@ -159,9 +159,41 @@ pub struct CacheUploadConfig {
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct DeclarativeConfig {
pub projects: Vec<DeclarativeProject>,
pub api_keys: Vec<DeclarativeApiKey>,
pub users: Vec<DeclarativeUser>,
pub projects: Vec<DeclarativeProject>,
pub api_keys: Vec<DeclarativeApiKey>,
pub users: Vec<DeclarativeUser>,
/// Remote builder definitions for distributed builds
pub remote_builders: Vec<DeclarativeRemoteBuilder>,
}
/// Declarative remote builder configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeRemoteBuilder {
pub name: String,
pub ssh_uri: String,
pub systems: Vec<String>,
#[serde(default = "default_max_jobs")]
pub max_jobs: i32,
#[serde(default = "default_speed_factor")]
pub speed_factor: i32,
#[serde(default)]
pub supported_features: Vec<String>,
#[serde(default)]
pub mandatory_features: Vec<String>,
/// Path to SSH private key file (for production)
pub ssh_key_file: Option<String>,
/// SSH public host key for verification
pub public_host_key: Option<String>,
#[serde(default = "default_true")]
pub enabled: bool,
}
const fn default_max_jobs() -> i32 {
1
}
const fn default_speed_factor() -> i32 {
1
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -171,18 +203,97 @@ pub struct DeclarativeProject {
pub description: Option<String>,
#[serde(default)]
pub jobsets: Vec<DeclarativeJobset>,
/// Notification configurations for this project
#[serde(default)]
pub notifications: Vec<DeclarativeNotification>,
/// Webhook configurations for this project
#[serde(default)]
pub webhooks: Vec<DeclarativeWebhook>,
/// Release channels for this project
#[serde(default)]
pub channels: Vec<DeclarativeChannel>,
/// Project members with their roles
#[serde(default)]
pub members: Vec<DeclarativeProjectMember>,
}
/// Declarative notification configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeNotification {
/// Notification type: github_status, email, gitlab_status, gitea_status, run_command
pub notification_type: String,
/// Type-specific configuration (JSON object)
pub config: serde_json::Value,
#[serde(default = "default_true")]
pub enabled: bool,
}
/// Declarative webhook configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeWebhook {
/// Forge type: github, gitea, gitlab
pub forge_type: String,
/// Webhook secret (inline, for dev/testing only)
pub secret: Option<String>,
/// Path to a file containing the webhook secret (for production)
pub secret_file: Option<String>,
#[serde(default = "default_true")]
pub enabled: bool,
}
/// Declarative channel configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeChannel {
pub name: String,
/// Name of the jobset this channel tracks (resolved during bootstrap)
pub jobset_name: String,
}
/// Declarative project member configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeProjectMember {
/// Username of the member (must exist in users)
pub username: String,
/// Role: member, maintainer, or admin
#[serde(default = "default_member_role")]
pub role: String,
}
fn default_member_role() -> String {
"member".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeJobset {
pub name: String,
pub nix_expression: String,
pub name: String,
pub nix_expression: String,
#[serde(default = "default_true")]
pub enabled: bool,
pub enabled: bool,
#[serde(default = "default_true")]
pub flake_mode: bool,
pub flake_mode: bool,
#[serde(default = "default_check_interval")]
pub check_interval: i32,
pub check_interval: i32,
/// Jobset state: disabled, enabled, one_shot, or one_at_a_time
pub state: Option<String>,
/// Git branch to track (defaults to repository default branch)
pub branch: Option<String>,
/// Scheduling priority shares (default 100, higher = more priority)
#[serde(default = "default_scheduling_shares")]
pub scheduling_shares: i32,
/// Jobset inputs for parameterized evaluations
#[serde(default)]
pub inputs: Vec<DeclarativeJobsetInput>,
}
/// Declarative jobset input for parameterized builds.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeclarativeJobsetInput {
pub name: String,
/// Input type: git, string, boolean, path, or build
pub input_type: String,
pub value: String,
/// Git revision (for git inputs)
pub revision: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -221,6 +332,10 @@ const fn default_check_interval() -> i32 {
60
}
const fn default_scheduling_shares() -> i32 {
100
}
fn default_role() -> String {
"admin".to_string()
}
@ -535,24 +650,33 @@ mod tests {
#[test]
fn test_declarative_config_serialization_roundtrip() {
let config = DeclarativeConfig {
projects: vec![DeclarativeProject {
projects: vec![DeclarativeProject {
name: "test".to_string(),
repository_url: "https://example.com/repo".to_string(),
description: Some("desc".to_string()),
jobsets: vec![DeclarativeJobset {
name: "checks".to_string(),
nix_expression: "checks".to_string(),
enabled: true,
flake_mode: true,
check_interval: 300,
name: "checks".to_string(),
nix_expression: "checks".to_string(),
enabled: true,
flake_mode: true,
check_interval: 300,
state: None,
branch: None,
scheduling_shares: 100,
inputs: vec![],
}],
notifications: vec![],
webhooks: vec![],
channels: vec![],
members: vec![],
}],
api_keys: vec![DeclarativeApiKey {
api_keys: vec![DeclarativeApiKey {
name: "test-key".to_string(),
key: "fc_test".to_string(),
role: "admin".to_string(),
}],
users: vec![],
users: vec![],
remote_builders: vec![],
};
let json = serde_json::to_string(&config).unwrap();

View file

@ -65,7 +65,8 @@ pub enum EvaluationStatus {
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, Default,
)]
#[sqlx(type_name = "text", rename_all = "snake_case")]
#[serde(rename_all = "snake_case")]
#[sqlx(type_name = "varchar", rename_all = "snake_case")]
pub enum JobsetState {
Disabled,
#[default]
@ -290,7 +291,7 @@ pub struct User {
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "text", rename_all = "lowercase")]
#[sqlx(type_name = "varchar", rename_all = "lowercase")]
pub enum UserType {
Local,
Github,

View file

@ -2,6 +2,7 @@ use sqlx::PgPool;
use uuid::Uuid;
use crate::{
config::DeclarativeChannel,
error::{CiError, Result},
models::{Channel, CreateChannel},
};
@ -86,6 +87,61 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> {
Ok(())
}
/// Upsert a channel (insert or update on conflict).
pub async fn upsert(
pool: &PgPool,
project_id: Uuid,
name: &str,
jobset_id: Uuid,
) -> Result<Channel> {
sqlx::query_as::<_, Channel>(
"INSERT INTO channels (project_id, name, jobset_id) VALUES ($1, $2, $3) \
ON CONFLICT (project_id, name) DO UPDATE SET jobset_id = EXCLUDED.jobset_id \
RETURNING *",
)
.bind(project_id)
.bind(name)
.bind(jobset_id)
.fetch_one(pool)
.await
.map_err(CiError::Database)
}
/// Sync channels from declarative config.
/// Deletes channels not in the declarative list and upserts those that are.
pub async fn sync_for_project(
pool: &PgPool,
project_id: Uuid,
channels: &[DeclarativeChannel],
resolve_jobset: impl Fn(&str) -> Option<Uuid>,
) -> Result<()> {
// Get channel names from declarative config
let names: Vec<&str> = channels.iter().map(|c| c.name.as_str()).collect();
// Delete channels not in declarative config
sqlx::query("DELETE FROM channels WHERE project_id = $1 AND name != ALL($2::text[])")
.bind(project_id)
.bind(&names)
.execute(pool)
.await
.map_err(CiError::Database)?;
// Upsert each channel
for channel in channels {
if let Some(jobset_id) = resolve_jobset(&channel.jobset_name) {
upsert(pool, project_id, &channel.name, jobset_id).await?;
} else {
tracing::warn!(
channel = %channel.name,
jobset_name = %channel.jobset_name,
"Could not resolve jobset for declarative channel"
);
}
}
Ok(())
}
/// Find the channel for a jobset and auto-promote if all builds in the
/// evaluation succeeded.
pub async fn auto_promote_if_complete(

View file

@ -2,6 +2,7 @@ use sqlx::PgPool;
use uuid::Uuid;
use crate::{
config::DeclarativeRemoteBuilder,
error::{CiError, Result},
models::{CreateRemoteBuilder, RemoteBuilder},
};
@ -133,3 +134,81 @@ pub async fn count(pool: &PgPool) -> Result<i64> {
.map_err(CiError::Database)?;
Ok(row.0)
}
/// Upsert a remote builder (insert or update on conflict by name).
pub async fn upsert(
pool: &PgPool,
name: &str,
ssh_uri: &str,
systems: &[String],
max_jobs: i32,
speed_factor: i32,
supported_features: &[String],
mandatory_features: &[String],
enabled: bool,
public_host_key: Option<&str>,
ssh_key_file: Option<&str>,
) -> Result<RemoteBuilder> {
sqlx::query_as::<_, RemoteBuilder>(
"INSERT INTO remote_builders (name, ssh_uri, systems, max_jobs, \
speed_factor, supported_features, mandatory_features, enabled, \
public_host_key, ssh_key_file) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, \
$10) ON CONFLICT (name) DO UPDATE SET ssh_uri = EXCLUDED.ssh_uri, systems = \
EXCLUDED.systems, max_jobs = EXCLUDED.max_jobs, speed_factor = \
EXCLUDED.speed_factor, supported_features = EXCLUDED.supported_features, \
mandatory_features = EXCLUDED.mandatory_features, enabled = \
EXCLUDED.enabled, public_host_key = COALESCE(EXCLUDED.public_host_key, \
remote_builders.public_host_key), ssh_key_file = \
COALESCE(EXCLUDED.ssh_key_file, remote_builders.ssh_key_file) RETURNING *",
)
.bind(name)
.bind(ssh_uri)
.bind(systems)
.bind(max_jobs)
.bind(speed_factor)
.bind(supported_features)
.bind(mandatory_features)
.bind(enabled)
.bind(public_host_key)
.bind(ssh_key_file)
.fetch_one(pool)
.await
.map_err(CiError::Database)
}
/// Sync remote builders from declarative config.
/// Deletes builders not in the declarative list and upserts those that are.
pub async fn sync_all(
pool: &PgPool,
builders: &[DeclarativeRemoteBuilder],
) -> Result<()> {
// Get builder names from declarative config
let names: Vec<&str> = builders.iter().map(|b| b.name.as_str()).collect();
// Delete builders not in declarative config
sqlx::query("DELETE FROM remote_builders WHERE name != ALL($1::text[])")
.bind(&names)
.execute(pool)
.await
.map_err(CiError::Database)?;
// Upsert each builder
for builder in builders {
upsert(
pool,
&builder.name,
&builder.ssh_uri,
&builder.systems,
builder.max_jobs,
builder.speed_factor,
&builder.supported_features,
&builder.mandatory_features,
builder.enabled,
builder.public_host_key.as_deref(),
builder.ssh_key_file.as_deref(),
)
.await?;
}
Ok(())
}

View file

@ -33,7 +33,7 @@ static SYSTEM_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\w+-\w+$").unwrap());
const VALID_REPO_PREFIXES: &[&str] =
&["https://", "http://", "git://", "ssh://"];
&["https://", "http://", "git://", "ssh://", "file://"];
const VALID_FORGE_TYPES: &[&str] = &["github", "gitea", "forgejo", "gitlab"];
/// Trait for validating request DTOs before persisting.
@ -62,7 +62,7 @@ fn validate_repository_url(url: &str) -> Result<(), String> {
}
if !VALID_REPO_PREFIXES.iter().any(|p| url.starts_with(p)) {
return Err(
"repository_url must start with https://, http://, git://, or ssh://"
"repository_url must start with https://, http://, git://, ssh://, or file://"
.to_string(),
);
}