pinakes/crates/pinakes-core/src/scan.rs
NotAShelf 6a73d11c4b
initial commit
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I4a6b498153eccd5407510dd541b7f4816a6a6964
2026-01-31 15:20:30 +03:00

283 lines
8.3 KiB
Rust

use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use notify::{PollWatcher, RecursiveMode, Watcher};
use tokio::sync::mpsc;
use tracing::{info, warn};
use crate::error::Result;
use crate::import;
use crate::storage::DynStorageBackend;
pub struct ScanStatus {
pub scanning: bool,
pub files_found: usize,
pub files_processed: usize,
pub errors: Vec<String>,
}
/// Shared scan progress that can be read by the status endpoint while a scan runs.
#[derive(Clone)]
pub struct ScanProgress {
pub is_scanning: Arc<AtomicBool>,
pub files_found: Arc<AtomicUsize>,
pub files_processed: Arc<AtomicUsize>,
pub error_count: Arc<AtomicUsize>,
pub error_messages: Arc<Mutex<Vec<String>>>,
}
const MAX_STORED_ERRORS: usize = 100;
impl ScanProgress {
pub fn new() -> Self {
Self {
is_scanning: Arc::new(AtomicBool::new(false)),
files_found: Arc::new(AtomicUsize::new(0)),
files_processed: Arc::new(AtomicUsize::new(0)),
error_count: Arc::new(AtomicUsize::new(0)),
error_messages: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn snapshot(&self) -> ScanStatus {
let errors = self
.error_messages
.lock()
.map(|v| v.clone())
.unwrap_or_default();
ScanStatus {
scanning: self.is_scanning.load(Ordering::Acquire),
files_found: self.files_found.load(Ordering::Acquire),
files_processed: self.files_processed.load(Ordering::Acquire),
errors,
}
}
fn begin(&self) {
self.is_scanning.store(true, Ordering::Release);
self.files_found.store(0, Ordering::Release);
self.files_processed.store(0, Ordering::Release);
self.error_count.store(0, Ordering::Release);
if let Ok(mut msgs) = self.error_messages.lock() {
msgs.clear();
}
}
fn record_error(&self, message: String) {
self.error_count.fetch_add(1, Ordering::Release);
if let Ok(mut msgs) = self.error_messages.lock()
&& msgs.len() < MAX_STORED_ERRORS
{
msgs.push(message);
}
}
fn finish(&self) {
self.is_scanning.store(false, Ordering::Release);
}
}
impl Default for ScanProgress {
fn default() -> Self {
Self::new()
}
}
pub async fn scan_directory(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
) -> Result<ScanStatus> {
scan_directory_with_progress(storage, dir, ignore_patterns, None).await
}
pub async fn scan_directory_with_progress(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
progress: Option<&ScanProgress>,
) -> Result<ScanStatus> {
info!(dir = %dir.display(), "starting directory scan");
if let Some(p) = progress {
p.begin();
}
let results = import::import_directory(storage, dir, ignore_patterns).await?;
// Note: for configurable concurrency, use import_directory_with_concurrency directly
let mut errors = Vec::new();
let mut processed = 0;
for result in &results {
match result {
Ok(_) => processed += 1,
Err(e) => {
let msg = e.to_string();
if let Some(p) = progress {
p.record_error(msg.clone());
}
errors.push(msg);
}
}
}
if let Some(p) = progress {
p.files_found.store(results.len(), Ordering::Release);
p.files_processed.store(processed, Ordering::Release);
p.finish();
}
let status = ScanStatus {
scanning: false,
files_found: results.len(),
files_processed: processed,
errors,
};
Ok(status)
}
pub async fn scan_all_roots(
storage: &DynStorageBackend,
ignore_patterns: &[String],
) -> Result<Vec<ScanStatus>> {
scan_all_roots_with_progress(storage, ignore_patterns, None).await
}
pub async fn scan_all_roots_with_progress(
storage: &DynStorageBackend,
ignore_patterns: &[String],
progress: Option<&ScanProgress>,
) -> Result<Vec<ScanStatus>> {
let roots = storage.list_root_dirs().await?;
let mut statuses = Vec::new();
for root in roots {
match scan_directory_with_progress(storage, &root, ignore_patterns, progress).await {
Ok(status) => statuses.push(status),
Err(e) => {
warn!(root = %root.display(), error = %e, "failed to scan root directory");
statuses.push(ScanStatus {
scanning: false,
files_found: 0,
files_processed: 0,
errors: vec![e.to_string()],
});
}
}
}
Ok(statuses)
}
pub struct FileWatcher {
_watcher: Box<dyn Watcher + Send>,
rx: mpsc::Receiver<PathBuf>,
}
impl FileWatcher {
pub fn new(dirs: &[PathBuf]) -> Result<Self> {
let (tx, rx) = mpsc::channel(1024);
// Try the recommended (native) watcher first, fall back to polling
let watcher: Box<dyn Watcher + Send> = match Self::try_native_watcher(dirs, tx.clone()) {
Ok(w) => {
info!("using native filesystem watcher");
w
}
Err(native_err) => {
warn!(error = %native_err, "native watcher failed, falling back to polling");
Self::polling_watcher(dirs, tx)?
}
};
Ok(Self {
_watcher: watcher,
rx,
})
}
fn try_native_watcher(
dirs: &[PathBuf],
tx: mpsc::Sender<PathBuf>,
) -> std::result::Result<Box<dyn Watcher + Send>, notify::Error> {
let tx_clone = tx.clone();
let mut watcher =
notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
if let Ok(event) = res {
for path in event.paths {
if tx_clone.blocking_send(path).is_err() {
tracing::warn!("filesystem watcher channel closed, stopping");
break;
}
}
}
})?;
for dir in dirs {
watcher.watch(dir, RecursiveMode::Recursive)?;
}
Ok(Box::new(watcher))
}
fn polling_watcher(
dirs: &[PathBuf],
tx: mpsc::Sender<PathBuf>,
) -> Result<Box<dyn Watcher + Send>> {
let tx_clone = tx.clone();
let poll_interval = std::time::Duration::from_secs(5);
let config = notify::Config::default().with_poll_interval(poll_interval);
let mut watcher = PollWatcher::new(
move |res: notify::Result<notify::Event>| {
if let Ok(event) = res {
for path in event.paths {
if tx_clone.blocking_send(path).is_err() {
tracing::warn!("filesystem watcher channel closed, stopping");
break;
}
}
}
},
config,
)
.map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?;
for dir in dirs {
watcher
.watch(dir, RecursiveMode::Recursive)
.map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?;
}
Ok(Box::new(watcher))
}
pub async fn next_change(&mut self) -> Option<PathBuf> {
self.rx.recv().await
}
}
pub async fn watch_and_import(
storage: DynStorageBackend,
dirs: Vec<PathBuf>,
ignore_patterns: Vec<String>,
) -> Result<()> {
let mut watcher = FileWatcher::new(&dirs)?;
info!("filesystem watcher started");
while let Some(path) = watcher.next_change().await {
if path.is_file()
&& crate::media_type::MediaType::from_path(&path).is_some()
&& !crate::import::should_ignore(&path, &ignore_patterns)
{
info!(path = %path.display(), "detected file change, importing");
if let Err(e) = import::import_file(&storage, &path).await {
warn!(path = %path.display(), error = %e, "failed to import changed file");
}
}
}
Ok(())
}