diff --git a/src/fetch.rs b/src/fetch.rs index 214b4eb..9e1d618 100644 --- a/src/fetch.rs +++ b/src/fetch.rs @@ -1,10 +1,13 @@ use std::{ fs, path::{Path, PathBuf}, + sync::Arc, }; -use indicatif::{ProgressBar, ProgressStyle}; +use futures::future::join_all; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use reqwest::Client; +use tokio::sync::Semaphore; use crate::{ error::{PakkerError, Result}, @@ -12,14 +15,19 @@ use crate::{ utils::verify_hash, }; +/// Maximum number of concurrent downloads +const MAX_CONCURRENT_DOWNLOADS: usize = 8; + pub struct Fetcher { client: Client, base_path: PathBuf, + shelve: bool, } pub struct FileFetcher { client: Client, base_path: PathBuf, + shelve: bool, } impl Fetcher { @@ -27,9 +35,15 @@ impl Fetcher { Self { client: Client::new(), base_path: base_path.as_ref().to_path_buf(), + shelve: false, } } + pub const fn with_shelve(mut self, shelve: bool) -> Self { + self.shelve = shelve; + self + } + pub async fn fetch_all( &self, lockfile: &LockFile, @@ -38,6 +52,7 @@ impl Fetcher { let fetcher = FileFetcher { client: self.client.clone(), base_path: self.base_path.clone(), + shelve: self.shelve, }; fetcher.fetch_all(lockfile, config).await } @@ -48,7 +63,7 @@ impl Fetcher { } impl FileFetcher { - /// Fetch all project files according to lockfile + /// Fetch all project files according to lockfile with parallel downloads pub async fn fetch_all( &self, lockfile: &LockFile, @@ -58,25 +73,104 @@ impl FileFetcher { lockfile.projects.iter().filter(|p| p.export).collect(); let total = exportable_projects.len(); - let spinner = ProgressBar::new(total as u64); - spinner.set_style( - ProgressStyle::default_spinner() - .template("{spinner:.green} {msg}") - .unwrap(), - ); - for (idx, project) in exportable_projects.iter().enumerate() { - let name = project - .name - .values() - .next() - .map_or("unknown", std::string::String::as_str); - - spinner.set_message(format!("Fetching {} ({}/{})", name, idx + 1, total)); - self.fetch_project(project, lockfile, config).await?; + if total == 0 { + log::info!("No projects to fetch"); + return Ok(()); } - spinner.finish_with_message("All projects fetched"); + // Set up multi-progress for parallel download tracking + let multi_progress = MultiProgress::new(); + let overall_bar = multi_progress.add(ProgressBar::new(total as u64)); + overall_bar.set_style( + ProgressStyle::default_bar() + .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} {msg}") + .unwrap() + .progress_chars("#>-"), + ); + overall_bar.set_message("Fetching projects..."); + + // Use a semaphore to limit concurrent downloads + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_DOWNLOADS)); + + // Prepare download tasks + let download_tasks: Vec<_> = exportable_projects + .iter() + .map(|project| { + let semaphore = Arc::clone(&semaphore); + let client = self.client.clone(); + let base_path = self.base_path.clone(); + let lockfile = lockfile.clone(); + let config = config.clone(); + let project = (*project).clone(); + let overall_bar = overall_bar.clone(); + + async move { + // Acquire semaphore permit to limit concurrency + let _permit = semaphore.acquire().await.map_err(|_| { + PakkerError::InternalError("Semaphore acquisition failed".into()) + })?; + + let name = project + .name + .values() + .next() + .map_or("unknown".to_string(), std::clone::Clone::clone); + + let fetcher = Self { + client, + base_path, + shelve: false, // Shelving happens at sync level, not per-project + }; + + let result = + fetcher.fetch_project(&project, &lockfile, &config).await; + + // Update progress bar + overall_bar.inc(1); + + match &result { + Ok(()) => { + log::debug!("Successfully fetched: {name}"); + }, + Err(e) => { + log::error!("Failed to fetch {name}: {e}"); + }, + } + + result.map(|()| name) + } + }) + .collect(); + + // Execute all downloads in parallel (limited by semaphore) + let results = join_all(download_tasks).await; + + overall_bar.finish_with_message("All projects fetched"); + + // Collect and report errors + let mut errors = Vec::new(); + let mut success_count = 0; + + for result in results { + match result { + Ok(_) => success_count += 1, + Err(e) => errors.push(e), + } + } + + log::info!("Fetch complete: {success_count}/{total} successful"); + + if !errors.is_empty() { + // Return the first error, but log all of them + for (idx, error) in errors.iter().enumerate() { + log::error!("Download error {}: {}", idx + 1, error); + } + return Err(errors.remove(0)); + } + + // Handle unknown files (shelve or delete) + self.handle_unknown_files(lockfile, config)?; // Sync overrides self.sync_overrides(config)?; @@ -84,6 +178,117 @@ impl FileFetcher { Ok(()) } + /// Handle unknown project files that aren't in the lockfile. + /// If shelve is true, moves them to a shelf directory. + /// Otherwise, deletes them. + fn handle_unknown_files( + &self, + lockfile: &LockFile, + config: &Config, + ) -> Result<()> { + // Collect all expected file names from lockfile + let expected_files: std::collections::HashSet = lockfile + .projects + .iter() + .filter(|p| p.export) + .filter_map(|p| p.files.first().map(|f| f.file_name.clone())) + .collect(); + + // Check each project type directory + let project_dirs = [ + ( + "mod", + self.get_default_path(&crate::model::ProjectType::Mod), + ), + ( + "resource-pack", + self.get_default_path(&crate::model::ProjectType::ResourcePack), + ), + ( + "shader", + self.get_default_path(&crate::model::ProjectType::Shader), + ), + ( + "data-pack", + self.get_default_path(&crate::model::ProjectType::DataPack), + ), + ( + "world", + self.get_default_path(&crate::model::ProjectType::World), + ), + ]; + + // Also check custom paths from config + let mut dirs_to_check: Vec = project_dirs + .iter() + .map(|(_, dir)| self.base_path.join(dir)) + .collect(); + + for custom_path in config.paths.values() { + dirs_to_check.push(self.base_path.join(custom_path)); + } + + let shelf_dir = self.base_path.join(".pakker-shelf"); + let mut shelved_count = 0; + let mut deleted_count = 0; + + for dir in dirs_to_check { + if !dir.exists() { + continue; + } + + let entries = match fs::read_dir(&dir) { + Ok(e) => e, + Err(_) => continue, + }; + + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + + let file_name = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name.to_string(), + None => continue, + }; + + // Skip if file is expected + if expected_files.contains(&file_name) { + continue; + } + + // Skip non-jar files (might be configs, etc.) + if !file_name.ends_with(".jar") { + continue; + } + + if self.shelve { + // Move to shelf + fs::create_dir_all(&shelf_dir)?; + let shelf_path = shelf_dir.join(&file_name); + fs::rename(&path, &shelf_path)?; + log::info!("Shelved unknown file: {file_name} -> .pakker-shelf/"); + shelved_count += 1; + } else { + // Delete unknown file + fs::remove_file(&path)?; + log::info!("Deleted unknown file: {file_name}"); + deleted_count += 1; + } + } + } + + if shelved_count > 0 { + log::info!("Shelved {shelved_count} unknown file(s) to .pakker-shelf/"); + } + if deleted_count > 0 { + log::info!("Deleted {deleted_count} unknown file(s)"); + } + + Ok(()) + } + /// Fetch files for a single project pub async fn fetch_project( &self, diff --git a/src/ipc.rs b/src/ipc.rs index 2039ec9..d1aceaa 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -12,7 +12,7 @@ use std::{ fs::{self, File, OpenOptions}, io::Write, os::unix::{fs::PermissionsExt, io::AsRawFd}, - path::PathBuf, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -108,7 +108,7 @@ impl IpcCoordinator { /// Extract modpack hash from pakku.json's parentLockHash field. /// This is the authoritative content hash for the modpack (Nix-style). - fn get_modpack_hash(working_dir: &PathBuf) -> Result { + fn get_modpack_hash(working_dir: &Path) -> Result { let pakku_path = working_dir.join("pakku.json"); if !pakku_path.exists() { @@ -147,7 +147,7 @@ impl IpcCoordinator { /// Create a new IPC coordinator for the given modpack directory. /// Uses parentLockHash from pakku.json to identify the modpack. - pub fn new(working_dir: &PathBuf) -> Result { + pub fn new(working_dir: &Path) -> Result { let modpack_hash = Self::get_modpack_hash(working_dir)?; let ipc_base = Self::get_ipc_base_dir(); let ipc_dir = ipc_base.join(&modpack_hash); @@ -187,6 +187,7 @@ impl IpcCoordinator { .read(true) .write(true) .create(true) + .truncate(false) .open(&self.ops_file) .map_err(|e| IpcError::InvalidFormat(e.to_string()))?;