pipeline: make artifact ordering deterministic; use enumeration order for jobs

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I1bed418fa37c185716a4f913906b3cbc6a6a6964
This commit is contained in:
raf 2026-03-17 21:24:26 +03:00
commit a397426858
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
2 changed files with 143 additions and 45 deletions

View file

@ -24,12 +24,30 @@ use crate::{
storage::Storage, storage::Storage,
}; };
/// Context for backing up a single repository.
struct BackupContext<'a> {
source: &'a ForgejoSource,
source_id: &'a str,
run_id: &'a RunId,
storage: &'a Storage,
sink_configs: &'a [SinkConfig],
temp_dir: &'a Path,
}
pub struct Pipeline { pub struct Pipeline {
config: Config, config: Config,
storage: Storage, storage: Storage,
} }
impl Pipeline { impl Pipeline {
/// Creates a new backup pipeline.
///
/// # Errors
///
/// Returns an error if:
///
/// - The database connection cannot be established
/// - The temp directory cannot be created
pub async fn new(config: Config) -> anyhow::Result<Self> { pub async fn new(config: Config) -> anyhow::Result<Self> {
let storage = let storage =
Storage::new(&format!("sqlite://{}", config.service.state_db_path)) Storage::new(&format!("sqlite://{}", config.service.state_db_path))
@ -40,6 +58,17 @@ impl Pipeline {
Ok(Self { config, storage }) Ok(Self { config, storage })
} }
/// Runs the backup pipeline.
///
/// # Errors
///
/// Returns an error if:
///
/// - Repository listing fails
/// - Manifest finalization fails
///
/// Individual repository backup failures are logged but do not fail
/// the entire run; the pipeline continues with remaining repositories.
pub async fn run(&self) -> anyhow::Result<RunId> { pub async fn run(&self) -> anyhow::Result<RunId> {
let run_id = RunId::new(); let run_id = RunId::new();
info!("Starting backup run {}", run_id.0); info!("Starting backup run {}", run_id.0);
@ -65,10 +94,13 @@ impl Pipeline {
// Spawn each repository backup as a task bounded by the semaphore. // Spawn each repository backup as a task bounded by the semaphore.
// We collect results in order so that manifest entries stay stable. // We collect results in order so that manifest entries stay stable.
let source = Arc::new(source); let source = Arc::new(source);
let mut repo_tasks: JoinSet<(String, anyhow::Result<ManifestEntry>)> = let mut repo_tasks: JoinSet<(
JoinSet::new(); String,
i64,
anyhow::Result<ManifestEntry>,
)> = JoinSet::new();
for repo in repos { for (index, repo) in repos.into_iter().enumerate() {
let permit = Arc::clone(&repo_sem) let permit = Arc::clone(&repo_sem)
.acquire_owned() .acquire_owned()
.await .await
@ -80,41 +112,57 @@ impl Pipeline {
let storage = self.storage.clone(); let storage = self.storage.clone();
let sink_configs = self.config.sinks.clone(); let sink_configs = self.config.sinks.clone();
let temp_dir = self.config.service.temp_dir.clone(); let temp_dir = self.config.service.temp_dir.clone();
let order_index = i64::try_from(index).map_err(|_| {
anyhow::anyhow!("Repository count exceeds i64 max")
})?;
repo_tasks.spawn(async move { repo_tasks.spawn(async move {
// Permit is held for the duration of the backup task. // Permit is held for the duration of the backup task.
let _permit = permit; let _permit = permit;
let result = backup_repository( let ctx = BackupContext {
&source, source: &source,
&source_id, source_id: &source_id,
&repo, run_id: &run_id,
&run_id, storage: &storage,
&storage, sink_configs: &sink_configs,
&sink_configs, temp_dir: Path::new(&temp_dir),
Path::new(&temp_dir), };
) let result = backup_repository(&ctx, &repo, order_index).await;
.await;
let label = format!("{}/{}", repo.owner, repo.name); let label = format!("{}/{}", repo.owner, repo.name);
(label, result) (label, order_index, result)
}); });
} }
// Collect results and sort by order_index to ensure deterministic
// manifest ordering that matches database ordering.
let mut results: Vec<(i64, String, anyhow::Result<ManifestEntry>)> =
Vec::new();
while let Some(join_result) = repo_tasks.join_next().await { while let Some(join_result) = repo_tasks.join_next().await {
match join_result { match join_result {
Ok((label, Ok(entry))) => { Ok((label, order_index, result)) => {
info!("Backed up {label}"); results.push((order_index, label, result));
manifest.add_artifact(entry);
},
Ok((label, Err(e))) => {
error!("Failed to backup {label}: {e}");
}, },
Err(e) => { Err(e) => {
error!("Repository task panicked: {e}"); error!("Repository task panicked: {e}");
}, },
} }
} }
results.sort_by_key(|(order_index, ..)| *order_index);
for (_, label, result) in results {
match result {
Ok(entry) => {
info!("Backed up {label}");
manifest.add_artifact(entry);
},
Err(e) => {
error!("Failed to backup {label}: {e}");
},
}
}
}, },
} }
} }
@ -142,19 +190,26 @@ impl Pipeline {
/// Back up a single repository to all configured sinks. /// Back up a single repository to all configured sinks.
/// ///
/// Returns a [`ManifestEntry`] on success describing the stored artifact and /// # Returns
/// where it landed. ///
/// A [`ManifestEntry`] describing the stored artifact and
/// where it landed or error.
///
/// # Errors
///
/// Returns an error if:
///
/// - Downloading the repository archive fails
/// - Reading file metadata fails
/// - Creating or updating the job in storage fails
/// - Writing to any sink fails
async fn backup_repository( async fn backup_repository(
source: &ForgejoSource, ctx: &BackupContext<'_>,
source_id: &str,
repo: &Repository, repo: &Repository,
run_id: &RunId, order_index: i64,
storage: &Storage,
sink_configs: &[SinkConfig],
temp_dir: &Path,
) -> anyhow::Result<ManifestEntry> { ) -> anyhow::Result<ManifestEntry> {
let (content_hash, temp_path) = let (content_hash, temp_path) =
source.download_archive(repo, temp_dir).await?; ctx.source.download_archive(repo, ctx.temp_dir).await?;
let size_bytes = tokio::fs::metadata(&temp_path).await?.len(); let size_bytes = tokio::fs::metadata(&temp_path).await?.len();
@ -162,7 +217,7 @@ async fn backup_repository(
content_hash.clone(), content_hash.clone(),
size_bytes, size_bytes,
SourceProvenance { SourceProvenance {
source_id: source_id.to_string(), source_id: ctx.source_id.to_string(),
entity_type: EntityType::Repository, entity_type: EntityType::Repository,
remote_id: format!("{}/{}", repo.owner, repo.name), remote_id: format!("{}/{}", repo.owner, repo.name),
fetched_at: Utc::now(), fetched_at: Utc::now(),
@ -170,14 +225,18 @@ async fn backup_repository(
temp_path, temp_path,
)); ));
let sink_ids: Vec<String> = let sink_ids: Vec<String> = ctx
sink_configs.iter().map(|s| s.id().to_string()).collect(); .sink_configs
.iter()
.map(|s| s.id().to_string())
.collect();
let job_id = JobId::new(); let job_id = JobId::new();
storage ctx
.storage
.create_job( .create_job(
job_id.clone(), job_id.clone(),
run_id.clone(), ctx.run_id.clone(),
content_hash.clone(), content_hash.clone(),
JobStatus::Fetching, JobStatus::Fetching,
sink_ids sink_ids
@ -190,6 +249,7 @@ async fn backup_repository(
}) })
}) })
.collect(), .collect(),
order_index,
) )
.await?; .await?;
@ -199,7 +259,7 @@ async fn backup_repository(
let mut sink_tasks: JoinSet<anyhow::Result<(String, StorageReceipt)>> = let mut sink_tasks: JoinSet<anyhow::Result<(String, StorageReceipt)>> =
JoinSet::new(); JoinSet::new();
for sink_config in sink_configs { for sink_config in ctx.sink_configs {
let artifact = Arc::clone(&artifact); let artifact = Arc::clone(&artifact);
let sink_config = sink_config.clone(); let sink_config = sink_config.clone();
let sink_id = sink_config.id().to_string(); let sink_id = sink_config.id().to_string();
@ -237,8 +297,9 @@ async fn backup_repository(
} }
} }
if sink_receipts.len() != sink_configs.len() { if sink_receipts.len() != ctx.sink_configs.len() {
storage ctx
.storage
.update_job_status( .update_job_status(
&job_id, &job_id,
JobStatus::Failed, JobStatus::Failed,
@ -266,11 +327,12 @@ async fn backup_repository(
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"Not all sinks succeeded: {}/{} completed", "Not all sinks succeeded: {}/{} completed",
sink_receipts.len(), sink_receipts.len(),
sink_configs.len() ctx.sink_configs.len()
)); ));
} }
storage ctx
.storage
.update_job_status( .update_job_status(
&job_id, &job_id,
JobStatus::Committed, JobStatus::Committed,
@ -290,13 +352,17 @@ async fn backup_repository(
let entry = ManifestEntry { let entry = ManifestEntry {
hash: content_hash, hash: content_hash,
size: artifact.size_bytes, size: artifact.size_bytes,
source_id: source_id.to_string(), source_id: ctx.source_id.to_string(),
sink_uris: sink_receipts, sink_uris: sink_receipts,
}; };
// Clean up the temp file now that all sinks have completed. // Clean up the temp file now that all sinks have completed.
match Arc::try_unwrap(artifact) { match Arc::try_unwrap(artifact) {
Ok(artifact) => artifact.cleanup().await, Ok(artifact) => {
if let Err(e) = artifact.cleanup().await {
error!("Failed to cleanup artifact temp file: {e}");
}
},
Err(_) => { Err(_) => {
error!( error!(
"Failed to unwrap Arc for artifact cleanup - unexpected live \ "Failed to unwrap Arc for artifact cleanup - unexpected live \

View file

@ -17,6 +17,13 @@ pub struct Storage {
} }
impl Storage { impl Storage {
/// Creates a new storage connection and runs migrations.
///
/// # Errors
/// Returns an error if:
/// - The database connection cannot be established
/// - WAL mode cannot be enabled
/// - Migrations fail
pub async fn new(database_url: &str) -> anyhow::Result<Self> { pub async fn new(database_url: &str) -> anyhow::Result<Self> {
let pool = SqlitePoolOptions::new() let pool = SqlitePoolOptions::new()
.max_connections(5) .max_connections(5)
@ -42,6 +49,7 @@ impl Storage {
artifact_hash TEXT NOT NULL, artifact_hash TEXT NOT NULL,
status TEXT NOT NULL, status TEXT NOT NULL,
sink_statuses TEXT NOT NULL, sink_statuses TEXT NOT NULL,
order_index INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL, created_at TEXT NOT NULL,
updated_at TEXT NOT NULL updated_at TEXT NOT NULL
) )
@ -78,6 +86,10 @@ impl Storage {
Ok(()) Ok(())
} }
/// Creates a new job in the database.
///
/// # Errors
/// Returns an error if the database insert fails.
pub async fn create_job( pub async fn create_job(
&self, &self,
job_id: JobId, job_id: JobId,
@ -85,14 +97,15 @@ impl Storage {
artifact_hash: ContentHash, artifact_hash: ContentHash,
status: JobStatus, status: JobStatus,
sink_statuses: HashMap<String, SinkJobState>, sink_statuses: HashMap<String, SinkJobState>,
) -> anyhow::Result<JobId> { order_index: i64,
) -> anyhow::Result<()> {
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let sink_statuses_json = serde_json::to_string(&sink_statuses)?; let sink_statuses_json = serde_json::to_string(&sink_statuses)?;
sqlx::query( sqlx::query(
r" r"
INSERT INTO jobs (job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at) INSERT INTO jobs (job_id, run_id, artifact_hash, status, sink_statuses, order_index, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
", ",
) )
.bind(job_id.0.to_string()) .bind(job_id.0.to_string())
@ -100,14 +113,21 @@ impl Storage {
.bind(artifact_hash.0) .bind(artifact_hash.0)
.bind(serde_json::to_string(&status)?) .bind(serde_json::to_string(&status)?)
.bind(sink_statuses_json) .bind(sink_statuses_json)
.bind(order_index)
.bind(now.to_rfc3339()) .bind(now.to_rfc3339())
.bind(now.to_rfc3339()) .bind(now.to_rfc3339())
.execute(&self.pool) .execute(&self.pool)
.await?; .await?;
Ok(job_id) Ok(())
} }
/// Updates the status of an existing job.
///
/// # Errors
/// Returns an error if:
/// - The database update fails
/// - The job is not found
pub async fn update_job_status( pub async fn update_job_status(
&self, &self,
job_id: &JobId, job_id: &JobId,
@ -138,6 +158,10 @@ impl Storage {
Ok(()) Ok(())
} }
/// Lists all jobs for a given run.
///
/// # Errors
/// Returns an error if the database query fails or if row data is invalid.
pub async fn list_jobs_by_run( pub async fn list_jobs_by_run(
&self, &self,
run_id: &RunId, run_id: &RunId,
@ -147,7 +171,7 @@ impl Storage {
SELECT job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at SELECT job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at
FROM jobs FROM jobs
WHERE run_id = ?1 WHERE run_id = ?1
ORDER BY created_at ORDER BY order_index
", ",
) )
.bind(run_id.0.to_string()) .bind(run_id.0.to_string())
@ -160,6 +184,11 @@ impl Storage {
} }
/// Saves a manifest with its root hash for a backup run. /// Saves a manifest with its root hash for a backup run.
///
/// # Errors
/// Returns an error if:
/// - The artifact count exceeds i64 max
/// - The database insert fails
pub async fn save_manifest( pub async fn save_manifest(
&self, &self,
run_id: &RunId, run_id: &RunId,
@ -190,6 +219,9 @@ impl Storage {
} }
/// Retrieves the stored root hash for a backup run. /// Retrieves the stored root hash for a backup run.
///
/// # Errors
/// Returns an error if the database query fails.
pub async fn get_manifest_root( pub async fn get_manifest_root(
&self, &self,
run_id: &RunId, run_id: &RunId,