diff --git a/src/main.rs b/src/main.rs index bf6d091..099d512 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod config; mod core; mod crypto; +mod retry; mod sinks; mod sources; mod storage; diff --git a/src/retry.rs b/src/retry.rs new file mode 100644 index 0000000..233e848 --- /dev/null +++ b/src/retry.rs @@ -0,0 +1,116 @@ +use std::time::Duration; + +use tracing::{debug, warn}; + +use crate::config::RetryConfig; + +/// Classification of an error for retry purposes. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ErrorKind { + /// Transient error - safe to retry (network failure, rate limit, 5xx). + Transient, + /// Permanent error - retrying will not help (4xx, auth failure, etc.). + Permanent, +} + +/// Classify a `reqwest::Error` for retry purposes. +pub fn classify_reqwest_error(err: &reqwest::Error) -> ErrorKind { + if err.is_connect() || err.is_timeout() || err.is_request() { + return ErrorKind::Transient; + } + + if let Some(status) = err.status() { + return classify_status_code(status.as_u16()); + } + + // Unknown - assume transient to avoid silently dropping operations. + ErrorKind::Transient +} + +/// Classify an HTTP status code for retry purposes. +pub const fn classify_status_code(status: u16) -> ErrorKind { + match status { + // Rate limit and server-side errors - back off and retry. + 429 | 500..=599 => ErrorKind::Transient, + // Client errors - permanent; retrying changes nothing. + 400..=499 => ErrorKind::Permanent, + // Unexpected codes - assume transient. + _ => ErrorKind::Transient, + } +} + +/// Scale `backoff_ms` by `multiplier` using integer arithmetic. +/// +/// The multiplier is represented as a fixed-point ratio (numerator / 1000) to +/// avoid any floating-point casts in the hot retry loop. Overflow is clamped +/// via saturating arithmetic. +fn scale_backoff(backoff_ms: u64, multiplier: f64) -> u64 { + // Convert the multiplier to an integer numerator over 1000. + // e.g. 2.0 -> 2000 / 1000, 1.5 -> 1500 / 1000. + // The cast is intentional: we truncate sub-millisecond precision, which is + // acceptable for backoff timing. + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let scale_num = (multiplier * 1000.0) as u64; + backoff_ms.saturating_mul(scale_num).saturating_div(1000) +} + +/// Execute `operation` with exponential backoff retries governed by `config`. +/// +/// `classify` converts the error type into an [`ErrorKind`]; only +/// [`ErrorKind::Transient`] errors are retried. A [`ErrorKind::Permanent`] +/// error is returned immediately without further attempts. +/// +/// Returns the last error after all attempts are exhausted. +pub async fn execute_with_retry( + config: &RetryConfig, + operation_name: &str, + classify: C, + mut operation: F, +) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Display, + C: Fn(&E) -> ErrorKind, +{ + let mut attempt = 0u32; + let mut backoff_ms = config.initial_backoff_ms; + + loop { + match operation().await { + Ok(value) => return Ok(value), + Err(err) => { + let kind = classify(&err); + + if kind == ErrorKind::Permanent { + debug!( + operation = operation_name, + attempt, "Permanent error, not retrying: {err}" + ); + return Err(err); + } + + if attempt >= config.max_retries { + warn!( + operation = operation_name, + attempt, + max_retries = config.max_retries, + "All retry attempts exhausted: {err}" + ); + return Err(err); + } + + warn!( + operation = operation_name, + attempt, backoff_ms, "Transient error, retrying after backoff: {err}" + ); + + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + + attempt += 1; + backoff_ms = scale_backoff(backoff_ms, config.backoff_multiplier) + .min(config.max_backoff_ms); + }, + } + } +} diff --git a/src/sources/forgejo.rs b/src/sources/forgejo.rs index d5b211a..2166118 100644 --- a/src/sources/forgejo.rs +++ b/src/sources/forgejo.rs @@ -7,13 +7,15 @@ use tokio::{fs::File, io::AsyncWriteExt}; use urlencoding::encode; use crate::{ - config::ForgejoSourceConfig, + config::{ForgejoSourceConfig, RetryConfig}, core::types::ContentHash, crypto::StreamingHasher, + retry::{ErrorKind, classify_reqwest_error, execute_with_retry}, }; pub struct ForgejoSource { config: ForgejoSourceConfig, + retry: RetryConfig, client: reqwest::Client, } @@ -25,7 +27,10 @@ pub struct Repository { } impl ForgejoSource { - pub fn new(config: ForgejoSourceConfig) -> anyhow::Result { + pub fn new( + config: ForgejoSourceConfig, + retry: RetryConfig, + ) -> anyhow::Result { let mut headers = HeaderMap::new(); headers.insert( header::AUTHORIZATION, @@ -39,7 +44,11 @@ impl ForgejoSource { .timeout(std::time::Duration::from_secs(300)) .build()?; - Ok(Self { config, client }) + Ok(Self { + config, + retry, + client, + }) } pub async fn list_repositories(&self) -> anyhow::Result> { @@ -85,14 +94,33 @@ impl ForgejoSource { encode(org), page ); - let response = self.client.get(&url).send().await?; + + let response = execute_with_retry( + &self.retry, + "fetch_org_repos_page", + |err: &reqwest::Error| classify_reqwest_error(err), + || self.client.get(&url).send(), + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to fetch repos for org {org}: {e}"))?; if !response.status().is_success() { - anyhow::bail!( - "Failed to fetch repos for org {}: {}", - org, - response.status() - ); + // Classify the HTTP error code: non-2xx from the server may be transient + // (5xx, 429). For those we already retried inside execute_with_retry via + // the reqwest error path; here we handle the case where the response + // arrived successfully but carries an error status. + let status = response.status().as_u16(); + match crate::retry::classify_status_code(status) { + ErrorKind::Transient => { + anyhow::bail!( + "Transient HTTP {status} fetching repos for org {org} (retries \ + exhausted)" + ); + }, + ErrorKind::Permanent => { + anyhow::bail!("HTTP {status} fetching repos for org {org}"); + }, + } } let api_repos: Vec = response.json().await?; @@ -130,7 +158,20 @@ impl ForgejoSource { encode(&repo.default_branch) ); - let response = self.client.get(&url).send().await?; + let response = execute_with_retry( + &self.retry, + "download_archive", + |err: &reqwest::Error| classify_reqwest_error(err), + || self.client.get(&url).send(), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to download archive for {}/{}: {e}", + repo.owner, + repo.name + ) + })?; if !response.status().is_success() { anyhow::bail!( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 4850363..012f59b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -11,6 +11,7 @@ use crate::core::types::{ SinkJobState, }; +#[derive(Clone)] pub struct Storage { pool: Pool, }