pinakes/crates/pinakes-core/src/import.rs
NotAShelf 592a9bcc47
pinakes-core: add error context to tag and collection writes; map serde_json errors to Serialization variant
pinakes-core: distinguish task panics from cancellations in import error
  handling
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Icf5686f34144630ebf1935c47b3979156a6a6964
2026-03-12 19:41:07 +03:00

553 lines
14 KiB
Rust

use std::{
path::{Path, PathBuf},
sync::Arc,
time::SystemTime,
};
use tracing::info;
use crate::{
audit,
error::{PinakesError, Result},
hash::compute_file_hash,
links,
media_type::{BuiltinMediaType, MediaType},
metadata,
model::{
AuditAction,
CustomField,
CustomFieldType,
MediaId,
MediaItem,
StorageMode,
},
plugin::PluginPipeline,
storage::DynStorageBackend,
thumbnail,
};
/// Result of importing a single file.
pub struct ImportResult {
pub media_id: MediaId,
pub was_duplicate: bool,
/// True if the file was skipped because it hasn't changed since last scan
pub was_skipped: bool,
pub path: PathBuf,
}
/// Options for import operations
#[derive(Debug, Clone, Default)]
pub struct ImportOptions {
/// Skip files that haven't changed since last scan (based on mtime)
pub incremental: bool,
/// Force re-import even if mtime hasn't changed
pub force: bool,
/// Photo configuration for toggleable features
pub photo_config: crate::config::PhotoConfig,
}
/// Get the modification time of a file as a Unix timestamp
fn get_file_mtime(path: &Path) -> Option<i64> {
std::fs::metadata(path)
.ok()
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
.map(|d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
}
/// Validates that a path is within configured root directories.
///
/// # Arguments
///
/// * `storage` - Storage backend to query root directories
/// * `path` - Path to validate
///
/// # Returns
///
/// `Ok(())` if path is within roots or no roots configured
///
/// # Errors
///
/// Returns `InvalidOperation` if path is outside all root directories
pub async fn validate_path_in_roots(
storage: &DynStorageBackend,
path: &Path,
) -> Result<()> {
let roots = storage.list_root_dirs().await?;
if roots.is_empty() {
return Ok(());
}
for root in &roots {
if let Ok(canonical_root) = root.canonicalize()
&& path.starts_with(&canonical_root)
{
return Ok(());
}
}
Err(PinakesError::InvalidOperation(format!(
"path {} is not within any configured root directory",
path.display()
)))
}
/// Imports a file using default options.
///
/// # Arguments
///
/// * `storage` - Storage backend
/// * `path` - Path to the file to import
///
/// # Returns
///
/// Import result with media ID and status
///
/// # Errors
///
/// Returns `FileNotFound` if path doesn't exist
pub async fn import_file(
storage: &DynStorageBackend,
path: &Path,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ImportResult> {
import_file_with_options(storage, path, &ImportOptions::default(), pipeline)
.await
}
/// Import a file with configurable options for incremental scanning
///
/// # Errors
///
/// Returns [`PinakesError`] if the file cannot be read, hashed, or stored.
pub async fn import_file_with_options(
storage: &DynStorageBackend,
path: &Path,
options: &ImportOptions,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<ImportResult> {
let path = path.canonicalize()?;
if !path.exists() {
return Err(PinakesError::FileNotFound(path));
}
validate_path_in_roots(storage, &path).await?;
let media_type = if let Some(pl) = pipeline {
pl.resolve_media_type(&path).await
} else {
MediaType::from_path(&path)
}
.ok_or_else(|| PinakesError::UnsupportedMediaType(path.clone()))?;
let current_mtime = get_file_mtime(&path);
// Check for incremental scan: skip if file hasn't changed
if options.incremental
&& !options.force
&& let Some(existing) = storage.get_media_by_path(&path).await?
&& let (Some(stored_mtime), Some(curr_mtime)) =
(existing.file_mtime, current_mtime)
&& stored_mtime == curr_mtime
{
return Ok(ImportResult {
media_id: existing.id,
was_duplicate: false,
was_skipped: true,
path: path.clone(),
});
}
let content_hash = compute_file_hash(&path).await?;
if let Some(existing) = storage.get_media_by_hash(&content_hash).await? {
// Update the mtime even for duplicates so incremental scan works
if current_mtime.is_some() && existing.file_mtime != current_mtime {
let mut updated = existing.clone();
updated.file_mtime = current_mtime;
let _ = storage.update_media(&updated).await;
}
return Ok(ImportResult {
media_id: existing.id,
was_duplicate: true,
was_skipped: false,
path: path.clone(),
});
}
let file_meta = std::fs::metadata(&path)?;
let file_size = file_meta.len();
let extracted = if let Some(pl) = pipeline {
pl.extract_metadata(&path, &media_type).await?
} else {
let path_clone = path.clone();
let media_type_clone = media_type.clone();
tokio::task::spawn_blocking(move || {
metadata::extract_metadata(&path_clone, &media_type_clone)
})
.await
.map_err(|e| PinakesError::MetadataExtraction(e.to_string()))??
};
let file_name = path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let now = chrono::Utc::now();
let media_id = MediaId::new();
// Generate thumbnail for image types
let thumb_path = if let Some(pl) = pipeline {
pl.generate_thumbnail(
media_id,
&path,
&media_type,
&thumbnail::default_thumbnail_dir(),
)
.await?
} else {
let source = path.clone();
let thumb_dir = thumbnail::default_thumbnail_dir();
let media_type_clone = media_type.clone();
tokio::task::spawn_blocking(move || {
thumbnail::generate_thumbnail(
media_id,
&source,
&media_type_clone,
&thumb_dir,
)
})
.await
.map_err(|e| PinakesError::MetadataExtraction(e.to_string()))??
};
// Generate perceptual hash for image files (if enabled in config)
let perceptual_hash = if options.photo_config.generate_perceptual_hash()
&& media_type.category() == crate::media_type::MediaCategory::Image
{
crate::metadata::image::generate_perceptual_hash(&path)
} else {
None
};
// Check if this is a markdown file for link extraction
let is_markdown =
media_type == MediaType::Builtin(BuiltinMediaType::Markdown);
// Capture media type debug string before moving into MediaItem
let media_type_debug = format!("{media_type:?}");
let item = MediaItem {
id: media_id,
path: path.clone(),
file_name,
media_type,
content_hash,
file_size,
title: extracted.title,
artist: extracted.artist,
album: extracted.album,
genre: extracted.genre,
year: extracted.year,
duration_secs: extracted.duration_secs,
description: extracted.description,
thumbnail_path: thumb_path,
custom_fields: std::collections::HashMap::new(),
file_mtime: current_mtime,
// Photo-specific metadata from extraction
date_taken: extracted.date_taken,
latitude: extracted.latitude,
longitude: extracted.longitude,
camera_make: extracted.camera_make,
camera_model: extracted.camera_model,
rating: extracted.rating,
perceptual_hash,
// Managed storage fields - external files use defaults
storage_mode: StorageMode::External,
original_filename: None,
uploaded_at: None,
storage_key: None,
created_at: now,
updated_at: now,
// New items are not deleted
deleted_at: None,
// Links will be extracted separately
links_extracted_at: None,
};
storage.insert_media(&item).await?;
// Extract and store markdown links for markdown files
if is_markdown
&& let Err(e) = extract_and_store_links(storage, media_id, &path).await
{
tracing::warn!(
media_id = %media_id,
path = %path.display(),
error = %e,
"failed to extract markdown links"
);
}
// Store extracted extra metadata as custom fields
for (key, value) in &extracted.extra {
let field = CustomField {
field_type: CustomFieldType::Text,
value: value.clone(),
};
if let Err(e) = storage.set_custom_field(media_id, key, &field).await {
tracing::warn!(
media_id = %media_id,
field = %key,
error = %e,
"failed to store extracted metadata as custom field"
);
}
}
audit::record_action(
storage,
Some(media_id),
AuditAction::Imported,
Some(format!("path={}", path.display())),
)
.await?;
if let Some(pl) = pipeline {
let payload = serde_json::json!({
"media_id": media_id.to_string(),
"path": path.to_string_lossy(),
"media_type": media_type_debug,
});
pl.emit_event("MediaImported", &payload);
}
info!(media_id = %media_id, path = %path.display(), "imported media file");
Ok(ImportResult {
media_id,
was_duplicate: false,
was_skipped: false,
path: path.clone(),
})
}
pub(crate) fn should_ignore(
path: &std::path::Path,
patterns: &[String],
) -> bool {
for component in path.components() {
if let std::path::Component::Normal(name) = component {
let name_str = name.to_string_lossy();
for pattern in patterns {
if pattern.starts_with('.')
&& name_str.starts_with('.')
&& pattern == name_str.as_ref()
{
return true;
}
// Simple glob: ".*" matches any dotfile
if pattern == ".*" && name_str.starts_with('.') {
return true;
}
if name_str == pattern.as_str() {
return true;
}
}
}
}
false
}
/// Default number of concurrent import tasks.
const DEFAULT_IMPORT_CONCURRENCY: usize = 8;
/// Import all supported files in a directory with default options.
///
/// # Errors
///
/// Returns [`PinakesError`] if the directory cannot be read or spawned tasks
/// fail.
pub async fn import_directory(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<std::result::Result<ImportResult, PinakesError>>> {
import_directory_with_options(
storage,
dir,
ignore_patterns,
DEFAULT_IMPORT_CONCURRENCY,
&ImportOptions::default(),
pipeline,
)
.await
}
/// Import all supported files in a directory with a specified concurrency
/// limit.
///
/// # Errors
///
/// Returns [`PinakesError`] if the directory cannot be read or spawned tasks
/// fail.
pub async fn import_directory_with_concurrency(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
concurrency: usize,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<std::result::Result<ImportResult, PinakesError>>> {
import_directory_with_options(
storage,
dir,
ignore_patterns,
concurrency,
&ImportOptions::default(),
pipeline,
)
.await
}
/// Import a directory with full options including incremental scanning support.
///
/// # Errors
///
/// Returns [`PinakesError`] if the directory cannot be read or spawned tasks
/// fail.
pub async fn import_directory_with_options(
storage: &DynStorageBackend,
dir: &Path,
ignore_patterns: &[String],
concurrency: usize,
options: &ImportOptions,
pipeline: Option<&Arc<PluginPipeline>>,
) -> Result<Vec<std::result::Result<ImportResult, PinakesError>>> {
let concurrency = concurrency.clamp(1, 256);
let dir = dir.to_path_buf();
let patterns = ignore_patterns.to_vec();
let options = options.clone();
let pipeline = pipeline.cloned();
let entries: Vec<PathBuf> = {
let dir = dir.clone();
tokio::task::spawn_blocking(move || {
walkdir::WalkDir::new(&dir)
.follow_links(true)
.into_iter()
.filter_map(std::result::Result::ok)
.filter(|e| !e.file_type().is_dir())
.filter(|e| MediaType::from_path(e.path()).is_some())
.filter(|e| !should_ignore(e.path(), &patterns))
.map(|e| e.path().to_path_buf())
.collect()
})
.await
.map_err(|e| PinakesError::Io(std::io::Error::other(e)))?
};
let mut results = Vec::with_capacity(entries.len());
let mut join_set = tokio::task::JoinSet::new();
for entry_path in entries {
let storage = Arc::clone(storage);
let path = entry_path.clone();
let opts = options.clone();
let pl = pipeline.clone();
join_set.spawn(async move {
let result =
import_file_with_options(&storage, &path, &opts, pl.as_ref()).await;
(path, result)
});
// Limit concurrency by draining when we hit the cap
if join_set.len() >= concurrency
&& let Some(result) = join_set.join_next().await
{
collect_import_result(result, &mut results);
}
}
// Drain remaining tasks
while let Some(result) = join_set.join_next().await {
collect_import_result(result, &mut results);
}
Ok(results)
}
fn collect_import_result(
join_result: std::result::Result<
(PathBuf, Result<ImportResult>),
tokio::task::JoinError,
>,
results: &mut Vec<std::result::Result<ImportResult, PinakesError>>,
) {
match join_result {
Ok((_path, Ok(r))) => results.push(Ok(r)),
Ok((path, Err(e))) => {
tracing::warn!(path = %path.display(), error = %e, "failed to import file");
results.push(Err(e));
},
Err(join_err) => {
if join_err.is_panic() {
tracing::error!(error = %join_err, "import task panicked");
} else {
tracing::warn!(error = %join_err, "import task was cancelled");
}
results.push(Err(PinakesError::InvalidOperation(format!(
"import task failed: {join_err}"
))));
},
}
}
/// Extract markdown links from a file and store them in the database.
async fn extract_and_store_links(
storage: &DynStorageBackend,
media_id: MediaId,
path: &Path,
) -> Result<()> {
let content = tokio::fs::read_to_string(path).await.map_err(|e| {
PinakesError::Io(std::io::Error::other(format!(
"failed to read markdown file for link extraction: {e}"
)))
})?;
// Extract links
let extracted_links = links::extract_links(media_id, &content);
if extracted_links.is_empty() {
// No links found, just mark as extracted
storage.mark_links_extracted(media_id).await?;
return Ok(());
}
// Clear any existing links for this media (in case of re-import)
storage.clear_links_for_media(media_id).await?;
// Save extracted links
storage
.save_markdown_links(media_id, &extracted_links)
.await?;
// Mark links as extracted
storage.mark_links_extracted(media_id).await?;
tracing::debug!(
media_id = %media_id,
link_count = extracted_links.len(),
"extracted markdown links"
);
Ok(())
}