use std::{ path::{Path, PathBuf}, sync::{ Arc, Mutex, atomic::{AtomicBool, AtomicUsize, Ordering}, }, }; use notify::{PollWatcher, RecursiveMode, Watcher}; use tokio::sync::mpsc; use tracing::{info, warn}; use crate::{ error::Result, import, plugin::PluginPipeline, storage::DynStorageBackend, }; /// Status of a directory scan operation. pub struct ScanStatus { pub scanning: bool, pub files_found: usize, pub files_processed: usize, /// Number of files skipped because they haven't changed (incremental scan) pub files_skipped: usize, pub errors: Vec, } /// Options for scanning operations #[derive(Debug, Clone, Default)] pub struct ScanOptions { /// Use incremental scanning (skip unchanged files based on mtime) pub incremental: bool, /// Force full rescan even for incremental mode pub force_full: bool, } /// Shared scan progress that can be read by the status endpoint while a scan /// runs. #[derive(Clone)] pub struct ScanProgress { pub is_scanning: Arc, pub files_found: Arc, pub files_processed: Arc, pub error_count: Arc, pub error_messages: Arc>>, } const MAX_STORED_ERRORS: usize = 100; impl ScanProgress { #[must_use] pub fn new() -> Self { Self { is_scanning: Arc::new(AtomicBool::new(false)), files_found: Arc::new(AtomicUsize::new(0)), files_processed: Arc::new(AtomicUsize::new(0)), error_count: Arc::new(AtomicUsize::new(0)), error_messages: Arc::new(Mutex::new(Vec::new())), } } #[must_use] pub fn snapshot(&self) -> ScanStatus { let errors = self .error_messages .lock() .map(|v| v.clone()) .unwrap_or_default(); ScanStatus { scanning: self.is_scanning.load(Ordering::Acquire), files_found: self.files_found.load(Ordering::Acquire), files_processed: self.files_processed.load(Ordering::Acquire), files_skipped: 0, // Not tracked in real-time progress errors, } } fn begin(&self) { self.is_scanning.store(true, Ordering::Release); self.files_found.store(0, Ordering::Release); self.files_processed.store(0, Ordering::Release); self.error_count.store(0, Ordering::Release); if let Ok(mut msgs) = self.error_messages.lock() { msgs.clear(); } } fn record_error(&self, message: String) { self.error_count.fetch_add(1, Ordering::Release); if let Ok(mut msgs) = self.error_messages.lock() && msgs.len() < MAX_STORED_ERRORS { msgs.push(message); } } fn finish(&self) { self.is_scanning.store(false, Ordering::Release); } } impl Default for ScanProgress { fn default() -> Self { Self::new() } } /// Scans a directory with default options. /// /// # Arguments /// /// * `storage` - Storage backend /// * `dir` - Directory to scan /// * `ignore_patterns` - Patterns to exclude /// /// # Returns /// /// Scan status with counts and any errors /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if the scan fails. pub async fn scan_directory( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], pipeline: Option<&Arc>, ) -> Result { scan_directory_with_options( storage, dir, ignore_patterns, None, &ScanOptions::default(), pipeline, ) .await } /// Scans a directory with incremental scanning support. /// /// Skips files that haven't changed since last scan based on mtime. /// /// # Arguments /// /// * `storage` - Storage backend /// * `dir` - Directory to scan /// * `ignore_patterns` - Patterns to exclude /// /// # Returns /// /// Scan status with counts and any errors /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if the scan fails. pub async fn scan_directory_incremental( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], pipeline: Option<&Arc>, ) -> Result { let options = ScanOptions { incremental: true, force_full: false, }; scan_directory_with_options( storage, dir, ignore_patterns, None, &options, pipeline, ) .await } /// Scans a directory with progress reporting. /// /// # Arguments /// /// * `storage` - Storage backend /// * `dir` - Directory to scan /// * `ignore_patterns` - Patterns to exclude /// * `progress` - Optional progress tracker /// /// # Returns /// /// Scan status with counts and any errors /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if the scan fails. pub async fn scan_directory_with_progress( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], progress: Option<&ScanProgress>, pipeline: Option<&Arc>, ) -> Result { scan_directory_with_options( storage, dir, ignore_patterns, progress, &ScanOptions::default(), pipeline, ) .await } /// Scan a directory with full options including progress tracking and /// incremental mode. /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if the scan fails. pub async fn scan_directory_with_options( storage: &DynStorageBackend, dir: &Path, ignore_patterns: &[String], progress: Option<&ScanProgress>, scan_options: &ScanOptions, pipeline: Option<&Arc>, ) -> Result { info!( dir = %dir.display(), incremental = scan_options.incremental, force = scan_options.force_full, "starting directory scan" ); if let Some(p) = progress { p.begin(); } // Convert scan options to import options let import_options = import::ImportOptions { incremental: scan_options.incremental && !scan_options.force_full, force: scan_options.force_full, photo_config: crate::config::PhotoConfig::default(), }; let results = import::import_directory_with_options( storage, dir, ignore_patterns, 8, // default concurrency &import_options, pipeline, ) .await?; let mut errors = Vec::new(); let mut processed = 0; let mut skipped = 0; for result in &results { match result { Ok(r) => { if r.was_skipped { skipped += 1; } else { processed += 1; } }, Err(e) => { let msg = e.to_string(); if let Some(p) = progress { p.record_error(msg.clone()); } errors.push(msg); }, } } if let Some(p) = progress { p.files_found.store(results.len(), Ordering::Release); p.files_processed.store(processed, Ordering::Release); p.finish(); } let status = ScanStatus { scanning: false, files_found: results.len(), files_processed: processed, files_skipped: skipped, errors, }; if scan_options.incremental { info!( dir = %dir.display(), found = status.files_found, processed = status.files_processed, skipped = status.files_skipped, "incremental scan complete" ); } Ok(status) } /// Scans all configured root directories with default options. /// /// # Arguments /// /// * `storage` - Storage backend /// * `ignore_patterns` - Patterns to exclude /// /// # Returns /// /// Status for each root directory /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if listing roots or scanning fails. pub async fn scan_all_roots( storage: &DynStorageBackend, ignore_patterns: &[String], pipeline: Option<&Arc>, ) -> Result> { scan_all_roots_with_options( storage, ignore_patterns, None, &ScanOptions::default(), pipeline, ) .await } /// Scans all roots incrementally, skipping unchanged files. /// /// # Arguments /// /// * `storage` - Storage backend /// * `ignore_patterns` - Patterns to exclude /// /// # Returns /// /// Status for each root directory /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if listing roots or scanning fails. pub async fn scan_all_roots_incremental( storage: &DynStorageBackend, ignore_patterns: &[String], pipeline: Option<&Arc>, ) -> Result> { let options = ScanOptions { incremental: true, force_full: false, }; scan_all_roots_with_options( storage, ignore_patterns, None, &options, pipeline, ) .await } /// Scans all root directories with progress reporting. /// /// # Arguments /// /// * `storage` - Storage backend /// * `ignore_patterns` - Patterns to exclude /// * `progress` - Optional progress tracker /// /// # Returns /// /// Status for each root directory /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if listing roots or scanning fails. pub async fn scan_all_roots_with_progress( storage: &DynStorageBackend, ignore_patterns: &[String], progress: Option<&ScanProgress>, pipeline: Option<&Arc>, ) -> Result> { scan_all_roots_with_options( storage, ignore_patterns, progress, &ScanOptions::default(), pipeline, ) .await } /// Scans all roots with full options including progress and incremental mode. /// /// # Arguments /// /// * `storage` - Storage backend /// * `ignore_patterns` - Patterns to exclude /// * `progress` - Optional progress tracker /// * `scan_options` - Scan configuration /// /// # Returns /// /// Status for each root directory /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if listing roots or scanning fails. pub async fn scan_all_roots_with_options( storage: &DynStorageBackend, ignore_patterns: &[String], progress: Option<&ScanProgress>, scan_options: &ScanOptions, pipeline: Option<&Arc>, ) -> Result> { let roots = storage.list_root_dirs().await?; let mut statuses = Vec::new(); for root in roots { match scan_directory_with_options( storage, &root, ignore_patterns, progress, scan_options, pipeline, ) .await { Ok(status) => statuses.push(status), Err(e) => { warn!(root = %root.display(), error = %e, "failed to scan root directory"); statuses.push(ScanStatus { scanning: false, files_found: 0, files_processed: 0, files_skipped: 0, errors: vec![e.to_string()], }); }, } } Ok(statuses) } /// Watches directories for file changes and imports modified files. pub struct FileWatcher { _watcher: Box, rx: mpsc::Receiver, } impl FileWatcher { /// Creates a new file watcher for the given directories. /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if no filesystem watcher could be /// created. pub fn new(dirs: &[PathBuf]) -> Result { let (tx, rx) = mpsc::channel(1024); // Try the recommended (native) watcher first, fall back to polling let watcher: Box = match Self::try_native_watcher( dirs, tx.clone(), ) { Ok(w) => { info!("using native filesystem watcher"); w }, Err(native_err) => { warn!(error = %native_err, "native watcher failed, falling back to polling"); Self::polling_watcher(dirs, tx)? }, }; Ok(Self { _watcher: watcher, rx, }) } fn try_native_watcher( dirs: &[PathBuf], tx: mpsc::Sender, ) -> std::result::Result, notify::Error> { let tx_clone = tx; let mut watcher = notify::recommended_watcher( move |res: notify::Result| { if let Ok(event) = res { for path in event.paths { if tx_clone.blocking_send(path).is_err() { tracing::warn!("filesystem watcher channel closed, stopping"); break; } } } }, )?; for dir in dirs { watcher.watch(dir, RecursiveMode::Recursive)?; } Ok(Box::new(watcher)) } fn polling_watcher( dirs: &[PathBuf], tx: mpsc::Sender, ) -> Result> { let tx_clone = tx; let poll_interval = std::time::Duration::from_secs(5); let config = notify::Config::default().with_poll_interval(poll_interval); let mut watcher = PollWatcher::new( move |res: notify::Result| { if let Ok(event) = res { for path in event.paths { if tx_clone.blocking_send(path).is_err() { tracing::warn!("filesystem watcher channel closed, stopping"); break; } } } }, config, ) .map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?; for dir in dirs { watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| { crate::error::PinakesError::Io(std::io::Error::other(e)) })?; } Ok(Box::new(watcher)) } /// Receives the next changed file path. pub async fn next_change(&mut self) -> Option { self.rx.recv().await } } /// Watches directories and imports files on change. /// /// # Errors /// /// Returns [`crate::error::PinakesError`] if the watcher cannot be started. pub async fn watch_and_import( storage: DynStorageBackend, dirs: Vec, ignore_patterns: Vec, ) -> Result<()> { let mut watcher = FileWatcher::new(&dirs)?; info!("filesystem watcher started"); while let Some(path) = watcher.next_change().await { if path.is_file() && crate::media_type::MediaType::from_path(&path).is_some() && !crate::import::should_ignore(&path, &ignore_patterns) { info!(path = %path.display(), "detected file change, importing"); if let Err(e) = import::import_file(&storage, &path, None).await { warn!(path = %path.display(), error = %e, "failed to import changed file"); } } } Ok(()) }