pinakes/crates/pinakes-server/src/main.rs
NotAShelf 2f61d7e9fa
pinakes-server: add chunked upload manager to app state with periodic cleanup
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I3462c21ff359b4e3a7eca9a82abd50086a6a6964
2026-03-06 18:29:15 +03:00

769 lines
30 KiB
Rust

use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Result;
use axum::Router;
use axum::response::Redirect;
use axum::routing::any;
use clap::Parser;
use tokio::sync::RwLock;
use tracing::info;
use tracing_subscriber::EnvFilter;
use pinakes_core::config::Config;
use pinakes_core::storage::StorageBackend;
use pinakes_server::app;
use pinakes_server::state::AppState;
/// Pinakes media cataloging server
#[derive(Parser)]
#[command(name = "pinakes-server", version, about)]
struct Cli {
/// Path to configuration file
#[arg(short, long, env = "PINAKES_CONFIG")]
config: Option<PathBuf>,
/// Override listen host
#[arg(long)]
host: Option<String>,
/// Override listen port
#[arg(short, long)]
port: Option<u16>,
/// Set log level (trace, debug, info, warn, error)
#[arg(long, default_value = "info")]
log_level: String,
/// Log output format (compact, full, pretty, json)
#[arg(long, default_value = "compact")]
log_format: String,
/// Run database migrations only, then exit
#[arg(long)]
migrate_only: bool,
}
/// Resolve the configuration file path.
/// Returns (path, was_explicit) where was_explicit indicates if the path was
/// explicitly provided by the user (vs discovered).
fn resolve_config_path(explicit: Option<&std::path::Path>) -> (PathBuf, bool) {
if let Some(path) = explicit {
return (path.to_path_buf(), true);
}
// Check current directory
let local = PathBuf::from("pinakes.toml");
if local.exists() {
return (local, false);
}
// XDG default
(Config::default_config_path(), false)
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
// Initialize logging
let env_filter = EnvFilter::try_new(&cli.log_level).unwrap_or_else(|_| EnvFilter::new("info"));
match cli.log_format.as_str() {
"json" => {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.json()
.init();
}
"pretty" => {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.pretty()
.init();
}
"full" => {
tracing_subscriber::fmt().with_env_filter(env_filter).init();
}
_ => {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.compact()
.init();
}
}
let (config_path, was_explicit) = resolve_config_path(cli.config.as_deref());
let mut config = if config_path.exists() {
info!(path = %config_path.display(), "loading configuration from file");
Config::from_file(&config_path)?
} else if was_explicit {
// User explicitly provided a config path that doesn't exist - this is an error
return Err(anyhow::anyhow!(
"configuration file not found: {}",
config_path.display()
));
} else {
info!(
"using default configuration (no config file found at {})",
config_path.display()
);
Config::default()
};
config.ensure_dirs()?;
config
.validate()
.map_err(|e| anyhow::anyhow!("invalid configuration: {e}"))?;
// Warn about authentication configuration
if config.server.authentication_disabled {
tracing::warn!(
"⚠️ AUTHENTICATION IS DISABLED - All requests will be allowed without authentication!"
);
tracing::warn!("⚠️ This is INSECURE and should only be used for development.");
} else {
let has_api_key = config
.server
.api_key
.as_ref()
.is_some_and(|k| !k.is_empty());
let has_accounts = !config.accounts.users.is_empty();
if !has_api_key && !has_accounts {
tracing::error!("⚠️ No authentication method configured!");
}
}
// Apply CLI overrides
if let Some(host) = cli.host {
config.server.host = host;
}
if let Some(port) = cli.port {
config.server.port = port;
}
// Storage backend initialization
let storage: pinakes_core::storage::DynStorageBackend = match config.storage.backend {
pinakes_core::config::StorageBackendType::Sqlite => {
let sqlite_config = config.storage.sqlite.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"sqlite storage selected but [storage.sqlite] config section missing"
)
})?;
info!(path = %sqlite_config.path.display(), "initializing sqlite storage");
let backend = pinakes_core::storage::sqlite::SqliteBackend::new(&sqlite_config.path)?;
backend.run_migrations().await?;
Arc::new(backend)
}
pinakes_core::config::StorageBackendType::Postgres => {
let pg_config = config.storage.postgres.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"postgres storage selected but [storage.postgres] config section missing"
)
})?;
info!(host = %pg_config.host, port = pg_config.port, database = %pg_config.database, "initializing postgres storage");
let backend = pinakes_core::storage::postgres::PostgresBackend::new(pg_config).await?;
backend.run_migrations().await?;
Arc::new(backend)
}
};
if cli.migrate_only {
info!("migrations complete, exiting");
return Ok(());
}
// Register root directories
for root in &config.directories.roots {
if root.exists() {
storage.add_root_dir(root.clone()).await?;
info!(path = %root.display(), "registered root directory");
} else {
tracing::warn!(path = %root.display(), "root directory does not exist, skipping");
}
}
// Start filesystem watcher if configured
if config.scanning.watch {
let watch_storage = storage.clone();
let watch_dirs = config.directories.roots.clone();
let watch_ignore = config.scanning.ignore_patterns.clone();
tokio::spawn(async move {
if let Err(e) =
pinakes_core::scan::watch_and_import(watch_storage, watch_dirs, watch_ignore).await
{
tracing::error!(error = %e, "filesystem watcher failed");
}
});
info!("filesystem watcher started");
}
let addr = format!("{}:{}", config.server.host, config.server.port);
// Initialize transcode service early so the job queue can reference it
let transcode_service: Option<Arc<pinakes_core::transcode::TranscodeService>> =
if config.transcoding.enabled {
Some(Arc::new(pinakes_core::transcode::TranscodeService::new(
config.transcoding.clone(),
)))
} else {
None
};
// Initialize job queue with executor
let job_storage = storage.clone();
let job_config = config.clone();
let job_transcode = transcode_service.clone();
let job_queue = pinakes_core::jobs::JobQueue::new(
config.jobs.worker_count,
move |job_id, kind, cancel, jobs| {
let storage = job_storage.clone();
let config = job_config.clone();
let transcode_svc = job_transcode.clone();
tokio::spawn(async move {
use pinakes_core::jobs::{JobKind, JobQueue};
match kind {
JobKind::Scan { path } => {
let ignore = config.scanning.ignore_patterns.clone();
let res = if let Some(p) = path {
pinakes_core::scan::scan_directory(&storage, &p, &ignore).await
} else {
pinakes_core::scan::scan_all_roots(&storage, &ignore)
.await
.map(|statuses| {
let total_found: usize =
statuses.iter().map(|s| s.files_found).sum();
let total_processed: usize =
statuses.iter().map(|s| s.files_processed).sum();
let all_errors: Vec<String> =
statuses.into_iter().flat_map(|s| s.errors).collect();
pinakes_core::scan::ScanStatus {
scanning: false,
files_found: total_found,
files_processed: total_processed,
files_skipped: 0,
errors: all_errors,
}
})
};
match res {
Ok(status) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::json!({
"files_found": status.files_found,
"files_processed": status.files_processed,
"errors": status.errors,
}),
)
.await;
}
Err(e) => {
JobQueue::fail(&jobs, job_id, e.to_string()).await;
}
}
}
JobKind::GenerateThumbnails { media_ids } => {
let thumb_dir = pinakes_core::thumbnail::default_thumbnail_dir();
let thumb_config = config.thumbnails.clone();
let total = media_ids.len();
let mut generated = 0usize;
let mut errors = Vec::new();
for (i, mid) in media_ids.iter().enumerate() {
if cancel.is_cancelled() {
break;
}
JobQueue::update_progress(
&jobs,
job_id,
i as f32 / total as f32,
format!("{}/{}", i, total),
)
.await;
match storage.get_media(*mid).await {
Ok(item) => {
let source = item.path.clone();
let mt = item.media_type.clone();
let id = item.id;
let td = thumb_dir.clone();
let tc = thumb_config.clone();
let res = tokio::task::spawn_blocking(move || {
pinakes_core::thumbnail::generate_thumbnail_with_config(
id, &source, mt, &td, &tc,
)
})
.await;
match res {
Ok(Ok(Some(path))) => {
let mut updated = item;
updated.thumbnail_path = Some(path);
let _ = storage.update_media(&updated).await;
generated += 1;
}
Ok(Ok(None)) => {}
Ok(Err(e)) => errors.push(format!("{}: {}", mid, e)),
Err(e) => errors.push(format!("{}: {}", mid, e)),
}
}
Err(e) => errors.push(format!("{}: {}", mid, e)),
}
}
JobQueue::complete(
&jobs,
job_id,
serde_json::json!({
"generated": generated, "errors": errors
}),
)
.await;
}
JobKind::VerifyIntegrity { media_ids } => {
let ids = if media_ids.is_empty() {
None
} else {
Some(media_ids.as_slice())
};
match pinakes_core::integrity::verify_integrity(&storage, ids).await {
Ok(report) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::to_value(&report).unwrap_or_default(),
)
.await;
}
Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await,
}
}
JobKind::OrphanDetection => {
match pinakes_core::integrity::detect_orphans(&storage).await {
Ok(report) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::to_value(&report).unwrap_or_default(),
)
.await;
}
Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await,
}
}
JobKind::CleanupThumbnails => {
let thumb_dir = pinakes_core::thumbnail::default_thumbnail_dir();
match pinakes_core::integrity::cleanup_orphaned_thumbnails(
&storage, &thumb_dir,
)
.await
{
Ok(removed) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::json!({ "removed": removed }),
)
.await;
}
Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await,
}
}
JobKind::Export {
format,
destination,
} => {
match pinakes_core::export::export_library(&storage, &format, &destination)
.await
{
Ok(result) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::to_value(&result).unwrap_or_default(),
)
.await;
}
Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await,
}
}
JobKind::Transcode { media_id, profile } => {
if let Some(ref svc) = transcode_svc {
match storage.get_media(media_id).await {
Ok(item) => {
match svc
.start_transcode(
media_id,
&item.path,
&profile,
item.duration_secs,
&storage,
)
.await
{
Ok(session_id) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::json!({"session_id": session_id.to_string()}),
)
.await;
}
Err(e) => {
JobQueue::fail(&jobs, job_id, e.to_string()).await
}
}
}
Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await,
}
} else {
JobQueue::fail(&jobs, job_id, "transcoding is not enabled".to_string())
.await;
}
}
JobKind::Enrich { media_ids } => {
// Enrichment job placeholder
JobQueue::complete(
&jobs,
job_id,
serde_json::json!({"media_ids": media_ids.len(), "status": "not_implemented"}),
)
.await;
}
JobKind::CleanupAnalytics => {
let before = chrono::Utc::now() - chrono::Duration::days(90);
match storage.cleanup_old_events(before).await {
Ok(count) => {
JobQueue::complete(
&jobs,
job_id,
serde_json::json!({"cleaned_up": count}),
)
.await;
}
Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await,
}
}
};
drop(cancel);
})
},
);
// Initialize cache layer
let cache = std::sync::Arc::new(pinakes_core::cache::CacheLayer::new(
config.jobs.cache_ttl_secs,
));
// Initialize plugin manager if plugins are enabled (before moving config into Arc)
let plugin_manager = if config.plugins.enabled {
match pinakes_core::plugin::PluginManager::new(
config.plugins.data_dir.clone(),
config.plugins.cache_dir.clone(),
config.plugins.clone().into(),
) {
Ok(pm) => {
tracing::info!("Plugin manager initialized");
Some(Arc::new(pm))
}
Err(e) => {
tracing::warn!("Failed to initialize plugin manager: {}", e);
None
}
}
} else {
tracing::info!("Plugins disabled in configuration");
None
};
// Initialize scheduler with cancellation support
let shutdown_token = tokio_util::sync::CancellationToken::new();
let config_arc = Arc::new(RwLock::new(config));
let scheduler = pinakes_core::scheduler::TaskScheduler::new(
job_queue.clone(),
shutdown_token.clone(),
config_arc.clone(),
Some(config_path.clone()),
);
let scheduler = Arc::new(scheduler);
// Restore saved scheduler state from config
scheduler.restore_state().await;
// Spawn scheduler background loop
{
let scheduler = scheduler.clone();
tokio::spawn(async move {
scheduler.run().await;
});
}
// Initialize managed storage service if enabled
let managed_storage = {
let config_read = config_arc.read().await;
if config_read.managed_storage.enabled {
let service = pinakes_core::managed_storage::ManagedStorageService::new(
config_read.managed_storage.storage_dir.clone(),
config_read.managed_storage.max_upload_size,
config_read.managed_storage.verify_on_read,
);
match service.init().await {
Ok(()) => {
info!(
path = %config_read.managed_storage.storage_dir.display(),
"managed storage initialized"
);
Some(Arc::new(service))
}
Err(e) => {
tracing::error!(error = %e, "failed to initialize managed storage");
None
}
}
} else {
tracing::info!("managed storage disabled in configuration");
None
}
};
// Initialize chunked upload manager if sync is enabled
let chunked_upload_manager = {
let config_read = config_arc.read().await;
if config_read.sync.enabled {
let manager = pinakes_core::sync::ChunkedUploadManager::new(
config_read.sync.temp_upload_dir.clone(),
);
match manager.init().await {
Ok(()) => {
info!(
path = %config_read.sync.temp_upload_dir.display(),
"chunked upload manager initialized"
);
Some(Arc::new(manager))
}
Err(e) => {
tracing::error!(error = %e, "failed to initialize chunked upload manager");
None
}
}
} else {
tracing::info!("sync disabled, chunked upload manager not initialized");
None
}
};
let state = AppState {
storage: storage.clone(),
config: config_arc.clone(),
config_path: Some(config_path),
scan_progress: pinakes_core::scan::ScanProgress::new(),
job_queue,
cache,
scheduler,
plugin_manager,
transcode_service,
managed_storage,
chunked_upload_manager,
};
// Periodic session cleanup (every 15 minutes)
{
let storage_clone = storage.clone();
let cancel = shutdown_token.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(15 * 60));
loop {
tokio::select! {
_ = interval.tick() => {
match storage_clone.delete_expired_sessions().await {
Ok(count) if count > 0 => {
tracing::info!(count = count, "cleaned up expired sessions");
}
Err(e) => {
tracing::error!(error = %e, "failed to cleanup expired sessions");
}
_ => {}
}
}
_ = cancel.cancelled() => {
break;
}
}
}
});
}
// Periodic chunked upload cleanup (every hour)
if let Some(ref manager) = state.chunked_upload_manager {
let manager_clone = manager.clone();
let cancel = shutdown_token.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 60));
loop {
tokio::select! {
_ = interval.tick() => {
match manager_clone.cleanup_expired(48).await {
Ok(count) if count > 0 => {
tracing::info!(count = count, "cleaned up expired upload temp files");
}
Err(e) => {
tracing::error!(error = %e, "failed to cleanup expired upload temp files");
}
_ => {}
}
}
_ = cancel.cancelled() => {
break;
}
}
}
});
}
let config_read = config_arc.read().await;
let tls_config = config_read.server.tls.clone();
drop(config_read);
// Create router with TLS config for HSTS headers
let router = if tls_config.enabled {
app::create_router_with_tls(state, Some(&tls_config))
} else {
app::create_router(state)
};
if tls_config.enabled {
// TLS/HTTPS mode
let cert_path = tls_config
.cert_path
.as_ref()
.ok_or_else(|| anyhow::anyhow!("TLS enabled but cert_path not specified"))?;
let key_path = tls_config
.key_path
.as_ref()
.ok_or_else(|| anyhow::anyhow!("TLS enabled but key_path not specified"))?;
info!(addr = %addr, cert = %cert_path.display(), "server listening with TLS");
// Configure TLS
let tls_config_builder =
axum_server::tls_rustls::RustlsConfig::from_pem_file(cert_path, key_path).await?;
// Start HTTP redirect server if configured
if tls_config.redirect_http {
let http_addr = format!(
"{}:{}",
config_arc.read().await.server.host,
tls_config.http_port
);
let https_port = config_arc.read().await.server.port;
let https_host = config_arc.read().await.server.host.clone();
let redirect_router = create_https_redirect_router(https_host, https_port);
let shutdown = shutdown_token.clone();
tokio::spawn(async move {
let listener = match tokio::net::TcpListener::bind(&http_addr).await {
Ok(l) => l,
Err(e) => {
tracing::warn!(error = %e, addr = %http_addr, "failed to bind HTTP redirect listener");
return;
}
};
info!(addr = %http_addr, "HTTP redirect server listening");
let server = axum::serve(
listener,
redirect_router.into_make_service_with_connect_info::<std::net::SocketAddr>(),
);
tokio::select! {
result = server => {
if let Err(e) = result {
tracing::warn!(error = %e, "HTTP redirect server error");
}
}
_ = shutdown.cancelled() => {
info!("HTTP redirect server shutting down");
}
}
});
}
// Start HTTPS server with graceful shutdown via Handle
let addr_parsed: std::net::SocketAddr = addr.parse()?;
let handle = axum_server::Handle::new();
let shutdown_handle = handle.clone();
// Spawn a task to trigger graceful shutdown
tokio::spawn(async move {
shutdown_signal().await;
shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(30)));
});
axum_server::bind_rustls(addr_parsed, tls_config_builder)
.handle(handle)
.serve(router.into_make_service_with_connect_info::<std::net::SocketAddr>())
.await?;
} else {
// Plain HTTP mode
info!(addr = %addr, "server listening");
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(
listener,
router.into_make_service_with_connect_info::<std::net::SocketAddr>(),
)
.with_graceful_shutdown(shutdown_signal())
.await?;
}
shutdown_token.cancel();
info!("server shut down");
Ok(())
}
/// Create a router that redirects all HTTP requests to HTTPS
fn create_https_redirect_router(https_host: String, https_port: u16) -> Router {
Router::new().fallback(any(move |uri: axum::http::Uri| {
let https_host = https_host.clone();
async move {
let path_and_query = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/");
let https_url = if https_port == 443 {
format!("https://{}{}", https_host, path_and_query)
} else {
format!("https://{}:{}{}", https_host, https_port, path_and_query)
};
Redirect::permanent(&https_url)
}
}))
}
async fn shutdown_signal() {
let ctrl_c = async {
match tokio::signal::ctrl_c().await {
Ok(()) => {}
Err(e) => {
tracing::warn!(error = %e, "failed to install Ctrl+C handler");
std::future::pending::<()>().await;
}
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut signal) => {
signal.recv().await;
}
Err(e) => {
tracing::warn!(error = %e, "failed to install SIGTERM handler");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => info!("received Ctrl+C, shutting down"),
_ = terminate => info!("received SIGTERM, shutting down"),
}
}