initial working prototype

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I3c8eddfdc43193e2d6a47c576dbe51e56a6a6964
This commit is contained in:
raf 2026-03-16 18:23:25 +03:00
commit 87bdd70b9e
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
14 changed files with 5915 additions and 0 deletions

4141
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

108
Cargo.toml Normal file
View file

@ -0,0 +1,108 @@
[package]
name = "konservejo"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.102"
async-trait = "0.1.89"
aws-config = "1.8.15"
aws-sdk-s3 = "1.126.0"
blake3 = "1.8.3"
chrono = { version = "0.4.44", features = [ "serde" ] }
clap = { version = "4.6.0", features = [ "derive" ] }
color-eyre = "0.6.5"
futures = "0.3.32"
hex = "0.4.3"
regex = "1.12.3"
reqwest = { version = "0.13.2", features = [ "json", "stream" ], default-features = false }
serde = { version = "1.0.228", features = [ "derive" ] }
serde_json = "1.0.149"
sqlx = { version = "0.8.6", features = [ "runtime-tokio", "sqlite", "migrate", "uuid", "chrono" ] }
thiserror = "2.0.18"
tokio = { version = "1.50.0", features = [ "full" ] }
tokio-util = { version = "0.7.18", features = [ "io", "compat" ] }
toml = "1.0.6"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.23", features = [ "json", "env-filter" ] }
urlencoding = "2.1.3"
uuid = { version = "1.22.0", features = [ "v4", "serde" ] }
# See:
# <https://doc.rust-lang.org/rustc/lints/listing/allowed-by-default.html>
[lints.clippy]
cargo = { level = "warn", priority = -1 }
complexity = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
perf = { level = "warn", priority = -1 }
style = { level = "warn", priority = -1 }
# The lint groups above enable some less-than-desirable rules, we should manually
# enable those to keep our sanity.
absolute_paths = "allow"
arbitrary_source_item_ordering = "allow"
clone_on_ref_ptr = "warn"
dbg_macro = "warn"
empty_drop = "warn"
empty_structs_with_brackets = "warn"
exit = "warn"
filetype_is_file = "warn"
get_unwrap = "warn"
implicit_return = "allow"
infinite_loop = "warn"
map_with_unused_argument_over_ranges = "warn"
missing_docs_in_private_items = "allow"
multiple_crate_versions = "allow" # :(
non_ascii_literal = "allow"
non_std_lazy_statics = "warn"
pathbuf_init_then_push = "warn"
pattern_type_mismatch = "allow"
question_mark_used = "allow"
rc_buffer = "warn"
rc_mutex = "warn"
rest_pat_in_fully_bound_structs = "warn"
similar_names = "allow"
single_call_fn = "allow"
std_instead_of_core = "allow"
too_long_first_doc_paragraph = "allow"
too_many_lines = "allow"
undocumented_unsafe_blocks = "warn"
unnecessary_safety_comment = "warn"
unused_result_ok = "warn"
unused_trait_names = "allow"
# False positive:
# clippy's build script check doesn't recognize workspace-inherited metadata
# which means in our current workspace layout, we get pranked by Clippy.
cargo_common_metadata = "allow"
# In the honor of a recent Cloudflare regression
panic = "deny"
unwrap_used = "deny"
# Less dangerous, but we'd like to know
# Those must be opt-in, and are fine ONLY in tests and examples. We *can* panic
# in NDG (the binary crate), but it should be very deliberate
expect_used = "warn"
print_stderr = "warn"
print_stdout = "warn"
todo = "warn"
unimplemented = "warn"
unreachable = "warn"
[profile.dev]
debug = true
opt-level = 0
[profile.release]
codegen-units = 1
lto = true
opt-level = "z"
panic = "abort"
strip = "symbols"
[dev-dependencies]
tempfile = "3.27.0"
tokio-test = "0.4.5"
wiremock = "0.6.5"

256
src/config.rs Normal file
View file

