Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I2d1f04f13970d21c36067f30bc04a9176a6a6964
387 lines
11 KiB
Rust
387 lines
11 KiB
Rust
use std::path::{Path, PathBuf};
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use notify::{PollWatcher, RecursiveMode, Watcher};
|
|
use tokio::sync::mpsc;
|
|
use tracing::{info, warn};
|
|
|
|
use crate::error::Result;
|
|
use crate::import;
|
|
use crate::storage::DynStorageBackend;
|
|
|
|
pub struct ScanStatus {
|
|
pub scanning: bool,
|
|
pub files_found: usize,
|
|
pub files_processed: usize,
|
|
/// Number of files skipped because they haven't changed (incremental scan)
|
|
pub files_skipped: usize,
|
|
pub errors: Vec<String>,
|
|
}
|
|
|
|
/// Options for scanning operations
|
|
#[derive(Debug, Clone, Default)]
|
|
pub struct ScanOptions {
|
|
/// Use incremental scanning (skip unchanged files based on mtime)
|
|
pub incremental: bool,
|
|
/// Force full rescan even for incremental mode
|
|
pub force_full: bool,
|
|
}
|
|
|
|
/// Shared scan progress that can be read by the status endpoint while a scan runs.
|
|
#[derive(Clone)]
|
|
pub struct ScanProgress {
|
|
pub is_scanning: Arc<AtomicBool>,
|
|
pub files_found: Arc<AtomicUsize>,
|
|
pub files_processed: Arc<AtomicUsize>,
|
|
pub error_count: Arc<AtomicUsize>,
|
|
pub error_messages: Arc<Mutex<Vec<String>>>,
|
|
}
|
|
|
|
const MAX_STORED_ERRORS: usize = 100;
|
|
|
|
impl ScanProgress {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
is_scanning: Arc::new(AtomicBool::new(false)),
|
|
files_found: Arc::new(AtomicUsize::new(0)),
|
|
files_processed: Arc::new(AtomicUsize::new(0)),
|
|
error_count: Arc::new(AtomicUsize::new(0)),
|
|
error_messages: Arc::new(Mutex::new(Vec::new())),
|
|
}
|
|
}
|
|
|
|
pub fn snapshot(&self) -> ScanStatus {
|
|
let errors = self
|
|
.error_messages
|
|
.lock()
|
|
.map(|v| v.clone())
|
|
.unwrap_or_default();
|
|
ScanStatus {
|
|
scanning: self.is_scanning.load(Ordering::Acquire),
|
|
files_found: self.files_found.load(Ordering::Acquire),
|
|
files_processed: self.files_processed.load(Ordering::Acquire),
|
|
files_skipped: 0, // Not tracked in real-time progress
|
|
errors,
|
|
}
|
|
}
|
|
|
|
fn begin(&self) {
|
|
self.is_scanning.store(true, Ordering::Release);
|
|
self.files_found.store(0, Ordering::Release);
|
|
self.files_processed.store(0, Ordering::Release);
|
|
self.error_count.store(0, Ordering::Release);
|
|
if let Ok(mut msgs) = self.error_messages.lock() {
|
|
msgs.clear();
|
|
}
|
|
}
|
|
|
|
fn record_error(&self, message: String) {
|
|
self.error_count.fetch_add(1, Ordering::Release);
|
|
if let Ok(mut msgs) = self.error_messages.lock()
|
|
&& msgs.len() < MAX_STORED_ERRORS
|
|
{
|
|
msgs.push(message);
|
|
}
|
|
}
|
|
|
|
fn finish(&self) {
|
|
self.is_scanning.store(false, Ordering::Release);
|
|
}
|
|
}
|
|
|
|
impl Default for ScanProgress {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
pub async fn scan_directory(
|
|
storage: &DynStorageBackend,
|
|
dir: &Path,
|
|
ignore_patterns: &[String],
|
|
) -> Result<ScanStatus> {
|
|
scan_directory_with_options(storage, dir, ignore_patterns, None, &ScanOptions::default()).await
|
|
}
|
|
|
|
/// Scan a directory with incremental scanning support
|
|
pub async fn scan_directory_incremental(
|
|
storage: &DynStorageBackend,
|
|
dir: &Path,
|
|
ignore_patterns: &[String],
|
|
) -> Result<ScanStatus> {
|
|
let options = ScanOptions {
|
|
incremental: true,
|
|
force_full: false,
|
|
};
|
|
scan_directory_with_options(storage, dir, ignore_patterns, None, &options).await
|
|
}
|
|
|
|
pub async fn scan_directory_with_progress(
|
|
storage: &DynStorageBackend,
|
|
dir: &Path,
|
|
ignore_patterns: &[String],
|
|
progress: Option<&ScanProgress>,
|
|
) -> Result<ScanStatus> {
|
|
scan_directory_with_options(
|
|
storage,
|
|
dir,
|
|
ignore_patterns,
|
|
progress,
|
|
&ScanOptions::default(),
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Scan a directory with full options including progress tracking and incremental mode
|
|
pub async fn scan_directory_with_options(
|
|
storage: &DynStorageBackend,
|
|
dir: &Path,
|
|
ignore_patterns: &[String],
|
|
progress: Option<&ScanProgress>,
|
|
scan_options: &ScanOptions,
|
|
) -> Result<ScanStatus> {
|
|
info!(
|
|
dir = %dir.display(),
|
|
incremental = scan_options.incremental,
|
|
force = scan_options.force_full,
|
|
"starting directory scan"
|
|
);
|
|
|
|
if let Some(p) = progress {
|
|
p.begin();
|
|
}
|
|
|
|
// Convert scan options to import options
|
|
let import_options = import::ImportOptions {
|
|
incremental: scan_options.incremental && !scan_options.force_full,
|
|
force: scan_options.force_full,
|
|
photo_config: crate::config::PhotoConfig::default(),
|
|
};
|
|
|
|
let results = import::import_directory_with_options(
|
|
storage,
|
|
dir,
|
|
ignore_patterns,
|
|
8, // Default concurrency
|
|
&import_options,
|
|
)
|
|
.await?;
|
|
|
|
let mut errors = Vec::new();
|
|
let mut processed = 0;
|
|
let mut skipped = 0;
|
|
for result in &results {
|
|
match result {
|
|
Ok(r) => {
|
|
if r.was_skipped {
|
|
skipped += 1;
|
|
} else {
|
|
processed += 1;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let msg = e.to_string();
|
|
if let Some(p) = progress {
|
|
p.record_error(msg.clone());
|
|
}
|
|
errors.push(msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(p) = progress {
|
|
p.files_found.store(results.len(), Ordering::Release);
|
|
p.files_processed.store(processed, Ordering::Release);
|
|
p.finish();
|
|
}
|
|
|
|
let status = ScanStatus {
|
|
scanning: false,
|
|
files_found: results.len(),
|
|
files_processed: processed,
|
|
files_skipped: skipped,
|
|
errors,
|
|
};
|
|
|
|
if scan_options.incremental {
|
|
info!(
|
|
dir = %dir.display(),
|
|
found = status.files_found,
|
|
processed = status.files_processed,
|
|
skipped = status.files_skipped,
|
|
"incremental scan complete"
|
|
);
|
|
}
|
|
|
|
Ok(status)
|
|
}
|
|
|
|
pub async fn scan_all_roots(
|
|
storage: &DynStorageBackend,
|
|
ignore_patterns: &[String],
|
|
) -> Result<Vec<ScanStatus>> {
|
|
scan_all_roots_with_options(storage, ignore_patterns, None, &ScanOptions::default()).await
|
|
}
|
|
|
|
/// Scan all roots incrementally (skip unchanged files)
|
|
pub async fn scan_all_roots_incremental(
|
|
storage: &DynStorageBackend,
|
|
ignore_patterns: &[String],
|
|
) -> Result<Vec<ScanStatus>> {
|
|
let options = ScanOptions {
|
|
incremental: true,
|
|
force_full: false,
|
|
};
|
|
scan_all_roots_with_options(storage, ignore_patterns, None, &options).await
|
|
}
|
|
|
|
pub async fn scan_all_roots_with_progress(
|
|
storage: &DynStorageBackend,
|
|
ignore_patterns: &[String],
|
|
progress: Option<&ScanProgress>,
|
|
) -> Result<Vec<ScanStatus>> {
|
|
scan_all_roots_with_options(storage, ignore_patterns, progress, &ScanOptions::default()).await
|
|
}
|
|
|
|
/// Scan all roots with full options including progress and incremental mode
|
|
pub async fn scan_all_roots_with_options(
|
|
storage: &DynStorageBackend,
|
|
ignore_patterns: &[String],
|
|
progress: Option<&ScanProgress>,
|
|
scan_options: &ScanOptions,
|
|
) -> Result<Vec<ScanStatus>> {
|
|
let roots = storage.list_root_dirs().await?;
|
|
let mut statuses = Vec::new();
|
|
|
|
for root in roots {
|
|
match scan_directory_with_options(storage, &root, ignore_patterns, progress, scan_options)
|
|
.await
|
|
{
|
|
Ok(status) => statuses.push(status),
|
|
Err(e) => {
|
|
warn!(root = %root.display(), error = %e, "failed to scan root directory");
|
|
statuses.push(ScanStatus {
|
|
scanning: false,
|
|
files_found: 0,
|
|
files_processed: 0,
|
|
files_skipped: 0,
|
|
errors: vec![e.to_string()],
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(statuses)
|
|
}
|
|
|
|
pub struct FileWatcher {
|
|
_watcher: Box<dyn Watcher + Send>,
|
|
rx: mpsc::Receiver<PathBuf>,
|
|
}
|
|
|
|
impl FileWatcher {
|
|
pub fn new(dirs: &[PathBuf]) -> Result<Self> {
|
|
let (tx, rx) = mpsc::channel(1024);
|
|
|
|
// Try the recommended (native) watcher first, fall back to polling
|
|
let watcher: Box<dyn Watcher + Send> = match Self::try_native_watcher(dirs, tx.clone()) {
|
|
Ok(w) => {
|
|
info!("using native filesystem watcher");
|
|
w
|
|
}
|
|
Err(native_err) => {
|
|
warn!(error = %native_err, "native watcher failed, falling back to polling");
|
|
Self::polling_watcher(dirs, tx)?
|
|
}
|
|
};
|
|
|
|
Ok(Self {
|
|
_watcher: watcher,
|
|
rx,
|
|
})
|
|
}
|
|
|
|
fn try_native_watcher(
|
|
dirs: &[PathBuf],
|
|
tx: mpsc::Sender<PathBuf>,
|
|
) -> std::result::Result<Box<dyn Watcher + Send>, notify::Error> {
|
|
let tx_clone = tx.clone();
|
|
let mut watcher =
|
|
notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
|
|
if let Ok(event) = res {
|
|
for path in event.paths {
|
|
if tx_clone.blocking_send(path).is_err() {
|
|
tracing::warn!("filesystem watcher channel closed, stopping");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
})?;
|
|
|
|
for dir in dirs {
|
|
watcher.watch(dir, RecursiveMode::Recursive)?;
|
|
}
|
|
|
|
Ok(Box::new(watcher))
|
|
}
|
|
|
|
fn polling_watcher(
|
|
dirs: &[PathBuf],
|
|
tx: mpsc::Sender<PathBuf>,
|
|
) -> Result<Box<dyn Watcher + Send>> {
|
|
let tx_clone = tx.clone();
|
|
let poll_interval = std::time::Duration::from_secs(5);
|
|
let config = notify::Config::default().with_poll_interval(poll_interval);
|
|
|
|
let mut watcher = PollWatcher::new(
|
|
move |res: notify::Result<notify::Event>| {
|
|
if let Ok(event) = res {
|
|
for path in event.paths {
|
|
if tx_clone.blocking_send(path).is_err() {
|
|
tracing::warn!("filesystem watcher channel closed, stopping");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
},
|
|
config,
|
|
)
|
|
.map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?;
|
|
|
|
for dir in dirs {
|
|
watcher
|
|
.watch(dir, RecursiveMode::Recursive)
|
|
.map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?;
|
|
}
|
|
|
|
Ok(Box::new(watcher))
|
|
}
|
|
|
|
pub async fn next_change(&mut self) -> Option<PathBuf> {
|
|
self.rx.recv().await
|
|
}
|
|
}
|
|
|
|
pub async fn watch_and_import(
|
|
storage: DynStorageBackend,
|
|
dirs: Vec<PathBuf>,
|
|
ignore_patterns: Vec<String>,
|
|
) -> Result<()> {
|
|
let mut watcher = FileWatcher::new(&dirs)?;
|
|
info!("filesystem watcher started");
|
|
|
|
while let Some(path) = watcher.next_change().await {
|
|
if path.is_file()
|
|
&& crate::media_type::MediaType::from_path(&path).is_some()
|
|
&& !crate::import::should_ignore(&path, &ignore_patterns)
|
|
{
|
|
info!(path = %path.display(), "detected file change, importing");
|
|
if let Err(e) = import::import_file(&storage, &path).await {
|
|
warn!(path = %path.display(), error = %e, "failed to import changed file");
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|