diff --git a/crates/common/src/bootstrap.rs b/crates/common/src/bootstrap.rs index 9607c62..c49e92f 100644 --- a/crates/common/src/bootstrap.rs +++ b/crates/common/src/bootstrap.rs @@ -4,16 +4,68 @@ //! Called once on server startup to reconcile declarative configuration //! with database state. Uses upsert semantics so repeated runs are idempotent. +use std::collections::HashMap; + use sha2::{Digest, Sha256}; use sqlx::PgPool; +use uuid::Uuid; use crate::{ - config::DeclarativeConfig, + config::{DeclarativeConfig, DeclarativeWebhook}, error::Result, - models::{CreateJobset, CreateProject}, + models::{CreateJobset, CreateProject, JobsetState}, repo, }; +/// Expand path with environment variables and home directory. +/// Supports ${VAR}, $VAR, and ~ for home directory. +fn expand_path(path: &str) -> String { + let expanded = if path.starts_with('~') { + if let Some(home) = std::env::var_os("HOME") { + path.replacen('~', &home.to_string_lossy(), 1) + } else { + path.to_string() + } + } else { + path.to_string() + }; + + // Expand ${VAR} and $VAR patterns + let mut result = expanded; + while let Some(start) = result.find("${") { + if let Some(end) = result[start..].find('}') { + let var_name = &result[start + 2..start + end]; + let replacement = std::env::var(var_name).unwrap_or_default(); + result = format!("{}{}{}", &result[..start], replacement, &result[start + end + 1..]); + } else { + break; + } + } + result +} + +/// Resolve secret for a webhook from inline value or file. +fn resolve_webhook_secret(webhook: &DeclarativeWebhook) -> Option { + if let Some(ref secret) = webhook.secret { + Some(secret.clone()) + } else if let Some(ref file) = webhook.secret_file { + let expanded = expand_path(file); + match std::fs::read_to_string(&expanded) { + Ok(s) => Some(s.trim().to_string()), + Err(e) => { + tracing::warn!( + forge_type = %webhook.forge_type, + file = %expanded, + "Failed to read webhook secret file: {e}" + ); + None + }, + } + } else { + None + } +} + /// Bootstrap declarative configuration into the database. /// /// This function is idempotent: running it multiple times with the same config @@ -23,6 +75,7 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { if config.projects.is_empty() && config.api_keys.is_empty() && config.users.is_empty() + && config.remote_builders.is_empty() { return Ok(()); } @@ -31,12 +84,14 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { let n_jobsets: usize = config.projects.iter().map(|p| p.jobsets.len()).sum(); let n_keys = config.api_keys.len(); let n_users = config.users.len(); + let n_builders = config.remote_builders.len(); tracing::info!( projects = n_projects, jobsets = n_jobsets, api_keys = n_keys, users = n_users, + remote_builders = n_builders, "Bootstrapping declarative configuration" ); @@ -56,6 +111,15 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { ); for decl_jobset in &decl_project.jobsets { + // Parse state string to JobsetState enum + let state = decl_jobset.state.as_ref().map(|s| match s.as_str() { + "disabled" => JobsetState::Disabled, + "enabled" => JobsetState::Enabled, + "one_shot" => JobsetState::OneShot, + "one_at_a_time" => JobsetState::OneAtATime, + _ => JobsetState::Enabled, // Default to enabled for unknown values + }); + let jobset = repo::jobsets::upsert(pool, CreateJobset { project_id: project.id, name: decl_jobset.name.clone(), @@ -63,9 +127,9 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { enabled: Some(decl_jobset.enabled), flake_mode: Some(decl_jobset.flake_mode), check_interval: Some(decl_jobset.check_interval), - branch: None, - scheduling_shares: None, - state: None, + branch: decl_jobset.branch.clone(), + scheduling_shares: Some(decl_jobset.scheduling_shares), + state, }) .await?; @@ -74,7 +138,71 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { jobset = %jobset.name, "Upserted declarative jobset" ); + + // Sync jobset inputs + if !decl_jobset.inputs.is_empty() { + repo::jobset_inputs::sync_for_jobset(pool, jobset.id, &decl_jobset.inputs) + .await?; + tracing::info!( + project = %project.name, + jobset = %jobset.name, + inputs = decl_jobset.inputs.len(), + "Synced declarative jobset inputs" + ); + } } + + // Build jobset name -> ID map for channel resolution + let jobset_map: HashMap = { + let jobsets = + repo::jobsets::list_for_project(pool, project.id, 1000, 0).await?; + jobsets.into_iter().map(|j| (j.name, j.id)).collect() + }; + + // Sync notifications + if !decl_project.notifications.is_empty() { + repo::notification_configs::sync_for_project( + pool, + project.id, + &decl_project.notifications, + ) + .await?; + tracing::info!( + project = %project.name, + notifications = decl_project.notifications.len(), + "Synced declarative notifications" + ); + } + + // Sync webhooks + if !decl_project.webhooks.is_empty() { + repo::webhook_configs::sync_for_project( + pool, + project.id, + &decl_project.webhooks, + resolve_webhook_secret, + ) + .await?; + tracing::info!( + project = %project.name, + webhooks = decl_project.webhooks.len(), + "Synced declarative webhooks" + ); + } + + // Sync channels + if !decl_project.channels.is_empty() { + repo::channels::sync_for_project(pool, project.id, &decl_project.channels, |name| { + jobset_map.get(name).copied() + }) + .await?; + tracing::info!( + project = %project.name, + channels = decl_project.channels.len(), + "Synced declarative channels" + ); + } + } // Upsert API keys @@ -100,12 +228,13 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { let password = if let Some(ref p) = decl_user.password { Some(p.clone()) } else if let Some(ref file) = decl_user.password_file { - match std::fs::read_to_string(file) { + let expanded = expand_path(file); + match std::fs::read_to_string(&expanded) { Ok(p) => Some(p.trim().to_string()), Err(e) => { tracing::warn!( username = %decl_user.username, - file = %file, + file = %expanded, "Failed to read password file: {e}" ); None @@ -180,6 +309,45 @@ pub async fn run(pool: &PgPool, config: &DeclarativeConfig) -> Result<()> { } } + // Sync remote builders + if !config.remote_builders.is_empty() { + repo::remote_builders::sync_all(pool, &config.remote_builders).await?; + tracing::info!( + builders = config.remote_builders.len(), + "Synced declarative remote builders" + ); + } + + // Build username -> user ID map for project member resolution + let user_map: HashMap = { + // Get all users (use large limit to get all) + let users = repo::users::list(pool, 10000, 0).await?; + users.into_iter().map(|u| (u.username, u.id)).collect() + }; + + // Sync project members (now that users exist) + for decl_project in &config.projects { + if decl_project.members.is_empty() { + continue; + } + + // Get project by name (already exists from earlier upsert) + if let Ok(project) = repo::projects::get_by_name(pool, &decl_project.name).await { + repo::project_members::sync_for_project( + pool, + project.id, + &decl_project.members, + |username| user_map.get(username).copied(), + ) + .await?; + tracing::info!( + project = %project.name, + members = decl_project.members.len(), + "Synced declarative project members" + ); + } + } + tracing::info!("Declarative bootstrap complete"); Ok(()) } diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 46e302b..acb3f8a 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -159,9 +159,41 @@ pub struct CacheUploadConfig { #[derive(Debug, Clone, Serialize, Deserialize, Default)] #[serde(default)] pub struct DeclarativeConfig { - pub projects: Vec, - pub api_keys: Vec, - pub users: Vec, + pub projects: Vec, + pub api_keys: Vec, + pub users: Vec, + /// Remote builder definitions for distributed builds + pub remote_builders: Vec, +} + +/// Declarative remote builder configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeclarativeRemoteBuilder { + pub name: String, + pub ssh_uri: String, + pub systems: Vec, + #[serde(default = "default_max_jobs")] + pub max_jobs: i32, + #[serde(default = "default_speed_factor")] + pub speed_factor: i32, + #[serde(default)] + pub supported_features: Vec, + #[serde(default)] + pub mandatory_features: Vec, + /// Path to SSH private key file (for production) + pub ssh_key_file: Option, + /// SSH public host key for verification + pub public_host_key: Option, + #[serde(default = "default_true")] + pub enabled: bool, +} + +const fn default_max_jobs() -> i32 { + 1 +} + +const fn default_speed_factor() -> i32 { + 1 } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -171,18 +203,97 @@ pub struct DeclarativeProject { pub description: Option, #[serde(default)] pub jobsets: Vec, + /// Notification configurations for this project + #[serde(default)] + pub notifications: Vec, + /// Webhook configurations for this project + #[serde(default)] + pub webhooks: Vec, + /// Release channels for this project + #[serde(default)] + pub channels: Vec, + /// Project members with their roles + #[serde(default)] + pub members: Vec, +} + +/// Declarative notification configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeclarativeNotification { + /// Notification type: github_status, email, gitlab_status, gitea_status, run_command + pub notification_type: String, + /// Type-specific configuration (JSON object) + pub config: serde_json::Value, + #[serde(default = "default_true")] + pub enabled: bool, +} + +/// Declarative webhook configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeclarativeWebhook { + /// Forge type: github, gitea, gitlab + pub forge_type: String, + /// Webhook secret (inline, for dev/testing only) + pub secret: Option, + /// Path to a file containing the webhook secret (for production) + pub secret_file: Option, + #[serde(default = "default_true")] + pub enabled: bool, +} + +/// Declarative channel configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeclarativeChannel { + pub name: String, + /// Name of the jobset this channel tracks (resolved during bootstrap) + pub jobset_name: String, +} + +/// Declarative project member configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeclarativeProjectMember { + /// Username of the member (must exist in users) + pub username: String, + /// Role: member, maintainer, or admin + #[serde(default = "default_member_role")] + pub role: String, +} + +fn default_member_role() -> String { + "member".to_string() } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeclarativeJobset { - pub name: String, - pub nix_expression: String, + pub name: String, + pub nix_expression: String, #[serde(default = "default_true")] - pub enabled: bool, + pub enabled: bool, #[serde(default = "default_true")] - pub flake_mode: bool, + pub flake_mode: bool, #[serde(default = "default_check_interval")] - pub check_interval: i32, + pub check_interval: i32, + /// Jobset state: disabled, enabled, one_shot, or one_at_a_time + pub state: Option, + /// Git branch to track (defaults to repository default branch) + pub branch: Option, + /// Scheduling priority shares (default 100, higher = more priority) + #[serde(default = "default_scheduling_shares")] + pub scheduling_shares: i32, + /// Jobset inputs for parameterized evaluations + #[serde(default)] + pub inputs: Vec, +} + +/// Declarative jobset input for parameterized builds. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeclarativeJobsetInput { + pub name: String, + /// Input type: git, string, boolean, path, or build + pub input_type: String, + pub value: String, + /// Git revision (for git inputs) + pub revision: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -221,6 +332,10 @@ const fn default_check_interval() -> i32 { 60 } +const fn default_scheduling_shares() -> i32 { + 100 +} + fn default_role() -> String { "admin".to_string() } @@ -535,24 +650,33 @@ mod tests { #[test] fn test_declarative_config_serialization_roundtrip() { let config = DeclarativeConfig { - projects: vec![DeclarativeProject { + projects: vec![DeclarativeProject { name: "test".to_string(), repository_url: "https://example.com/repo".to_string(), description: Some("desc".to_string()), jobsets: vec![DeclarativeJobset { - name: "checks".to_string(), - nix_expression: "checks".to_string(), - enabled: true, - flake_mode: true, - check_interval: 300, + name: "checks".to_string(), + nix_expression: "checks".to_string(), + enabled: true, + flake_mode: true, + check_interval: 300, + state: None, + branch: None, + scheduling_shares: 100, + inputs: vec![], }], + notifications: vec![], + webhooks: vec![], + channels: vec![], + members: vec![], }], - api_keys: vec![DeclarativeApiKey { + api_keys: vec![DeclarativeApiKey { name: "test-key".to_string(), key: "fc_test".to_string(), role: "admin".to_string(), }], - users: vec![], + users: vec![], + remote_builders: vec![], }; let json = serde_json::to_string(&config).unwrap(); diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index 86e9365..9960a52 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -65,7 +65,8 @@ pub enum EvaluationStatus { #[derive( Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, Default, )] -#[sqlx(type_name = "text", rename_all = "snake_case")] +#[serde(rename_all = "snake_case")] +#[sqlx(type_name = "varchar", rename_all = "snake_case")] pub enum JobsetState { Disabled, #[default] @@ -290,7 +291,7 @@ pub struct User { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)] -#[sqlx(type_name = "text", rename_all = "lowercase")] +#[sqlx(type_name = "varchar", rename_all = "lowercase")] pub enum UserType { Local, Github, diff --git a/crates/common/src/repo/channels.rs b/crates/common/src/repo/channels.rs index cf48476..e61635a 100644 --- a/crates/common/src/repo/channels.rs +++ b/crates/common/src/repo/channels.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use uuid::Uuid; use crate::{ + config::DeclarativeChannel, error::{CiError, Result}, models::{Channel, CreateChannel}, }; @@ -86,6 +87,61 @@ pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { Ok(()) } +/// Upsert a channel (insert or update on conflict). +pub async fn upsert( + pool: &PgPool, + project_id: Uuid, + name: &str, + jobset_id: Uuid, +) -> Result { + sqlx::query_as::<_, Channel>( + "INSERT INTO channels (project_id, name, jobset_id) VALUES ($1, $2, $3) \ + ON CONFLICT (project_id, name) DO UPDATE SET jobset_id = EXCLUDED.jobset_id \ + RETURNING *", + ) + .bind(project_id) + .bind(name) + .bind(jobset_id) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +/// Sync channels from declarative config. +/// Deletes channels not in the declarative list and upserts those that are. +pub async fn sync_for_project( + pool: &PgPool, + project_id: Uuid, + channels: &[DeclarativeChannel], + resolve_jobset: impl Fn(&str) -> Option, +) -> Result<()> { + // Get channel names from declarative config + let names: Vec<&str> = channels.iter().map(|c| c.name.as_str()).collect(); + + // Delete channels not in declarative config + sqlx::query("DELETE FROM channels WHERE project_id = $1 AND name != ALL($2::text[])") + .bind(project_id) + .bind(&names) + .execute(pool) + .await + .map_err(CiError::Database)?; + + // Upsert each channel + for channel in channels { + if let Some(jobset_id) = resolve_jobset(&channel.jobset_name) { + upsert(pool, project_id, &channel.name, jobset_id).await?; + } else { + tracing::warn!( + channel = %channel.name, + jobset_name = %channel.jobset_name, + "Could not resolve jobset for declarative channel" + ); + } + } + + Ok(()) +} + /// Find the channel for a jobset and auto-promote if all builds in the /// evaluation succeeded. pub async fn auto_promote_if_complete( diff --git a/crates/common/src/repo/remote_builders.rs b/crates/common/src/repo/remote_builders.rs index 22b6bc4..0ba6cc3 100644 --- a/crates/common/src/repo/remote_builders.rs +++ b/crates/common/src/repo/remote_builders.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use uuid::Uuid; use crate::{ + config::DeclarativeRemoteBuilder, error::{CiError, Result}, models::{CreateRemoteBuilder, RemoteBuilder}, }; @@ -133,3 +134,81 @@ pub async fn count(pool: &PgPool) -> Result { .map_err(CiError::Database)?; Ok(row.0) } + +/// Upsert a remote builder (insert or update on conflict by name). +pub async fn upsert( + pool: &PgPool, + name: &str, + ssh_uri: &str, + systems: &[String], + max_jobs: i32, + speed_factor: i32, + supported_features: &[String], + mandatory_features: &[String], + enabled: bool, + public_host_key: Option<&str>, + ssh_key_file: Option<&str>, +) -> Result { + sqlx::query_as::<_, RemoteBuilder>( + "INSERT INTO remote_builders (name, ssh_uri, systems, max_jobs, \ + speed_factor, supported_features, mandatory_features, enabled, \ + public_host_key, ssh_key_file) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, \ + $10) ON CONFLICT (name) DO UPDATE SET ssh_uri = EXCLUDED.ssh_uri, systems = \ + EXCLUDED.systems, max_jobs = EXCLUDED.max_jobs, speed_factor = \ + EXCLUDED.speed_factor, supported_features = EXCLUDED.supported_features, \ + mandatory_features = EXCLUDED.mandatory_features, enabled = \ + EXCLUDED.enabled, public_host_key = COALESCE(EXCLUDED.public_host_key, \ + remote_builders.public_host_key), ssh_key_file = \ + COALESCE(EXCLUDED.ssh_key_file, remote_builders.ssh_key_file) RETURNING *", + ) + .bind(name) + .bind(ssh_uri) + .bind(systems) + .bind(max_jobs) + .bind(speed_factor) + .bind(supported_features) + .bind(mandatory_features) + .bind(enabled) + .bind(public_host_key) + .bind(ssh_key_file) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +/// Sync remote builders from declarative config. +/// Deletes builders not in the declarative list and upserts those that are. +pub async fn sync_all( + pool: &PgPool, + builders: &[DeclarativeRemoteBuilder], +) -> Result<()> { + // Get builder names from declarative config + let names: Vec<&str> = builders.iter().map(|b| b.name.as_str()).collect(); + + // Delete builders not in declarative config + sqlx::query("DELETE FROM remote_builders WHERE name != ALL($1::text[])") + .bind(&names) + .execute(pool) + .await + .map_err(CiError::Database)?; + + // Upsert each builder + for builder in builders { + upsert( + pool, + &builder.name, + &builder.ssh_uri, + &builder.systems, + builder.max_jobs, + builder.speed_factor, + &builder.supported_features, + &builder.mandatory_features, + builder.enabled, + builder.public_host_key.as_deref(), + builder.ssh_key_file.as_deref(), + ) + .await?; + } + + Ok(()) +} diff --git a/crates/common/src/validate.rs b/crates/common/src/validate.rs index fc56de4..060d2d1 100644 --- a/crates/common/src/validate.rs +++ b/crates/common/src/validate.rs @@ -33,7 +33,7 @@ static SYSTEM_RE: LazyLock = LazyLock::new(|| Regex::new(r"^\w+-\w+$").unwrap()); const VALID_REPO_PREFIXES: &[&str] = - &["https://", "http://", "git://", "ssh://"]; + &["https://", "http://", "git://", "ssh://", "file://"]; const VALID_FORGE_TYPES: &[&str] = &["github", "gitea", "forgejo", "gitlab"]; /// Trait for validating request DTOs before persisting. @@ -62,7 +62,7 @@ fn validate_repository_url(url: &str) -> Result<(), String> { } if !VALID_REPO_PREFIXES.iter().any(|p| url.starts_with(p)) { return Err( - "repository_url must start with https://, http://, git://, or ssh://" + "repository_url must start with https://, http://, git://, ssh://, or file://" .to_string(), ); }