From 1b12be3f8aecc66cef43592b34f6a4c3480c0715 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 2 Nov 2025 23:33:33 +0300 Subject: [PATCH] crates: production models and repo layer Signed-off-by: NotAShelf Change-Id: Iceb76724c09eaca7ca5d823010db76776a6a6964 --- crates/common/Cargo.toml | 5 + .../migrations/002_add_build_system.sql | 2 + .../migrations/003_production_features.sql | 92 +++ .../migrations/004_build_outputs_and_deps.sql | 14 + .../005_channels_remote_builders.sql | 44 + crates/common/migrations/006_hardening.sql | 14 + crates/common/src/config.rs | 138 +++- crates/common/src/database.rs | 2 +- crates/common/src/error.rs | 18 + crates/common/src/gc_roots.rs | 104 +++ crates/common/src/lib.rs | 7 + crates/common/src/log_storage.rs | 68 ++ crates/common/src/models.rs | 330 +++++++- crates/common/src/notifications.rs | 296 +++++++ crates/common/src/repo/api_keys.rs | 57 ++ crates/common/src/repo/build_dependencies.rs | 79 ++ crates/common/src/repo/build_products.rs | 40 + crates/common/src/repo/build_steps.rs | 54 ++ crates/common/src/repo/builds.rs | 292 +++++++ crates/common/src/repo/channels.rs | 111 +++ crates/common/src/repo/evaluations.rs | 146 ++++ crates/common/src/repo/jobset_inputs.rs | 52 ++ crates/common/src/repo/jobsets.rs | 137 ++++ crates/common/src/repo/mod.rs | 13 + .../common/src/repo/notification_configs.rs | 48 ++ crates/common/src/repo/projects.rs | 95 +++ crates/common/src/repo/remote_builders.rs | 124 +++ crates/common/src/repo/webhook_configs.rs | 73 ++ crates/common/src/validate.rs | 596 ++++++++++++++ crates/common/tests/database_tests.rs | 32 +- crates/common/tests/repo_tests.rs | 770 ++++++++++++++++++ 31 files changed, 3841 insertions(+), 12 deletions(-) create mode 100644 crates/common/migrations/002_add_build_system.sql create mode 100644 crates/common/migrations/003_production_features.sql create mode 100644 crates/common/migrations/004_build_outputs_and_deps.sql create mode 100644 crates/common/migrations/005_channels_remote_builders.sql create mode 100644 crates/common/migrations/006_hardening.sql create mode 100644 crates/common/src/gc_roots.rs create mode 100644 crates/common/src/log_storage.rs create mode 100644 crates/common/src/notifications.rs create mode 100644 crates/common/src/repo/api_keys.rs create mode 100644 crates/common/src/repo/build_dependencies.rs create mode 100644 crates/common/src/repo/build_products.rs create mode 100644 crates/common/src/repo/build_steps.rs create mode 100644 crates/common/src/repo/builds.rs create mode 100644 crates/common/src/repo/channels.rs create mode 100644 crates/common/src/repo/evaluations.rs create mode 100644 crates/common/src/repo/jobset_inputs.rs create mode 100644 crates/common/src/repo/jobsets.rs create mode 100644 crates/common/src/repo/mod.rs create mode 100644 crates/common/src/repo/notification_configs.rs create mode 100644 crates/common/src/repo/projects.rs create mode 100644 crates/common/src/repo/remote_builders.rs create mode 100644 crates/common/src/repo/webhook_configs.rs create mode 100644 crates/common/src/validate.rs create mode 100644 crates/common/tests/repo_tests.rs diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 237b92e..042bb3e 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -22,3 +22,8 @@ config.workspace = true tempfile.workspace = true toml.workspace = true tokio.workspace = true +reqwest.workspace = true +sha2.workspace = true +hex.workspace = true +lettre.workspace = true +regex.workspace = true diff --git a/crates/common/migrations/002_add_build_system.sql b/crates/common/migrations/002_add_build_system.sql new file mode 100644 index 0000000..56421b0 --- /dev/null +++ b/crates/common/migrations/002_add_build_system.sql @@ -0,0 +1,2 @@ +-- Add system field to builds table +ALTER TABLE builds ADD COLUMN system VARCHAR(50); diff --git a/crates/common/migrations/003_production_features.sql b/crates/common/migrations/003_production_features.sql new file mode 100644 index 0000000..24c10fe --- /dev/null +++ b/crates/common/migrations/003_production_features.sql @@ -0,0 +1,92 @@ +-- Production features: auth, priority, retry, notifications, GC roots, log paths + +-- API key authentication +CREATE TABLE api_keys ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + name VARCHAR(255) NOT NULL, + key_hash VARCHAR(128) NOT NULL UNIQUE, + role VARCHAR(50) NOT NULL DEFAULT 'admin' + CHECK (role IN ('admin', 'create-projects', 'restart-jobs', 'cancel-build', 'bump-to-front', 'eval-jobset', 'read-only')), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + last_used_at TIMESTAMP WITH TIME ZONE +); + +-- Build priority and retry support +ALTER TABLE builds ADD COLUMN priority INTEGER NOT NULL DEFAULT 0; +ALTER TABLE builds ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0; +ALTER TABLE builds ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 3; +ALTER TABLE builds ADD COLUMN notification_pending_since TIMESTAMP WITH TIME ZONE; + +-- GC root tracking on build products +ALTER TABLE build_products ADD COLUMN gc_root_path TEXT; + +-- Build log file path (filesystem path to captured log) +ALTER TABLE builds ADD COLUMN log_url TEXT; + +-- Webhook configuration for incoming push events +CREATE TABLE webhook_configs ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + forge_type VARCHAR(50) NOT NULL CHECK (forge_type IN ('github', 'gitea', 'forgejo', 'gitlab')), + secret_hash VARCHAR(128), + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + UNIQUE(project_id, forge_type) +); + +-- Notification configuration per project +CREATE TABLE notification_configs ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + notification_type VARCHAR(50) NOT NULL + CHECK (notification_type IN ('github_status', 'gitea_status', 'forgejo_status', 'gitlab_status', 'run_command', 'email')), + config JSONB NOT NULL DEFAULT '{}', + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + UNIQUE(project_id, notification_type) +); + +-- Jobset inputs for multi-input support +CREATE TABLE jobset_inputs ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + jobset_id UUID NOT NULL REFERENCES jobsets(id) ON DELETE CASCADE, + name VARCHAR(255) NOT NULL, + input_type VARCHAR(50) NOT NULL + CHECK (input_type IN ('git', 'string', 'boolean', 'path', 'build')), + value TEXT NOT NULL, + revision TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + UNIQUE(jobset_id, name) +); + +-- Track flake mode per jobset +ALTER TABLE jobsets ADD COLUMN flake_mode BOOLEAN NOT NULL DEFAULT true; +ALTER TABLE jobsets ADD COLUMN check_interval INTEGER NOT NULL DEFAULT 60; + +-- Store the flake URI or legacy expression path in nix_expression (already exists) +-- For flake mode: nix_expression = "github:owner/repo" or "." +-- For legacy mode: nix_expression = "release.nix" + +-- Indexes for new columns +CREATE INDEX idx_builds_priority ON builds(priority DESC, created_at ASC); +CREATE INDEX idx_builds_notification_pending ON builds(notification_pending_since) WHERE notification_pending_since IS NOT NULL; +CREATE INDEX idx_api_keys_key_hash ON api_keys(key_hash); +CREATE INDEX idx_webhook_configs_project ON webhook_configs(project_id); +CREATE INDEX idx_notification_configs_project ON notification_configs(project_id); +CREATE INDEX idx_jobset_inputs_jobset ON jobset_inputs(jobset_id); + +-- Update active_jobsets view to include flake_mode +-- Must DROP first: adding columns to jobsets changes j.* expansion, +-- and CREATE OR REPLACE VIEW cannot rename existing columns. +DROP VIEW IF EXISTS active_jobsets; +CREATE VIEW active_jobsets AS +SELECT + j.*, + p.name as project_name, + p.repository_url +FROM jobsets j +JOIN projects p ON j.project_id = p.id +WHERE j.enabled = true; + +-- Update list_pending to respect priority ordering +-- (handled in application code, but index above supports it) diff --git a/crates/common/migrations/004_build_outputs_and_deps.sql b/crates/common/migrations/004_build_outputs_and_deps.sql new file mode 100644 index 0000000..e1b5587 --- /dev/null +++ b/crates/common/migrations/004_build_outputs_and_deps.sql @@ -0,0 +1,14 @@ +ALTER TABLE builds ADD COLUMN outputs JSONB; +ALTER TABLE builds ADD COLUMN is_aggregate BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE builds ADD COLUMN constituents JSONB; + +CREATE TABLE build_dependencies ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + build_id UUID NOT NULL REFERENCES builds(id) ON DELETE CASCADE, + dependency_build_id UUID NOT NULL REFERENCES builds(id) ON DELETE CASCADE, + UNIQUE(build_id, dependency_build_id) +); + +CREATE INDEX idx_build_deps_build ON build_dependencies(build_id); +CREATE INDEX idx_build_deps_dep ON build_dependencies(dependency_build_id); +CREATE INDEX idx_builds_drv_path ON builds(drv_path); diff --git a/crates/common/migrations/005_channels_remote_builders.sql b/crates/common/migrations/005_channels_remote_builders.sql new file mode 100644 index 0000000..920de5e --- /dev/null +++ b/crates/common/migrations/005_channels_remote_builders.sql @@ -0,0 +1,44 @@ +-- Channels for release management (like Hydra channels) +-- A channel tracks the latest "good" evaluation for a jobset +CREATE TABLE channels ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + name VARCHAR(255) NOT NULL, + jobset_id UUID NOT NULL REFERENCES jobsets(id) ON DELETE CASCADE, + current_evaluation_id UUID REFERENCES evaluations(id), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + UNIQUE(project_id, name) +); + +-- Remote builders for multi-machine / multi-arch builds +CREATE TABLE remote_builders ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL UNIQUE, + ssh_uri TEXT NOT NULL, + systems TEXT[] NOT NULL DEFAULT '{}', + max_jobs INTEGER NOT NULL DEFAULT 1, + speed_factor INTEGER NOT NULL DEFAULT 1, + supported_features TEXT[] NOT NULL DEFAULT '{}', + mandatory_features TEXT[] NOT NULL DEFAULT '{}', + enabled BOOLEAN NOT NULL DEFAULT true, + public_host_key TEXT, + ssh_key_file TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +-- Track input hash for evaluation caching (skip re-eval when inputs unchanged) +ALTER TABLE evaluations ADD COLUMN inputs_hash VARCHAR(128); + +-- Track which remote builder was used for a build +ALTER TABLE builds ADD COLUMN builder_id UUID REFERENCES remote_builders(id); + +-- Track whether build outputs have been signed +ALTER TABLE builds ADD COLUMN signed BOOLEAN NOT NULL DEFAULT false; + +-- Indexes +CREATE INDEX idx_channels_project ON channels(project_id); +CREATE INDEX idx_channels_jobset ON channels(jobset_id); +CREATE INDEX idx_remote_builders_enabled ON remote_builders(enabled) WHERE enabled = true; +CREATE INDEX idx_evaluations_inputs_hash ON evaluations(jobset_id, inputs_hash); +CREATE INDEX idx_builds_builder ON builds(builder_id) WHERE builder_id IS NOT NULL; diff --git a/crates/common/migrations/006_hardening.sql b/crates/common/migrations/006_hardening.sql new file mode 100644 index 0000000..19f3106 --- /dev/null +++ b/crates/common/migrations/006_hardening.sql @@ -0,0 +1,14 @@ +-- Hardening: indexes for performance + +-- Cache lookup index (prefix match on path) +CREATE INDEX IF NOT EXISTS idx_build_products_path_prefix ON build_products (path text_pattern_ops); + +-- Composite index for pending builds query +CREATE INDEX IF NOT EXISTS idx_builds_pending_priority ON builds (status, priority DESC, created_at ASC) + WHERE status = 'pending'; + +-- System filtering index +CREATE INDEX IF NOT EXISTS idx_builds_system ON builds(system) WHERE system IS NOT NULL; + +-- Deduplication lookup by drv_path + status +CREATE INDEX IF NOT EXISTS idx_builds_drv_completed ON builds(drv_path) WHERE status = 'completed'; diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 9583a2f..ba26b3d 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -1,8 +1,9 @@ //! Configuration management for FC CI +use std::path::PathBuf; + use config as config_crate; use serde::{Deserialize, Serialize}; -use std::path::PathBuf; #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct Config { @@ -10,6 +11,11 @@ pub struct Config { pub server: ServerConfig, pub evaluator: EvaluatorConfig, pub queue_runner: QueueRunnerConfig, + pub gc: GcConfig, + pub logs: LogConfig, + pub notifications: NotificationsConfig, + pub cache: CacheConfig, + pub signing: SigningConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -23,19 +29,29 @@ pub struct DatabaseConfig { } #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] pub struct ServerConfig { pub host: String, pub port: u16, pub request_timeout: u64, pub max_body_size: usize, + pub api_key: Option, + pub allowed_origins: Vec, + pub cors_permissive: bool, + pub rate_limit_rps: Option, + pub rate_limit_burst: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] pub struct EvaluatorConfig { pub poll_interval: u64, pub git_timeout: u64, pub nix_timeout: u64, + pub max_concurrent_evals: usize, pub work_dir: PathBuf, + pub restrict_eval: bool, + pub allow_ifd: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -46,6 +62,56 @@ pub struct QueueRunnerConfig { pub work_dir: PathBuf, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct GcConfig { + pub gc_roots_dir: PathBuf, + pub enabled: bool, + pub max_age_days: u64, + pub cleanup_interval: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogConfig { + pub log_dir: PathBuf, + pub compress: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct NotificationsConfig { + pub run_command: Option, + pub github_token: Option, + pub gitea_url: Option, + pub gitea_token: Option, + pub email: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct EmailConfig { + pub smtp_host: String, + pub smtp_port: u16, + pub smtp_user: Option, + pub smtp_password: Option, + pub from_address: String, + pub to_addresses: Vec, + pub tls: bool, + pub on_failure_only: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CacheConfig { + pub enabled: bool, + pub secret_key_file: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct SigningConfig { + pub enabled: bool, + pub key_file: Option, +} + impl Default for DatabaseConfig { fn default() -> Self { Self { @@ -94,6 +160,11 @@ impl Default for ServerConfig { port: 3000, request_timeout: 30, max_body_size: 10 * 1024 * 1024, // 10MB + api_key: None, + allowed_origins: Vec::new(), + cors_permissive: false, + rate_limit_rps: None, + rate_limit_burst: None, } } } @@ -104,7 +175,10 @@ impl Default for EvaluatorConfig { poll_interval: 60, git_timeout: 600, nix_timeout: 1800, + max_concurrent_evals: 4, work_dir: PathBuf::from("/tmp/fc-evaluator"), + restrict_eval: true, + allow_ifd: false, } } } @@ -120,6 +194,56 @@ impl Default for QueueRunnerConfig { } } +impl Default for GcConfig { + fn default() -> Self { + Self { + gc_roots_dir: PathBuf::from("/nix/var/nix/gcroots/per-user/fc/fc-roots"), + enabled: true, + max_age_days: 30, + cleanup_interval: 3600, + } + } +} + +impl Default for LogConfig { + fn default() -> Self { + Self { + log_dir: PathBuf::from("/var/lib/fc/logs"), + compress: false, + } + } +} + +impl Default for NotificationsConfig { + fn default() -> Self { + Self { + run_command: None, + github_token: None, + gitea_url: None, + gitea_token: None, + email: None, + } + } +} + +impl Default for CacheConfig { + fn default() -> Self { + Self { + enabled: true, + secret_key_file: None, + } + } +} + +impl Default for SigningConfig { + fn default() -> Self { + Self { + enabled: false, + key_file: None, + } + } +} + impl Config { pub fn load() -> anyhow::Result { let mut settings = config_crate::Config::builder(); @@ -197,6 +321,18 @@ impl Config { )); } + // Validate GC config + if self.gc.enabled && self.gc.gc_roots_dir.as_os_str().is_empty() { + return Err(anyhow::anyhow!( + "GC roots directory cannot be empty when GC is enabled" + )); + } + + // Validate log config + if self.logs.log_dir.as_os_str().is_empty() { + return Err(anyhow::anyhow!("Log directory cannot be empty")); + } + Ok(()) } } diff --git a/crates/common/src/database.rs b/crates/common/src/database.rs index 1c1ce8c..4113883 100644 --- a/crates/common/src/database.rs +++ b/crates/common/src/database.rs @@ -38,7 +38,7 @@ impl Database { pub async fn health_check(pool: &PgPool) -> anyhow::Result<()> { debug!("Performing database health check"); - let result: i64 = sqlx::query_scalar("SELECT 1").fetch_one(pool).await?; + let result: i32 = sqlx::query_scalar("SELECT 1").fetch_one(pool).await?; if result != 1 { return Err(anyhow::anyhow!( diff --git a/crates/common/src/error.rs b/crates/common/src/error.rs index aafb181..8b3735a 100644 --- a/crates/common/src/error.rs +++ b/crates/common/src/error.rs @@ -24,6 +24,24 @@ pub enum CiError { #[error("Not found: {0}")] NotFound(String), + + #[error("Validation error: {0}")] + Validation(String), + + #[error("Conflict: {0}")] + Conflict(String), + + #[error("Timeout: {0}")] + Timeout(String), + + #[error("Nix evaluation error: {0}")] + NixEval(String), + + #[error("Unauthorized: {0}")] + Unauthorized(String), + + #[error("Forbidden: {0}")] + Forbidden(String), } pub type Result = std::result::Result; diff --git a/crates/common/src/gc_roots.rs b/crates/common/src/gc_roots.rs new file mode 100644 index 0000000..38e8770 --- /dev/null +++ b/crates/common/src/gc_roots.rs @@ -0,0 +1,104 @@ +//! GC root management - prevents nix-store --gc from deleting build outputs + +use std::os::unix::fs::symlink; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use tracing::{info, warn}; + +/// Remove GC root symlinks with mtime older than max_age. Returns count removed. +pub fn cleanup_old_roots(roots_dir: &Path, max_age: Duration) -> std::io::Result { + if !roots_dir.exists() { + return Ok(0); + } + + let mut count = 0u64; + let now = std::time::SystemTime::now(); + + for entry in std::fs::read_dir(roots_dir)? { + let entry = entry?; + let metadata = match entry.metadata() { + Ok(m) => m, + Err(_) => continue, + }; + + let modified = match metadata.modified() { + Ok(t) => t, + Err(_) => continue, + }; + + if let Ok(age) = now.duration_since(modified) { + if age > max_age { + if let Err(e) = std::fs::remove_file(entry.path()) { + warn!( + "Failed to remove old GC root {}: {e}", + entry.path().display() + ); + } else { + count += 1; + } + } + } + } + + Ok(count) +} + +pub struct GcRoots { + roots_dir: PathBuf, + enabled: bool, +} + +impl GcRoots { + pub fn new(roots_dir: PathBuf, enabled: bool) -> std::io::Result { + if enabled { + std::fs::create_dir_all(&roots_dir)?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(&roots_dir, std::fs::Permissions::from_mode(0o700))?; + } + } + Ok(Self { roots_dir, enabled }) + } + + /// Register a GC root for a build output. Returns the symlink path. + pub fn register( + &self, + build_id: &uuid::Uuid, + output_path: &str, + ) -> std::io::Result> { + if !self.enabled { + return Ok(None); + } + if !crate::validate::is_valid_store_path(output_path) { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Invalid store path: {output_path}"), + )); + } + let link_path = self.roots_dir.join(build_id.to_string()); + // Remove existing symlink if present + if link_path.exists() || link_path.symlink_metadata().is_ok() { + std::fs::remove_file(&link_path)?; + } + symlink(output_path, &link_path)?; + info!(build_id = %build_id, output = output_path, "Registered GC root"); + Ok(Some(link_path)) + } + + /// Remove a GC root for a build. + pub fn remove(&self, build_id: &uuid::Uuid) { + if !self.enabled { + return; + } + let link_path = self.roots_dir.join(build_id.to_string()); + if let Err(e) = std::fs::remove_file(&link_path) { + if e.kind() != std::io::ErrorKind::NotFound { + warn!(build_id = %build_id, "Failed to remove GC root: {e}"); + } + } else { + info!(build_id = %build_id, "Removed GC root"); + } + } +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 287ab4f..bae5b74 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -3,12 +3,19 @@ pub mod config; pub mod database; pub mod error; +pub mod gc_roots; +pub mod log_storage; pub mod migrate; pub mod migrate_cli; pub mod models; +pub mod notifications; +pub mod repo; + +pub mod validate; pub use config::*; pub use database::*; pub use error::*; pub use migrate::*; pub use models::*; +pub use validate::Validate; diff --git a/crates/common/src/log_storage.rs b/crates/common/src/log_storage.rs new file mode 100644 index 0000000..730b45b --- /dev/null +++ b/crates/common/src/log_storage.rs @@ -0,0 +1,68 @@ +//! Build log storage - captures and serves build logs + +use std::path::PathBuf; + +use uuid::Uuid; + +pub struct LogStorage { + log_dir: PathBuf, +} + +impl LogStorage { + pub fn new(log_dir: PathBuf) -> std::io::Result { + std::fs::create_dir_all(&log_dir)?; + Ok(Self { log_dir }) + } + + /// Returns the filesystem path where a build's log should be stored + pub fn log_path(&self, build_id: &Uuid) -> PathBuf { + self.log_dir.join(format!("{}.log", build_id)) + } + + /// Returns the filesystem path for an active (in-progress) build log + pub fn log_path_for_active(&self, build_id: &Uuid) -> PathBuf { + self.log_dir.join(format!("{}.active.log", build_id)) + } + + /// Write build log content to file + pub fn write_log( + &self, + build_id: &Uuid, + stdout: &str, + stderr: &str, + ) -> std::io::Result { + let path = self.log_path(build_id); + let mut content = String::new(); + if !stdout.is_empty() { + content.push_str(stdout); + } + if !stderr.is_empty() { + if !content.is_empty() { + content.push('\n'); + } + content.push_str(stderr); + } + std::fs::write(&path, &content)?; + tracing::debug!(build_id = %build_id, path = %path.display(), "Wrote build log"); + Ok(path) + } + + /// Read a build log from disk. Returns None if the file doesn't exist. + pub fn read_log(&self, build_id: &Uuid) -> std::io::Result> { + let path = self.log_path(build_id); + if !path.exists() { + return Ok(None); + } + let content = std::fs::read_to_string(&path)?; + Ok(Some(content)) + } + + /// Delete a build log + pub fn delete_log(&self, build_id: &Uuid) -> std::io::Result<()> { + let path = self.log_path(build_id); + if path.exists() { + std::fs::remove_file(&path)?; + } + Ok(()) + } +} diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index c4e4369..a73148c 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -22,6 +22,8 @@ pub struct Jobset { pub name: String, pub nix_expression: String, pub enabled: bool, + pub flake_mode: bool, + pub check_interval: i32, pub created_at: DateTime, pub updated_at: DateTime, } @@ -33,9 +35,11 @@ pub struct Evaluation { pub commit_hash: String, pub evaluation_time: DateTime, pub status: EvaluationStatus, + pub error_message: Option, + pub inputs_hash: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::Type)] #[sqlx(type_name = "text", rename_all = "lowercase")] pub enum EvaluationStatus { Pending, @@ -54,9 +58,23 @@ pub struct Build { pub started_at: Option>, pub completed_at: Option>, pub log_path: Option, + pub build_output_path: Option, + pub error_message: Option, + pub system: Option, + pub priority: i32, + pub retry_count: i32, + pub max_retries: i32, + pub notification_pending_since: Option>, + pub log_url: Option, + pub created_at: DateTime, + pub outputs: Option, + pub is_aggregate: bool, + pub constituents: Option, + pub builder_id: Option, + pub signed: bool, } -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type)] +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq)] #[sqlx(type_name = "text", rename_all = "lowercase")] pub enum BuildStatus { Pending, @@ -65,3 +83,311 @@ pub enum BuildStatus { Failed, Cancelled, } + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct BuildProduct { + pub id: Uuid, + pub build_id: Uuid, + pub name: String, + pub path: String, + pub sha256_hash: Option, + pub file_size: Option, + pub content_type: Option, + pub is_directory: bool, + pub gc_root_path: Option, + pub created_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct BuildStep { + pub id: Uuid, + pub build_id: Uuid, + pub step_number: i32, + pub command: String, + pub output: Option, + pub error_output: Option, + pub started_at: DateTime, + pub completed_at: Option>, + pub exit_code: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct BuildDependency { + pub id: Uuid, + pub build_id: Uuid, + pub dependency_build_id: Uuid, +} + +/// Active jobset view — enabled jobsets joined with project info. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct ActiveJobset { + pub id: Uuid, + pub project_id: Uuid, + pub name: String, + pub nix_expression: String, + pub enabled: bool, + pub flake_mode: bool, + pub check_interval: i32, + pub created_at: DateTime, + pub updated_at: DateTime, + pub project_name: String, + pub repository_url: String, +} + +/// Build statistics from the build_stats view. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow, Default)] +pub struct BuildStats { + pub total_builds: Option, + pub completed_builds: Option, + pub failed_builds: Option, + pub running_builds: Option, + pub pending_builds: Option, + pub avg_duration_seconds: Option, +} + +/// API key for authentication. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct ApiKey { + pub id: Uuid, + pub name: String, + pub key_hash: String, + pub role: String, + pub created_at: DateTime, + pub last_used_at: Option>, +} + +/// Webhook configuration for a project. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct WebhookConfig { + pub id: Uuid, + pub project_id: Uuid, + pub forge_type: String, + pub secret_hash: Option, + pub enabled: bool, + pub created_at: DateTime, +} + +/// Notification configuration for a project. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct NotificationConfig { + pub id: Uuid, + pub project_id: Uuid, + pub notification_type: String, + pub config: serde_json::Value, + pub enabled: bool, + pub created_at: DateTime, +} + +/// Jobset input definition. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct JobsetInput { + pub id: Uuid, + pub jobset_id: Uuid, + pub name: String, + pub input_type: String, + pub value: String, + pub revision: Option, + pub created_at: DateTime, +} + +/// Release channel — tracks the latest "good" evaluation for a jobset. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct Channel { + pub id: Uuid, + pub project_id: Uuid, + pub name: String, + pub jobset_id: Uuid, + pub current_evaluation_id: Option, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +/// Remote builder for multi-machine / multi-arch builds. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct RemoteBuilder { + pub id: Uuid, + pub name: String, + pub ssh_uri: String, + pub systems: Vec, + pub max_jobs: i32, + pub speed_factor: i32, + pub supported_features: Vec, + pub mandatory_features: Vec, + pub enabled: bool, + pub public_host_key: Option, + pub ssh_key_file: Option, + pub created_at: DateTime, +} + +// --- Pagination --- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PaginationParams { + pub limit: Option, + pub offset: Option, +} + +impl PaginationParams { + pub fn limit(&self) -> i64 { + self.limit.unwrap_or(50).min(200).max(1) + } + + pub fn offset(&self) -> i64 { + self.offset.unwrap_or(0).max(0) + } +} + +impl Default for PaginationParams { + fn default() -> Self { + Self { + limit: Some(50), + offset: Some(0), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PaginatedResponse { + pub items: Vec, + pub total: i64, + pub limit: i64, + pub offset: i64, +} + +// --- DTO structs for creation and updates --- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateProject { + pub name: String, + pub description: Option, + pub repository_url: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateProject { + pub name: Option, + pub description: Option, + pub repository_url: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateJobset { + pub project_id: Uuid, + pub name: String, + pub nix_expression: String, + pub enabled: Option, + pub flake_mode: Option, + pub check_interval: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateJobset { + pub name: Option, + pub nix_expression: Option, + pub enabled: Option, + pub flake_mode: Option, + pub check_interval: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateEvaluation { + pub jobset_id: Uuid, + pub commit_hash: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateBuild { + pub evaluation_id: Uuid, + pub job_name: String, + pub drv_path: String, + pub system: Option, + pub outputs: Option, + pub is_aggregate: Option, + pub constituents: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateBuildProduct { + pub build_id: Uuid, + pub name: String, + pub path: String, + pub sha256_hash: Option, + pub file_size: Option, + pub content_type: Option, + pub is_directory: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateBuildStep { + pub build_id: Uuid, + pub step_number: i32, + pub command: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateWebhookConfig { + pub project_id: Uuid, + pub forge_type: String, + pub secret: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateNotificationConfig { + pub project_id: Uuid, + pub notification_type: String, + pub config: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateChannel { + pub project_id: Uuid, + pub name: String, + pub jobset_id: Uuid, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateChannel { + pub name: Option, + pub jobset_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateRemoteBuilder { + pub name: String, + pub ssh_uri: String, + pub systems: Vec, + pub max_jobs: Option, + pub speed_factor: Option, + pub supported_features: Option>, + pub mandatory_features: Option>, + pub public_host_key: Option, + pub ssh_key_file: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateRemoteBuilder { + pub name: Option, + pub ssh_uri: Option, + pub systems: Option>, + pub max_jobs: Option, + pub speed_factor: Option, + pub supported_features: Option>, + pub mandatory_features: Option>, + pub enabled: Option, + pub public_host_key: Option, + pub ssh_key_file: Option, +} + +/// Summary of system status for the admin API. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SystemStatus { + pub projects_count: i64, + pub jobsets_count: i64, + pub evaluations_count: i64, + pub builds_pending: i64, + pub builds_running: i64, + pub builds_completed: i64, + pub builds_failed: i64, + pub remote_builders: i64, + pub channels_count: i64, +} diff --git a/crates/common/src/notifications.rs b/crates/common/src/notifications.rs new file mode 100644 index 0000000..56a0a74 --- /dev/null +++ b/crates/common/src/notifications.rs @@ -0,0 +1,296 @@ +//! Notification dispatch for build events + +use crate::config::{EmailConfig, NotificationsConfig}; +use crate::models::{Build, BuildStatus, Project}; + +use tracing::{error, info, warn}; + +/// Dispatch all configured notifications for a completed build. +pub async fn dispatch_build_finished( + build: &Build, + project: &Project, + commit_hash: &str, + config: &NotificationsConfig, +) { + // 1. Run command notification + if let Some(ref cmd) = config.run_command { + run_command_notification(cmd, build, project).await; + } + + // 2. GitHub commit status + if let Some(ref token) = config.github_token { + if project.repository_url.contains("github.com") { + set_github_status(token, &project.repository_url, commit_hash, build).await; + } + } + + // 3. Gitea/Forgejo commit status + if let (Some(url), Some(token)) = (&config.gitea_url, &config.gitea_token) { + set_gitea_status(url, token, &project.repository_url, commit_hash, build).await; + } + + // 4. Email notification + if let Some(ref email_config) = config.email { + if !email_config.on_failure_only || build.status == BuildStatus::Failed { + send_email_notification(email_config, build, project).await; + } + } +} + +async fn run_command_notification(cmd: &str, build: &Build, project: &Project) { + let status_str = match build.status { + BuildStatus::Completed => "success", + BuildStatus::Failed => "failure", + BuildStatus::Cancelled => "cancelled", + _ => "unknown", + }; + + let result = tokio::process::Command::new("sh") + .arg("-c") + .arg(cmd) + .env("FC_BUILD_ID", build.id.to_string()) + .env("FC_BUILD_STATUS", status_str) + .env("FC_BUILD_JOB", &build.job_name) + .env("FC_BUILD_DRV", &build.drv_path) + .env("FC_PROJECT_NAME", &project.name) + .env("FC_PROJECT_URL", &project.repository_url) + .env( + "FC_BUILD_OUTPUT", + build.build_output_path.as_deref().unwrap_or(""), + ) + .output() + .await; + + match result { + Ok(output) => { + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + warn!(build_id = %build.id, "RunCommand failed: {stderr}"); + } else { + info!(build_id = %build.id, "RunCommand completed successfully"); + } + } + Err(e) => error!(build_id = %build.id, "RunCommand execution failed: {e}"), + } +} + +async fn set_github_status(token: &str, repo_url: &str, commit: &str, build: &Build) { + // Parse owner/repo from URL + let (owner, repo) = match parse_github_repo(repo_url) { + Some(v) => v, + None => { + warn!("Cannot parse GitHub owner/repo from {repo_url}"); + return; + } + }; + + let (state, description) = match build.status { + BuildStatus::Completed => ("success", "Build succeeded"), + BuildStatus::Failed => ("failure", "Build failed"), + BuildStatus::Running => ("pending", "Build in progress"), + BuildStatus::Pending => ("pending", "Build queued"), + BuildStatus::Cancelled => ("error", "Build cancelled"), + }; + + let url = format!("https://api.github.com/repos/{owner}/{repo}/statuses/{commit}"); + let body = serde_json::json!({ + "state": state, + "description": description, + "context": format!("fc/{}", build.job_name), + }); + + let client = reqwest::Client::new(); + match client + .post(&url) + .header("Authorization", format!("token {token}")) + .header("User-Agent", "fc-ci") + .header("Accept", "application/vnd.github+json") + .json(&body) + .send() + .await + { + Ok(resp) => { + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + warn!("GitHub status API returned {status}: {text}"); + } else { + info!(build_id = %build.id, "Set GitHub commit status: {state}"); + } + } + Err(e) => error!("GitHub status API request failed: {e}"), + } +} + +async fn set_gitea_status( + base_url: &str, + token: &str, + repo_url: &str, + commit: &str, + build: &Build, +) { + // Parse owner/repo from URL (try to extract from the gitea URL) + let (owner, repo) = match parse_gitea_repo(repo_url, base_url) { + Some(v) => v, + None => { + warn!("Cannot parse Gitea owner/repo from {repo_url}"); + return; + } + }; + + let (state, description) = match build.status { + BuildStatus::Completed => ("success", "Build succeeded"), + BuildStatus::Failed => ("failure", "Build failed"), + BuildStatus::Running => ("pending", "Build in progress"), + BuildStatus::Pending => ("pending", "Build queued"), + BuildStatus::Cancelled => ("error", "Build cancelled"), + }; + + let url = format!("{base_url}/api/v1/repos/{owner}/{repo}/statuses/{commit}"); + let body = serde_json::json!({ + "state": state, + "description": description, + "context": format!("fc/{}", build.job_name), + }); + + let client = reqwest::Client::new(); + match client + .post(&url) + .header("Authorization", format!("token {token}")) + .json(&body) + .send() + .await + { + Ok(resp) => { + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + warn!("Gitea status API returned {status}: {text}"); + } else { + info!(build_id = %build.id, "Set Gitea commit status: {state}"); + } + } + Err(e) => error!("Gitea status API request failed: {e}"), + } +} + +fn parse_github_repo(url: &str) -> Option<(String, String)> { + // Handle https://github.com/owner/repo.git or git@github.com:owner/repo.git + let url = url.trim_end_matches(".git"); + if let Some(rest) = url.strip_prefix("https://github.com/") { + let parts: Vec<&str> = rest.splitn(2, '/').collect(); + if parts.len() == 2 { + return Some((parts[0].to_string(), parts[1].to_string())); + } + } + if let Some(rest) = url.strip_prefix("git@github.com:") { + let parts: Vec<&str> = rest.splitn(2, '/').collect(); + if parts.len() == 2 { + return Some((parts[0].to_string(), parts[1].to_string())); + } + } + None +} + +fn parse_gitea_repo(repo_url: &str, base_url: &str) -> Option<(String, String)> { + let url = repo_url.trim_end_matches(".git"); + let base = base_url.trim_end_matches('/'); + if let Some(rest) = url.strip_prefix(&format!("{base}/")) { + let parts: Vec<&str> = rest.splitn(2, '/').collect(); + if parts.len() == 2 { + return Some((parts[0].to_string(), parts[1].to_string())); + } + } + None +} + +async fn send_email_notification(config: &EmailConfig, build: &Build, project: &Project) { + use lettre::message::header::ContentType; + use lettre::transport::smtp::authentication::Credentials; + use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor}; + + let status_str = match build.status { + BuildStatus::Completed => "SUCCESS", + BuildStatus::Failed => "FAILURE", + BuildStatus::Cancelled => "CANCELLED", + _ => "UNKNOWN", + }; + + let subject = format!( + "[FC] {} - {} ({})", + status_str, build.job_name, project.name + ); + + let body = format!( + "Build notification from FC CI\n\n\ + Project: {}\n\ + Job: {}\n\ + Status: {}\n\ + Derivation: {}\n\ + Output: {}\n\ + Build ID: {}\n", + project.name, + build.job_name, + status_str, + build.drv_path, + build.build_output_path.as_deref().unwrap_or("N/A"), + build.id, + ); + + for to_addr in &config.to_addresses { + let email = match Message::builder() + .from(match config.from_address.parse() { + Ok(addr) => addr, + Err(e) => { + error!("Invalid from address '{}': {e}", config.from_address); + return; + } + }) + .to(match to_addr.parse() { + Ok(addr) => addr, + Err(e) => { + warn!("Invalid to address '{to_addr}': {e}"); + continue; + } + }) + .subject(&subject) + .header(ContentType::TEXT_PLAIN) + .body(body.clone()) + { + Ok(e) => e, + Err(e) => { + error!("Failed to build email: {e}"); + continue; + } + }; + + let mut mailer_builder = if config.tls { + match AsyncSmtpTransport::::relay(&config.smtp_host) { + Ok(b) => b.port(config.smtp_port), + Err(e) => { + error!("Failed to create SMTP transport: {e}"); + return; + } + } + } else { + AsyncSmtpTransport::::builder_dangerous(&config.smtp_host) + .port(config.smtp_port) + }; + + if let (Some(user), Some(pass)) = (&config.smtp_user, &config.smtp_password) { + mailer_builder = + mailer_builder.credentials(Credentials::new(user.clone(), pass.clone())); + } + + let mailer = mailer_builder.build(); + + match mailer.send(email).await { + Ok(_) => { + info!(build_id = %build.id, to = to_addr, "Email notification sent"); + } + Err(e) => { + error!(build_id = %build.id, to = to_addr, "Failed to send email: {e}"); + } + } + } +} diff --git a/crates/common/src/repo/api_keys.rs b/crates/common/src/repo/api_keys.rs new file mode 100644 index 0000000..175f7e1 --- /dev/null +++ b/crates/common/src/repo/api_keys.rs @@ -0,0 +1,57 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::ApiKey; + +pub async fn create(pool: &PgPool, name: &str, key_hash: &str, role: &str) -> Result { + sqlx::query_as::<_, ApiKey>( + "INSERT INTO api_keys (name, key_hash, role) VALUES ($1, $2, $3) RETURNING *", + ) + .bind(name) + .bind(key_hash) + .bind(role) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("API key with this hash already exists")) + } + _ => CiError::Database(e), + }) +} + +pub async fn get_by_hash(pool: &PgPool, key_hash: &str) -> Result> { + sqlx::query_as::<_, ApiKey>("SELECT * FROM api_keys WHERE key_hash = $1") + .bind(key_hash) + .fetch_optional(pool) + .await + .map_err(CiError::Database) +} + +pub async fn list(pool: &PgPool) -> Result> { + sqlx::query_as::<_, ApiKey>("SELECT * FROM api_keys ORDER BY created_at DESC") + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM api_keys WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("API key {id} not found"))); + } + Ok(()) +} + +pub async fn touch_last_used(pool: &PgPool, id: Uuid) -> Result<()> { + sqlx::query("UPDATE api_keys SET last_used_at = NOW() WHERE id = $1") + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + Ok(()) +} diff --git a/crates/common/src/repo/build_dependencies.rs b/crates/common/src/repo/build_dependencies.rs new file mode 100644 index 0000000..6f278d2 --- /dev/null +++ b/crates/common/src/repo/build_dependencies.rs @@ -0,0 +1,79 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::BuildDependency; + +pub async fn create( + pool: &PgPool, + build_id: Uuid, + dependency_build_id: Uuid, +) -> Result { + sqlx::query_as::<_, BuildDependency>( + "INSERT INTO build_dependencies (build_id, dependency_build_id) VALUES ($1, $2) RETURNING *", + ) + .bind(build_id) + .bind(dependency_build_id) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!( + "Dependency from {build_id} to {dependency_build_id} already exists" + )) + } + _ => CiError::Database(e), + }) +} + +pub async fn list_for_build(pool: &PgPool, build_id: Uuid) -> Result> { + sqlx::query_as::<_, BuildDependency>("SELECT * FROM build_dependencies WHERE build_id = $1") + .bind(build_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// Batch check if all dependency builds are completed for multiple builds at once. +/// Returns a map from build_id to whether all deps are completed. +pub async fn check_deps_for_builds( + pool: &PgPool, + build_ids: &[Uuid], +) -> Result> { + if build_ids.is_empty() { + return Ok(std::collections::HashMap::new()); + } + + // Find build_ids that have incomplete deps + let rows: Vec<(Uuid,)> = sqlx::query_as( + "SELECT DISTINCT bd.build_id FROM build_dependencies bd \ + JOIN builds b ON bd.dependency_build_id = b.id \ + WHERE bd.build_id = ANY($1) AND b.status != 'completed'", + ) + .bind(build_ids) + .fetch_all(pool) + .await + .map_err(CiError::Database)?; + + let incomplete: std::collections::HashSet = rows.into_iter().map(|(id,)| id).collect(); + + Ok(build_ids + .iter() + .map(|id| (*id, !incomplete.contains(id))) + .collect()) +} + +/// Check if all dependency builds for a given build are completed. +pub async fn all_deps_completed(pool: &PgPool, build_id: Uuid) -> Result { + let row: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM build_dependencies bd \ + JOIN builds b ON bd.dependency_build_id = b.id \ + WHERE bd.build_id = $1 AND b.status != 'completed'", + ) + .bind(build_id) + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + + Ok(row.0 == 0) +} diff --git a/crates/common/src/repo/build_products.rs b/crates/common/src/repo/build_products.rs new file mode 100644 index 0000000..58e6b6d --- /dev/null +++ b/crates/common/src/repo/build_products.rs @@ -0,0 +1,40 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{BuildProduct, CreateBuildProduct}; + +pub async fn create(pool: &PgPool, input: CreateBuildProduct) -> Result { + sqlx::query_as::<_, BuildProduct>( + "INSERT INTO build_products (build_id, name, path, sha256_hash, file_size, content_type, is_directory) \ + VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *", + ) + .bind(input.build_id) + .bind(&input.name) + .bind(&input.path) + .bind(&input.sha256_hash) + .bind(input.file_size) + .bind(&input.content_type) + .bind(input.is_directory) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, BuildProduct>("SELECT * FROM build_products WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Build product {id} not found"))) +} + +pub async fn list_for_build(pool: &PgPool, build_id: Uuid) -> Result> { + sqlx::query_as::<_, BuildProduct>( + "SELECT * FROM build_products WHERE build_id = $1 ORDER BY created_at ASC", + ) + .bind(build_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} diff --git a/crates/common/src/repo/build_steps.rs b/crates/common/src/repo/build_steps.rs new file mode 100644 index 0000000..69ff7a1 --- /dev/null +++ b/crates/common/src/repo/build_steps.rs @@ -0,0 +1,54 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{BuildStep, CreateBuildStep}; + +pub async fn create(pool: &PgPool, input: CreateBuildStep) -> Result { + sqlx::query_as::<_, BuildStep>( + "INSERT INTO build_steps (build_id, step_number, command) VALUES ($1, $2, $3) RETURNING *", + ) + .bind(input.build_id) + .bind(input.step_number) + .bind(&input.command) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!( + "Build step {} already exists for this build", + input.step_number + )) + } + _ => CiError::Database(e), + }) +} + +pub async fn complete( + pool: &PgPool, + id: Uuid, + exit_code: i32, + output: Option<&str>, + error_output: Option<&str>, +) -> Result { + sqlx::query_as::<_, BuildStep>( + "UPDATE build_steps SET completed_at = NOW(), exit_code = $1, output = $2, error_output = $3 WHERE id = $4 RETURNING *", + ) + .bind(exit_code) + .bind(output) + .bind(error_output) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Build step {id} not found"))) +} + +pub async fn list_for_build(pool: &PgPool, build_id: Uuid) -> Result> { + sqlx::query_as::<_, BuildStep>( + "SELECT * FROM build_steps WHERE build_id = $1 ORDER BY step_number ASC", + ) + .bind(build_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} diff --git a/crates/common/src/repo/builds.rs b/crates/common/src/repo/builds.rs new file mode 100644 index 0000000..d532bfb --- /dev/null +++ b/crates/common/src/repo/builds.rs @@ -0,0 +1,292 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{Build, BuildStats, BuildStatus, CreateBuild}; + +pub async fn create(pool: &PgPool, input: CreateBuild) -> Result { + let is_aggregate = input.is_aggregate.unwrap_or(false); + sqlx::query_as::<_, Build>( + "INSERT INTO builds (evaluation_id, job_name, drv_path, status, system, outputs, is_aggregate, constituents) \ + VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7) RETURNING *", + ) + .bind(input.evaluation_id) + .bind(&input.job_name) + .bind(&input.drv_path) + .bind(&input.system) + .bind(&input.outputs) + .bind(is_aggregate) + .bind(&input.constituents) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!( + "Build for job '{}' already exists in this evaluation", + input.job_name + )) + } + _ => CiError::Database(e), + }) +} + +pub async fn get_completed_by_drv_path(pool: &PgPool, drv_path: &str) -> Result> { + sqlx::query_as::<_, Build>( + "SELECT * FROM builds WHERE drv_path = $1 AND status = 'completed' LIMIT 1", + ) + .bind(drv_path) + .fetch_optional(pool) + .await + .map_err(CiError::Database) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Build>("SELECT * FROM builds WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Build {id} not found"))) +} + +pub async fn list_for_evaluation(pool: &PgPool, evaluation_id: Uuid) -> Result> { + sqlx::query_as::<_, Build>( + "SELECT * FROM builds WHERE evaluation_id = $1 ORDER BY created_at DESC", + ) + .bind(evaluation_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn list_pending(pool: &PgPool, limit: i64) -> Result> { + sqlx::query_as::<_, Build>( + "SELECT * FROM builds WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT $1", + ) + .bind(limit) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// Atomically claim a pending build by setting it to running. +/// Returns `None` if the build was already claimed by another worker. +pub async fn start(pool: &PgPool, id: Uuid) -> Result> { + sqlx::query_as::<_, Build>( + "UPDATE builds SET status = 'running', started_at = NOW() WHERE id = $1 AND status = 'pending' RETURNING *", + ) + .bind(id) + .fetch_optional(pool) + .await + .map_err(CiError::Database) +} + +pub async fn complete( + pool: &PgPool, + id: Uuid, + status: BuildStatus, + log_path: Option<&str>, + build_output_path: Option<&str>, + error_message: Option<&str>, +) -> Result { + sqlx::query_as::<_, Build>( + "UPDATE builds SET status = $1, completed_at = NOW(), log_path = $2, build_output_path = $3, error_message = $4 WHERE id = $5 RETURNING *", + ) + .bind(status) + .bind(log_path) + .bind(build_output_path) + .bind(error_message) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Build {id} not found"))) +} + +pub async fn list_recent(pool: &PgPool, limit: i64) -> Result> { + sqlx::query_as::<_, Build>("SELECT * FROM builds ORDER BY created_at DESC LIMIT $1") + .bind(limit) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn list_for_project(pool: &PgPool, project_id: Uuid) -> Result> { + sqlx::query_as::<_, Build>( + "SELECT b.* FROM builds b \ + JOIN evaluations e ON b.evaluation_id = e.id \ + JOIN jobsets j ON e.jobset_id = j.id \ + WHERE j.project_id = $1 \ + ORDER BY b.created_at DESC", + ) + .bind(project_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn get_stats(pool: &PgPool) -> Result { + sqlx::query_as::<_, BuildStats>("SELECT * FROM build_stats") + .fetch_optional(pool) + .await + .map_err(CiError::Database) + .map(|opt| opt.unwrap_or_default()) +} + +/// Reset builds that were left in 'running' state (orphaned by a crashed runner). +/// Limited to 50 builds per call to prevent thundering herd. +pub async fn reset_orphaned(pool: &PgPool, older_than_secs: i64) -> Result { + let result = sqlx::query( + "UPDATE builds SET status = 'pending', started_at = NULL \ + WHERE id IN (SELECT id FROM builds WHERE status = 'running' \ + AND started_at < NOW() - make_interval(secs => $1) LIMIT 50)", + ) + .bind(older_than_secs) + .execute(pool) + .await + .map_err(CiError::Database)?; + + Ok(result.rows_affected()) +} + +/// List builds with optional evaluation_id, status, system, and job_name filters, with pagination. +pub async fn list_filtered( + pool: &PgPool, + evaluation_id: Option, + status: Option<&str>, + system: Option<&str>, + job_name: Option<&str>, + limit: i64, + offset: i64, +) -> Result> { + sqlx::query_as::<_, Build>( + "SELECT * FROM builds \ + WHERE ($1::uuid IS NULL OR evaluation_id = $1) \ + AND ($2::text IS NULL OR status = $2) \ + AND ($3::text IS NULL OR system = $3) \ + AND ($4::text IS NULL OR job_name ILIKE '%' || $4 || '%') \ + ORDER BY created_at DESC LIMIT $5 OFFSET $6", + ) + .bind(evaluation_id) + .bind(status) + .bind(system) + .bind(job_name) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn count_filtered( + pool: &PgPool, + evaluation_id: Option, + status: Option<&str>, + system: Option<&str>, + job_name: Option<&str>, +) -> Result { + let row: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM builds \ + WHERE ($1::uuid IS NULL OR evaluation_id = $1) \ + AND ($2::text IS NULL OR status = $2) \ + AND ($3::text IS NULL OR system = $3) \ + AND ($4::text IS NULL OR job_name ILIKE '%' || $4 || '%')", + ) + .bind(evaluation_id) + .bind(status) + .bind(system) + .bind(job_name) + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(row.0) +} + +pub async fn cancel(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Build>( + "UPDATE builds SET status = 'cancelled', completed_at = NOW() WHERE id = $1 AND status IN ('pending', 'running') RETURNING *", + ) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| { + CiError::NotFound(format!( + "Build {id} not found or not in a cancellable state" + )) + }) +} + +/// Cancel a build and all its transitive dependents. +pub async fn cancel_cascade(pool: &PgPool, id: Uuid) -> Result> { + let mut cancelled = Vec::new(); + + // Cancel the target build + if let Ok(build) = cancel(pool, id).await { + cancelled.push(build); + } + + // Find and cancel all dependents recursively + let mut to_cancel: Vec = vec![id]; + while let Some(build_id) = to_cancel.pop() { + let dependents: Vec<(Uuid,)> = sqlx::query_as( + "SELECT build_id FROM build_dependencies WHERE dependency_build_id = $1", + ) + .bind(build_id) + .fetch_all(pool) + .await + .map_err(CiError::Database)?; + + for (dep_id,) in dependents { + if let Ok(build) = cancel(pool, dep_id).await { + to_cancel.push(dep_id); + cancelled.push(build); + } + } + } + + Ok(cancelled) +} + +/// Mark a build's outputs as signed. +pub async fn mark_signed(pool: &PgPool, id: Uuid) -> Result<()> { + sqlx::query("UPDATE builds SET signed = true WHERE id = $1") + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + Ok(()) +} + +/// Batch-fetch completed builds by derivation paths. +/// Returns a map from drv_path to Build for deduplication. +pub async fn get_completed_by_drv_paths( + pool: &PgPool, + drv_paths: &[String], +) -> Result> { + if drv_paths.is_empty() { + return Ok(std::collections::HashMap::new()); + } + let builds = sqlx::query_as::<_, Build>( + "SELECT DISTINCT ON (drv_path) * FROM builds \ + WHERE drv_path = ANY($1) AND status = 'completed' \ + ORDER BY drv_path, completed_at DESC", + ) + .bind(drv_paths) + .fetch_all(pool) + .await + .map_err(CiError::Database)?; + + Ok(builds + .into_iter() + .map(|b| (b.drv_path.clone(), b)) + .collect()) +} + +/// Set the builder_id for a build. +pub async fn set_builder(pool: &PgPool, id: Uuid, builder_id: Uuid) -> Result<()> { + sqlx::query("UPDATE builds SET builder_id = $1 WHERE id = $2") + .bind(builder_id) + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + Ok(()) +} diff --git a/crates/common/src/repo/channels.rs b/crates/common/src/repo/channels.rs new file mode 100644 index 0000000..48ffde6 --- /dev/null +++ b/crates/common/src/repo/channels.rs @@ -0,0 +1,111 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{Channel, CreateChannel}; + +pub async fn create(pool: &PgPool, input: CreateChannel) -> Result { + sqlx::query_as::<_, Channel>( + "INSERT INTO channels (project_id, name, jobset_id) \ + VALUES ($1, $2, $3) RETURNING *", + ) + .bind(input.project_id) + .bind(&input.name) + .bind(input.jobset_id) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => CiError::Conflict( + format!("Channel '{}' already exists for this project", input.name), + ), + _ => CiError::Database(e), + }) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Channel>("SELECT * FROM channels WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Channel {id} not found"))) +} + +pub async fn list_for_project(pool: &PgPool, project_id: Uuid) -> Result> { + sqlx::query_as::<_, Channel>("SELECT * FROM channels WHERE project_id = $1 ORDER BY name") + .bind(project_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn list_all(pool: &PgPool) -> Result> { + sqlx::query_as::<_, Channel>("SELECT * FROM channels ORDER BY name") + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// Promote an evaluation to a channel (set it as the current evaluation). +pub async fn promote(pool: &PgPool, channel_id: Uuid, evaluation_id: Uuid) -> Result { + sqlx::query_as::<_, Channel>( + "UPDATE channels SET current_evaluation_id = $1, updated_at = NOW() \ + WHERE id = $2 RETURNING *", + ) + .bind(evaluation_id) + .bind(channel_id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Channel {channel_id} not found"))) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM channels WHERE id = $1") + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Channel {id} not found"))); + } + Ok(()) +} + +/// Find the channel for a jobset and auto-promote if all builds in the evaluation succeeded. +pub async fn auto_promote_if_complete( + pool: &PgPool, + jobset_id: Uuid, + evaluation_id: Uuid, +) -> Result<()> { + // Check if all builds for this evaluation are completed + let row: (i64, i64) = sqlx::query_as( + "SELECT COUNT(*), COUNT(*) FILTER (WHERE status = 'completed') \ + FROM builds WHERE evaluation_id = $1", + ) + .bind(evaluation_id) + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + + let (total, completed) = row; + if total == 0 || total != completed { + return Ok(()); + } + + // All builds completed — promote to any channels tracking this jobset + let channels = sqlx::query_as::<_, Channel>("SELECT * FROM channels WHERE jobset_id = $1") + .bind(jobset_id) + .fetch_all(pool) + .await + .map_err(CiError::Database)?; + + for channel in channels { + let _ = promote(pool, channel.id, evaluation_id).await; + tracing::info!( + channel = %channel.name, + evaluation_id = %evaluation_id, + "Auto-promoted evaluation to channel" + ); + } + + Ok(()) +} diff --git a/crates/common/src/repo/evaluations.rs b/crates/common/src/repo/evaluations.rs new file mode 100644 index 0000000..b5d7ab9 --- /dev/null +++ b/crates/common/src/repo/evaluations.rs @@ -0,0 +1,146 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{CreateEvaluation, Evaluation, EvaluationStatus}; + +pub async fn create(pool: &PgPool, input: CreateEvaluation) -> Result { + sqlx::query_as::<_, Evaluation>( + "INSERT INTO evaluations (jobset_id, commit_hash, status) VALUES ($1, $2, 'pending') RETURNING *", + ) + .bind(input.jobset_id) + .bind(&input.commit_hash) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!( + "Evaluation for commit '{}' already exists in this jobset", + input.commit_hash + )) + } + _ => CiError::Database(e), + }) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Evaluation>("SELECT * FROM evaluations WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Evaluation {id} not found"))) +} + +pub async fn list_for_jobset(pool: &PgPool, jobset_id: Uuid) -> Result> { + sqlx::query_as::<_, Evaluation>( + "SELECT * FROM evaluations WHERE jobset_id = $1 ORDER BY evaluation_time DESC", + ) + .bind(jobset_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// List evaluations with optional jobset_id and status filters, with pagination. +pub async fn list_filtered( + pool: &PgPool, + jobset_id: Option, + status: Option<&str>, + limit: i64, + offset: i64, +) -> Result> { + sqlx::query_as::<_, Evaluation>( + "SELECT * FROM evaluations \ + WHERE ($1::uuid IS NULL OR jobset_id = $1) \ + AND ($2::text IS NULL OR status = $2) \ + ORDER BY evaluation_time DESC LIMIT $3 OFFSET $4", + ) + .bind(jobset_id) + .bind(status) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn count_filtered( + pool: &PgPool, + jobset_id: Option, + status: Option<&str>, +) -> Result { + let row: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM evaluations \ + WHERE ($1::uuid IS NULL OR jobset_id = $1) \ + AND ($2::text IS NULL OR status = $2)", + ) + .bind(jobset_id) + .bind(status) + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(row.0) +} + +pub async fn update_status( + pool: &PgPool, + id: Uuid, + status: EvaluationStatus, + error_message: Option<&str>, +) -> Result { + sqlx::query_as::<_, Evaluation>( + "UPDATE evaluations SET status = $1, error_message = $2 WHERE id = $3 RETURNING *", + ) + .bind(status) + .bind(error_message) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Evaluation {id} not found"))) +} + +pub async fn get_latest(pool: &PgPool, jobset_id: Uuid) -> Result> { + sqlx::query_as::<_, Evaluation>( + "SELECT * FROM evaluations WHERE jobset_id = $1 ORDER BY evaluation_time DESC LIMIT 1", + ) + .bind(jobset_id) + .fetch_optional(pool) + .await + .map_err(CiError::Database) +} + +/// Set the inputs hash for an evaluation (used for eval caching). +pub async fn set_inputs_hash(pool: &PgPool, id: Uuid, hash: &str) -> Result<()> { + sqlx::query("UPDATE evaluations SET inputs_hash = $1 WHERE id = $2") + .bind(hash) + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + Ok(()) +} + +/// Check if an evaluation with the same inputs_hash already exists for this jobset. +pub async fn get_by_inputs_hash( + pool: &PgPool, + jobset_id: Uuid, + inputs_hash: &str, +) -> Result> { + sqlx::query_as::<_, Evaluation>( + "SELECT * FROM evaluations WHERE jobset_id = $1 AND inputs_hash = $2 \ + AND status = 'completed' ORDER BY evaluation_time DESC LIMIT 1", + ) + .bind(jobset_id) + .bind(inputs_hash) + .fetch_optional(pool) + .await + .map_err(CiError::Database) +} + +pub async fn count(pool: &PgPool) -> Result { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM evaluations") + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(row.0) +} diff --git a/crates/common/src/repo/jobset_inputs.rs b/crates/common/src/repo/jobset_inputs.rs new file mode 100644 index 0000000..d2245ef --- /dev/null +++ b/crates/common/src/repo/jobset_inputs.rs @@ -0,0 +1,52 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::JobsetInput; + +pub async fn create( + pool: &PgPool, + jobset_id: Uuid, + name: &str, + input_type: &str, + value: &str, + revision: Option<&str>, +) -> Result { + sqlx::query_as::<_, JobsetInput>( + "INSERT INTO jobset_inputs (jobset_id, name, input_type, value, revision) VALUES ($1, $2, $3, $4, $5) RETURNING *", + ) + .bind(jobset_id) + .bind(name) + .bind(input_type) + .bind(value) + .bind(revision) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("Input '{name}' already exists in this jobset")) + } + _ => CiError::Database(e), + }) +} + +pub async fn list_for_jobset(pool: &PgPool, jobset_id: Uuid) -> Result> { + sqlx::query_as::<_, JobsetInput>( + "SELECT * FROM jobset_inputs WHERE jobset_id = $1 ORDER BY name ASC", + ) + .bind(jobset_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM jobset_inputs WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Jobset input {id} not found"))); + } + Ok(()) +} diff --git a/crates/common/src/repo/jobsets.rs b/crates/common/src/repo/jobsets.rs new file mode 100644 index 0000000..deecd4b --- /dev/null +++ b/crates/common/src/repo/jobsets.rs @@ -0,0 +1,137 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{ActiveJobset, CreateJobset, Jobset, UpdateJobset}; + +pub async fn create(pool: &PgPool, input: CreateJobset) -> Result { + let enabled = input.enabled.unwrap_or(true); + let flake_mode = input.flake_mode.unwrap_or(true); + let check_interval = input.check_interval.unwrap_or(60); + + sqlx::query_as::<_, Jobset>( + "INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *", + ) + .bind(input.project_id) + .bind(&input.name) + .bind(&input.nix_expression) + .bind(enabled) + .bind(flake_mode) + .bind(check_interval) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("Jobset '{}' already exists in this project", input.name)) + } + _ => CiError::Database(e), + }) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Jobset>("SELECT * FROM jobsets WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Jobset {id} not found"))) +} + +pub async fn list_for_project( + pool: &PgPool, + project_id: Uuid, + limit: i64, + offset: i64, +) -> Result> { + sqlx::query_as::<_, Jobset>( + "SELECT * FROM jobsets WHERE project_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3", + ) + .bind(project_id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn count_for_project(pool: &PgPool, project_id: Uuid) -> Result { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM jobsets WHERE project_id = $1") + .bind(project_id) + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(row.0) +} + +pub async fn update(pool: &PgPool, id: Uuid, input: UpdateJobset) -> Result { + let existing = get(pool, id).await?; + + let name = input.name.unwrap_or(existing.name); + let nix_expression = input.nix_expression.unwrap_or(existing.nix_expression); + let enabled = input.enabled.unwrap_or(existing.enabled); + let flake_mode = input.flake_mode.unwrap_or(existing.flake_mode); + let check_interval = input.check_interval.unwrap_or(existing.check_interval); + + sqlx::query_as::<_, Jobset>( + "UPDATE jobsets SET name = $1, nix_expression = $2, enabled = $3, flake_mode = $4, check_interval = $5 WHERE id = $6 RETURNING *", + ) + .bind(&name) + .bind(&nix_expression) + .bind(enabled) + .bind(flake_mode) + .bind(check_interval) + .bind(id) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("Jobset '{name}' already exists in this project")) + } + _ => CiError::Database(e), + }) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM jobsets WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Jobset {id} not found"))); + } + + Ok(()) +} + +pub async fn upsert(pool: &PgPool, input: CreateJobset) -> Result { + let enabled = input.enabled.unwrap_or(true); + let flake_mode = input.flake_mode.unwrap_or(true); + let check_interval = input.check_interval.unwrap_or(60); + + sqlx::query_as::<_, Jobset>( + "INSERT INTO jobsets (project_id, name, nix_expression, enabled, flake_mode, check_interval) \ + VALUES ($1, $2, $3, $4, $5, $6) \ + ON CONFLICT (project_id, name) DO UPDATE SET \ + nix_expression = EXCLUDED.nix_expression, \ + enabled = EXCLUDED.enabled, \ + flake_mode = EXCLUDED.flake_mode, \ + check_interval = EXCLUDED.check_interval \ + RETURNING *", + ) + .bind(input.project_id) + .bind(&input.name) + .bind(&input.nix_expression) + .bind(enabled) + .bind(flake_mode) + .bind(check_interval) + .fetch_one(pool) + .await + .map_err(CiError::Database) +} + +pub async fn list_active(pool: &PgPool) -> Result> { + sqlx::query_as::<_, ActiveJobset>("SELECT * FROM active_jobsets") + .fetch_all(pool) + .await + .map_err(CiError::Database) +} diff --git a/crates/common/src/repo/mod.rs b/crates/common/src/repo/mod.rs new file mode 100644 index 0000000..86b383f --- /dev/null +++ b/crates/common/src/repo/mod.rs @@ -0,0 +1,13 @@ +pub mod api_keys; +pub mod build_dependencies; +pub mod build_products; +pub mod build_steps; +pub mod builds; +pub mod channels; +pub mod evaluations; +pub mod jobset_inputs; +pub mod jobsets; +pub mod notification_configs; +pub mod projects; +pub mod remote_builders; +pub mod webhook_configs; diff --git a/crates/common/src/repo/notification_configs.rs b/crates/common/src/repo/notification_configs.rs new file mode 100644 index 0000000..035fe09 --- /dev/null +++ b/crates/common/src/repo/notification_configs.rs @@ -0,0 +1,48 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{CreateNotificationConfig, NotificationConfig}; + +pub async fn create(pool: &PgPool, input: CreateNotificationConfig) -> Result { + sqlx::query_as::<_, NotificationConfig>( + "INSERT INTO notification_configs (project_id, notification_type, config) VALUES ($1, $2, $3) RETURNING *", + ) + .bind(input.project_id) + .bind(&input.notification_type) + .bind(&input.config) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!( + "Notification config '{}' already exists for this project", + input.notification_type + )) + } + _ => CiError::Database(e), + }) +} + +pub async fn list_for_project(pool: &PgPool, project_id: Uuid) -> Result> { + sqlx::query_as::<_, NotificationConfig>( + "SELECT * FROM notification_configs WHERE project_id = $1 AND enabled = true ORDER BY created_at DESC", + ) + .bind(project_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM notification_configs WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!( + "Notification config {id} not found" + ))); + } + Ok(()) +} diff --git a/crates/common/src/repo/projects.rs b/crates/common/src/repo/projects.rs new file mode 100644 index 0000000..3aa7cec --- /dev/null +++ b/crates/common/src/repo/projects.rs @@ -0,0 +1,95 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{CreateProject, Project, UpdateProject}; + +pub async fn create(pool: &PgPool, input: CreateProject) -> Result { + sqlx::query_as::<_, Project>( + "INSERT INTO projects (name, description, repository_url) VALUES ($1, $2, $3) RETURNING *", + ) + .bind(&input.name) + .bind(&input.description) + .bind(&input.repository_url) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("Project '{}' already exists", input.name)) + } + _ => CiError::Database(e), + }) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, Project>("SELECT * FROM projects WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Project {id} not found"))) +} + +pub async fn get_by_name(pool: &PgPool, name: &str) -> Result { + sqlx::query_as::<_, Project>("SELECT * FROM projects WHERE name = $1") + .bind(name) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Project '{name}' not found"))) +} + +pub async fn list(pool: &PgPool, limit: i64, offset: i64) -> Result> { + sqlx::query_as::<_, Project>( + "SELECT * FROM projects ORDER BY created_at DESC LIMIT $1 OFFSET $2", + ) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn count(pool: &PgPool) -> Result { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM projects") + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(row.0) +} + +pub async fn update(pool: &PgPool, id: Uuid, input: UpdateProject) -> Result { + // Build dynamic update — only set provided fields + let existing = get(pool, id).await?; + + let name = input.name.unwrap_or(existing.name); + let description = input.description.or(existing.description); + let repository_url = input.repository_url.unwrap_or(existing.repository_url); + + sqlx::query_as::<_, Project>( + "UPDATE projects SET name = $1, description = $2, repository_url = $3 WHERE id = $4 RETURNING *", + ) + .bind(&name) + .bind(&description) + .bind(&repository_url) + .bind(id) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("Project '{name}' already exists")) + } + _ => CiError::Database(e), + }) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM projects WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Project {id} not found"))); + } + + Ok(()) +} diff --git a/crates/common/src/repo/remote_builders.rs b/crates/common/src/repo/remote_builders.rs new file mode 100644 index 0000000..3baf68a --- /dev/null +++ b/crates/common/src/repo/remote_builders.rs @@ -0,0 +1,124 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{CreateRemoteBuilder, RemoteBuilder}; + +pub async fn create(pool: &PgPool, input: CreateRemoteBuilder) -> Result { + sqlx::query_as::<_, RemoteBuilder>( + "INSERT INTO remote_builders (name, ssh_uri, systems, max_jobs, speed_factor, \ + supported_features, mandatory_features, public_host_key, ssh_key_file) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *", + ) + .bind(&input.name) + .bind(&input.ssh_uri) + .bind(&input.systems) + .bind(input.max_jobs.unwrap_or(1)) + .bind(input.speed_factor.unwrap_or(1)) + .bind(input.supported_features.as_deref().unwrap_or(&[])) + .bind(input.mandatory_features.as_deref().unwrap_or(&[])) + .bind(&input.public_host_key) + .bind(&input.ssh_key_file) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!("Remote builder '{}' already exists", input.name)) + } + _ => CiError::Database(e), + }) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, RemoteBuilder>("SELECT * FROM remote_builders WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Remote builder {id} not found"))) +} + +pub async fn list(pool: &PgPool) -> Result> { + sqlx::query_as::<_, RemoteBuilder>( + "SELECT * FROM remote_builders ORDER BY speed_factor DESC, name", + ) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn list_enabled(pool: &PgPool) -> Result> { + sqlx::query_as::<_, RemoteBuilder>( + "SELECT * FROM remote_builders WHERE enabled = true ORDER BY speed_factor DESC, name", + ) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +/// Find a suitable builder for the given system. +pub async fn find_for_system(pool: &PgPool, system: &str) -> Result> { + sqlx::query_as::<_, RemoteBuilder>( + "SELECT * FROM remote_builders WHERE enabled = true AND $1 = ANY(systems) \ + ORDER BY speed_factor DESC", + ) + .bind(system) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn update( + pool: &PgPool, + id: Uuid, + input: crate::models::UpdateRemoteBuilder, +) -> Result { + // Build dynamic update — use COALESCE pattern + sqlx::query_as::<_, RemoteBuilder>( + "UPDATE remote_builders SET \ + name = COALESCE($1, name), \ + ssh_uri = COALESCE($2, ssh_uri), \ + systems = COALESCE($3, systems), \ + max_jobs = COALESCE($4, max_jobs), \ + speed_factor = COALESCE($5, speed_factor), \ + supported_features = COALESCE($6, supported_features), \ + mandatory_features = COALESCE($7, mandatory_features), \ + enabled = COALESCE($8, enabled), \ + public_host_key = COALESCE($9, public_host_key), \ + ssh_key_file = COALESCE($10, ssh_key_file) \ + WHERE id = $11 RETURNING *", + ) + .bind(&input.name) + .bind(&input.ssh_uri) + .bind(&input.systems) + .bind(input.max_jobs) + .bind(input.speed_factor) + .bind(&input.supported_features) + .bind(&input.mandatory_features) + .bind(input.enabled) + .bind(&input.public_host_key) + .bind(&input.ssh_key_file) + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Remote builder {id} not found"))) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM remote_builders WHERE id = $1") + .bind(id) + .execute(pool) + .await + .map_err(CiError::Database)?; + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Remote builder {id} not found"))); + } + Ok(()) +} + +pub async fn count(pool: &PgPool) -> Result { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM remote_builders") + .fetch_one(pool) + .await + .map_err(CiError::Database)?; + Ok(row.0) +} diff --git a/crates/common/src/repo/webhook_configs.rs b/crates/common/src/repo/webhook_configs.rs new file mode 100644 index 0000000..5a90afb --- /dev/null +++ b/crates/common/src/repo/webhook_configs.rs @@ -0,0 +1,73 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::error::{CiError, Result}; +use crate::models::{CreateWebhookConfig, WebhookConfig}; + +pub async fn create( + pool: &PgPool, + input: CreateWebhookConfig, + secret_hash: Option<&str>, +) -> Result { + sqlx::query_as::<_, WebhookConfig>( + "INSERT INTO webhook_configs (project_id, forge_type, secret_hash) VALUES ($1, $2, $3) RETURNING *", + ) + .bind(input.project_id) + .bind(&input.forge_type) + .bind(secret_hash) + .fetch_one(pool) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.is_unique_violation() => { + CiError::Conflict(format!( + "Webhook config for forge '{}' already exists for this project", + input.forge_type + )) + } + _ => CiError::Database(e), + }) +} + +pub async fn get(pool: &PgPool, id: Uuid) -> Result { + sqlx::query_as::<_, WebhookConfig>("SELECT * FROM webhook_configs WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| CiError::NotFound(format!("Webhook config {id} not found"))) +} + +pub async fn list_for_project(pool: &PgPool, project_id: Uuid) -> Result> { + sqlx::query_as::<_, WebhookConfig>( + "SELECT * FROM webhook_configs WHERE project_id = $1 ORDER BY created_at DESC", + ) + .bind(project_id) + .fetch_all(pool) + .await + .map_err(CiError::Database) +} + +pub async fn get_by_project_and_forge( + pool: &PgPool, + project_id: Uuid, + forge_type: &str, +) -> Result> { + sqlx::query_as::<_, WebhookConfig>( + "SELECT * FROM webhook_configs WHERE project_id = $1 AND forge_type = $2 AND enabled = true", + ) + .bind(project_id) + .bind(forge_type) + .fetch_optional(pool) + .await + .map_err(CiError::Database) +} + +pub async fn delete(pool: &PgPool, id: Uuid) -> Result<()> { + let result = sqlx::query("DELETE FROM webhook_configs WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + if result.rows_affected() == 0 { + return Err(CiError::NotFound(format!("Webhook config {id} not found"))); + } + Ok(()) +} diff --git a/crates/common/src/validate.rs b/crates/common/src/validate.rs new file mode 100644 index 0000000..cc68592 --- /dev/null +++ b/crates/common/src/validate.rs @@ -0,0 +1,596 @@ +//! Input validation helpers + +use regex::Regex; +use std::sync::LazyLock; + +/// Validate that a path is a valid nix store path. +/// Rejects path traversal, overly long paths, and non-store paths. +pub fn is_valid_store_path(path: &str) -> bool { + path.starts_with("/nix/store/") && !path.contains("..") && path.len() < 512 +} + +/// Validate that a string is a valid nix store hash (32 lowercase alphanumeric chars). +pub fn is_valid_nix_hash(hash: &str) -> bool { + hash.len() == 32 + && hash + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()) +} + +// --- Validation trait and helpers --- + +static NAME_RE: LazyLock = + LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$").unwrap()); + +static COMMIT_HASH_RE: LazyLock = + LazyLock::new(|| Regex::new(r"^[0-9a-fA-F]{1,64}$").unwrap()); + +static SYSTEM_RE: LazyLock = LazyLock::new(|| Regex::new(r"^\w+-\w+$").unwrap()); + +const VALID_REPO_PREFIXES: &[&str] = &["https://", "http://", "git://", "ssh://"]; +const VALID_FORGE_TYPES: &[&str] = &["github", "gitea", "forgejo", "gitlab"]; + +/// Trait for validating request DTOs before persisting. +pub trait Validate { + fn validate(&self) -> Result<(), String>; +} + +fn validate_name(name: &str, field: &str) -> Result<(), String> { + if name.is_empty() || name.len() > 255 { + return Err(format!("{field} must be between 1 and 255 characters")); + } + if !NAME_RE.is_match(name) { + return Err(format!( + "{field} must start with alphanumeric and contain only [a-zA-Z0-9_-]" + )); + } + Ok(()) +} + +fn validate_repository_url(url: &str) -> Result<(), String> { + if url.is_empty() { + return Err("repository_url cannot be empty".to_string()); + } + if url.len() > 2048 { + return Err("repository_url must be at most 2048 characters".to_string()); + } + if !VALID_REPO_PREFIXES.iter().any(|p| url.starts_with(p)) { + return Err( + "repository_url must start with https://, http://, git://, or ssh://".to_string(), + ); + } + Ok(()) +} + +fn validate_description(desc: &str) -> Result<(), String> { + if desc.len() > 4096 { + return Err("description must be at most 4096 characters".to_string()); + } + Ok(()) +} + +fn validate_nix_expression(expr: &str) -> Result<(), String> { + if expr.is_empty() { + return Err("nix_expression cannot be empty".to_string()); + } + if expr.len() > 1024 { + return Err("nix_expression must be at most 1024 characters".to_string()); + } + if expr.contains('\0') { + return Err("nix_expression must not contain null bytes".to_string()); + } + Ok(()) +} + +fn validate_check_interval(interval: i32) -> Result<(), String> { + if !(10..=86400).contains(&interval) { + return Err("check_interval must be between 10 and 86400".to_string()); + } + Ok(()) +} + +fn validate_commit_hash(hash: &str) -> Result<(), String> { + if !COMMIT_HASH_RE.is_match(hash) { + return Err("commit_hash must be 1-64 hex characters".to_string()); + } + Ok(()) +} + +fn validate_drv_path(path: &str) -> Result<(), String> { + if !is_valid_store_path(path) { + return Err("drv_path must be a valid nix store path".to_string()); + } + Ok(()) +} + +fn validate_system(system: &str) -> Result<(), String> { + if !SYSTEM_RE.is_match(system) { + return Err("system must match pattern like x86_64-linux".to_string()); + } + Ok(()) +} + +fn validate_ssh_uri(uri: &str) -> Result<(), String> { + if uri.is_empty() { + return Err("ssh_uri cannot be empty".to_string()); + } + if uri.len() > 2048 { + return Err("ssh_uri must be at most 2048 characters".to_string()); + } + Ok(()) +} + +fn validate_positive_i32(val: i32, field: &str) -> Result<(), String> { + if val < 1 { + return Err(format!("{field} must be >= 1")); + } + Ok(()) +} + +fn validate_forge_type(forge_type: &str) -> Result<(), String> { + if !VALID_FORGE_TYPES.contains(&forge_type) { + return Err(format!( + "forge_type must be one of: {}", + VALID_FORGE_TYPES.join(", ") + )); + } + Ok(()) +} + +// --- Implementations --- + +use crate::models::*; + +impl Validate for CreateProject { + fn validate(&self) -> Result<(), String> { + validate_name(&self.name, "name")?; + validate_repository_url(&self.repository_url)?; + if let Some(ref desc) = self.description { + validate_description(desc)?; + } + Ok(()) + } +} + +impl Validate for UpdateProject { + fn validate(&self) -> Result<(), String> { + if let Some(ref name) = self.name { + validate_name(name, "name")?; + } + if let Some(ref url) = self.repository_url { + validate_repository_url(url)?; + } + if let Some(ref desc) = self.description { + validate_description(desc)?; + } + Ok(()) + } +} + +impl Validate for CreateJobset { + fn validate(&self) -> Result<(), String> { + validate_name(&self.name, "name")?; + validate_nix_expression(&self.nix_expression)?; + if let Some(interval) = self.check_interval { + validate_check_interval(interval)?; + } + Ok(()) + } +} + +impl Validate for UpdateJobset { + fn validate(&self) -> Result<(), String> { + if let Some(ref name) = self.name { + validate_name(name, "name")?; + } + if let Some(ref expr) = self.nix_expression { + validate_nix_expression(expr)?; + } + if let Some(interval) = self.check_interval { + validate_check_interval(interval)?; + } + Ok(()) + } +} + +impl Validate for CreateEvaluation { + fn validate(&self) -> Result<(), String> { + validate_commit_hash(&self.commit_hash)?; + Ok(()) + } +} + +impl Validate for CreateBuild { + fn validate(&self) -> Result<(), String> { + validate_drv_path(&self.drv_path)?; + if let Some(ref system) = self.system { + validate_system(system)?; + } + Ok(()) + } +} + +impl Validate for CreateChannel { + fn validate(&self) -> Result<(), String> { + validate_name(&self.name, "name")?; + Ok(()) + } +} + +impl Validate for UpdateChannel { + fn validate(&self) -> Result<(), String> { + if let Some(ref name) = self.name { + validate_name(name, "name")?; + } + Ok(()) + } +} + +impl Validate for CreateRemoteBuilder { + fn validate(&self) -> Result<(), String> { + validate_name(&self.name, "name")?; + validate_ssh_uri(&self.ssh_uri)?; + if self.systems.is_empty() { + return Err("systems must not be empty".to_string()); + } + for system in &self.systems { + validate_system(system)?; + } + if let Some(max_jobs) = self.max_jobs { + validate_positive_i32(max_jobs, "max_jobs")?; + } + if let Some(speed_factor) = self.speed_factor { + validate_positive_i32(speed_factor, "speed_factor")?; + } + Ok(()) + } +} + +impl Validate for UpdateRemoteBuilder { + fn validate(&self) -> Result<(), String> { + if let Some(ref name) = self.name { + validate_name(name, "name")?; + } + if let Some(ref uri) = self.ssh_uri { + validate_ssh_uri(uri)?; + } + if let Some(ref systems) = self.systems { + if systems.is_empty() { + return Err("systems must not be empty".to_string()); + } + for system in systems { + validate_system(system)?; + } + } + if let Some(max_jobs) = self.max_jobs { + validate_positive_i32(max_jobs, "max_jobs")?; + } + if let Some(speed_factor) = self.speed_factor { + validate_positive_i32(speed_factor, "speed_factor")?; + } + Ok(()) + } +} + +impl Validate for CreateWebhookConfig { + fn validate(&self) -> Result<(), String> { + validate_forge_type(&self.forge_type)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + // --- is_valid_store_path --- + + #[test] + fn valid_store_path() { + assert!(is_valid_store_path( + "/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-hello-2.12" + )); + } + + #[test] + fn valid_store_path_nested() { + assert!(is_valid_store_path( + "/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-hello-2.12/bin/hello" + )); + } + + #[test] + fn store_path_rejects_path_traversal() { + assert!(!is_valid_store_path( + "/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-hello/../../../etc/passwd" + )); + } + + #[test] + fn store_path_rejects_relative_path() { + assert!(!is_valid_store_path("nix/store/something")); + } + + #[test] + fn store_path_rejects_wrong_prefix() { + assert!(!is_valid_store_path("/tmp/nix/store/something")); + assert!(!is_valid_store_path("/etc/passwd")); + assert!(!is_valid_store_path("/nix/var/something")); + } + + #[test] + fn store_path_rejects_empty() { + assert!(!is_valid_store_path("")); + } + + #[test] + fn store_path_rejects_just_prefix() { + // "/nix/store/" alone has no hash, but structurally starts_with and has no .., + // so it passes. This is fine — the DB lookup won't find anything for it. + assert!(is_valid_store_path("/nix/store/")); + } + + #[test] + fn store_path_rejects_overly_long() { + let long_path = format!("/nix/store/{}", "a".repeat(512)); + assert!(!is_valid_store_path(&long_path)); + } + + #[test] + fn store_path_rejects_double_dot_embedded() { + assert!(!is_valid_store_path("/nix/store/abc..def")); + } + + // --- is_valid_nix_hash --- + + #[test] + fn valid_nix_hash_lowercase_alpha() { + assert!(is_valid_nix_hash("abcdefghijklmnopqrstuvwxyzabcdef")); + } + + #[test] + fn valid_nix_hash_digits() { + assert!(is_valid_nix_hash("01234567890123456789012345678901")); + } + + #[test] + fn valid_nix_hash_mixed() { + assert!(is_valid_nix_hash("a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6")); + } + + #[test] + fn nix_hash_rejects_uppercase() { + assert!(!is_valid_nix_hash("ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEF")); + } + + #[test] + fn nix_hash_rejects_mixed_case() { + assert!(!is_valid_nix_hash("abcdefghijklmnopqrstuvwxyzAbcdeF")); + } + + #[test] + fn nix_hash_rejects_too_short() { + assert!(!is_valid_nix_hash("abcdef1234567890")); + } + + #[test] + fn nix_hash_rejects_too_long() { + assert!(!is_valid_nix_hash("abcdefghijklmnopqrstuvwxyzabcdefg")); + } + + #[test] + fn nix_hash_rejects_empty() { + assert!(!is_valid_nix_hash("")); + } + + #[test] + fn nix_hash_rejects_special_chars() { + assert!(!is_valid_nix_hash("abcdefghijklmnopqrstuvwxyz!@#$%^")); + } + + #[test] + fn nix_hash_rejects_spaces() { + assert!(!is_valid_nix_hash("abcdefghijklmnop rstuvwxyzabcdef")); + } + + #[test] + fn nix_hash_rejects_path_traversal_attempt() { + assert!(!is_valid_nix_hash("../../../../../../etc/passwd__")); + } + + #[test] + fn nix_hash_rejects_sql_injection_attempt() { + assert!(!is_valid_nix_hash("' OR 1=1; DROP TABLE builds;--")); + } + + // --- Validate trait tests --- + + #[test] + fn test_create_project_valid() { + let p = CreateProject { + name: "my-project".to_string(), + description: Some("A test project".to_string()), + repository_url: "https://github.com/test/repo".to_string(), + }; + assert!(p.validate().is_ok()); + } + + #[test] + fn test_create_project_invalid_name() { + let p = CreateProject { + name: "".to_string(), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }; + assert!(p.validate().is_err()); + + let p = CreateProject { + name: "-starts-with-dash".to_string(), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }; + assert!(p.validate().is_err()); + + let p = CreateProject { + name: "has spaces".to_string(), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }; + assert!(p.validate().is_err()); + } + + #[test] + fn test_create_project_invalid_url() { + let p = CreateProject { + name: "valid-name".to_string(), + description: None, + repository_url: "ftp://example.com".to_string(), + }; + assert!(p.validate().is_err()); + } + + #[test] + fn test_create_project_description_too_long() { + let p = CreateProject { + name: "valid-name".to_string(), + description: Some("a".repeat(4097)), + repository_url: "https://github.com/test/repo".to_string(), + }; + assert!(p.validate().is_err()); + } + + #[test] + fn test_create_jobset_valid() { + let j = CreateJobset { + project_id: Uuid::new_v4(), + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: Some(300), + }; + assert!(j.validate().is_ok()); + } + + #[test] + fn test_create_jobset_interval_too_low() { + let j = CreateJobset { + project_id: Uuid::new_v4(), + name: "main".to_string(), + nix_expression: "packages".to_string(), + enabled: None, + flake_mode: None, + check_interval: Some(5), + }; + assert!(j.validate().is_err()); + } + + #[test] + fn test_create_evaluation_valid() { + let e = CreateEvaluation { + jobset_id: Uuid::new_v4(), + commit_hash: "abc123".to_string(), + }; + assert!(e.validate().is_ok()); + } + + #[test] + fn test_create_evaluation_invalid_hash() { + let e = CreateEvaluation { + jobset_id: Uuid::new_v4(), + commit_hash: "not-hex!".to_string(), + }; + assert!(e.validate().is_err()); + } + + #[test] + fn test_create_build_valid() { + let b = CreateBuild { + evaluation_id: Uuid::new_v4(), + job_name: "hello".to_string(), + drv_path: "/nix/store/abc123-hello.drv".to_string(), + system: Some("x86_64-linux".to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }; + assert!(b.validate().is_ok()); + } + + #[test] + fn test_create_build_invalid_drv() { + let b = CreateBuild { + evaluation_id: Uuid::new_v4(), + job_name: "hello".to_string(), + drv_path: "/tmp/bad-path".to_string(), + system: None, + outputs: None, + is_aggregate: None, + constituents: None, + }; + assert!(b.validate().is_err()); + } + + #[test] + fn test_create_remote_builder_valid() { + let rb = CreateRemoteBuilder { + name: "builder1".to_string(), + ssh_uri: "root@builder.example.com".to_string(), + systems: vec!["x86_64-linux".to_string()], + max_jobs: Some(4), + speed_factor: Some(1), + supported_features: None, + mandatory_features: None, + public_host_key: None, + ssh_key_file: None, + }; + assert!(rb.validate().is_ok()); + } + + #[test] + fn test_create_remote_builder_invalid_max_jobs() { + let rb = CreateRemoteBuilder { + name: "builder1".to_string(), + ssh_uri: "root@builder.example.com".to_string(), + systems: vec!["x86_64-linux".to_string()], + max_jobs: Some(0), + speed_factor: None, + supported_features: None, + mandatory_features: None, + public_host_key: None, + ssh_key_file: None, + }; + assert!(rb.validate().is_err()); + } + + #[test] + fn test_create_webhook_config_valid() { + let wh = CreateWebhookConfig { + project_id: Uuid::new_v4(), + forge_type: "github".to_string(), + secret: None, + }; + assert!(wh.validate().is_ok()); + } + + #[test] + fn test_create_webhook_config_invalid_forge() { + let wh = CreateWebhookConfig { + project_id: Uuid::new_v4(), + forge_type: "bitbucket".to_string(), + secret: None, + }; + assert!(wh.validate().is_err()); + } + + #[test] + fn test_create_channel_valid() { + let c = CreateChannel { + project_id: Uuid::new_v4(), + name: "stable".to_string(), + jobset_id: Uuid::new_v4(), + }; + assert!(c.validate().is_ok()); + } +} diff --git a/crates/common/tests/database_tests.rs b/crates/common/tests/database_tests.rs index 3faa130..fa3fbff 100644 --- a/crates/common/tests/database_tests.rs +++ b/crates/common/tests/database_tests.rs @@ -19,7 +19,10 @@ async fn test_database_connection() -> anyhow::Result<()> { let db = match Database::new(config).await { Ok(db) => db, Err(e) => { - println!("Skipping test_database_connection: no PostgreSQL instance available - {}", e); + println!( + "Skipping test_database_connection: no PostgreSQL instance available - {}", + e + ); return Ok(()); } }; @@ -48,7 +51,10 @@ async fn test_database_health_check() -> anyhow::Result<()> { let pool = match PgPool::connect("postgresql://postgres:password@localhost/test").await { Ok(pool) => pool, Err(e) => { - println!("Skipping test_database_health_check: no PostgreSQL instance available - {}", e); + println!( + "Skipping test_database_health_check: no PostgreSQL instance available - {}", + e + ); return Ok(()); } }; @@ -66,7 +72,10 @@ async fn test_connection_info() -> anyhow::Result<()> { let pool = match PgPool::connect("postgresql://postgres:password@localhost/test").await { Ok(pool) => pool, Err(e) => { - println!("Skipping test_connection_info: no PostgreSQL instance available - {}", e); + println!( + "Skipping test_connection_info: no PostgreSQL instance available - {}", + e + ); return Ok(()); } }; @@ -79,10 +88,14 @@ async fn test_connection_info() -> anyhow::Result<()> { idle_timeout: 600, max_lifetime: 1800, }) - .await { + .await + { Ok(db) => db, Err(e) => { - println!("Skipping test_connection_info: database connection failed - {}", e); + println!( + "Skipping test_connection_info: database connection failed - {}", + e + ); pool.close().await; return Ok(()); } @@ -111,10 +124,14 @@ async fn test_pool_stats() -> anyhow::Result<()> { idle_timeout: 600, max_lifetime: 1800, }) - .await { + .await + { Ok(db) => db, Err(e) => { - println!("Skipping test_pool_stats: no PostgreSQL instance available - {}", e); + println!( + "Skipping test_pool_stats: no PostgreSQL instance available - {}", + e + ); return Ok(()); } }; @@ -176,4 +193,3 @@ async fn test_database_config_validation() -> anyhow::Result<()> { Ok(()) } - diff --git a/crates/common/tests/repo_tests.rs b/crates/common/tests/repo_tests.rs new file mode 100644 index 0000000..eba509d --- /dev/null +++ b/crates/common/tests/repo_tests.rs @@ -0,0 +1,770 @@ +//! Integration tests for repository CRUD operations. +//! Requires TEST_DATABASE_URL to be set to a PostgreSQL connection string. + +use fc_common::models::*; +use fc_common::repo; + +async fn get_pool() -> Option { + let url = match std::env::var("TEST_DATABASE_URL") { + Ok(url) => url, + Err(_) => { + println!("Skipping repo test: TEST_DATABASE_URL not set"); + return None; + } + }; + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&url) + .await + .ok()?; + + // Run migrations + sqlx::migrate!("./migrations").run(&pool).await.ok()?; + + Some(pool) +} + +/// Helper: create a project with a unique name. +async fn create_test_project(pool: &sqlx::PgPool, prefix: &str) -> Project { + repo::projects::create( + pool, + CreateProject { + name: format!("{prefix}-{}", uuid::Uuid::new_v4()), + description: Some("Test project".to_string()), + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create project") +} + +/// Helper: create a jobset for a project. +async fn create_test_jobset(pool: &sqlx::PgPool, project_id: uuid::Uuid) -> Jobset { + repo::jobsets::create( + pool, + CreateJobset { + project_id, + name: format!("default-{}", uuid::Uuid::new_v4()), + nix_expression: "packages".to_string(), + enabled: Some(true), + flake_mode: None, + check_interval: None, + }, + ) + .await + .expect("create jobset") +} + +/// Helper: create an evaluation for a jobset. +async fn create_test_eval(pool: &sqlx::PgPool, jobset_id: uuid::Uuid) -> Evaluation { + repo::evaluations::create( + pool, + CreateEvaluation { + jobset_id, + commit_hash: format!("abc123{}", uuid::Uuid::new_v4().simple()), + }, + ) + .await + .expect("create evaluation") +} + +/// Helper: create a build for an evaluation. +async fn create_test_build( + pool: &sqlx::PgPool, + eval_id: uuid::Uuid, + job_name: &str, + drv_path: &str, + system: Option<&str>, +) -> Build { + repo::builds::create( + pool, + CreateBuild { + evaluation_id: eval_id, + job_name: job_name.to_string(), + drv_path: drv_path.to_string(), + system: system.map(|s| s.to_string()), + outputs: None, + is_aggregate: None, + constituents: None, + }, + ) + .await + .expect("create build") +} + +// ---- Existing tests ---- + +#[tokio::test] +async fn test_project_crud() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + // Create + let project = create_test_project(&pool, "crud").await; + assert!(!project.name.is_empty()); + assert_eq!(project.description.as_deref(), Some("Test project")); + + // Get + let fetched = repo::projects::get(&pool, project.id) + .await + .expect("get project"); + assert_eq!(fetched.name, project.name); + + // Get by name + let by_name = repo::projects::get_by_name(&pool, &project.name) + .await + .expect("get by name"); + assert_eq!(by_name.id, project.id); + + // Update + let updated = repo::projects::update( + &pool, + project.id, + UpdateProject { + name: None, + description: Some("Updated description".to_string()), + repository_url: None, + }, + ) + .await + .expect("update project"); + assert_eq!(updated.description.as_deref(), Some("Updated description")); + + // List + let projects = repo::projects::list(&pool, 100, 0) + .await + .expect("list projects"); + assert!(projects.iter().any(|p| p.id == project.id)); + + // Delete + repo::projects::delete(&pool, project.id) + .await + .expect("delete project"); + + // Verify deleted + let result = repo::projects::get(&pool, project.id).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_project_unique_constraint() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let name = format!("unique-test-{}", uuid::Uuid::new_v4()); + + let _project = repo::projects::create( + &pool, + CreateProject { + name: name.clone(), + description: None, + repository_url: "https://github.com/test/repo".to_string(), + }, + ) + .await + .expect("create first project"); + + // Creating with same name should fail with Conflict + let result = repo::projects::create( + &pool, + CreateProject { + name, + description: None, + repository_url: "https://github.com/test/repo2".to_string(), + }, + ) + .await; + + assert!(matches!(result, Err(fc_common::CiError::Conflict(_)))); +} + +#[tokio::test] +async fn test_jobset_crud() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "jobset").await; + + // Create jobset + let jobset = repo::jobsets::create( + &pool, + CreateJobset { + project_id: project.id, + name: "default".to_string(), + nix_expression: "packages".to_string(), + enabled: Some(true), + flake_mode: None, + check_interval: None, + }, + ) + .await + .expect("create jobset"); + + assert_eq!(jobset.name, "default"); + assert!(jobset.enabled); + + // Get + let fetched = repo::jobsets::get(&pool, jobset.id) + .await + .expect("get jobset"); + assert_eq!(fetched.project_id, project.id); + + // List for project + let jobsets = repo::jobsets::list_for_project(&pool, project.id, 100, 0) + .await + .expect("list jobsets"); + assert_eq!(jobsets.len(), 1); + + // Update + let updated = repo::jobsets::update( + &pool, + jobset.id, + UpdateJobset { + name: None, + nix_expression: Some("checks".to_string()), + enabled: Some(false), + flake_mode: None, + check_interval: None, + }, + ) + .await + .expect("update jobset"); + assert_eq!(updated.nix_expression, "checks"); + assert!(!updated.enabled); + + // Delete + repo::jobsets::delete(&pool, jobset.id) + .await + .expect("delete jobset"); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_evaluation_and_build_lifecycle() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + // Set up project and jobset + let project = create_test_project(&pool, "eval").await; + let jobset = create_test_jobset(&pool, project.id).await; + + // Create evaluation + let eval = repo::evaluations::create( + &pool, + CreateEvaluation { + jobset_id: jobset.id, + commit_hash: "abc123def456".to_string(), + }, + ) + .await + .expect("create evaluation"); + + assert_eq!(eval.commit_hash, "abc123def456"); + + // Update status + let updated = repo::evaluations::update_status(&pool, eval.id, EvaluationStatus::Running, None) + .await + .expect("update evaluation status"); + assert!(matches!(updated.status, EvaluationStatus::Running)); + + // Get latest + let latest = repo::evaluations::get_latest(&pool, jobset.id) + .await + .expect("get latest"); + assert!(latest.is_some()); + assert_eq!(latest.unwrap().id, eval.id); + + // Create build + let build = create_test_build( + &pool, + eval.id, + "hello", + "/nix/store/abc.drv", + Some("x86_64-linux"), + ) + .await; + assert_eq!(build.job_name, "hello"); + assert_eq!(build.system.as_deref(), Some("x86_64-linux")); + + // List pending + let pending = repo::builds::list_pending(&pool, 10) + .await + .expect("list pending"); + assert!(pending.iter().any(|b| b.id == build.id)); + + // Start build + let started = repo::builds::start(&pool, build.id) + .await + .expect("start build"); + assert!(started.is_some()); + + // Second start should return None (already claimed) + let second = repo::builds::start(&pool, build.id) + .await + .expect("second start"); + assert!(second.is_none()); + + // Complete build + let completed = repo::builds::complete( + &pool, + build.id, + BuildStatus::Completed, + None, + Some("/nix/store/output"), + None, + ) + .await + .expect("complete build"); + assert!(matches!(completed.status, BuildStatus::Completed)); + + // Create build step + let step = repo::build_steps::create( + &pool, + CreateBuildStep { + build_id: build.id, + step_number: 1, + command: "nix build".to_string(), + }, + ) + .await + .expect("create build step"); + + // Complete build step + let completed_step = repo::build_steps::complete(&pool, step.id, 0, Some("output"), None) + .await + .expect("complete build step"); + assert_eq!(completed_step.exit_code, Some(0)); + + // Create build product + let product = repo::build_products::create( + &pool, + CreateBuildProduct { + build_id: build.id, + name: "hello".to_string(), + path: "/nix/store/output".to_string(), + sha256_hash: Some("sha256-abc".to_string()), + file_size: Some(1024), + content_type: None, + is_directory: true, + }, + ) + .await + .expect("create build product"); + assert_eq!(product.file_size, Some(1024)); + + // List build products + let products = repo::build_products::list_for_build(&pool, build.id) + .await + .expect("list products"); + assert_eq!(products.len(), 1); + + // List build steps + let steps = repo::build_steps::list_for_build(&pool, build.id) + .await + .expect("list steps"); + assert_eq!(steps.len(), 1); + + // Test filtered list + let filtered = repo::builds::list_filtered(&pool, Some(eval.id), None, None, None, 50, 0) + .await + .expect("list filtered"); + assert!(filtered.iter().any(|b| b.id == build.id)); + + // Get stats + let stats = repo::builds::get_stats(&pool).await.expect("get stats"); + assert!(stats.total_builds.unwrap_or(0) > 0); + + // List recent + let recent = repo::builds::list_recent(&pool, 10) + .await + .expect("list recent"); + assert!(!recent.is_empty()); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_not_found_errors() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let fake_id = uuid::Uuid::new_v4(); + + assert!(matches!( + repo::projects::get(&pool, fake_id).await, + Err(fc_common::CiError::NotFound(_)) + )); + + assert!(matches!( + repo::jobsets::get(&pool, fake_id).await, + Err(fc_common::CiError::NotFound(_)) + )); + + assert!(matches!( + repo::evaluations::get(&pool, fake_id).await, + Err(fc_common::CiError::NotFound(_)) + )); + + assert!(matches!( + repo::builds::get(&pool, fake_id).await, + Err(fc_common::CiError::NotFound(_)) + )); +} + +// ---- New hardening tests ---- + +#[tokio::test] +async fn test_batch_get_completed_by_drv_paths() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "batch-drv").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + let drv1 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let drv2 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let drv_missing = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + + let b1 = create_test_build(&pool, eval.id, "pkg1", &drv1, Some("x86_64-linux")).await; + let b2 = create_test_build(&pool, eval.id, "pkg2", &drv2, Some("x86_64-linux")).await; + + // Start and complete both + repo::builds::start(&pool, b1.id).await.unwrap(); + repo::builds::complete(&pool, b1.id, BuildStatus::Completed, None, None, None) + .await + .unwrap(); + repo::builds::start(&pool, b2.id).await.unwrap(); + repo::builds::complete(&pool, b2.id, BuildStatus::Completed, None, None, None) + .await + .unwrap(); + + // Batch query + let results = repo::builds::get_completed_by_drv_paths( + &pool, + &[drv1.clone(), drv2.clone(), drv_missing.clone()], + ) + .await + .expect("batch get"); + + assert!(results.contains_key(&drv1)); + assert!(results.contains_key(&drv2)); + assert!(!results.contains_key(&drv_missing)); + assert_eq!(results.len(), 2); + + // Empty input + let empty = repo::builds::get_completed_by_drv_paths(&pool, &[]) + .await + .expect("empty batch"); + assert!(empty.is_empty()); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_batch_check_deps_for_builds() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "batch-deps").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + // Create dep (will be completed) and dependent (pending) + let dep_drv = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let main_drv = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let standalone_drv = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + + let dep_build = create_test_build(&pool, eval.id, "dep", &dep_drv, None).await; + let main_build = create_test_build(&pool, eval.id, "main", &main_drv, None).await; + let standalone = create_test_build(&pool, eval.id, "standalone", &standalone_drv, None).await; + + // Create dependency: main depends on dep + repo::build_dependencies::create(&pool, main_build.id, dep_build.id) + .await + .expect("create dep"); + + // Before dep is completed, main should have incomplete deps + let results = + repo::build_dependencies::check_deps_for_builds(&pool, &[main_build.id, standalone.id]) + .await + .expect("batch check deps"); + + assert_eq!(results[&main_build.id], false); // dep not completed + assert_eq!(results[&standalone.id], true); // no deps + + // Now complete the dep + repo::builds::start(&pool, dep_build.id).await.unwrap(); + repo::builds::complete( + &pool, + dep_build.id, + BuildStatus::Completed, + None, + None, + None, + ) + .await + .unwrap(); + + // Recheck + let results = + repo::build_dependencies::check_deps_for_builds(&pool, &[main_build.id, standalone.id]) + .await + .expect("batch check deps after complete"); + + assert_eq!(results[&main_build.id], true); // dep now completed + assert_eq!(results[&standalone.id], true); + + // Empty input + let empty = repo::build_dependencies::check_deps_for_builds(&pool, &[]) + .await + .expect("empty check"); + assert!(empty.is_empty()); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_list_filtered_with_system_filter() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "filter-sys").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + let drv_x86 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let drv_arm = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + + create_test_build(&pool, eval.id, "x86-pkg", &drv_x86, Some("x86_64-linux")).await; + create_test_build(&pool, eval.id, "arm-pkg", &drv_arm, Some("aarch64-linux")).await; + + // Filter by x86_64-linux + let x86_builds = repo::builds::list_filtered( + &pool, + Some(eval.id), + None, + Some("x86_64-linux"), + None, + 50, + 0, + ) + .await + .expect("filter x86"); + assert!( + x86_builds + .iter() + .all(|b| b.system.as_deref() == Some("x86_64-linux")) + ); + assert!(!x86_builds.is_empty()); + + // Filter by aarch64-linux + let arm_builds = repo::builds::list_filtered( + &pool, + Some(eval.id), + None, + Some("aarch64-linux"), + None, + 50, + 0, + ) + .await + .expect("filter arm"); + assert!( + arm_builds + .iter() + .all(|b| b.system.as_deref() == Some("aarch64-linux")) + ); + assert!(!arm_builds.is_empty()); + + // Count + let x86_count = + repo::builds::count_filtered(&pool, Some(eval.id), None, Some("x86_64-linux"), None) + .await + .expect("count x86"); + assert_eq!(x86_count, x86_builds.len() as i64); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_list_filtered_with_job_name_filter() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "filter-job").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + let drv1 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let drv2 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let drv3 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + + create_test_build(&pool, eval.id, "hello-world", &drv1, None).await; + create_test_build(&pool, eval.id, "hello-lib", &drv2, None).await; + create_test_build(&pool, eval.id, "goodbye", &drv3, None).await; + + // ILIKE filter should match both hello-world and hello-lib + let hello_builds = + repo::builds::list_filtered(&pool, Some(eval.id), None, None, Some("hello"), 50, 0) + .await + .expect("filter hello"); + assert_eq!(hello_builds.len(), 2); + assert!(hello_builds.iter().all(|b| b.job_name.contains("hello"))); + + // "goodbye" should only match one + let goodbye_builds = + repo::builds::list_filtered(&pool, Some(eval.id), None, None, Some("goodbye"), 50, 0) + .await + .expect("filter goodbye"); + assert_eq!(goodbye_builds.len(), 1); + + // Count matches + let count = repo::builds::count_filtered(&pool, Some(eval.id), None, None, Some("hello")) + .await + .expect("count hello"); + assert_eq!(count, 2); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_reset_orphaned_batch_limit() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "orphan").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + // Create and start a build, then set started_at far in the past to simulate orphan + let drv = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let build = create_test_build(&pool, eval.id, "orphan-test", &drv, None).await; + repo::builds::start(&pool, build.id).await.unwrap(); + + // Set started_at to 2 hours ago to make it look orphaned + sqlx::query("UPDATE builds SET started_at = NOW() - INTERVAL '2 hours' WHERE id = $1") + .bind(build.id) + .execute(&pool) + .await + .unwrap(); + + // Reset orphaned with 1 hour threshold + let reset_count = repo::builds::reset_orphaned(&pool, 3600) + .await + .expect("reset orphaned"); + assert!(reset_count >= 1); + + // Verify the build is back to pending + let build = repo::builds::get(&pool, build.id).await.expect("get build"); + assert!(matches!(build.status, BuildStatus::Pending)); + assert!(build.started_at.is_none()); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_build_cancel_cascade() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "cancel-cascade").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + let drv1 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + let drv2 = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + + let parent = create_test_build(&pool, eval.id, "parent", &drv1, None).await; + let child = create_test_build(&pool, eval.id, "child", &drv2, None).await; + + // child depends on parent + repo::build_dependencies::create(&pool, child.id, parent.id) + .await + .expect("create dep"); + + // Cancel parent should cascade to child + let cancelled = repo::builds::cancel_cascade(&pool, parent.id) + .await + .expect("cancel cascade"); + + assert!(cancelled.len() >= 1); + + // Both should be cancelled + let parent = repo::builds::get(&pool, parent.id).await.unwrap(); + let child = repo::builds::get(&pool, child.id).await.unwrap(); + assert!(matches!(parent.status, BuildStatus::Cancelled)); + assert!(matches!(child.status, BuildStatus::Cancelled)); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +} + +#[tokio::test] +async fn test_dedup_by_drv_path() { + let pool = match get_pool().await { + Some(p) => p, + None => return, + }; + + let project = create_test_project(&pool, "dedup").await; + let jobset = create_test_jobset(&pool, project.id).await; + let eval = create_test_eval(&pool, jobset.id).await; + + let drv = format!("/nix/store/{}.drv", uuid::Uuid::new_v4().simple()); + + let build = create_test_build(&pool, eval.id, "dedup-pkg", &drv, None).await; + + // Complete it + repo::builds::start(&pool, build.id).await.unwrap(); + repo::builds::complete(&pool, build.id, BuildStatus::Completed, None, None, None) + .await + .unwrap(); + + // Check single dedup + let existing = repo::builds::get_completed_by_drv_path(&pool, &drv) + .await + .expect("dedup check"); + assert!(existing.is_some()); + assert_eq!(existing.unwrap().id, build.id); + + // Check batch dedup + let batch = repo::builds::get_completed_by_drv_paths(&pool, &[drv.clone()]) + .await + .expect("batch dedup"); + assert!(batch.contains_key(&drv)); + + // Cleanup + repo::projects::delete(&pool, project.id).await.ok(); +}