various: get retries with backoff working; take a RetryConfig for sources
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I27d0b21c117c5654536ac3f665efc7876a6a6964
This commit is contained in:
parent
abd29e0969
commit
f62d75140f
4 changed files with 169 additions and 10 deletions
|
|
@ -1,6 +1,7 @@
|
||||||
mod config;
|
mod config;
|
||||||
mod core;
|
mod core;
|
||||||
mod crypto;
|
mod crypto;
|
||||||
|
mod retry;
|
||||||
mod sinks;
|
mod sinks;
|
||||||
mod sources;
|
mod sources;
|
||||||
mod storage;
|
mod storage;
|
||||||
|
|
|
||||||
116
src/retry.rs
Normal file
116
src/retry.rs
Normal file
|
|
@ -0,0 +1,116 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
|
use crate::config::RetryConfig;
|
||||||
|
|
||||||
|
/// Classification of an error for retry purposes.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum ErrorKind {
|
||||||
|
/// Transient error - safe to retry (network failure, rate limit, 5xx).
|
||||||
|
Transient,
|
||||||
|
/// Permanent error - retrying will not help (4xx, auth failure, etc.).
|
||||||
|
Permanent,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Classify a `reqwest::Error` for retry purposes.
|
||||||
|
pub fn classify_reqwest_error(err: &reqwest::Error) -> ErrorKind {
|
||||||
|
if err.is_connect() || err.is_timeout() || err.is_request() {
|
||||||
|
return ErrorKind::Transient;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(status) = err.status() {
|
||||||
|
return classify_status_code(status.as_u16());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unknown - assume transient to avoid silently dropping operations.
|
||||||
|
ErrorKind::Transient
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Classify an HTTP status code for retry purposes.
|
||||||
|
pub const fn classify_status_code(status: u16) -> ErrorKind {
|
||||||
|
match status {
|
||||||
|
// Rate limit and server-side errors - back off and retry.
|
||||||
|
429 | 500..=599 => ErrorKind::Transient,
|
||||||
|
// Client errors - permanent; retrying changes nothing.
|
||||||
|
400..=499 => ErrorKind::Permanent,
|
||||||
|
// Unexpected codes - assume transient.
|
||||||
|
_ => ErrorKind::Transient,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Scale `backoff_ms` by `multiplier` using integer arithmetic.
|
||||||
|
///
|
||||||
|
/// The multiplier is represented as a fixed-point ratio (numerator / 1000) to
|
||||||
|
/// avoid any floating-point casts in the hot retry loop. Overflow is clamped
|
||||||
|
/// via saturating arithmetic.
|
||||||
|
fn scale_backoff(backoff_ms: u64, multiplier: f64) -> u64 {
|
||||||
|
// Convert the multiplier to an integer numerator over 1000.
|
||||||
|
// e.g. 2.0 -> 2000 / 1000, 1.5 -> 1500 / 1000.
|
||||||
|
// The cast is intentional: we truncate sub-millisecond precision, which is
|
||||||
|
// acceptable for backoff timing.
|
||||||
|
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
|
||||||
|
let scale_num = (multiplier * 1000.0) as u64;
|
||||||
|
backoff_ms.saturating_mul(scale_num).saturating_div(1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute `operation` with exponential backoff retries governed by `config`.
|
||||||
|
///
|
||||||
|
/// `classify` converts the error type into an [`ErrorKind`]; only
|
||||||
|
/// [`ErrorKind::Transient`] errors are retried. A [`ErrorKind::Permanent`]
|
||||||
|
/// error is returned immediately without further attempts.
|
||||||
|
///
|
||||||
|
/// Returns the last error after all attempts are exhausted.
|
||||||
|
pub async fn execute_with_retry<F, Fut, T, E, C>(
|
||||||
|
config: &RetryConfig,
|
||||||
|
operation_name: &str,
|
||||||
|
classify: C,
|
||||||
|
mut operation: F,
|
||||||
|
) -> Result<T, E>
|
||||||
|
where
|
||||||
|
F: FnMut() -> Fut,
|
||||||
|
Fut: std::future::Future<Output = Result<T, E>>,
|
||||||
|
E: std::fmt::Display,
|
||||||
|
C: Fn(&E) -> ErrorKind,
|
||||||
|
{
|
||||||
|
let mut attempt = 0u32;
|
||||||
|
let mut backoff_ms = config.initial_backoff_ms;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match operation().await {
|
||||||
|
Ok(value) => return Ok(value),
|
||||||
|
Err(err) => {
|
||||||
|
let kind = classify(&err);
|
||||||
|
|
||||||
|
if kind == ErrorKind::Permanent {
|
||||||
|
debug!(
|
||||||
|
operation = operation_name,
|
||||||
|
attempt, "Permanent error, not retrying: {err}"
|
||||||
|
);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempt >= config.max_retries {
|
||||||
|
warn!(
|
||||||
|
operation = operation_name,
|
||||||
|
attempt,
|
||||||
|
max_retries = config.max_retries,
|
||||||
|
"All retry attempts exhausted: {err}"
|
||||||
|
);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!(
|
||||||
|
operation = operation_name,
|
||||||
|
attempt, backoff_ms, "Transient error, retrying after backoff: {err}"
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
|
||||||
|
|
||||||
|
attempt += 1;
|
||||||
|
backoff_ms = scale_backoff(backoff_ms, config.backoff_multiplier)
|
||||||
|
.min(config.max_backoff_ms);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -7,13 +7,15 @@ use tokio::{fs::File, io::AsyncWriteExt};
|
||||||
use urlencoding::encode;
|
use urlencoding::encode;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::ForgejoSourceConfig,
|
config::{ForgejoSourceConfig, RetryConfig},
|
||||||
core::types::ContentHash,
|
core::types::ContentHash,
|
||||||
crypto::StreamingHasher,
|
crypto::StreamingHasher,
|
||||||
|
retry::{ErrorKind, classify_reqwest_error, execute_with_retry},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct ForgejoSource {
|
pub struct ForgejoSource {
|
||||||
config: ForgejoSourceConfig,
|
config: ForgejoSourceConfig,
|
||||||
|
retry: RetryConfig,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -25,7 +27,10 @@ pub struct Repository {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ForgejoSource {
|
impl ForgejoSource {
|
||||||
pub fn new(config: ForgejoSourceConfig) -> anyhow::Result<Self> {
|
pub fn new(
|
||||||
|
config: ForgejoSourceConfig,
|
||||||
|
retry: RetryConfig,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
headers.insert(
|
headers.insert(
|
||||||
header::AUTHORIZATION,
|
header::AUTHORIZATION,
|
||||||
|
|
@ -39,7 +44,11 @@ impl ForgejoSource {
|
||||||
.timeout(std::time::Duration::from_secs(300))
|
.timeout(std::time::Duration::from_secs(300))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
Ok(Self { config, client })
|
Ok(Self {
|
||||||
|
config,
|
||||||
|
retry,
|
||||||
|
client,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_repositories(&self) -> anyhow::Result<Vec<Repository>> {
|
pub async fn list_repositories(&self) -> anyhow::Result<Vec<Repository>> {
|
||||||
|
|
@ -85,14 +94,33 @@ impl ForgejoSource {
|
||||||
encode(org),
|
encode(org),
|
||||||
page
|
page
|
||||||
);
|
);
|
||||||
let response = self.client.get(&url).send().await?;
|
|
||||||
|
let response = execute_with_retry(
|
||||||
|
&self.retry,
|
||||||
|
"fetch_org_repos_page",
|
||||||
|
|err: &reqwest::Error| classify_reqwest_error(err),
|
||||||
|
|| self.client.get(&url).send(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to fetch repos for org {org}: {e}"))?;
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
anyhow::bail!(
|
// Classify the HTTP error code: non-2xx from the server may be transient
|
||||||
"Failed to fetch repos for org {}: {}",
|
// (5xx, 429). For those we already retried inside execute_with_retry via
|
||||||
org,
|
// the reqwest error path; here we handle the case where the response
|
||||||
response.status()
|
// arrived successfully but carries an error status.
|
||||||
);
|
let status = response.status().as_u16();
|
||||||
|
match crate::retry::classify_status_code(status) {
|
||||||
|
ErrorKind::Transient => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"Transient HTTP {status} fetching repos for org {org} (retries \
|
||||||
|
exhausted)"
|
||||||
|
);
|
||||||
|
},
|
||||||
|
ErrorKind::Permanent => {
|
||||||
|
anyhow::bail!("HTTP {status} fetching repos for org {org}");
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let api_repos: Vec<ApiRepository> = response.json().await?;
|
let api_repos: Vec<ApiRepository> = response.json().await?;
|
||||||
|
|
@ -130,7 +158,20 @@ impl ForgejoSource {
|
||||||
encode(&repo.default_branch)
|
encode(&repo.default_branch)
|
||||||
);
|
);
|
||||||
|
|
||||||
let response = self.client.get(&url).send().await?;
|
let response = execute_with_retry(
|
||||||
|
&self.retry,
|
||||||
|
"download_archive",
|
||||||
|
|err: &reqwest::Error| classify_reqwest_error(err),
|
||||||
|
|| self.client.get(&url).send(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
anyhow::anyhow!(
|
||||||
|
"Failed to download archive for {}/{}: {e}",
|
||||||
|
repo.owner,
|
||||||
|
repo.name
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
anyhow::bail!(
|
anyhow::bail!(
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ use crate::core::types::{
|
||||||
SinkJobState,
|
SinkJobState,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Storage {
|
pub struct Storage {
|
||||||
pool: Pool<Sqlite>,
|
pool: Pool<Sqlite>,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue