fc-queue-runner: integrate failed paths cache in build dispatch
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I418f007368916de1320dd56f425a5d8f6a6a6964
This commit is contained in:
parent
65a6fd853d
commit
9efba1bbc7
3 changed files with 72 additions and 1 deletions
|
|
@ -36,6 +36,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let poll_interval = Duration::from_secs(qr_config.poll_interval);
|
let poll_interval = Duration::from_secs(qr_config.poll_interval);
|
||||||
let build_timeout = Duration::from_secs(qr_config.build_timeout);
|
let build_timeout = Duration::from_secs(qr_config.build_timeout);
|
||||||
let strict_errors = qr_config.strict_errors;
|
let strict_errors = qr_config.strict_errors;
|
||||||
|
let failed_paths_cache = qr_config.failed_paths_cache;
|
||||||
|
let failed_paths_ttl = qr_config.failed_paths_ttl;
|
||||||
let work_dir = qr_config.work_dir;
|
let work_dir = qr_config.work_dir;
|
||||||
|
|
||||||
// Ensure the work directory exists
|
// Ensure the work directory exists
|
||||||
|
|
@ -77,12 +79,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors) => {
|
result = fc_queue_runner::runner_loop::run(db.pool().clone(), worker_pool, poll_interval, wakeup, strict_errors, failed_paths_cache) => {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::error!("Runner loop failed: {e}");
|
tracing::error!("Runner loop failed: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
() = gc_loop(gc_config_for_loop) => {}
|
() = gc_loop(gc_config_for_loop) => {}
|
||||||
|
() = failed_paths_cleanup_loop(db.pool().clone(), failed_paths_ttl, failed_paths_cache) => {}
|
||||||
() = shutdown_signal() => {
|
() = shutdown_signal() => {
|
||||||
tracing::info!("Shutdown signal received, draining in-flight builds...");
|
tracing::info!("Shutdown signal received, draining in-flight builds...");
|
||||||
worker_pool_for_drain.drain();
|
worker_pool_for_drain.drain();
|
||||||
|
|
@ -148,6 +151,31 @@ async fn gc_loop(gc_config: GcConfig) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn failed_paths_cleanup_loop(
|
||||||
|
pool: sqlx::PgPool,
|
||||||
|
ttl: u64,
|
||||||
|
enabled: bool,
|
||||||
|
) {
|
||||||
|
if !enabled {
|
||||||
|
return std::future::pending().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let interval = std::time::Duration::from_secs(3600);
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(interval).await;
|
||||||
|
match fc_common::repo::failed_paths_cache::cleanup_expired(&pool, ttl).await
|
||||||
|
{
|
||||||
|
Ok(count) if count > 0 => {
|
||||||
|
tracing::info!(count, "Cleaned up expired failed paths cache entries");
|
||||||
|
},
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed paths cache cleanup failed: {e}");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
let ctrl_c = async {
|
let ctrl_c = async {
|
||||||
tokio::signal::ctrl_c()
|
tokio::signal::ctrl_c()
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ pub async fn run(
|
||||||
poll_interval: Duration,
|
poll_interval: Duration,
|
||||||
wakeup: Arc<Notify>,
|
wakeup: Arc<Notify>,
|
||||||
strict_errors: bool,
|
strict_errors: bool,
|
||||||
|
failed_paths_cache: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
// Reset orphaned builds from previous crashes (older than 5 minutes)
|
||||||
match repo::builds::reset_orphaned(&pool, 300).await {
|
match repo::builds::reset_orphaned(&pool, 300).await {
|
||||||
|
|
@ -112,6 +113,37 @@ pub async fn run(
|
||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Failed paths cache: skip known-failing derivations
|
||||||
|
if failed_paths_cache {
|
||||||
|
if let Ok(true) = repo::failed_paths_cache::is_cached_failure(
|
||||||
|
&pool,
|
||||||
|
&build.drv_path,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::info!(
|
||||||
|
build_id = %build.id, drv = %build.drv_path,
|
||||||
|
"Cached failure: skipping known-failing derivation"
|
||||||
|
);
|
||||||
|
if let Err(e) = repo::builds::start(&pool, build.id).await {
|
||||||
|
tracing::warn!(build_id = %build.id, "Failed to start cached-failure build: {e}");
|
||||||
|
}
|
||||||
|
if let Err(e) = repo::builds::complete(
|
||||||
|
&pool,
|
||||||
|
build.id,
|
||||||
|
BuildStatus::CachedFailure,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some("Build skipped: derivation is in failed paths cache"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(build_id = %build.id, "Failed to complete cached-failure build: {e}");
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Dependency-aware scheduling: skip if deps not met
|
// Dependency-aware scheduling: skip if deps not met
|
||||||
match repo::build_dependencies::all_deps_completed(&pool, build.id)
|
match repo::build_dependencies::all_deps_completed(&pool, build.id)
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
|
|
@ -744,6 +744,17 @@ async fn run_build(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
if let Err(e) = repo::failed_paths_cache::insert(
|
||||||
|
pool,
|
||||||
|
&build.drv_path,
|
||||||
|
failure_status,
|
||||||
|
build.id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(build_id = %build.id, "Failed to cache failed path: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
tracing::warn!(build_id = %build.id, "Build failed: {:?}", failure_status);
|
tracing::warn!(build_id = %build.id, "Build failed: {:?}", failure_status);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue