From f51e659209572bf728d95db92e0bcc18c04bf917 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Tue, 17 Mar 2026 15:51:25 +0300 Subject: [PATCH] pipeline: rewrite to spawn each repository as a `tokio::spawn` task Each one is gated by a `tokio::sync::Semaphore`. Each repo acquires an OwnedPermit before spawning; the permit is held for the task's lifetime, and automatically released on drop. Signed-off-by: NotAShelf Change-Id: Icb359f7e82b0bbbeac232f0b79cf13186a6a6964 --- src/core/pipeline.rs | 350 ++++++++++++++++++++++++------------------- 1 file changed, 192 insertions(+), 158 deletions(-) diff --git a/src/core/pipeline.rs b/src/core/pipeline.rs index c5e56a3..03beb6c 100644 --- a/src/core/pipeline.rs +++ b/src/core/pipeline.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use chrono::Utc; -use tokio::task::JoinSet; +use tokio::{sync::Semaphore, task::JoinSet}; use tracing::{error, info, warn}; use crate::{ @@ -46,32 +46,72 @@ impl Pipeline { let mut manifest = Manifest::new(run_id.clone()); + // Semaphore bounding how many repositories are processed concurrently. + // A limit of 0 would deadlock, so we floor it at 1. + let concurrency = self.config.service.concurrency_limit.max(1); + let repo_sem = Arc::new(Semaphore::new(concurrency)); + for source_config in &self.config.sources { match source_config { SourceConfig::Forgejo(cfg) => { info!("Processing Forgejo source: {}", cfg.id); - let source = ForgejoSource::new(cfg.clone())?; + let source = + ForgejoSource::new(cfg.clone(), self.config.service.retry.clone())?; let repos = source.list_repositories().await?; info!("Found {} repositories", repos.len()); + // Spawn each repository backup as a task bounded by the semaphore. + // We collect results in order so that manifest entries stay stable. + let source = Arc::new(source); + let mut repo_tasks: JoinSet<(String, anyhow::Result)> = + JoinSet::new(); + for repo in repos { - match self - .backup_repository( + let permit = Arc::clone(&repo_sem) + .acquire_owned() + .await + .map_err(|e| anyhow::anyhow!("Semaphore closed: {e}"))?; + + let source = Arc::clone(&source); + let run_id = run_id.clone(); + let source_id = cfg.id.clone(); + let storage = self.storage.clone(); + let sink_configs = self.config.sinks.clone(); + let temp_dir = self.config.service.temp_dir.clone(); + + repo_tasks.spawn(async move { + // Permit is held for the duration of the backup task. + let _permit = permit; + + let result = backup_repository( &source, - &cfg.id, + &source_id, &repo, &run_id, - &mut manifest, + &storage, + &sink_configs, + Path::new(&temp_dir), ) - .await - { - Ok(()) => { - info!("Backed up {}/{}", repo.owner, repo.name); + .await; + + let label = format!("{}/{}", repo.owner, repo.name); + (label, result) + }); + } + + while let Some(join_result) = repo_tasks.join_next().await { + match join_result { + Ok((label, Ok(entry))) => { + info!("Backed up {label}"); + manifest.add_artifact(entry); + }, + Ok((label, Err(e))) => { + error!("Failed to backup {label}: {e}"); }, Err(e) => { - error!("Failed to backup {}/{}: {}", repo.owner, repo.name, e); + error!("Repository task panicked: {e}"); }, } } @@ -82,7 +122,6 @@ impl Pipeline { finalize_manifest(&mut manifest) .map_err(|e| anyhow::anyhow!("Failed to finalize manifest: {e}"))?; - // Persist the manifest root hash for integrity verification if let Some(ref root_hash) = manifest.root_hash { self .storage @@ -99,177 +138,172 @@ impl Pipeline { Ok(run_id) } +} - async fn backup_repository( - &self, - source: &ForgejoSource, - source_id: &str, - repo: &Repository, - run_id: &RunId, - manifest: &mut Manifest, - ) -> anyhow::Result<()> { - let temp_dir = Path::new(&self.config.service.temp_dir); - let (content_hash, temp_path) = - source.download_archive(repo, temp_dir).await?; +/// Back up a single repository to all configured sinks. +/// +/// Returns a [`ManifestEntry`] on success describing the stored artifact and +/// where it landed. +async fn backup_repository( + source: &ForgejoSource, + source_id: &str, + repo: &Repository, + run_id: &RunId, + storage: &Storage, + sink_configs: &[SinkConfig], + temp_dir: &Path, +) -> anyhow::Result { + let (content_hash, temp_path) = + source.download_archive(repo, temp_dir).await?; - let size_bytes = tokio::fs::metadata(&temp_path).await?.len(); + let size_bytes = tokio::fs::metadata(&temp_path).await?.len(); - let artifact = Arc::new(Artifact::new( + let artifact = Arc::new(Artifact::new( + content_hash.clone(), + size_bytes, + SourceProvenance { + source_id: source_id.to_string(), + entity_type: EntityType::Repository, + remote_id: format!("{}/{}", repo.owner, repo.name), + fetched_at: Utc::now(), + }, + temp_path, + )); + + let sink_ids: Vec = + sink_configs.iter().map(|s| s.id().to_string()).collect(); + + let job_id = JobId::new(); + storage + .create_job( + job_id.clone(), + run_id.clone(), content_hash.clone(), - size_bytes, - SourceProvenance { - source_id: source_id.to_string(), - entity_type: EntityType::Repository, - remote_id: format!("{}/{}", repo.owner, repo.name), - fetched_at: Utc::now(), - }, - temp_path, - )); - - let sink_ids: Vec = self - .config - .sinks - .iter() - .map(|s| s.id().to_string()) - .collect(); - - let job_id = JobId::new(); - self - .storage - .create_job( - job_id.clone(), - run_id.clone(), - content_hash.clone(), - JobStatus::Fetching, - sink_ids - .iter() - .map(|id| { - (id.clone(), SinkJobState { - status: JobStatus::Discovered, - receipt: None, - error: None, - }) + JobStatus::Fetching, + sink_ids + .iter() + .map(|id| { + (id.clone(), SinkJobState { + status: JobStatus::Discovered, + receipt: None, + error: None, }) - .collect(), - ) - .await?; + }) + .collect(), + ) + .await?; - let mut sink_tasks = JoinSet::new(); - let sinks: Vec<_> = self - .config - .sinks - .iter() - .map(|s| (s.id().to_string(), s.clone())) - .collect(); + // Fan out writes to all sinks concurrently. Each sink gets its own task; + // there is no additional semaphore here because individual sink tasks are + // cheap and are already bounded by the outer repository semaphore. + let mut sink_tasks: JoinSet> = + JoinSet::new(); - for (sink_id, sink_config) in sinks { - let artifact = Arc::clone(&artifact); + for sink_config in sink_configs { + let artifact = Arc::clone(&artifact); + let sink_config = sink_config.clone(); + let sink_id = sink_config.id().to_string(); - sink_tasks.spawn(async move { - match sink_config { - SinkConfig::Filesystem(cfg) => { - let sink = FilesystemSink::new(cfg)?; - let receipt = sink.write(&artifact).await?; - Ok::<_, anyhow::Error>((sink_id, receipt)) - }, - SinkConfig::S3(_) => { - warn!("S3 sink not yet implemented"); - Err(anyhow::anyhow!("S3 sink not implemented")) - }, - } - }); - } - - let mut sink_receipts: HashMap = HashMap::new(); - let mut sink_receipt_objs: HashMap = HashMap::new(); - - while let Some(result) = sink_tasks.join_next().await { - match result { - Ok(Ok((sink_id, receipt))) => { - sink_receipts.insert(sink_id.clone(), receipt.uri.clone()); - sink_receipt_objs.insert(sink_id, receipt); + sink_tasks.spawn(async move { + match sink_config { + SinkConfig::Filesystem(cfg) => { + let sink = FilesystemSink::new(cfg)?; + let receipt = sink.write(&artifact).await?; + Ok((sink_id, receipt)) }, - Ok(Err(e)) => { - error!("Sink task failed: {}", e); - }, - Err(e) => { - error!("Sink task panicked: {}", e); + SinkConfig::S3(_) => { + warn!("S3 sink not yet implemented"); + Err(anyhow::anyhow!("S3 sink not implemented")) }, } + }); + } + + let mut sink_receipts: HashMap = HashMap::new(); + let mut sink_receipt_objs: HashMap = HashMap::new(); + + while let Some(result) = sink_tasks.join_next().await { + match result { + Ok(Ok((sink_id, receipt))) => { + sink_receipts.insert(sink_id.clone(), receipt.uri.clone()); + sink_receipt_objs.insert(sink_id, receipt); + }, + Ok(Err(e)) => { + error!("Sink task failed: {e}"); + }, + Err(e) => { + error!("Sink task panicked: {e}"); + }, } + } - if sink_receipts.len() != self.config.sinks.len() { - self - .storage - .update_job_status( - &job_id, - JobStatus::Failed, - sink_ids - .iter() - .map(|id| { - (id.clone(), SinkJobState { - status: if sink_receipts.contains_key(id) { - JobStatus::Committed - } else { - JobStatus::Failed - }, - receipt: sink_receipt_objs.get(id).cloned(), - error: if sink_receipts.contains_key(id) { - None - } else { - Some("Sink write failed".to_string()) - }, - }) - }) - .collect(), - ) - .await?; - - return Err(anyhow::anyhow!( - "Not all sinks succeeded: {}/{} completed", - sink_receipts.len(), - self.config.sinks.len() - )); - } - - self - .storage + if sink_receipts.len() != sink_configs.len() { + storage .update_job_status( &job_id, - JobStatus::Committed, + JobStatus::Failed, sink_ids .iter() .map(|id| { (id.clone(), SinkJobState { - status: JobStatus::Committed, + status: if sink_receipts.contains_key(id) { + JobStatus::Committed + } else { + JobStatus::Failed + }, receipt: sink_receipt_objs.get(id).cloned(), - error: None, + error: if sink_receipts.contains_key(id) { + None + } else { + Some("Sink write failed".to_string()) + }, }) }) .collect(), ) .await?; - manifest.add_artifact(ManifestEntry { - hash: content_hash, - size: artifact.size_bytes, - source_id: source_id.to_string(), - sink_uris: sink_receipts, - }); - - // Explicitly clean up the temp file - // Arc::try_unwrap will succeed since all sink tasks have completed - match Arc::try_unwrap(artifact) { - Ok(artifact) => artifact.cleanup().await, - Err(_) => { - // This shouldn't happen, but log if it does - error!( - "Failed to unwrap Arc for artifact cleanup - sinks still holding \ - references" - ); - }, - } - - Ok(()) + return Err(anyhow::anyhow!( + "Not all sinks succeeded: {}/{} completed", + sink_receipts.len(), + sink_configs.len() + )); } + + storage + .update_job_status( + &job_id, + JobStatus::Committed, + sink_ids + .iter() + .map(|id| { + (id.clone(), SinkJobState { + status: JobStatus::Committed, + receipt: sink_receipt_objs.get(id).cloned(), + error: None, + }) + }) + .collect(), + ) + .await?; + + let entry = ManifestEntry { + hash: content_hash, + size: artifact.size_bytes, + source_id: source_id.to_string(), + sink_uris: sink_receipts, + }; + + // Clean up the temp file now that all sinks have completed. + match Arc::try_unwrap(artifact) { + Ok(artifact) => artifact.cleanup().await, + Err(_) => { + error!( + "Failed to unwrap Arc for artifact cleanup - unexpected live \ + references" + ); + }, + } + + Ok(entry) }