@ -0,0 +1,256 @@
use std::{env, path::Path};
use serde::{Deserialize, Serialize};
#[derive(Clone, Deserialize, Serialize)]
#[serde(transparent)]
pub struct Secret(String);
impl Secret {
pub fn expose(&self) -> &str {
&self.0
}
}
impl std::fmt::Debug for Secret {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("[REDACTED]")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub service: ServiceConfig,
#[serde(rename = "source")]
pub sources: Vec<SourceConfig>,
#[serde(rename = "sink")]
pub sinks: Vec<SinkConfig>,
#[serde(default)]
pub verification: VerificationConfig,
#[serde(default)]
pub retention: RetentionConfig,
}
impl Config {
pub fn from_file(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref();
let contents = std::fs::read_to_string(path).map_err(|e| {
anyhow::anyhow!("Failed to read config file '{}': {}", path.display(), e)
})?;
let interpolated = interpolate_env_vars(&contents).map_err(|e| {
anyhow::anyhow!("Failed to interpolate environment variables: {e}")
})?;
let config: Self = toml::from_str(&interpolated).map_err(|e| {
anyhow::anyhow!("Failed to parse config file '{}': {}", path.display(), e)
})?;
config.validate()?;
Ok(config)
}
fn validate(&self) -> anyhow::Result<()> {
if self.sources.is_empty() {
anyhow::bail!("At least one source must be configured");
}
if self.sinks.is_empty() {
anyhow::bail!("At least one sink must be configured");
}
let ratio = self.verification.sample_ratio;
if !(0.0..=1.0).contains(&ratio) || !ratio.is_finite() {
anyhow::bail!(
"verification.sample_ratio must be between 0.0 and 1.0, got {ratio}"
);
}
Ok(())
}
}
fn interpolate_env_vars(input: &str) -> anyhow::Result<String> {
use std::sync::LazyLock;
use regex::Regex;
// This regex is compile-time validated and cannot fail
#[allow(clippy::expect_used)]
static RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
.expect("regex is compile-time validated")
});
let mut result = input.to_string();
for caps in RE.captures_iter(input) {
let var_name = caps
.get(1)
.ok_or_else(|| anyhow::anyhow!("Invalid capture group in regex"))?
.as_str();
let var_value = env::var(var_name).map_err(|_| {
anyhow::anyhow!("Environment variable {var_name} not found")
})?;
let pattern = format!("${{{var_name}}}");
result = result.replace(&pattern, &var_value);
}
Ok(result)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceConfig {
pub name: String,
#[serde(default = "default_concurrency_limit")]
pub concurrency_limit: usize,
pub temp_dir: String,
pub state_db_path: String,
}
const fn default_concurrency_limit() -> usize {
10
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum SourceConfig {
Forgejo(ForgejoSourceConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoSourceConfig {
pub id: String,
pub api_url: String,
pub token: Secret,
#[serde(default = "default_rate_limit")]
pub rate_limit_rps: u32,
#[serde(default)]
pub scope: ForgejoScope,
#[serde(default)]
pub filter: ForgejoFilter,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ForgejoScope {
#[serde(default)]
pub organizations: Vec<String>,
#[serde(default)]
pub exclude_repos: Vec<String>,
#[serde(default)]
pub include_wikis: bool,
#[serde(default)]
pub include_issues: bool,
#[serde(default)]
pub include_lfs: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ForgejoFilter {
pub since: Option<String>,
}
const fn default_rate_limit() -> u32 {
10
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum SinkConfig {
Filesystem(FilesystemSinkConfig),
S3(S3SinkConfig),
}
impl SinkConfig {
pub fn id(&self) -> &str {
match self {
Self::Filesystem(cfg) => &cfg.id,
Self::S3(cfg) => &cfg.id,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FilesystemSinkConfig {
pub id: String,
pub path: String,
#[serde(default = "default_verify_on_write")]
pub verify_on_write: bool,
#[serde(default)]
pub retention_days: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct S3SinkConfig {
pub id: String,
pub bucket: String,
pub region: String,
#[serde(default)]
pub endpoint: Option<String>,
pub access_key_id: Secret,
pub secret_access_key: Secret,
#[serde(default)]
pub storage_class: Option<String>,
#[serde(default = "default_multipart_threshold")]
pub multipart_threshold_mb: u64,
#[serde(default = "default_verify_on_write")]
pub verify_on_write: bool,
}
const fn default_verify_on_write() -> bool {
true
}
const fn default_multipart_threshold() -> u64 {
100
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct VerificationConfig {
#[serde(default = "default_verification_strategy")]
pub strategy: VerificationStrategy,
#[serde(default = "default_sample_ratio")]
pub sample_ratio: f64,
#[serde(default = "default_hash_algorithm")]
pub hash_algorithm: HashAlgorithm,
}
#[derive(
Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default,
)]
#[serde(rename_all = "snake_case")]
pub enum VerificationStrategy {
#[default]
Full,
Sample,
}
#[derive(
Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default,
)]
#[serde(rename_all = "snake_case")]
pub enum HashAlgorithm {
#[default]
Blake3,
Sha256,
}
const fn default_verification_strategy() -> VerificationStrategy {
VerificationStrategy::Full
}
const fn default_sample_ratio() -> f64 {
1.0
}
const fn default_hash_algorithm() -> HashAlgorithm {
HashAlgorithm::Blake3
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RetentionConfig {
#[serde(default = "default_manifest_ttl_days")]
pub manifest_ttl_days: i64,
#[serde(default)]
pub auto_prune: bool,
}
const fn default_manifest_ttl_days() -> i64 {
365
}

2
src/core/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod pipeline;
pub mod types;

275
src/core/pipeline.rs Normal file
View file

@ -0,0 +1,275 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use chrono::Utc;
use tokio::task::JoinSet;
use tracing::{error, info, warn};
use crate::{
config::{Config, SinkConfig, SourceConfig},
core::types::{
Artifact,
EntityType,
JobId,
JobStatus,
Manifest,
ManifestEntry,
RunId,
SinkJobState,
SourceProvenance,
StorageReceipt,
},
crypto::manifest::finalize_manifest,
sinks::filesystem::FilesystemSink,
sources::forgejo::{ForgejoSource, Repository},
storage::Storage,
};
pub struct Pipeline {
config: Config,
storage: Storage,
}
impl Pipeline {
pub async fn new(config: Config) -> anyhow::Result<Self> {
let storage =
Storage::new(&format!("sqlite://{}", config.service.state_db_path))
.await?;
tokio::fs::create_dir_all(&config.service.temp_dir).await?;
Ok(Self { config, storage })
}
pub async fn run(&self) -> anyhow::Result<RunId> {
let run_id = RunId::new();
info!("Starting backup run {}", run_id.0);
let mut manifest = Manifest::new(run_id.clone());
for source_config in &self.config.sources {
match source_config {
SourceConfig::Forgejo(cfg) => {
info!("Processing Forgejo source: {}", cfg.id);
let source = ForgejoSource::new(cfg.clone())?;
let repos = source.list_repositories().await?;
info!("Found {} repositories", repos.len());
for repo in repos {
match self
.backup_repository(
&source,
&cfg.id,
&repo,
&run_id,
&mut manifest,
)
.await
{
Ok(()) => {
info!("Backed up {}/{}", repo.owner, repo.name);
},
Err(e) => {
error!("Failed to backup {}/{}: {}", repo.owner, repo.name, e);
},
}
}
},
}
}
finalize_manifest(&mut manifest)
.map_err(|e| anyhow::anyhow!("Failed to finalize manifest: {e}"))?;
// Persist the manifest root hash for integrity verification
if let Some(ref root_hash) = manifest.root_hash {
self
.storage
.save_manifest(&run_id, root_hash, manifest.artifacts.len())
.await?;
info!(
"Backup run {} completed with root hash {}",
run_id.0,
root_hash.as_ref()
);
} else {
info!("Backup run {} completed (no artifacts)", run_id.0);
}
Ok(run_id)
}
async fn backup_repository(
&self,
source: &ForgejoSource,
source_id: &str,
repo: &Repository,
run_id: &RunId,
manifest: &mut Manifest,
) -> anyhow::Result<()> {
let temp_dir = Path::new(&self.config.service.temp_dir);
let (content_hash, temp_path) =
source.download_archive(repo, temp_dir).await?;
let size_bytes = tokio::fs::metadata(&temp_path).await?.len();
let artifact = Arc::new(Artifact::new(
content_hash.clone(),
size_bytes,
SourceProvenance {
source_id: source_id.to_string(),
entity_type: EntityType::Repository,
remote_id: format!("{}/{}", repo.owner, repo.name),
fetched_at: Utc::now(),
},
temp_path,
));
let sink_ids: Vec<String> = self
.config
.sinks
.iter()
.map(|s| s.id().to_string())
.collect();
let job_id = JobId::new();
self
.storage
.create_job(
job_id.clone(),
run_id.clone(),
content_hash.clone(),
JobStatus::Fetching,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: JobStatus::Discovered,
receipt: None,
error: None,
})
})
.collect(),
)
.await?;
let mut sink_tasks = JoinSet::new();
let sinks: Vec<_> = self
.config
.sinks
.iter()
.map(|s| (s.id().to_string(), s.clone()))
.collect();
for (sink_id, sink_config) in sinks {
let artifact = Arc::clone(&artifact);
sink_tasks.spawn(async move {
match sink_config {
SinkConfig::Filesystem(cfg) => {
let sink = FilesystemSink::new(cfg)?;
let receipt = sink.write(&artifact).await?;
Ok::<_, anyhow::Error>((sink_id, receipt))
},
SinkConfig::S3(_) => {
warn!("S3 sink not yet implemented");
Err(anyhow::anyhow!("S3 sink not implemented"))
},
}
});
}
let mut sink_receipts: HashMap<String, String> = HashMap::new();
let mut sink_receipt_objs: HashMap<String, StorageReceipt> = HashMap::new();
while let Some(result) = sink_tasks.join_next().await {
match result {
Ok(Ok((sink_id, receipt))) => {
sink_receipts.insert(sink_id.clone(), receipt.uri.clone());
sink_receipt_objs.insert(sink_id, receipt);
},
Ok(Err(e)) => {
error!("Sink task failed: {}", e);
},
Err(e) => {
error!("Sink task panicked: {}", e);
},
}
}
if sink_receipts.len() != self.config.sinks.len() {
self
.storage
.update_job_status(
&job_id,
JobStatus::Failed,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: if sink_receipts.contains_key(id) {
JobStatus::Committed
} else {
JobStatus::Failed
},
receipt: sink_receipt_objs.get(id).cloned(),
error: if sink_receipts.contains_key(id) {
None
} else {
Some("Sink write failed".to_string())
},
})
})
.collect(),
)
.await?;
return Err(anyhow::anyhow!(
"Not all sinks succeeded: {}/{} completed",
sink_receipts.len(),
self.config.sinks.len()
));
}
self
.storage
.update_job_status(
&job_id,
JobStatus::Committed,
sink_ids
.iter()
.map(|id| {
(id.clone(), SinkJobState {
status: JobStatus::Committed,
receipt: sink_receipt_objs.get(id).cloned(),
error: None,
})
})
.collect(),
)
.await?;
manifest.add_artifact(ManifestEntry {
hash: content_hash,
size: artifact.size_bytes,
source_id: source_id.to_string(),
sink_uris: sink_receipts,
});
// Explicitly clean up the temp file
// Arc::try_unwrap will succeed since all sink tasks have completed
match Arc::try_unwrap(artifact) {
Ok(artifact) => artifact.cleanup().await,
Err(_) => {
// This shouldn't happen, but log if it does
error!(
"Failed to unwrap Arc for artifact cleanup - sinks still holding \
references"
);
},
}
Ok(())
}
}

