various: replace silent error discards with logged warnings
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I465d760b5330980270b64b4a89abc09f6a6a6964
This commit is contained in:
parent
9bbc1754d9
commit
38ed7faee2
7 changed files with 147 additions and 40 deletions
|
|
@ -175,12 +175,22 @@ pub async fn auto_promote_if_complete(
|
|||
.map_err(CiError::Database)?;
|
||||
|
||||
for channel in channels {
|
||||
let _ = promote(pool, channel.id, evaluation_id).await;
|
||||
tracing::info!(
|
||||
channel = %channel.name,
|
||||
evaluation_id = %evaluation_id,
|
||||
"Auto-promoted evaluation to channel"
|
||||
);
|
||||
match promote(pool, channel.id, evaluation_id).await {
|
||||
Ok(_) => {
|
||||
tracing::info!(
|
||||
channel = %channel.name,
|
||||
evaluation_id = %evaluation_id,
|
||||
"Auto-promoted evaluation to channel"
|
||||
);
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
channel = %channel.name,
|
||||
evaluation_id = %evaluation_id,
|
||||
"Failed to auto-promote channel: {e}"
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -109,11 +109,14 @@ pub async fn authenticate(
|
|||
if let Some(ref hash) = user.password_hash {
|
||||
if verify_password(&creds.password, hash)? {
|
||||
// Update last login time
|
||||
let _ =
|
||||
if let Err(e) =
|
||||
sqlx::query("UPDATE users SET last_login_at = NOW() WHERE id = $1")
|
||||
.bind(user.id)
|
||||
.execute(pool)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
tracing::warn!(user_id = %user.id, "Failed to update last_login_at: {e}");
|
||||
}
|
||||
Ok(user)
|
||||
} else {
|
||||
Err(CiError::Unauthorized("Invalid credentials".to_string()))
|
||||
|
|
@ -442,13 +445,16 @@ pub async fn validate_session(
|
|||
|
||||
// Update last_used_at
|
||||
if result.is_some() {
|
||||
let _ = sqlx::query(
|
||||
if let Err(e) = sqlx::query(
|
||||
"UPDATE user_sessions SET last_used_at = NOW() WHERE session_token_hash \
|
||||
= $1",
|
||||
)
|
||||
.bind(&token_hash)
|
||||
.execute(pool)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to update session last_used_at: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
|
|
|
|||
|
|
@ -403,7 +403,11 @@ async fn evaluate_jobset(
|
|||
};
|
||||
|
||||
// Set inputs hash (only needed for new evaluations, not existing ones)
|
||||
let _ = repo::evaluations::set_inputs_hash(pool, eval.id, &inputs_hash).await;
|
||||
if let Err(e) =
|
||||
repo::evaluations::set_inputs_hash(pool, eval.id, &inputs_hash).await
|
||||
{
|
||||
tracing::warn!(eval_id = %eval.id, "Failed to set evaluation inputs hash: {e}");
|
||||
}
|
||||
|
||||
// Check for declarative config in repo
|
||||
check_declarative_config(pool, &repo_path, jobset.project_id).await;
|
||||
|
|
@ -525,9 +529,11 @@ async fn create_builds_from_eval(
|
|||
if let Some(&dep_build_id) = drv_to_build.get(dep_drv)
|
||||
&& dep_build_id != build_id
|
||||
{
|
||||
let _ =
|
||||
repo::build_dependencies::create(pool, build_id, dep_build_id)
|
||||
.await;
|
||||
if let Err(e) =
|
||||
repo::build_dependencies::create(pool, build_id, dep_build_id).await
|
||||
{
|
||||
tracing::warn!(build_id = %build_id, dep = %dep_build_id, "Failed to create build dependency: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -538,9 +544,11 @@ async fn create_builds_from_eval(
|
|||
if let Some(&dep_build_id) = name_to_build.get(constituent_name)
|
||||
&& dep_build_id != build_id
|
||||
{
|
||||
let _ =
|
||||
repo::build_dependencies::create(pool, build_id, dep_build_id)
|
||||
.await;
|
||||
if let Err(e) =
|
||||
repo::build_dependencies::create(pool, build_id, dep_build_id).await
|
||||
{
|
||||
tracing::warn!(build_id = %build_id, dep = %dep_build_id, "Failed to create constituent dependency: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,8 +43,10 @@ pub async fn run(
|
|||
job = %build.job_name,
|
||||
"Aggregate build: all constituents completed"
|
||||
);
|
||||
let _ = repo::builds::start(&pool, build.id).await;
|
||||
let _ = repo::builds::complete(
|
||||
if let Err(e) = repo::builds::start(&pool, build.id).await {
|
||||
tracing::warn!(build_id = %build.id, "Failed to start aggregate build: {e}");
|
||||
}
|
||||
if let Err(e) = repo::builds::complete(
|
||||
&pool,
|
||||
build.id,
|
||||
BuildStatus::Completed,
|
||||
|
|
@ -52,7 +54,10 @@ pub async fn run(
|
|||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
tracing::warn!(build_id = %build.id, "Failed to complete aggregate build: {e}");
|
||||
}
|
||||
continue;
|
||||
},
|
||||
Ok(false) => {
|
||||
|
|
@ -84,8 +89,10 @@ pub async fn run(
|
|||
drv = %build.drv_path,
|
||||
"Dedup: reusing result from existing build"
|
||||
);
|
||||
let _ = repo::builds::start(&pool, build.id).await;
|
||||
let _ = repo::builds::complete(
|
||||
if let Err(e) = repo::builds::start(&pool, build.id).await {
|
||||
tracing::warn!(build_id = %build.id, "Failed to start dedup build: {e}");
|
||||
}
|
||||
if let Err(e) = repo::builds::complete(
|
||||
&pool,
|
||||
build.id,
|
||||
BuildStatus::Completed,
|
||||
|
|
@ -93,7 +100,10 @@ pub async fn run(
|
|||
existing.build_output_path.as_deref(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
tracing::warn!(build_id = %build.id, "Failed to complete dedup build: {e}");
|
||||
}
|
||||
continue;
|
||||
},
|
||||
_ => {},
|
||||
|
|
|
|||
|
|
@ -265,25 +265,24 @@ fn build_s3_store_uri(
|
|||
return base_uri.to_string();
|
||||
};
|
||||
|
||||
let mut params: Vec<(String, String)> = Vec::new();
|
||||
let mut params: Vec<(&str, &str)> = Vec::new();
|
||||
|
||||
if let Some(region) = &cfg.region {
|
||||
params.push(("region".to_string(), region.clone()));
|
||||
params.push(("region", region));
|
||||
}
|
||||
|
||||
if let Some(endpoint) = &cfg.endpoint_url {
|
||||
params.push(("endpoint".to_string(), endpoint.clone()));
|
||||
params.push(("endpoint", endpoint));
|
||||
}
|
||||
|
||||
if cfg.use_path_style {
|
||||
params.push(("use-path-style".to_string(), "true".to_string()));
|
||||
params.push(("use-path-style", "true"));
|
||||
}
|
||||
|
||||
if params.is_empty() {
|
||||
return base_uri.to_string();
|
||||
}
|
||||
|
||||
// Build URI with query parameters
|
||||
let query = params
|
||||
.iter()
|
||||
.map(|(k, v)| {
|
||||
|
|
@ -292,7 +291,67 @@ fn build_s3_store_uri(
|
|||
.collect::<Vec<_>>()
|
||||
.join("&");
|
||||
|
||||
format!("{}?{}", base_uri, query)
|
||||
format!("{base_uri}?{query}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use fc_common::config::S3CacheConfig;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_build_s3_store_uri_no_config() {
|
||||
let result = build_s3_store_uri("s3://my-bucket", None);
|
||||
assert_eq!(result, "s3://my-bucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_s3_store_uri_empty_config() {
|
||||
let cfg = S3CacheConfig::default();
|
||||
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
|
||||
assert_eq!(result, "s3://my-bucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_s3_store_uri_with_region() {
|
||||
let cfg = S3CacheConfig {
|
||||
region: Some("us-east-1".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
|
||||
assert_eq!(result, "s3://my-bucket?region=us-east-1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_s3_store_uri_with_endpoint_and_path_style() {
|
||||
let cfg = S3CacheConfig {
|
||||
endpoint_url: Some("https://minio.example.com".to_string()),
|
||||
use_path_style: true,
|
||||
..Default::default()
|
||||
};
|
||||
let result = build_s3_store_uri("s3://my-bucket", Some(&cfg));
|
||||
assert!(result.starts_with("s3://my-bucket?"));
|
||||
assert!(result.contains("endpoint=https%3A%2F%2Fminio.example.com"));
|
||||
assert!(result.contains("use-path-style=true"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_s3_store_uri_all_params() {
|
||||
let cfg = S3CacheConfig {
|
||||
region: Some("eu-west-1".to_string()),
|
||||
endpoint_url: Some("https://s3.example.com".to_string()),
|
||||
use_path_style: true,
|
||||
..Default::default()
|
||||
};
|
||||
let result = build_s3_store_uri("s3://cache-bucket", Some(&cfg));
|
||||
assert!(result.starts_with("s3://cache-bucket?"));
|
||||
assert!(result.contains("region=eu-west-1"));
|
||||
assert!(result.contains("endpoint=https%3A%2F%2Fs3.example.com"));
|
||||
assert!(result.contains("use-path-style=true"));
|
||||
// Verify params are joined with &
|
||||
assert_eq!(result.matches('&').count(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to run the build on a remote builder if one is available for the build's
|
||||
|
|
@ -319,7 +378,10 @@ async fn try_remote_build(
|
|||
);
|
||||
|
||||
// Set builder_id
|
||||
let _ = repo::builds::set_builder(pool, build.id, builder.id).await;
|
||||
if let Err(e) = repo::builds::set_builder(pool, build.id, builder.id).await
|
||||
{
|
||||
tracing::warn!(build_id = %build.id, builder = %builder.name, "Failed to set builder_id: {e}");
|
||||
}
|
||||
|
||||
// Build remotely via --store
|
||||
let store_uri = format!("ssh://{}", builder.ssh_uri);
|
||||
|
|
@ -402,7 +464,7 @@ async fn collect_metrics_and_alert(
|
|||
repo::evaluations::get(pool, build.evaluation_id).await
|
||||
{
|
||||
if let Ok(jobset) = repo::jobsets::get(pool, evaluation.jobset_id).await {
|
||||
let _ = manager
|
||||
manager
|
||||
.check_and_alert(pool, Some(jobset.project_id), Some(jobset.id))
|
||||
.await;
|
||||
}
|
||||
|
|
@ -508,7 +570,9 @@ async fn run_build(
|
|||
let log_path = if let Some(ref storage) = log_storage {
|
||||
let final_path = storage.log_path(&build.id);
|
||||
if live_log_path.exists() {
|
||||
let _ = tokio::fs::rename(&live_log_path, &final_path).await;
|
||||
if let Err(e) = tokio::fs::rename(&live_log_path, &final_path).await {
|
||||
tracing::warn!(build_id = %build.id, "Failed to rename build log: {e}");
|
||||
}
|
||||
} else {
|
||||
match storage.write_log(
|
||||
&build.id,
|
||||
|
|
@ -599,7 +663,9 @@ async fn run_build(
|
|||
|
||||
// Sign outputs at build time
|
||||
if sign_outputs(&build_result.output_paths, signing_config).await {
|
||||
let _ = repo::builds::mark_signed(pool, build.id).await;
|
||||
if let Err(e) = repo::builds::mark_signed(pool, build.id).await {
|
||||
tracing::warn!(build_id = %build.id, "Failed to mark build as signed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// Push to external binary cache if configured
|
||||
|
|
@ -682,7 +748,9 @@ async fn run_build(
|
|||
|
||||
// Write error log
|
||||
if let Some(ref storage) = log_storage {
|
||||
let _ = storage.write_log(&build.id, "", &msg);
|
||||
if let Err(e) = storage.write_log(&build.id, "", &msg) {
|
||||
tracing::warn!(build_id = %build.id, "Failed to write error log: {e}");
|
||||
}
|
||||
}
|
||||
// Clean up live log
|
||||
let _ = tokio::fs::remove_file(&live_log_path).await;
|
||||
|
|
@ -722,9 +790,12 @@ async fn run_build(
|
|||
if updated_build.status == BuildStatus::Completed
|
||||
&& let Ok(eval) = repo::evaluations::get(pool, build.evaluation_id).await
|
||||
{
|
||||
let _ =
|
||||
if let Err(e) =
|
||||
repo::channels::auto_promote_if_complete(pool, eval.jobset_id, eval.id)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
tracing::warn!(build_id = %build.id, "Failed to auto-promote channels: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -60,12 +60,15 @@ async fn create_channel(
|
|||
fc_common::repo::evaluations::get_latest(&state.pool, jobset_id).await
|
||||
{
|
||||
if eval.status == fc_common::models::EvaluationStatus::Completed {
|
||||
let _ = fc_common::repo::channels::auto_promote_if_complete(
|
||||
if let Err(e) = fc_common::repo::channels::auto_promote_if_complete(
|
||||
&state.pool,
|
||||
jobset_id,
|
||||
eval.id,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
tracing::warn!(jobset_id = %jobset_id, "Failed to auto-promote channel: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -176,10 +176,9 @@ pub fn router(state: AppState, config: &ServerConfig) -> Router {
|
|||
));
|
||||
|
||||
// Add rate limiting if configured
|
||||
if let (Some(rps), Some(burst)) =
|
||||
if let (Some(_rps), Some(burst)) =
|
||||
(config.rate_limit_rps, config.rate_limit_burst)
|
||||
{
|
||||
let _ = rps; // rate_limit_rps reserved for future use
|
||||
let rl_state = Arc::new(RateLimitState {
|
||||
requests: DashMap::new(),
|
||||
burst,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue