From a397426858b80080cfb8f0b35a5c79c4430120b1 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Tue, 17 Mar 2026 21:24:26 +0300 Subject: [PATCH] pipeline: make artifact ordering deterministic; use enumeration order for jobs Signed-off-by: NotAShelf Change-Id: I1bed418fa37c185716a4f913906b3cbc6a6a6964 --- src/core/pipeline.rs | 148 +++++++++++++++++++++++++++++++------------ src/storage/mod.rs | 42 ++++++++++-- 2 files changed, 144 insertions(+), 46 deletions(-) diff --git a/src/core/pipeline.rs b/src/core/pipeline.rs index 03beb6c..836b2c6 100644 --- a/src/core/pipeline.rs +++ b/src/core/pipeline.rs @@ -24,12 +24,30 @@ use crate::{ storage::Storage, }; +/// Context for backing up a single repository. +struct BackupContext<'a> { + source: &'a ForgejoSource, + source_id: &'a str, + run_id: &'a RunId, + storage: &'a Storage, + sink_configs: &'a [SinkConfig], + temp_dir: &'a Path, +} + pub struct Pipeline { config: Config, storage: Storage, } impl Pipeline { + /// Creates a new backup pipeline. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// - The database connection cannot be established + /// - The temp directory cannot be created pub async fn new(config: Config) -> anyhow::Result { let storage = Storage::new(&format!("sqlite://{}", config.service.state_db_path)) @@ -40,6 +58,17 @@ impl Pipeline { Ok(Self { config, storage }) } + /// Runs the backup pipeline. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// - Repository listing fails + /// - Manifest finalization fails + /// + /// Individual repository backup failures are logged but do not fail + /// the entire run; the pipeline continues with remaining repositories. pub async fn run(&self) -> anyhow::Result { let run_id = RunId::new(); info!("Starting backup run {}", run_id.0); @@ -65,10 +94,13 @@ impl Pipeline { // Spawn each repository backup as a task bounded by the semaphore. // We collect results in order so that manifest entries stay stable. let source = Arc::new(source); - let mut repo_tasks: JoinSet<(String, anyhow::Result)> = - JoinSet::new(); + let mut repo_tasks: JoinSet<( + String, + i64, + anyhow::Result, + )> = JoinSet::new(); - for repo in repos { + for (index, repo) in repos.into_iter().enumerate() { let permit = Arc::clone(&repo_sem) .acquire_owned() .await @@ -80,41 +112,57 @@ impl Pipeline { let storage = self.storage.clone(); let sink_configs = self.config.sinks.clone(); let temp_dir = self.config.service.temp_dir.clone(); + let order_index = i64::try_from(index).map_err(|_| { + anyhow::anyhow!("Repository count exceeds i64 max") + })?; repo_tasks.spawn(async move { // Permit is held for the duration of the backup task. let _permit = permit; - let result = backup_repository( - &source, - &source_id, - &repo, - &run_id, - &storage, - &sink_configs, - Path::new(&temp_dir), - ) - .await; + let ctx = BackupContext { + source: &source, + source_id: &source_id, + run_id: &run_id, + storage: &storage, + sink_configs: &sink_configs, + temp_dir: Path::new(&temp_dir), + }; + let result = backup_repository(&ctx, &repo, order_index).await; let label = format!("{}/{}", repo.owner, repo.name); - (label, result) + (label, order_index, result) }); } + // Collect results and sort by order_index to ensure deterministic + // manifest ordering that matches database ordering. + let mut results: Vec<(i64, String, anyhow::Result)> = + Vec::new(); while let Some(join_result) = repo_tasks.join_next().await { match join_result { - Ok((label, Ok(entry))) => { - info!("Backed up {label}"); - manifest.add_artifact(entry); - }, - Ok((label, Err(e))) => { - error!("Failed to backup {label}: {e}"); + Ok((label, order_index, result)) => { + results.push((order_index, label, result)); }, Err(e) => { error!("Repository task panicked: {e}"); }, } } + + results.sort_by_key(|(order_index, ..)| *order_index); + + for (_, label, result) in results { + match result { + Ok(entry) => { + info!("Backed up {label}"); + manifest.add_artifact(entry); + }, + Err(e) => { + error!("Failed to backup {label}: {e}"); + }, + } + } }, } } @@ -142,19 +190,26 @@ impl Pipeline { /// Back up a single repository to all configured sinks. /// -/// Returns a [`ManifestEntry`] on success describing the stored artifact and -/// where it landed. +/// # Returns +/// +/// A [`ManifestEntry`] describing the stored artifact and +/// where it landed or error. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// - Downloading the repository archive fails +/// - Reading file metadata fails +/// - Creating or updating the job in storage fails +/// - Writing to any sink fails async fn backup_repository( - source: &ForgejoSource, - source_id: &str, + ctx: &BackupContext<'_>, repo: &Repository, - run_id: &RunId, - storage: &Storage, - sink_configs: &[SinkConfig], - temp_dir: &Path, + order_index: i64, ) -> anyhow::Result { let (content_hash, temp_path) = - source.download_archive(repo, temp_dir).await?; + ctx.source.download_archive(repo, ctx.temp_dir).await?; let size_bytes = tokio::fs::metadata(&temp_path).await?.len(); @@ -162,7 +217,7 @@ async fn backup_repository( content_hash.clone(), size_bytes, SourceProvenance { - source_id: source_id.to_string(), + source_id: ctx.source_id.to_string(), entity_type: EntityType::Repository, remote_id: format!("{}/{}", repo.owner, repo.name), fetched_at: Utc::now(), @@ -170,14 +225,18 @@ async fn backup_repository( temp_path, )); - let sink_ids: Vec = - sink_configs.iter().map(|s| s.id().to_string()).collect(); + let sink_ids: Vec = ctx + .sink_configs + .iter() + .map(|s| s.id().to_string()) + .collect(); let job_id = JobId::new(); - storage + ctx + .storage .create_job( job_id.clone(), - run_id.clone(), + ctx.run_id.clone(), content_hash.clone(), JobStatus::Fetching, sink_ids @@ -190,6 +249,7 @@ async fn backup_repository( }) }) .collect(), + order_index, ) .await?; @@ -199,7 +259,7 @@ async fn backup_repository( let mut sink_tasks: JoinSet> = JoinSet::new(); - for sink_config in sink_configs { + for sink_config in ctx.sink_configs { let artifact = Arc::clone(&artifact); let sink_config = sink_config.clone(); let sink_id = sink_config.id().to_string(); @@ -237,8 +297,9 @@ async fn backup_repository( } } - if sink_receipts.len() != sink_configs.len() { - storage + if sink_receipts.len() != ctx.sink_configs.len() { + ctx + .storage .update_job_status( &job_id, JobStatus::Failed, @@ -266,11 +327,12 @@ async fn backup_repository( return Err(anyhow::anyhow!( "Not all sinks succeeded: {}/{} completed", sink_receipts.len(), - sink_configs.len() + ctx.sink_configs.len() )); } - storage + ctx + .storage .update_job_status( &job_id, JobStatus::Committed, @@ -290,13 +352,17 @@ async fn backup_repository( let entry = ManifestEntry { hash: content_hash, size: artifact.size_bytes, - source_id: source_id.to_string(), + source_id: ctx.source_id.to_string(), sink_uris: sink_receipts, }; // Clean up the temp file now that all sinks have completed. match Arc::try_unwrap(artifact) { - Ok(artifact) => artifact.cleanup().await, + Ok(artifact) => { + if let Err(e) = artifact.cleanup().await { + error!("Failed to cleanup artifact temp file: {e}"); + } + }, Err(_) => { error!( "Failed to unwrap Arc for artifact cleanup - unexpected live \ diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 012f59b..f210a70 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -17,6 +17,13 @@ pub struct Storage { } impl Storage { + /// Creates a new storage connection and runs migrations. + /// + /// # Errors + /// Returns an error if: + /// - The database connection cannot be established + /// - WAL mode cannot be enabled + /// - Migrations fail pub async fn new(database_url: &str) -> anyhow::Result { let pool = SqlitePoolOptions::new() .max_connections(5) @@ -42,6 +49,7 @@ impl Storage { artifact_hash TEXT NOT NULL, status TEXT NOT NULL, sink_statuses TEXT NOT NULL, + order_index INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) @@ -78,6 +86,10 @@ impl Storage { Ok(()) } + /// Creates a new job in the database. + /// + /// # Errors + /// Returns an error if the database insert fails. pub async fn create_job( &self, job_id: JobId, @@ -85,14 +97,15 @@ impl Storage { artifact_hash: ContentHash, status: JobStatus, sink_statuses: HashMap, - ) -> anyhow::Result { + order_index: i64, + ) -> anyhow::Result<()> { let now = chrono::Utc::now(); let sink_statuses_json = serde_json::to_string(&sink_statuses)?; sqlx::query( r" - INSERT INTO jobs (job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) + INSERT INTO jobs (job_id, run_id, artifact_hash, status, sink_statuses, order_index, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) ", ) .bind(job_id.0.to_string()) @@ -100,14 +113,21 @@ impl Storage { .bind(artifact_hash.0) .bind(serde_json::to_string(&status)?) .bind(sink_statuses_json) + .bind(order_index) .bind(now.to_rfc3339()) .bind(now.to_rfc3339()) .execute(&self.pool) .await?; - Ok(job_id) + Ok(()) } + /// Updates the status of an existing job. + /// + /// # Errors + /// Returns an error if: + /// - The database update fails + /// - The job is not found pub async fn update_job_status( &self, job_id: &JobId, @@ -138,6 +158,10 @@ impl Storage { Ok(()) } + /// Lists all jobs for a given run. + /// + /// # Errors + /// Returns an error if the database query fails or if row data is invalid. pub async fn list_jobs_by_run( &self, run_id: &RunId, @@ -147,7 +171,7 @@ impl Storage { SELECT job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at FROM jobs WHERE run_id = ?1 - ORDER BY created_at + ORDER BY order_index ", ) .bind(run_id.0.to_string()) @@ -160,6 +184,11 @@ impl Storage { } /// Saves a manifest with its root hash for a backup run. + /// + /// # Errors + /// Returns an error if: + /// - The artifact count exceeds i64 max + /// - The database insert fails pub async fn save_manifest( &self, run_id: &RunId, @@ -190,6 +219,9 @@ impl Storage { } /// Retrieves the stored root hash for a backup run. + /// + /// # Errors + /// Returns an error if the database query fails. pub async fn get_manifest_root( &self, run_id: &RunId,