172
src/core/types.rs Normal file
View file

@ -0,0 +1,172 @@
use std::{collections::HashMap, path::PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ContentHash(pub String);
impl ContentHash {
pub fn new(hash: impl Into<String>) -> Self {
Self(hash.into())
}
}
impl AsRef<str> for ContentHash {
fn as_ref(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct JobId(pub uuid::Uuid);
impl JobId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4())
}
}
impl Default for JobId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct RunId(pub uuid::Uuid);
impl RunId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4())
}
}
impl Default for RunId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum JobStatus {
Discovered,
Fetching,
Distributing,
Verifying,
Committed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceProvenance {
pub source_id: String,
pub entity_type: EntityType,
pub remote_id: String,
pub fetched_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum EntityType {
Repository,
Wiki,
Issues,
LfsObject,
}
/// Represents a downloaded artifact that is stored as a file on disk.
/// The file is read by sinks on demand.
/// NOTE: Cleanup must be handled explicitly by the caller using `cleanup()`.
#[derive(Debug)]
pub struct Artifact {
pub content_hash: ContentHash,
pub size_bytes: u64,
#[allow(dead_code)]
pub source_provenance: SourceProvenance,
temp_path: PathBuf,
}
impl Artifact {
pub const fn new(
content_hash: ContentHash,
size_bytes: u64,
source_provenance: SourceProvenance,
temp_path: PathBuf,
) -> Self {
Self {
content_hash,
size_bytes,
source_provenance,
temp_path,
}
}
pub fn temp_path(&self) -> &std::path::Path {
&self.temp_path
}
/// Explicitly clean up the temporary file.
/// This should be called after all sinks have processed the artifact.
pub async fn cleanup(&self) {
let _ = tokio::fs::remove_file(&self.temp_path).await;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SinkJobState {
pub status: JobStatus,
pub receipt: Option<StorageReceipt>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageReceipt {
pub sink_id: String,
pub uri: String,
pub hash_verified: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestEntry {
pub hash: ContentHash,
pub size: u64,
pub source_id: String,
pub sink_uris: HashMap<String, String>,
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_field_names)]
pub struct Job {
pub job_id: JobId,
pub run_id: RunId,
pub artifact_hash: ContentHash,
pub status: JobStatus,
pub sink_statuses: HashMap<String, SinkJobState>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Manifest {
pub run_id: RunId,
pub created_at: DateTime<Utc>,
pub root_hash: Option<ContentHash>,
pub artifacts: Vec<ManifestEntry>,
}
impl Manifest {
pub fn new(run_id: RunId) -> Self {
Self {
run_id,
created_at: Utc::now(),
root_hash: None,
artifacts: Vec::new(),
}
}
pub fn add_artifact(&mut self, entry: ManifestEntry) {
self.artifacts.push(entry);
}
}

206
src/crypto/manifest.rs Normal file
View file

@ -0,0 +1,206 @@
use blake3::Hasher;
use crate::core::types::{ContentHash, Manifest};
/// Converts a hex string to a 32-byte array.
fn hex_to_bytes(hex_str: &str) -> anyhow::Result<[u8; 32]> {
let bytes = hex::decode(hex_str)
.map_err(|e| anyhow::anyhow!("Invalid hex string '{hex_str}': {e}"))?;
if bytes.len() != 32 {
return Err(anyhow::anyhow!(
"Expected 32 bytes, got {} bytes",
bytes.len()
));
}
let mut array = [0u8; 32];
array.copy_from_slice(&bytes);
Ok(array)
}
/// Computes a Merkle tree root hash from leaf hashes.
///
/// Builds a binary Merkle tree where:
///
/// - Each leaf is a BLAKE3 hash of artifact content
/// - Each internal node is hash(left || right)
/// - Odd-numbered levels duplicate the last node
///
/// This provides:
///
/// - Deterministic root hash for the manifest
/// - Efficient inclusion proofs (log n verification)
/// - Cryptographic integrity guarantees
pub fn compute_merkle_root(
leaf_hashes: &[ContentHash],
) -> anyhow::Result<ContentHash> {
if leaf_hashes.is_empty() {
return Ok(ContentHash::new(blake3::hash(b"").to_hex().to_string()));
}
// Convert all hashes to byte arrays
let mut current_level: Vec<[u8; 32]> = Vec::with_capacity(leaf_hashes.len());
for hash in leaf_hashes {
let bytes = hex_to_bytes(hash.as_ref())?;
current_level.push(bytes);
}
// Build tree bottom-up
while current_level.len() > 1 {
let mut next_level = Vec::new();
for chunk in current_level.chunks(2) {
let left = &chunk[0];
let right = chunk.get(1).unwrap_or(left);
let mut hasher = Hasher::new();
hasher.update(left);
hasher.update(right);
let hash = hasher.finalize();
let mut array = [0u8; 32];
array.copy_from_slice(hash.as_bytes());
next_level.push(array);
}
current_level = next_level;
}
Ok(ContentHash::new(hex::encode(current_level[0])))
}
/// Generates a Merkle inclusion proof for a leaf at the given index.
///
/// # Returns
///
/// The sibling hashes needed to verify the leaf is in the tree.
/// To verify: starting with the leaf hash, iteratively hash with siblings
/// and compare against the known root.
pub fn get_inclusion_proof(
leaf_hashes: &[ContentHash],
leaf_index: usize,
) -> Option<MerkleProof> {
if leaf_index >= leaf_hashes.len() {
return None;
}
let mut proof = Vec::new();
// Filter out invalid hashes - if the leaf_hash at leaf_index is invalid,
// we return None which signals the proof cannot be generated
let mut current_level: Vec<[u8; 32]> = leaf_hashes
.iter()
.filter_map(|h| hex_to_bytes(h.as_ref()).ok())
.collect();
// Verify the leaf at the original index is still valid
if current_level.len() != leaf_hashes.len() {
return None;
}
let mut index = leaf_index;
while current_level.len() > 1 {
let mut next_level = Vec::new();
for (i, chunk) in current_level.chunks(2).enumerate() {
let left = &chunk[0];
let right = chunk.get(1).unwrap_or(left);
if i * 2 == index || (i * 2 + 1 == index && chunk.len() > 1) {
// This pair contains our target
let sibling = if index.is_multiple_of(2) {
// Target is left, sibling is right
MerkleProofNode::Right(*right)
} else {
// Target is right, sibling is left
MerkleProofNode::Left(*left)
};
proof.push(sibling);
}
let mut hasher = Hasher::new();
hasher.update(left);
hasher.update(right);
let hash = hasher.finalize();
let mut array = [0u8; 32];
array.copy_from_slice(hash.as_bytes());
next_level.push(array);
}
index /= 2;
current_level = next_level;
}
Some(MerkleProof {
leaf_index,
siblings: proof,
})
}
/// Verifies a Merkle inclusion proof.
///
/// # Returns
///
/// `true` if the proof is valid for the given root and leaf hash.
pub fn verify_inclusion_proof(
root: &ContentHash,
leaf_hash: &ContentHash,
proof: &MerkleProof,
) -> bool {
let Ok(current_hash) = hex_to_bytes(leaf_hash.as_ref()) else {
return false; // invalid leaf hash hex
};
let mut current_hash = current_hash;
for node in &proof.siblings {
let mut hasher = Hasher::new();
match node {
MerkleProofNode::Left(sibling_hash) => {
hasher.update(sibling_hash);
hasher.update(&current_hash);
},
MerkleProofNode::Right(sibling_hash) => {
hasher.update(&current_hash);
hasher.update(sibling_hash);
},
}
let hash = hasher.finalize();
current_hash.copy_from_slice(hash.as_bytes());
}
let computed_root = hex::encode(current_hash);
computed_root == root.as_ref()
}
/// A node in a Merkle inclusion proof.
#[derive(Debug, Clone)]
pub enum MerkleProofNode {
/// Sibling is to the left of the target.
Left([u8; 32]),
/// Sibling is to the right of the target.
Right([u8; 32]),
}
/// A Merkle inclusion proof.
#[derive(Debug, Clone)]
pub struct MerkleProof {
pub leaf_index: usize,
pub siblings: Vec<MerkleProofNode>,
}
/// Finalizes the manifest by computing its Merkle root hash.
pub fn finalize_manifest(manifest: &mut Manifest) -> anyhow::Result<()> {
if manifest.root_hash.is_none() {
let leaf_hashes: Vec<ContentHash> = manifest
.artifacts
.iter()
.map(|entry| entry.hash.clone())
.collect();
manifest.root_hash = Some(compute_merkle_root(&leaf_hashes)?);
}
Ok(())
}

52
src/crypto/mod.rs Normal file
View file

@ -0,0 +1,52 @@
pub mod manifest;
use std::path::Path;
use blake3::Hasher;
use tokio::io::AsyncReadExt;
pub async fn hash_file(path: &Path) -> anyhow::Result<String> {
let mut file = tokio::fs::File::open(path).await?;
let mut hasher = Hasher::new();
let mut buffer = [0u8; 8192];
loop {
let n = file.read(&mut buffer).await?;
if n == 0 {
break;
}
hasher.update(&buffer[..n]);
}
Ok(hasher.finalize().to_hex().to_string())
}
pub struct StreamingHasher {
hasher: Hasher,
bytes_hashed: u64,
}
impl StreamingHasher {
pub fn new() -> Self {
Self {
hasher: Hasher::new(),
bytes_hashed: 0,
}
}
pub fn update(&mut self, data: &[u8]) {
self.hasher.update(data);
self.bytes_hashed += data.len() as u64;
}
pub fn finalize(self) -> (String, u64) {
let hash = self.hasher.finalize().to_hex().to_string();
(hash, self.bytes_hashed)
}
}
impl Default for StreamingHasher {
fn default() -> Self {
Self::new()
}
}

176
src/main.rs Normal file
View file

@ -0,0 +1,176 @@
mod config;
mod core;
mod crypto;
mod sinks;
mod sources;
mod storage;
use clap::{Parser, Subcommand};
use color_eyre::eyre::{Result, bail, eyre};
use tracing::{error, info, warn};
#[derive(Parser)]
#[command(
name = "konservejo",
about = "Declarative backup orchestrator for Forgejo"
)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(short, long, global = true, default_value = "./config.toml")]
config: String,
}
#[derive(Subcommand)]
enum Commands {
Backup,
ValidateConfig,
VerifyManifest {
#[arg(long)]
run_id: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
color_eyre::install()?;
tracing_subscriber::fmt::init();
let cli = Cli::parse();
match cli.command {
Commands::Backup => {
info!("Loading configuration from {}", cli.config);
let config = config::Config::from_file(&cli.config)
.map_err(|e| eyre!("Failed to load config: {}", e))?;
info!("Initializing backup pipeline");
let pipeline = core::pipeline::Pipeline::new(config)
.await
.map_err(|e| eyre!("Failed to initialize pipeline: {}", e))?;
info!("Starting backup");
match pipeline.run().await {
Ok(run_id) => {
info!("Backup completed successfully: run_id={}", run_id.0);
Ok(())
},
Err(e) => {
error!("Backup failed: {}", e);
Err(eyre!("Backup failed: {}", e))
},
}
},
Commands::ValidateConfig => {
info!("Validating configuration from {}", cli.config);
config::Config::from_file(&cli.config)
.map_err(|e| eyre!("Configuration is invalid: {}", e))?;
info!("Configuration is valid");
Ok(())
},
Commands::VerifyManifest { run_id } => {
info!("Verifying manifest for run {}", run_id);
let config = config::Config::from_file(&cli.config)
.map_err(|e| eyre!("Failed to load config: {}", e))?;
let storage = storage::Storage::new(&format!(
"sqlite://{}",
config.service.state_db_path
))
.await
.map_err(|e| eyre!("Failed to connect to database: {}", e))?;
let run_uuid = uuid::Uuid::parse_str(&run_id)
.map_err(|e| eyre!("Invalid run_id UUID: {}", e))?;
let jobs = storage
.list_jobs_by_run(&core::types::RunId(run_uuid))
.await
.map_err(|e| eyre!("Failed to query jobs: {}", e))?;
if jobs.is_empty() {
bail!("No jobs found for run {}", run_id);
}
let leaf_hashes: Vec<core::types::ContentHash> =
jobs.iter().map(|j| j.artifact_hash.clone()).collect();
let computed_root =
crypto::manifest::compute_merkle_root(&leaf_hashes)
.map_err(|e| eyre!("Failed to compute Merkle root: {}", e))?;
// Retrieve the stored root hash and verify integrity
let run_id_struct = core::types::RunId(run_uuid);
match storage.get_manifest_root(&run_id_struct).await {
Ok(Some(stored_root)) => {
if stored_root.as_ref() != computed_root.as_ref() {
bail!(
"Root hash mismatch! Stored: {}, Computed: {}. The manifest has \
been tampered with or jobs are missing.",
stored_root.as_ref(),
computed_root.as_ref()
);
}
info!("Root hash verified: {}", stored_root.as_ref());
},
Ok(None) => {
warn!(
"No stored manifest root found for run {}. Verification \
continuing with computed root.",
run_id
);
},
Err(e) => {
error!("Failed to retrieve stored manifest root: {}", e);
},
}
let mut all_valid = true;
for (i, job) in jobs.iter().enumerate() {
info!(
"Verifying job {} (status: {:?}, sinks: {}, updated: {})",
job.job_id.0,
job.status,
job.sink_statuses.len(),
job.updated_at
);
if let Some(proof) =
crypto::manifest::get_inclusion_proof(&leaf_hashes, i)
{
info!(
"Generated inclusion proof for index {} ({} siblings)",
proof.leaf_index,
proof.siblings.len()
);
let valid = crypto::manifest::verify_inclusion_proof(
&computed_root,
&job.artifact_hash,
&proof,
);
if !valid {
error!(
"Artifact {} failed verification (run: {}, created: {})",
job.artifact_hash.as_ref(),
job.run_id.0,
job.created_at
);
all_valid = false;
}
}
}
if all_valid {
info!(
"All {} artifacts verified successfully against root {}",
jobs.len(),
computed_root.as_ref()
);
Ok(())
} else {
bail!("Manifest verification failed - some artifacts invalid")
}
},
}
}

80
src/sinks/filesystem.rs Normal file
View file

@ -0,0 +1,80 @@
use std::path::PathBuf;
use crate::{
config::FilesystemSinkConfig,
core::types::{Artifact, StorageReceipt},
crypto::hash_file,
};
pub struct FilesystemSink {
config: FilesystemSinkConfig,
}
impl FilesystemSink {
pub fn new(config: FilesystemSinkConfig) -> anyhow::Result<Self> {
std::fs::create_dir_all(&config.path)?;
Ok(Self { config })
}
pub async fn write(
&self,
artifact: &Artifact,
) -> anyhow::Result<StorageReceipt> {
let hash_str = artifact.content_hash.as_ref();
anyhow::ensure!(
hash_str.len() >= 4,
"Content hash must be at least 4 characters, got {}",
hash_str.len()
);
anyhow::ensure!(hash_str.is_ascii(), "Content hash must be ASCII");
let (dir1, dir2) = (&hash_str[0..2], &hash_str[2..4]);
let target_dir = PathBuf::from(&self.config.path).join(dir1).join(dir2);
tokio::fs::create_dir_all(&target_dir).await?;
let temp_path =
target_dir.join(format!("{}.{}.tmp", hash_str, uuid::Uuid::new_v4()));
let final_path = target_dir.join(hash_str);
let cleanup = || {
async {
let _ = tokio::fs::remove_file(&temp_path).await;
}
};
if let Err(e) = tokio::fs::copy(artifact.temp_path(), &temp_path).await {
cleanup().await;
return Err(e.into());
}
if self.config.verify_on_write {
let computed_hash = match hash_file(&temp_path).await {
Ok(h) => h,
Err(e) => {
cleanup().await;
return Err(e);
},
};
if computed_hash != artifact.content_hash.as_ref() {
cleanup().await;
anyhow::bail!(
"Hash verification failed for artifact {}",
artifact.content_hash.as_ref()
);
}
}
if let Err(e) = tokio::fs::rename(&temp_path, &final_path).await {
cleanup().await;
return Err(e.into());
}
let uri = final_path.to_string_lossy().to_string();
Ok(StorageReceipt {
sink_id: self.config.id.clone(),
uri,
hash_verified: self.config.verify_on_write,
})
}
}

1
src/sinks/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod filesystem;

182
src/sources/forgejo.rs Normal file
View file

@ -0,0 +1,182 @@
use std::path::Path;
use futures::StreamExt;
use reqwest::header::{self, HeaderMap, HeaderValue};
use serde::Deserialize;
use tokio::{fs::File, io::AsyncWriteExt};
use urlencoding::encode;
use crate::{
config::ForgejoSourceConfig,
core::types::ContentHash,
crypto::StreamingHasher,
};
pub struct ForgejoSource {
config: ForgejoSourceConfig,
client: reqwest::Client,
}
#[derive(Debug, Clone)]
pub struct Repository {
pub owner: String,
pub name: String,
pub default_branch: String,
}
impl ForgejoSource {
pub fn new(config: ForgejoSourceConfig) -> anyhow::Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(
header::AUTHORIZATION,
HeaderValue::from_str(&format!("token {}", config.token.expose()))?,
);
headers
.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
let client = reqwest::Client::builder()
.default_headers(headers)
.timeout(std::time::Duration::from_secs(300))
.build()?;
Ok(Self { config, client })
}
pub async fn list_repositories(&self) -> anyhow::Result<Vec<Repository>> {
let mut repos = Vec::new();
for org in &self.config.scope.organizations {
let mut page = 1;
loop {
let (org_repos, has_more) =
self.fetch_org_repos_page(org, page).await?;
repos.extend(org_repos);
if !has_more {
break;
}
page += 1;
}
}
let filtered_repos: Vec<_> = repos
.into_iter()
.filter(|r| {
let full_name = format!("{}/{}", r.owner, r.name);
!self.config.scope.exclude_repos.iter().any(|pattern| {
pattern.strip_prefix("*/").map_or_else(
|| full_name == *pattern,
|name_pattern| r.name == name_pattern,
)
})
})
.collect();
Ok(filtered_repos)
}
async fn fetch_org_repos_page(
&self,
org: &str,
page: i32,
) -> anyhow::Result<(Vec<Repository>, bool)> {
let url = format!(
"{}/orgs/{}/repos?page={}&limit=50",
self.config.api_url,
encode(org),
page
);
let response = self.client.get(&url).send().await?;
if !response.status().is_success() {
anyhow::bail!(
"Failed to fetch repos for org {}: {}",
org,
response.status()
);
}
let api_repos: Vec<ApiRepository> = response.json().await?;
let has_more = api_repos.len() == 50;
let repos: Vec<Repository> = api_repos
.into_iter()
.map(|r| {
Repository {
owner: r.owner.login,
name: r.name,
default_branch: r
.default_branch
.unwrap_or_else(|| "main".to_string()),
}
})
.collect();
Ok((repos, has_more))
}
pub async fn download_archive(
&self,
repo: &Repository,
temp_dir: &Path,
) -> anyhow::Result<(ContentHash, std::path::PathBuf)> {
let archive_name = format!("{}_{}.tar.gz", repo.owner, repo.name);
let temp_path = temp_dir.join(&archive_name);
let url = format!(
"{}/repos/{}/{}/archive/{}.tar.gz",
self.config.api_url,
encode(&repo.owner),
encode(&repo.name),
encode(&repo.default_branch)
);
let response = self.client.get(&url).send().await?;
if !response.status().is_success() {
anyhow::bail!(
"Failed to download archive for {}/{}: {}",
repo.owner,
repo.name,
response.status()
);
}
let mut file = File::create(&temp_path).await?;
let mut stream = response.bytes_stream();
let mut hasher = StreamingHasher::new();
let result = async {
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
hasher.update(&chunk);
}
file.flush().await?;
Ok::<(), anyhow::Error>(())
}
.await;
if let Err(e) = result {
let _ = tokio::fs::remove_file(&temp_path).await;
return Err(e);
}
drop(file);
let (hash, _) = hasher.finalize();
Ok((ContentHash::new(hash), temp_path))
}
}
#[derive(Debug, Deserialize)]
struct ApiRepository {
name: String,
#[serde(rename = "default_branch")]
default_branch: Option<String>,
owner: ApiOwner,
}
#[derive(Debug, Deserialize)]
struct ApiOwner {
login: String,
}

1
src/sources/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod forgejo;

263
src/storage/mod.rs Normal file
View file

@ -0,0 +1,263 @@
use std::collections::HashMap;
use sqlx::{FromRow, Pool, Sqlite, sqlite::SqlitePoolOptions};
use crate::core::types::{
ContentHash,
Job,
JobId,
JobStatus,
RunId,
SinkJobState,
};
pub struct Storage {
pool: Pool<Sqlite>,
}
impl Storage {
pub async fn new(database_url: &str) -> anyhow::Result<Self> {
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect(database_url)
.await?;
sqlx::query("PRAGMA journal_mode=WAL")
.execute(&pool)
.await?;
let storage = Self { pool };
storage.migrate().await?;
Ok(storage)
}
async fn migrate(&self) -> anyhow::Result<()> {
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
artifact_hash TEXT NOT NULL,
status TEXT NOT NULL,
sink_statuses TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
",
)
.execute(&self.pool)
.await?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS checkpoints (
source_id TEXT PRIMARY KEY,
cursor TEXT NOT NULL,
updated_at TEXT NOT NULL
)
",
)
.execute(&self.pool)
.await?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS manifests (
run_id TEXT PRIMARY KEY,
root_hash TEXT NOT NULL,
artifact_count INTEGER NOT NULL,
created_at TEXT NOT NULL
)
",
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn create_job(
&self,
job_id: JobId,
run_id: RunId,
artifact_hash: ContentHash,
status: JobStatus,
sink_statuses: HashMap<String, SinkJobState>,
) -> anyhow::Result<JobId> {
let now = chrono::Utc::now();
let sink_statuses_json = serde_json::to_string(&sink_statuses)?;
sqlx::query(
r"
INSERT INTO jobs (job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
",
)
.bind(job_id.0.to_string())
.bind(run_id.0.to_string())
.bind(artifact_hash.0)
.bind(serde_json::to_string(&status)?)
.bind(sink_statuses_json)
.bind(now.to_rfc3339())
.bind(now.to_rfc3339())
.execute(&self.pool)
.await?;
Ok(job_id)
}
pub async fn update_job_status(
&self,
job_id: &JobId,
status: JobStatus,
sink_statuses: HashMap<String, SinkJobState>,
) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let sink_statuses_json = serde_json::to_string(&sink_statuses)?;
let result = sqlx::query(
r"
UPDATE jobs
SET status = ?1, sink_statuses = ?2, updated_at = ?3
WHERE job_id = ?4
",
)
.bind(serde_json::to_string(&status)?)
.bind(sink_statuses_json)
.bind(now.to_rfc3339())
.bind(job_id.0.to_string())
.execute(&self.pool)
.await?;
if result.rows_affected() == 0 {
anyhow::bail!("Job not found: {}", job_id.0);
}
Ok(())
}
pub async fn list_jobs_by_run(
&self,
run_id: &RunId,
) -> anyhow::Result<Vec<Job>> {
let rows = sqlx::query_as::<_, JobRow>(
r"
SELECT job_id, run_id, artifact_hash, status, sink_statuses, created_at, updated_at
FROM jobs
WHERE run_id = ?1
ORDER BY created_at
",
)
.bind(run_id.0.to_string())
.fetch_all(&self.pool)
.await?;
let jobs: anyhow::Result<Vec<Job>> =
rows.into_iter().map(Job::try_from).collect();
jobs
}
/// Saves a manifest with its root hash for a backup run.
pub async fn save_manifest(
&self,
run_id: &RunId,
root_hash: &ContentHash,
artifact_count: usize,
) -> anyhow::Result<()> {
let now = chrono::Utc::now();
// Check if artifact_count fits in i64
let count_i64 = i64::try_from(artifact_count).map_err(|_| {
anyhow::anyhow!("Artifact count {artifact_count} exceeds i64 max")
})?;
sqlx::query(
r"
INSERT INTO manifests (run_id, root_hash, artifact_count, created_at)
VALUES (?1, ?2, ?3, ?4)
",
)
.bind(run_id.0.to_string())
.bind(root_hash.as_ref())
.bind(count_i64)
.bind(now.to_rfc3339())
.execute(&self.pool)
.await?;
Ok(())
}
/// Retrieves the stored root hash for a backup run.
pub async fn get_manifest_root(
&self,
run_id: &RunId,
) -> anyhow::Result<Option<ContentHash>> {
let row: Option<(String,)> = sqlx::query_as(
r"
SELECT root_hash
FROM manifests
WHERE run_id = ?1
",
)
.bind(run_id.0.to_string())
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|(hash,)| ContentHash::new(hash)))
}
}
#[derive(FromRow)]
struct JobRow {
job_id: String,
run_id: String,
artifact_hash: String,
status: String,
sink_statuses: String,
created_at: String,
updated_at: String,
}
impl TryFrom<JobRow> for Job {
type Error = anyhow::Error;
fn try_from(row: JobRow) -> anyhow::Result<Self> {
let job_id = row.job_id.parse().map_err(|e| {
anyhow::anyhow!("Invalid job_id UUID '{}': {}", row.job_id, e)
})?;
let run_id = row.run_id.parse().map_err(|e| {
anyhow::anyhow!("Invalid run_id UUID '{}': {}", row.run_id, e)
})?;
let created_at = chrono::DateTime::parse_from_rfc3339(&row.created_at)
.map_err(|e| {
anyhow::anyhow!(
"Invalid created_at datetime '{}': {}",
row.created_at,
e
)
})?
.with_timezone(&chrono::Utc);
let updated_at = chrono::DateTime::parse_from_rfc3339(&row.updated_at)
.map_err(|e| {
anyhow::anyhow!(
"Invalid updated_at datetime '{}': {}",
row.updated_at,
e
)
})?
.with_timezone(&chrono::Utc);
Ok(Self {
job_id: JobId(job_id),
run_id: RunId(run_id),
artifact_hash: ContentHash::new(row.artifact_hash),
status: serde_json::from_str(&row.status)
.unwrap_or(JobStatus::Discovered),
sink_statuses: serde_json::from_str(&row.sink_statuses)
.unwrap_or_default(),
created_at,
updated_at,
})
}
}