initial commit
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I4a6b498153eccd5407510dd541b7f4816a6a6964
This commit is contained in:
commit
6a73d11c4b
124 changed files with 34856 additions and 0 deletions
283
crates/pinakes-core/src/scan.rs
Normal file
283
crates/pinakes-core/src/scan.rs
Normal file
|
|
@ -0,0 +1,283 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use notify::{PollWatcher, RecursiveMode, Watcher};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::import;
|
||||
use crate::storage::DynStorageBackend;
|
||||
|
||||
pub struct ScanStatus {
|
||||
pub scanning: bool,
|
||||
pub files_found: usize,
|
||||
pub files_processed: usize,
|
||||
pub errors: Vec<String>,
|
||||
}
|
||||
|
||||
/// Shared scan progress that can be read by the status endpoint while a scan runs.
|
||||
#[derive(Clone)]
|
||||
pub struct ScanProgress {
|
||||
pub is_scanning: Arc<AtomicBool>,
|
||||
pub files_found: Arc<AtomicUsize>,
|
||||
pub files_processed: Arc<AtomicUsize>,
|
||||
pub error_count: Arc<AtomicUsize>,
|
||||
pub error_messages: Arc<Mutex<Vec<String>>>,
|
||||
}
|
||||
|
||||
const MAX_STORED_ERRORS: usize = 100;
|
||||
|
||||
impl ScanProgress {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
is_scanning: Arc::new(AtomicBool::new(false)),
|
||||
files_found: Arc::new(AtomicUsize::new(0)),
|
||||
files_processed: Arc::new(AtomicUsize::new(0)),
|
||||
error_count: Arc::new(AtomicUsize::new(0)),
|
||||
error_messages: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn snapshot(&self) -> ScanStatus {
|
||||
let errors = self
|
||||
.error_messages
|
||||
.lock()
|
||||
.map(|v| v.clone())
|
||||
.unwrap_or_default();
|
||||
ScanStatus {
|
||||
scanning: self.is_scanning.load(Ordering::Acquire),
|
||||
files_found: self.files_found.load(Ordering::Acquire),
|
||||
files_processed: self.files_processed.load(Ordering::Acquire),
|
||||
errors,
|
||||
}
|
||||
}
|
||||
|
||||
fn begin(&self) {
|
||||
self.is_scanning.store(true, Ordering::Release);
|
||||
self.files_found.store(0, Ordering::Release);
|
||||
self.files_processed.store(0, Ordering::Release);
|
||||
self.error_count.store(0, Ordering::Release);
|
||||
if let Ok(mut msgs) = self.error_messages.lock() {
|
||||
msgs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
fn record_error(&self, message: String) {
|
||||
self.error_count.fetch_add(1, Ordering::Release);
|
||||
if let Ok(mut msgs) = self.error_messages.lock()
|
||||
&& msgs.len() < MAX_STORED_ERRORS
|
||||
{
|
||||
msgs.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
fn finish(&self) {
|
||||
self.is_scanning.store(false, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ScanProgress {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn scan_directory(
|
||||
storage: &DynStorageBackend,
|
||||
dir: &Path,
|
||||
ignore_patterns: &[String],
|
||||
) -> Result<ScanStatus> {
|
||||
scan_directory_with_progress(storage, dir, ignore_patterns, None).await
|
||||
}
|
||||
|
||||
pub async fn scan_directory_with_progress(
|
||||
storage: &DynStorageBackend,
|
||||
dir: &Path,
|
||||
ignore_patterns: &[String],
|
||||
progress: Option<&ScanProgress>,
|
||||
) -> Result<ScanStatus> {
|
||||
info!(dir = %dir.display(), "starting directory scan");
|
||||
|
||||
if let Some(p) = progress {
|
||||
p.begin();
|
||||
}
|
||||
|
||||
let results = import::import_directory(storage, dir, ignore_patterns).await?;
|
||||
// Note: for configurable concurrency, use import_directory_with_concurrency directly
|
||||
|
||||
let mut errors = Vec::new();
|
||||
let mut processed = 0;
|
||||
for result in &results {
|
||||
match result {
|
||||
Ok(_) => processed += 1,
|
||||
Err(e) => {
|
||||
let msg = e.to_string();
|
||||
if let Some(p) = progress {
|
||||
p.record_error(msg.clone());
|
||||
}
|
||||
errors.push(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(p) = progress {
|
||||
p.files_found.store(results.len(), Ordering::Release);
|
||||
p.files_processed.store(processed, Ordering::Release);
|
||||
p.finish();
|
||||
}
|
||||
|
||||
let status = ScanStatus {
|
||||
scanning: false,
|
||||
files_found: results.len(),
|
||||
files_processed: processed,
|
||||
errors,
|
||||
};
|
||||
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
pub async fn scan_all_roots(
|
||||
storage: &DynStorageBackend,
|
||||
ignore_patterns: &[String],
|
||||
) -> Result<Vec<ScanStatus>> {
|
||||
scan_all_roots_with_progress(storage, ignore_patterns, None).await
|
||||
}
|
||||
|
||||
pub async fn scan_all_roots_with_progress(
|
||||
storage: &DynStorageBackend,
|
||||
ignore_patterns: &[String],
|
||||
progress: Option<&ScanProgress>,
|
||||
) -> Result<Vec<ScanStatus>> {
|
||||
let roots = storage.list_root_dirs().await?;
|
||||
let mut statuses = Vec::new();
|
||||
|
||||
for root in roots {
|
||||
match scan_directory_with_progress(storage, &root, ignore_patterns, progress).await {
|
||||
Ok(status) => statuses.push(status),
|
||||
Err(e) => {
|
||||
warn!(root = %root.display(), error = %e, "failed to scan root directory");
|
||||
statuses.push(ScanStatus {
|
||||
scanning: false,
|
||||
files_found: 0,
|
||||
files_processed: 0,
|
||||
errors: vec![e.to_string()],
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(statuses)
|
||||
}
|
||||
|
||||
pub struct FileWatcher {
|
||||
_watcher: Box<dyn Watcher + Send>,
|
||||
rx: mpsc::Receiver<PathBuf>,
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
pub fn new(dirs: &[PathBuf]) -> Result<Self> {
|
||||
let (tx, rx) = mpsc::channel(1024);
|
||||
|
||||
// Try the recommended (native) watcher first, fall back to polling
|
||||
let watcher: Box<dyn Watcher + Send> = match Self::try_native_watcher(dirs, tx.clone()) {
|
||||
Ok(w) => {
|
||||
info!("using native filesystem watcher");
|
||||
w
|
||||
}
|
||||
Err(native_err) => {
|
||||
warn!(error = %native_err, "native watcher failed, falling back to polling");
|
||||
Self::polling_watcher(dirs, tx)?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
_watcher: watcher,
|
||||
rx,
|
||||
})
|
||||
}
|
||||
|
||||
fn try_native_watcher(
|
||||
dirs: &[PathBuf],
|
||||
tx: mpsc::Sender<PathBuf>,
|
||||
) -> std::result::Result<Box<dyn Watcher + Send>, notify::Error> {
|
||||
let tx_clone = tx.clone();
|
||||
let mut watcher =
|
||||
notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
|
||||
if let Ok(event) = res {
|
||||
for path in event.paths {
|
||||
if tx_clone.blocking_send(path).is_err() {
|
||||
tracing::warn!("filesystem watcher channel closed, stopping");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
for dir in dirs {
|
||||
watcher.watch(dir, RecursiveMode::Recursive)?;
|
||||
}
|
||||
|
||||
Ok(Box::new(watcher))
|
||||
}
|
||||
|
||||
fn polling_watcher(
|
||||
dirs: &[PathBuf],
|
||||
tx: mpsc::Sender<PathBuf>,
|
||||
) -> Result<Box<dyn Watcher + Send>> {
|
||||
let tx_clone = tx.clone();
|
||||
let poll_interval = std::time::Duration::from_secs(5);
|
||||
let config = notify::Config::default().with_poll_interval(poll_interval);
|
||||
|
||||
let mut watcher = PollWatcher::new(
|
||||
move |res: notify::Result<notify::Event>| {
|
||||
if let Ok(event) = res {
|
||||
for path in event.paths {
|
||||
if tx_clone.blocking_send(path).is_err() {
|
||||
tracing::warn!("filesystem watcher channel closed, stopping");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
config,
|
||||
)
|
||||
.map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?;
|
||||
|
||||
for dir in dirs {
|
||||
watcher
|
||||
.watch(dir, RecursiveMode::Recursive)
|
||||
.map_err(|e| crate::error::PinakesError::Io(std::io::Error::other(e)))?;
|
||||
}
|
||||
|
||||
Ok(Box::new(watcher))
|
||||
}
|
||||
|
||||
pub async fn next_change(&mut self) -> Option<PathBuf> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn watch_and_import(
|
||||
storage: DynStorageBackend,
|
||||
dirs: Vec<PathBuf>,
|
||||
ignore_patterns: Vec<String>,
|
||||
) -> Result<()> {
|
||||
let mut watcher = FileWatcher::new(&dirs)?;
|
||||
info!("filesystem watcher started");
|
||||
|
||||
while let Some(path) = watcher.next_change().await {
|
||||
if path.is_file()
|
||||
&& crate::media_type::MediaType::from_path(&path).is_some()
|
||||
&& !crate::import::should_ignore(&path, &ignore_patterns)
|
||||
{
|
||||
info!(path = %path.display(), "detected file change, importing");
|
||||
if let Err(e) = import::import_file(&storage, &path).await {
|
||||
warn!(path = %path.display(), error = %e, "failed to import changed file");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue