pipeline: rewrite to spawn each repository as a tokio::spawn task

Each one is gated by a `tokio::sync::Semaphore`. Each repo acquires an
OwnedPermit before spawning; the permit is held for the task's lifetime,
and automatically released on drop.

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Icb359f7e82b0bbbeac232f0b79cf13186a6a6964
This commit is contained in:
raf 2026-03-17 15:51:25 +03:00
commit f51e659209
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF

View file

@ -1,7 +1,7 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use chrono::Utc;
use tokio::task::JoinSet;
use tokio::{sync::Semaphore, task::JoinSet};
use tracing::{error, info, warn};
use crate::{
@ -46,32 +46,72 @@ impl Pipeline {
let mut manifest = Manifest::new(run_id.clone());
// Semaphore bounding how many repositories are processed concurrently.
// A limit of 0 would deadlock, so we floor it at 1.
let concurrency = self.config.service.concurrency_limit.max(1);
let repo_sem = Arc::new(Semaphore::new(concurrency));
for source_config in &self.config.sources {
match source_config {
SourceConfig::Forgejo(cfg) => {
info!("Processing Forgejo source: {}", cfg.id);
let source = ForgejoSource::new(cfg.clone())?;
let source =
ForgejoSource::new(cfg.clone(), self.config.service.retry.clone())?;
let repos = source.list_repositories().await?;
info!("Found {} repositories", repos.len());
// Spawn each repository backup as a task bounded by the semaphore.
// We collect results in order so that manifest entries stay stable.
let source = Arc::new(source);
let mut repo_tasks: JoinSet<(String, anyhow::Result<ManifestEntry>)> =
JoinSet::new();
for repo in repos {
match self
.backup_repository(
let permit = Arc::clone(&repo_sem)
.acquire_owned()
.await
.map_err(|e| anyhow::anyhow!("Semaphore closed: {e}"))?;
let source = Arc::clone(&source);
let run_id = run_id.clone();
let source_id = cfg.id.clone();
let storage = self.storage.clone();
let sink_configs = self.config.sinks.clone();
let temp_dir = self.config.service.temp_dir.clone();
repo_tasks.spawn(async move {
// Permit is held for the duration of the backup task.
let _permit = permit;
let result = backup_repository(
&source,
&cfg.id,
&source_id,
&repo,
&run_id,
&mut manifest,
&storage,
&sink_configs,
Path::new(&temp_dir),
)
.await
{
Ok(()) => {
info!("Backed up {}/{}", repo.owner, repo.name);
.await;
let label = format!("{}/{}", repo.owner, repo.name);
(label, result)
});
}
while let Some(join_result) = repo_tasks.join_next().await {
match join_result {
Ok((label, Ok(entry))) => {
info!("Backed up {label}");
manifest.add_artifact(entry);
},
Ok((label, Err(e))) => {
error!("Failed to backup {label}: {e}");
},
Err(e) => {
error!("Failed to backup {}/{}: {}", repo.owner, repo.name, e);
error!("Repository task panicked: {e}");
},
}
}
@ -82,7 +122,6 @@ impl Pipeline {
finalize_manifest(&mut manifest)
.map_err(|e| anyhow::anyhow!("Failed to finalize manifest: {e}"))?;
// Persist the manifest root hash for integrity verification
if let Some(ref root_hash) = manifest.root_hash {
self
.storage
@ -99,177 +138,172 @@ impl Pipeline {
Ok(run_id)
}
}
async fn backup_repository(
&self,
source: &ForgejoSource,
source_id: &str,
repo: &Repository,
run_id: &RunId,
manifest: &mut Manifest,
) -> anyhow::Result<()> {
let temp_dir = Path::new(&self.config.service.temp_dir);
let (content_hash, temp_path) =
source.download_archive(repo, temp_dir).await?;
/// Back up a single repository to all configured sinks.
///
/// Returns a [`ManifestEntry`] on success describing the stored artifact and
/// where it landed.
async fn backup_repository(
source: &ForgejoSource,
source_id: &str,
repo: &Repository,
run_id: &RunId,
storage: &Storage,
sink_configs: &[SinkConfig],
temp_dir: &Path,
) -> anyhow::Result<ManifestEntry> {
let (content_hash, temp_path) =
source.download_archive(repo, temp_dir).await?;
let size_bytes = tokio::fs::metadata(&temp_path).await?.len();
let size_bytes = tokio::fs::metadata(&temp_path).await?.len();
let artifact = Arc::new(Artifact::new(
let artifact = Arc::new(Artifact::new(
content_hash.clone(),
size_bytes,
SourceProvenance {
source_id: source_id.to_string(),
entity_type: EntityType::Repository,
remote_id: format!("{}/{}", repo.owner, repo.name),
fetched_at: Utc::now(),
},
temp_path,
));
let sink_ids: Vec<String> =
sink_configs.iter().map(|s| s.id().to_string()).collect();
let job_id = JobId::new();
storage
.create_job(
job_id.clone(),
run_id.clone(),
content_hash.clone(),
size_bytes,
SourceProvenance {
source_id: source_id.to_string(),
entity_type: EntityType::Repository,
remote_id: format!("{}/{}", repo.owner, repo.name),
fetched_at: Utc::now(),
},
temp_path,
));
let sink_ids: Vec<String> = self
.config
.sinks
.iter()
.map(|s| s.id().to_string())
.collect();
let job_id = JobId::new();
self
.storage
.create_job(
job_id.clone(),
run_id.clone(),
content_hash.clone(),
JobStatus::Fetching,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: JobStatus::Discovered,
receipt: None,
error: None,
})
JobStatus::Fetching,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: JobStatus::Discovered,
receipt: None,
error: None,
})
.collect(),
)
.await?;
})
.collect(),
)
.await?;
let mut sink_tasks = JoinSet::new();
let sinks: Vec<_> = self
.config
.sinks
.iter()
.map(|s| (s.id().to_string(), s.clone()))
.collect();
// Fan out writes to all sinks concurrently. Each sink gets its own task;
// there is no additional semaphore here because individual sink tasks are
// cheap and are already bounded by the outer repository semaphore.
let mut sink_tasks: JoinSet<anyhow::Result<(String, StorageReceipt)>> =
JoinSet::new();
for (sink_id, sink_config) in sinks {
let artifact = Arc::clone(&artifact);
for sink_config in sink_configs {
let artifact = Arc::clone(&artifact);
let sink_config = sink_config.clone();
let sink_id = sink_config.id().to_string();
sink_tasks.spawn(async move {
match sink_config {
SinkConfig::Filesystem(cfg) => {
let sink = FilesystemSink::new(cfg)?;
let receipt = sink.write(&artifact).await?;
Ok::<_, anyhow::Error>((sink_id, receipt))
},
SinkConfig::S3(_) => {
warn!("S3 sink not yet implemented");
Err(anyhow::anyhow!("S3 sink not implemented"))
},
}
});
}
let mut sink_receipts: HashMap<String, String> = HashMap::new();
let mut sink_receipt_objs: HashMap<String, StorageReceipt> = HashMap::new();
while let Some(result) = sink_tasks.join_next().await {
match result {
Ok(Ok((sink_id, receipt))) => {
sink_receipts.insert(sink_id.clone(), receipt.uri.clone());
sink_receipt_objs.insert(sink_id, receipt);
sink_tasks.spawn(async move {
match sink_config {
SinkConfig::Filesystem(cfg) => {
let sink = FilesystemSink::new(cfg)?;
let receipt = sink.write(&artifact).await?;
Ok((sink_id, receipt))
},
Ok(Err(e)) => {
error!("Sink task failed: {}", e);
},
Err(e) => {
error!("Sink task panicked: {}", e);
SinkConfig::S3(_) => {
warn!("S3 sink not yet implemented");
Err(anyhow::anyhow!("S3 sink not implemented"))
},
}
});
}
let mut sink_receipts: HashMap<String, String> = HashMap::new();
let mut sink_receipt_objs: HashMap<String, StorageReceipt> = HashMap::new();
while let Some(result) = sink_tasks.join_next().await {
match result {
Ok(Ok((sink_id, receipt))) => {
sink_receipts.insert(sink_id.clone(), receipt.uri.clone());
sink_receipt_objs.insert(sink_id, receipt);
},
Ok(Err(e)) => {
error!("Sink task failed: {e}");
},
Err(e) => {
error!("Sink task panicked: {e}");
},
}
}
if sink_receipts.len() != self.config.sinks.len() {
self
.storage
.update_job_status(
&job_id,
JobStatus::Failed,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: if sink_receipts.contains_key(id) {
JobStatus::Committed
} else {
JobStatus::Failed
},
receipt: sink_receipt_objs.get(id).cloned(),
error: if sink_receipts.contains_key(id) {
None
} else {
Some("Sink write failed".to_string())
},
})
})
.collect(),
)
.await?;
return Err(anyhow::anyhow!(
"Not all sinks succeeded: {}/{} completed",
sink_receipts.len(),
self.config.sinks.len()
));
}
self
.storage
if sink_receipts.len() != sink_configs.len() {
storage
.update_job_status(
&job_id,
JobStatus::Committed,
JobStatus::Failed,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: JobStatus::Committed,
status: if sink_receipts.contains_key(id) {
JobStatus::Committed
} else {
JobStatus::Failed
},
receipt: sink_receipt_objs.get(id).cloned(),
error: None,
error: if sink_receipts.contains_key(id) {
None
} else {
Some("Sink write failed".to_string())
},
})
})
.collect(),
)
.await?;
manifest.add_artifact(ManifestEntry {
hash: content_hash,
size: artifact.size_bytes,
source_id: source_id.to_string(),
sink_uris: sink_receipts,
});
// Explicitly clean up the temp file
// Arc::try_unwrap will succeed since all sink tasks have completed
match Arc::try_unwrap(artifact) {
Ok(artifact) => artifact.cleanup().await,
Err(_) => {
// This shouldn't happen, but log if it does
error!(
"Failed to unwrap Arc for artifact cleanup - sinks still holding \
references"
);
},
}
Ok(())
return Err(anyhow::anyhow!(
"Not all sinks succeeded: {}/{} completed",
sink_receipts.len(),
sink_configs.len()
));
}
storage
.update_job_status(
&job_id,
JobStatus::Committed,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: JobStatus::Committed,
receipt: sink_receipt_objs.get(id).cloned(),
error: None,
})
})
.collect(),
)
.await?;
let entry = ManifestEntry {
hash: content_hash,
size: artifact.size_bytes,
source_id: source_id.to_string(),
sink_uris: sink_receipts,
};
// Clean up the temp file now that all sinks have completed.
match Arc::try_unwrap(artifact) {
Ok(artifact) => artifact.cleanup().await,
Err(_) => {
error!(
"Failed to unwrap Arc for artifact cleanup - unexpected live \
references"
);
},
}
Ok(entry)
}