fetch: add parallel downloads and --shelve flag

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Id61b7c502923c697599cfb3afed948d56a6a6964
This commit is contained in:
raf 2026-02-12 23:20:26 +03:00
commit 788bdb0f1b
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
2 changed files with 225 additions and 19 deletions

View file

@ -1,10 +1,13 @@
use std::{ use std::{
fs, fs,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc,
}; };
use indicatif::{ProgressBar, ProgressStyle}; use futures::future::join_all;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use reqwest::Client; use reqwest::Client;
use tokio::sync::Semaphore;
use crate::{ use crate::{
error::{PakkerError, Result}, error::{PakkerError, Result},
@ -12,14 +15,19 @@ use crate::{
utils::verify_hash, utils::verify_hash,
}; };
/// Maximum number of concurrent downloads
const MAX_CONCURRENT_DOWNLOADS: usize = 8;
pub struct Fetcher { pub struct Fetcher {
client: Client, client: Client,
base_path: PathBuf, base_path: PathBuf,
shelve: bool,
} }
pub struct FileFetcher { pub struct FileFetcher {
client: Client, client: Client,
base_path: PathBuf, base_path: PathBuf,
shelve: bool,
} }
impl Fetcher { impl Fetcher {
@ -27,9 +35,15 @@ impl Fetcher {
Self { Self {
client: Client::new(), client: Client::new(),
base_path: base_path.as_ref().to_path_buf(), base_path: base_path.as_ref().to_path_buf(),
shelve: false,
} }
} }
pub const fn with_shelve(mut self, shelve: bool) -> Self {
self.shelve = shelve;
self
}
pub async fn fetch_all( pub async fn fetch_all(
&self, &self,
lockfile: &LockFile, lockfile: &LockFile,
@ -38,6 +52,7 @@ impl Fetcher {
let fetcher = FileFetcher { let fetcher = FileFetcher {
client: self.client.clone(), client: self.client.clone(),
base_path: self.base_path.clone(), base_path: self.base_path.clone(),
shelve: self.shelve,
}; };
fetcher.fetch_all(lockfile, config).await fetcher.fetch_all(lockfile, config).await
} }
@ -48,7 +63,7 @@ impl Fetcher {
} }
impl FileFetcher { impl FileFetcher {
/// Fetch all project files according to lockfile /// Fetch all project files according to lockfile with parallel downloads
pub async fn fetch_all( pub async fn fetch_all(
&self, &self,
lockfile: &LockFile, lockfile: &LockFile,
@ -58,25 +73,104 @@ impl FileFetcher {
lockfile.projects.iter().filter(|p| p.export).collect(); lockfile.projects.iter().filter(|p| p.export).collect();
let total = exportable_projects.len(); let total = exportable_projects.len();
let spinner = ProgressBar::new(total as u64);
spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} {msg}")
.unwrap(),
);
for (idx, project) in exportable_projects.iter().enumerate() { if total == 0 {
let name = project log::info!("No projects to fetch");
.name return Ok(());
.values()
.next()
.map_or("unknown", std::string::String::as_str);
spinner.set_message(format!("Fetching {} ({}/{})", name, idx + 1, total));
self.fetch_project(project, lockfile, config).await?;
} }
spinner.finish_with_message("All projects fetched"); // Set up multi-progress for parallel download tracking
let multi_progress = MultiProgress::new();
let overall_bar = multi_progress.add(ProgressBar::new(total as u64));
overall_bar.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} {msg}")
.unwrap()
.progress_chars("#>-"),
);
overall_bar.set_message("Fetching projects...");
// Use a semaphore to limit concurrent downloads
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_DOWNLOADS));
// Prepare download tasks
let download_tasks: Vec<_> = exportable_projects
.iter()
.map(|project| {
let semaphore = Arc::clone(&semaphore);
let client = self.client.clone();
let base_path = self.base_path.clone();
let lockfile = lockfile.clone();
let config = config.clone();
let project = (*project).clone();
let overall_bar = overall_bar.clone();
async move {
// Acquire semaphore permit to limit concurrency
let _permit = semaphore.acquire().await.map_err(|_| {
PakkerError::InternalError("Semaphore acquisition failed".into())
})?;
let name = project
.name
.values()
.next()
.map_or("unknown".to_string(), std::clone::Clone::clone);
let fetcher = Self {
client,
base_path,
shelve: false, // Shelving happens at sync level, not per-project
};
let result =
fetcher.fetch_project(&project, &lockfile, &config).await;
// Update progress bar
overall_bar.inc(1);
match &result {
Ok(()) => {
log::debug!("Successfully fetched: {name}");
},
Err(e) => {
log::error!("Failed to fetch {name}: {e}");
},
}
result.map(|()| name)
}
})
.collect();
// Execute all downloads in parallel (limited by semaphore)
let results = join_all(download_tasks).await;
overall_bar.finish_with_message("All projects fetched");
// Collect and report errors
let mut errors = Vec::new();
let mut success_count = 0;
for result in results {
match result {
Ok(_) => success_count += 1,
Err(e) => errors.push(e),
}
}
log::info!("Fetch complete: {success_count}/{total} successful");
if !errors.is_empty() {
// Return the first error, but log all of them
for (idx, error) in errors.iter().enumerate() {
log::error!("Download error {}: {}", idx + 1, error);
}
return Err(errors.remove(0));
}
// Handle unknown files (shelve or delete)
self.handle_unknown_files(lockfile, config)?;
// Sync overrides // Sync overrides
self.sync_overrides(config)?; self.sync_overrides(config)?;
@ -84,6 +178,117 @@ impl FileFetcher {
Ok(()) Ok(())
} }
/// Handle unknown project files that aren't in the lockfile.
/// If shelve is true, moves them to a shelf directory.
/// Otherwise, deletes them.
fn handle_unknown_files(
&self,
lockfile: &LockFile,
config: &Config,
) -> Result<()> {
// Collect all expected file names from lockfile
let expected_files: std::collections::HashSet<String> = lockfile
.projects
.iter()
.filter(|p| p.export)
.filter_map(|p| p.files.first().map(|f| f.file_name.clone()))
.collect();
// Check each project type directory
let project_dirs = [
(
"mod",
self.get_default_path(&crate::model::ProjectType::Mod),
),
(
"resource-pack",
self.get_default_path(&crate::model::ProjectType::ResourcePack),
),
(
"shader",
self.get_default_path(&crate::model::ProjectType::Shader),
),
(
"data-pack",
self.get_default_path(&crate::model::ProjectType::DataPack),
),
(
"world",
self.get_default_path(&crate::model::ProjectType::World),
),
];
// Also check custom paths from config
let mut dirs_to_check: Vec<PathBuf> = project_dirs
.iter()
.map(|(_, dir)| self.base_path.join(dir))
.collect();
for custom_path in config.paths.values() {
dirs_to_check.push(self.base_path.join(custom_path));
}
let shelf_dir = self.base_path.join(".pakker-shelf");
let mut shelved_count = 0;
let mut deleted_count = 0;
for dir in dirs_to_check {
if !dir.exists() {
continue;
}
let entries = match fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => continue,
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let file_name = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name.to_string(),
None => continue,
};
// Skip if file is expected
if expected_files.contains(&file_name) {
continue;
}
// Skip non-jar files (might be configs, etc.)
if !file_name.ends_with(".jar") {
continue;
}
if self.shelve {
// Move to shelf
fs::create_dir_all(&shelf_dir)?;
let shelf_path = shelf_dir.join(&file_name);
fs::rename(&path, &shelf_path)?;
log::info!("Shelved unknown file: {file_name} -> .pakker-shelf/");
shelved_count += 1;
} else {
// Delete unknown file
fs::remove_file(&path)?;
log::info!("Deleted unknown file: {file_name}");
deleted_count += 1;
}
}
}
if shelved_count > 0 {
log::info!("Shelved {shelved_count} unknown file(s) to .pakker-shelf/");
}
if deleted_count > 0 {
log::info!("Deleted {deleted_count} unknown file(s)");
}
Ok(())
}
/// Fetch files for a single project /// Fetch files for a single project
pub async fn fetch_project( pub async fn fetch_project(
&self, &self,

View file

@ -12,7 +12,7 @@ use std::{
fs::{self, File, OpenOptions}, fs::{self, File, OpenOptions},
io::Write, io::Write,
os::unix::{fs::PermissionsExt, io::AsRawFd}, os::unix::{fs::PermissionsExt, io::AsRawFd},
path::PathBuf, path::{Path, PathBuf},
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
@ -108,7 +108,7 @@ impl IpcCoordinator {
/// Extract modpack hash from pakku.json's parentLockHash field. /// Extract modpack hash from pakku.json's parentLockHash field.
/// This is the authoritative content hash for the modpack (Nix-style). /// This is the authoritative content hash for the modpack (Nix-style).
fn get_modpack_hash(working_dir: &PathBuf) -> Result<String, IpcError> { fn get_modpack_hash(working_dir: &Path) -> Result<String, IpcError> {
let pakku_path = working_dir.join("pakku.json"); let pakku_path = working_dir.join("pakku.json");
if !pakku_path.exists() { if !pakku_path.exists() {
@ -147,7 +147,7 @@ impl IpcCoordinator {
/// Create a new IPC coordinator for the given modpack directory. /// Create a new IPC coordinator for the given modpack directory.
/// Uses parentLockHash from pakku.json to identify the modpack. /// Uses parentLockHash from pakku.json to identify the modpack.
pub fn new(working_dir: &PathBuf) -> Result<Self, IpcError> { pub fn new(working_dir: &Path) -> Result<Self, IpcError> {
let modpack_hash = Self::get_modpack_hash(working_dir)?; let modpack_hash = Self::get_modpack_hash(working_dir)?;
let ipc_base = Self::get_ipc_base_dir(); let ipc_base = Self::get_ipc_base_dir();
let ipc_dir = ipc_base.join(&modpack_hash); let ipc_dir = ipc_base.join(&modpack_hash);
@ -187,6 +187,7 @@ impl IpcCoordinator {
.read(true) .read(true)
.write(true) .write(true)
.create(true) .create(true)
.truncate(false)
.open(&self.ops_file) .open(&self.ops_file)
.map_err(|e| IpcError::InvalidFormat(e.to_string()))?; .map_err(|e| IpcError::InvalidFormat(e.to_string()))?;