crates/queue-runner: add cache upload config and worker improvements
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I781a843b88a9b62b929a8d0407274bc86a6a6964
This commit is contained in:
parent
00a4dc8d37
commit
c0df24c6e1
4 changed files with 58 additions and 19 deletions
|
|
@ -8,6 +8,7 @@ use tokio::io::{AsyncBufReadExt, BufReader};
|
||||||
const MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100MB
|
const MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100MB
|
||||||
|
|
||||||
/// Run a build on a remote machine via `nix build --store ssh://...`.
|
/// Run a build on a remote machine via `nix build --store ssh://...`.
|
||||||
|
#[tracing::instrument(skip(work_dir, live_log_path), fields(drv_path, store_uri))]
|
||||||
pub async fn run_nix_build_remote(
|
pub async fn run_nix_build_remote(
|
||||||
drv_path: &str,
|
drv_path: &str,
|
||||||
work_dir: &Path,
|
work_dir: &Path,
|
||||||
|
|
@ -157,6 +158,7 @@ pub fn parse_nix_log_line(line: &str) -> Option<(&'static str, String)> {
|
||||||
|
|
||||||
/// Run `nix build` for a derivation path.
|
/// Run `nix build` for a derivation path.
|
||||||
/// If `live_log_path` is provided, build output is streamed to that file incrementally.
|
/// If `live_log_path` is provided, build output is streamed to that file incrementally.
|
||||||
|
#[tracing::instrument(skip(work_dir, live_log_path), fields(drv_path))]
|
||||||
pub async fn run_nix_build(
|
pub async fn run_nix_build(
|
||||||
drv_path: &str,
|
drv_path: &str,
|
||||||
work_dir: &Path,
|
work_dir: &Path,
|
||||||
|
|
@ -225,12 +227,11 @@ pub async fn run_nix_build(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse nix internal JSON log lines
|
// Parse nix internal JSON log lines
|
||||||
if line.starts_with("@nix ") {
|
if line.starts_with("@nix ")
|
||||||
if let Some(json_str) = line.strip_prefix("@nix ") {
|
&& let Some(json_str) = line.strip_prefix("@nix ")
|
||||||
if let Ok(parsed) =
|
&& let Ok(parsed) =
|
||||||
serde_json::from_str::<serde_json::Value>(json_str.trim())
|
serde_json::from_str::<serde_json::Value>(json_str.trim())
|
||||||
{
|
&& let Some(action) = parsed.get("action").and_then(|a| a.as_str())
|
||||||
if let Some(action) = parsed.get("action").and_then(|a| a.as_str())
|
|
||||||
{
|
{
|
||||||
match action {
|
match action {
|
||||||
"start" => {
|
"start" => {
|
||||||
|
|
@ -247,21 +248,16 @@ pub async fn run_nix_build(
|
||||||
"stop" => {
|
"stop" => {
|
||||||
if let Some(drv) =
|
if let Some(drv) =
|
||||||
parsed.get("derivation").and_then(|d| d.as_str())
|
parsed.get("derivation").and_then(|d| d.as_str())
|
||||||
{
|
&& let Some(step) =
|
||||||
if let Some(step) =
|
|
||||||
steps.iter_mut().rfind(|s| s.drv_path == drv)
|
steps.iter_mut().rfind(|s| s.drv_path == drv)
|
||||||
{
|
{
|
||||||
step.completed_at = Some(chrono::Utc::now());
|
step.completed_at = Some(chrono::Utc::now());
|
||||||
step.success = true;
|
step.success = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if buf.len() < MAX_LOG_SIZE {
|
if buf.len() < MAX_LOG_SIZE {
|
||||||
buf.push_str(&line);
|
buf.push_str(&line);
|
||||||
|
|
|
||||||
|
|
@ -19,18 +19,18 @@ struct Cli {
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
|
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
|
||||||
tracing::info!("Starting CI Queue Runner");
|
|
||||||
|
|
||||||
let config = Config::load()?;
|
let config = Config::load()?;
|
||||||
|
fc_common::init_tracing(&config.tracing);
|
||||||
|
|
||||||
|
tracing::info!("Starting CI Queue Runner");
|
||||||
let log_config = config.logs;
|
let log_config = config.logs;
|
||||||
let gc_config = config.gc;
|
let gc_config = config.gc;
|
||||||
let gc_config_for_loop = gc_config.clone();
|
let gc_config_for_loop = gc_config.clone();
|
||||||
let notifications_config = config.notifications;
|
let notifications_config = config.notifications;
|
||||||
let signing_config = config.signing;
|
let signing_config = config.signing;
|
||||||
|
let cache_upload_config = config.cache_upload;
|
||||||
let qr_config = config.queue_runner;
|
let qr_config = config.queue_runner;
|
||||||
|
|
||||||
let workers = cli.workers.unwrap_or(qr_config.workers);
|
let workers = cli.workers.unwrap_or(qr_config.workers);
|
||||||
|
|
@ -55,6 +55,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
gc_config,
|
gc_config,
|
||||||
notifications_config,
|
notifications_config,
|
||||||
signing_config,
|
signing_config,
|
||||||
|
cache_upload_config,
|
||||||
));
|
));
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,9 @@ use std::time::Duration;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
use fc_common::config::{GcConfig, LogConfig, NotificationsConfig, SigningConfig};
|
use fc_common::config::{
|
||||||
|
CacheUploadConfig, GcConfig, LogConfig, NotificationsConfig, SigningConfig,
|
||||||
|
};
|
||||||
use fc_common::gc_roots::GcRoots;
|
use fc_common::gc_roots::GcRoots;
|
||||||
use fc_common::log_storage::LogStorage;
|
use fc_common::log_storage::LogStorage;
|
||||||
use fc_common::models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep};
|
use fc_common::models::{Build, BuildStatus, CreateBuildProduct, CreateBuildStep};
|
||||||
|
|
@ -20,6 +22,7 @@ pub struct WorkerPool {
|
||||||
gc_config: Arc<GcConfig>,
|
gc_config: Arc<GcConfig>,
|
||||||
notifications_config: Arc<NotificationsConfig>,
|
notifications_config: Arc<NotificationsConfig>,
|
||||||
signing_config: Arc<SigningConfig>,
|
signing_config: Arc<SigningConfig>,
|
||||||
|
cache_upload_config: Arc<CacheUploadConfig>,
|
||||||
drain_token: tokio_util::sync::CancellationToken,
|
drain_token: tokio_util::sync::CancellationToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -33,6 +36,7 @@ impl WorkerPool {
|
||||||
gc_config: GcConfig,
|
gc_config: GcConfig,
|
||||||
notifications_config: NotificationsConfig,
|
notifications_config: NotificationsConfig,
|
||||||
signing_config: SigningConfig,
|
signing_config: SigningConfig,
|
||||||
|
cache_upload_config: CacheUploadConfig,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
semaphore: Arc::new(Semaphore::new(workers)),
|
semaphore: Arc::new(Semaphore::new(workers)),
|
||||||
|
|
@ -43,6 +47,7 @@ impl WorkerPool {
|
||||||
gc_config: Arc::new(gc_config),
|
gc_config: Arc::new(gc_config),
|
||||||
notifications_config: Arc::new(notifications_config),
|
notifications_config: Arc::new(notifications_config),
|
||||||
signing_config: Arc::new(signing_config),
|
signing_config: Arc::new(signing_config),
|
||||||
|
cache_upload_config: Arc::new(cache_upload_config),
|
||||||
drain_token: tokio_util::sync::CancellationToken::new(),
|
drain_token: tokio_util::sync::CancellationToken::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -69,6 +74,7 @@ impl WorkerPool {
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, build), fields(build_id = %build.id, job = %build.job_name))]
|
||||||
pub fn dispatch(&self, build: Build) {
|
pub fn dispatch(&self, build: Build) {
|
||||||
if self.drain_token.is_cancelled() {
|
if self.drain_token.is_cancelled() {
|
||||||
tracing::info!(build_id = %build.id, "Drain in progress, not dispatching");
|
tracing::info!(build_id = %build.id, "Drain in progress, not dispatching");
|
||||||
|
|
@ -83,6 +89,7 @@ impl WorkerPool {
|
||||||
let gc_config = self.gc_config.clone();
|
let gc_config = self.gc_config.clone();
|
||||||
let notifications_config = self.notifications_config.clone();
|
let notifications_config = self.notifications_config.clone();
|
||||||
let signing_config = self.signing_config.clone();
|
let signing_config = self.signing_config.clone();
|
||||||
|
let cache_upload_config = self.cache_upload_config.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = match semaphore.acquire().await {
|
let _permit = match semaphore.acquire().await {
|
||||||
|
|
@ -99,6 +106,7 @@ impl WorkerPool {
|
||||||
&gc_config,
|
&gc_config,
|
||||||
¬ifications_config,
|
¬ifications_config,
|
||||||
&signing_config,
|
&signing_config,
|
||||||
|
&cache_upload_config,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
@ -178,6 +186,28 @@ async fn sign_outputs(output_paths: &[String], signing_config: &SigningConfig) -
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Push output paths to an external binary cache via `nix copy`.
|
||||||
|
async fn push_to_cache(output_paths: &[String], store_uri: &str) {
|
||||||
|
for path in output_paths {
|
||||||
|
let result = tokio::process::Command::new("nix")
|
||||||
|
.args(["copy", "--to", store_uri, path])
|
||||||
|
.output()
|
||||||
|
.await;
|
||||||
|
match result {
|
||||||
|
Ok(o) if o.status.success() => {
|
||||||
|
tracing::debug!(output = path, store = store_uri, "Pushed to binary cache");
|
||||||
|
}
|
||||||
|
Ok(o) => {
|
||||||
|
let stderr = String::from_utf8_lossy(&o.stderr);
|
||||||
|
tracing::warn!(output = path, "Failed to push to cache: {stderr}");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(output = path, "Failed to run nix copy: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Try to run the build on a remote builder if one is available for the build's system.
|
/// Try to run the build on a remote builder if one is available for the build's system.
|
||||||
async fn try_remote_build(
|
async fn try_remote_build(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
|
|
@ -230,6 +260,7 @@ async fn try_remote_build(
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(pool, build, work_dir, log_config, gc_config, notifications_config, signing_config, cache_upload_config), fields(build_id = %build.id, job = %build.job_name))]
|
||||||
async fn run_build(
|
async fn run_build(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
build: &Build,
|
build: &Build,
|
||||||
|
|
@ -239,6 +270,7 @@ async fn run_build(
|
||||||
gc_config: &GcConfig,
|
gc_config: &GcConfig,
|
||||||
notifications_config: &NotificationsConfig,
|
notifications_config: &NotificationsConfig,
|
||||||
signing_config: &SigningConfig,
|
signing_config: &SigningConfig,
|
||||||
|
cache_upload_config: &CacheUploadConfig,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Atomically claim the build
|
// Atomically claim the build
|
||||||
let claimed = repo::builds::start(pool, build.id).await?;
|
let claimed = repo::builds::start(pool, build.id).await?;
|
||||||
|
|
@ -408,6 +440,12 @@ async fn run_build(
|
||||||
let _ = repo::builds::mark_signed(pool, build.id).await;
|
let _ = repo::builds::mark_signed(pool, build.id).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Push to external binary cache if configured
|
||||||
|
if cache_upload_config.enabled
|
||||||
|
&& let Some(ref store_uri) = cache_upload_config.store_uri {
|
||||||
|
push_to_cache(&build_result.output_paths, store_uri).await;
|
||||||
|
}
|
||||||
|
|
||||||
let primary_output = build_result.output_paths.first().map(|s| s.as_str());
|
let primary_output = build_result.output_paths.first().map(|s| s.as_str());
|
||||||
|
|
||||||
repo::builds::complete(
|
repo::builds::complete(
|
||||||
|
|
@ -488,13 +526,12 @@ async fn run_build(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auto-promote channels if all builds in the evaluation are done
|
// Auto-promote channels if all builds in the evaluation are done
|
||||||
if updated_build.status == BuildStatus::Completed {
|
if updated_build.status == BuildStatus::Completed
|
||||||
if let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await {
|
&& let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await {
|
||||||
let _ =
|
let _ =
|
||||||
repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id).await;
|
repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,7 @@ async fn test_worker_pool_drain_stops_dispatch() {
|
||||||
fc_common::config::GcConfig::default(),
|
fc_common::config::GcConfig::default(),
|
||||||
fc_common::config::NotificationsConfig::default(),
|
fc_common::config::NotificationsConfig::default(),
|
||||||
fc_common::config::SigningConfig::default(),
|
fc_common::config::SigningConfig::default(),
|
||||||
|
fc_common::config::CacheUploadConfig::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Drain should not panic
|
// Drain should not panic
|
||||||
|
|
@ -139,6 +140,8 @@ async fn test_atomic_build_claiming() {
|
||||||
enabled: None,
|
enabled: None,
|
||||||
flake_mode: None,
|
flake_mode: None,
|
||||||
check_interval: None,
|
check_interval: None,
|
||||||
|
branch: None,
|
||||||
|
scheduling_shares: None,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|
@ -228,6 +231,8 @@ async fn test_orphan_build_reset() {
|
||||||
enabled: None,
|
enabled: None,
|
||||||
flake_mode: None,
|
flake_mode: None,
|
||||||
check_interval: None,
|
check_interval: None,
|
||||||
|
branch: None,
|
||||||
|
scheduling_shares: None,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue