use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use axum::{Router, response::Redirect, routing::any}; use clap::Parser; use pinakes_core::{config::Config, storage::StorageBackend}; use pinakes_server::{app, state::AppState}; use tokio::sync::RwLock; use tracing::info; use tracing_subscriber::EnvFilter; /// Pinakes media cataloging server #[derive(Parser)] #[command(name = "pinakes-server", version, about)] struct Cli { /// Path to configuration file #[arg(short, long, env = "PINAKES_CONFIG")] config: Option, /// Override listen host #[arg(long)] host: Option, /// Override listen port #[arg(short, long)] port: Option, /// Set log level (trace, debug, info, warn, error) #[arg(long, default_value = "info")] log_level: String, /// Log output format (compact, full, pretty, json) #[arg(long, default_value = "compact")] log_format: String, /// Run database migrations only, then exit #[arg(long)] migrate_only: bool, } /// Resolve the configuration file path. /// Returns (path, was_explicit) where was_explicit indicates if the path was /// explicitly provided by the user (vs discovered). fn resolve_config_path(explicit: Option<&std::path::Path>) -> (PathBuf, bool) { if let Some(path) = explicit { return (path.to_path_buf(), true); } // Check current directory let local = PathBuf::from("pinakes.toml"); if local.exists() { return (local, false); } // XDG default (Config::default_config_path(), false) } #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); // Initialize logging let env_filter = EnvFilter::try_new(&cli.log_level) .unwrap_or_else(|_| EnvFilter::new("info")); match cli.log_format.as_str() { "json" => { tracing_subscriber::fmt() .with_env_filter(env_filter) .json() .init(); }, "pretty" => { tracing_subscriber::fmt() .with_env_filter(env_filter) .pretty() .init(); }, "full" => { tracing_subscriber::fmt().with_env_filter(env_filter).init(); }, _ => { tracing_subscriber::fmt() .with_env_filter(env_filter) .compact() .init(); }, } let (config_path, was_explicit) = resolve_config_path(cli.config.as_deref()); let mut config = if config_path.exists() { info!(path = %config_path.display(), "loading configuration from file"); Config::from_file(&config_path)? } else if was_explicit { // User explicitly provided a config path that doesn't exist - this is an // error return Err(anyhow::anyhow!( "configuration file not found: {}", config_path.display() )); } else { info!( "using default configuration (no config file found at {})", config_path.display() ); Config::default() }; config.ensure_dirs()?; config .validate() .map_err(|e| anyhow::anyhow!("invalid configuration: {e}"))?; // Warn about authentication configuration if config.server.authentication_disabled { tracing::warn!( "⚠️ AUTHENTICATION IS DISABLED - All requests will be allowed without \ authentication!" ); tracing::warn!( "⚠️ This is INSECURE and should only be used for development." ); } else { let has_api_key = config .server .api_key .as_ref() .is_some_and(|k| !k.is_empty()); let has_accounts = !config.accounts.users.is_empty(); if !has_api_key && !has_accounts { tracing::error!("⚠️ No authentication method configured!"); } } // Apply CLI overrides if let Some(host) = cli.host { config.server.host = host; } if let Some(port) = cli.port { config.server.port = port; } // Storage backend initialization let storage: pinakes_core::storage::DynStorageBackend = match config .storage .backend { pinakes_core::config::StorageBackendType::Sqlite => { let sqlite_config = config.storage.sqlite.as_ref().ok_or_else(|| { anyhow::anyhow!( "sqlite storage selected but [storage.sqlite] config section missing" ) })?; info!(path = %sqlite_config.path.display(), "initializing sqlite storage"); let backend = pinakes_core::storage::sqlite::SqliteBackend::new(&sqlite_config.path)?; backend.run_migrations().await?; Arc::new(backend) }, pinakes_core::config::StorageBackendType::Postgres => { let pg_config = config.storage.postgres.as_ref().ok_or_else(|| { anyhow::anyhow!( "postgres storage selected but [storage.postgres] config section \ missing" ) })?; info!(host = %pg_config.host, port = pg_config.port, database = %pg_config.database, "initializing postgres storage"); let backend = pinakes_core::storage::postgres::PostgresBackend::new(pg_config) .await?; backend.run_migrations().await?; Arc::new(backend) }, }; if cli.migrate_only { info!("migrations complete, exiting"); return Ok(()); } // Register root directories for root in &config.directories.roots { if root.exists() { storage.add_root_dir(root.clone()).await?; info!(path = %root.display(), "registered root directory"); } else { tracing::warn!(path = %root.display(), "root directory does not exist, skipping"); } } // Start filesystem watcher if configured if config.scanning.watch { let watch_storage = storage.clone(); let watch_dirs = config.directories.roots.clone(); let watch_ignore = config.scanning.ignore_patterns.clone(); tokio::spawn(async move { if let Err(e) = pinakes_core::scan::watch_and_import( watch_storage, watch_dirs, watch_ignore, ) .await { tracing::error!(error = %e, "filesystem watcher failed"); } }); info!("filesystem watcher started"); } let addr = format!("{}:{}", config.server.host, config.server.port); // Initialize transcode service early so the job queue can reference it let transcode_service: Option< Arc, > = if config.transcoding.enabled { Some(Arc::new(pinakes_core::transcode::TranscodeService::new( config.transcoding.clone(), ))) } else { None }; // Initialize job queue with executor let job_storage = storage.clone(); let job_config = config.clone(); let job_transcode = transcode_service.clone(); let job_queue = pinakes_core::jobs::JobQueue::new( config.jobs.worker_count, move |job_id, kind, cancel, jobs| { let storage = job_storage.clone(); let config = job_config.clone(); let transcode_svc = job_transcode.clone(); tokio::spawn(async move { use pinakes_core::jobs::{JobKind, JobQueue}; match kind { JobKind::Scan { path } => { let ignore = config.scanning.ignore_patterns.clone(); let res = if let Some(p) = path { pinakes_core::scan::scan_directory(&storage, &p, &ignore).await } else { pinakes_core::scan::scan_all_roots(&storage, &ignore) .await .map(|statuses| { let total_found: usize = statuses.iter().map(|s| s.files_found).sum(); let total_processed: usize = statuses.iter().map(|s| s.files_processed).sum(); let all_errors: Vec = statuses.into_iter().flat_map(|s| s.errors).collect(); pinakes_core::scan::ScanStatus { scanning: false, files_found: total_found, files_processed: total_processed, files_skipped: 0, errors: all_errors, } }) }; match res { Ok(status) => { JobQueue::complete( &jobs, job_id, serde_json::json!({ "files_found": status.files_found, "files_processed": status.files_processed, "errors": status.errors, }), ) .await; }, Err(e) => { JobQueue::fail(&jobs, job_id, e.to_string()).await; }, } }, JobKind::GenerateThumbnails { media_ids } => { let thumb_dir = pinakes_core::thumbnail::default_thumbnail_dir(); let thumb_config = config.thumbnails.clone(); let total = media_ids.len(); let mut generated = 0usize; let mut errors = Vec::new(); for (i, mid) in media_ids.iter().enumerate() { if cancel.is_cancelled() { break; } JobQueue::update_progress( &jobs, job_id, i as f32 / total as f32, format!("{}/{}", i, total), ) .await; match storage.get_media(*mid).await { Ok(item) => { let source = item.path.clone(); let mt = item.media_type.clone(); let id = item.id; let td = thumb_dir.clone(); let tc = thumb_config.clone(); let res = tokio::task::spawn_blocking(move || { pinakes_core::thumbnail::generate_thumbnail_with_config( id, &source, mt, &td, &tc, ) }) .await; match res { Ok(Ok(Some(path))) => { let mut updated = item; updated.thumbnail_path = Some(path); let _ = storage.update_media(&updated).await; generated += 1; }, Ok(Ok(None)) => {}, Ok(Err(e)) => errors.push(format!("{}: {}", mid, e)), Err(e) => errors.push(format!("{}: {}", mid, e)), } }, Err(e) => errors.push(format!("{}: {}", mid, e)), } } JobQueue::complete( &jobs, job_id, serde_json::json!({ "generated": generated, "errors": errors }), ) .await; }, JobKind::VerifyIntegrity { media_ids } => { let ids = if media_ids.is_empty() { None } else { Some(media_ids.as_slice()) }; match pinakes_core::integrity::verify_integrity(&storage, ids).await { Ok(report) => { JobQueue::complete( &jobs, job_id, serde_json::to_value(&report).unwrap_or_default(), ) .await; }, Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await, } }, JobKind::OrphanDetection => { match pinakes_core::integrity::detect_orphans(&storage).await { Ok(report) => { JobQueue::complete( &jobs, job_id, serde_json::to_value(&report).unwrap_or_default(), ) .await; }, Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await, } }, JobKind::CleanupThumbnails => { let thumb_dir = pinakes_core::thumbnail::default_thumbnail_dir(); match pinakes_core::integrity::cleanup_orphaned_thumbnails( &storage, &thumb_dir, ) .await { Ok(removed) => { JobQueue::complete( &jobs, job_id, serde_json::json!({ "removed": removed }), ) .await; }, Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await, } }, JobKind::Export { format, destination, } => { match pinakes_core::export::export_library( &storage, &format, &destination, ) .await { Ok(result) => { JobQueue::complete( &jobs, job_id, serde_json::to_value(&result).unwrap_or_default(), ) .await; }, Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await, } }, JobKind::Transcode { media_id, profile } => { if let Some(ref svc) = transcode_svc { match storage.get_media(media_id).await { Ok(item) => { match svc .start_transcode( media_id, &item.path, &profile, item.duration_secs, &storage, ) .await { Ok(session_id) => { JobQueue::complete( &jobs, job_id, serde_json::json!({"session_id": session_id.to_string()}), ) .await; }, Err(e) => { JobQueue::fail(&jobs, job_id, e.to_string()).await }, } }, Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await, } } else { JobQueue::fail( &jobs, job_id, "transcoding is not enabled".to_string(), ) .await; } }, JobKind::Enrich { media_ids } => { // Enrichment job placeholder JobQueue::complete( &jobs, job_id, serde_json::json!({"media_ids": media_ids.len(), "status": "not_implemented"}), ) .await; }, JobKind::CleanupAnalytics => { let before = chrono::Utc::now() - chrono::Duration::days(90); match storage.cleanup_old_events(before).await { Ok(count) => { JobQueue::complete( &jobs, job_id, serde_json::json!({"cleaned_up": count}), ) .await; }, Err(e) => JobQueue::fail(&jobs, job_id, e.to_string()).await, } }, }; drop(cancel); }) }, ); // Initialize cache layer let cache = std::sync::Arc::new(pinakes_core::cache::CacheLayer::new( config.jobs.cache_ttl_secs, )); // Initialize plugin manager if plugins are enabled (before moving config into // Arc) let plugin_manager = if config.plugins.enabled { match pinakes_core::plugin::PluginManager::new( config.plugins.data_dir.clone(), config.plugins.cache_dir.clone(), config.plugins.clone().into(), ) { Ok(pm) => { tracing::info!("Plugin manager initialized"); Some(Arc::new(pm)) }, Err(e) => { tracing::warn!("Failed to initialize plugin manager: {}", e); None }, } } else { tracing::info!("Plugins disabled in configuration"); None }; // Initialize scheduler with cancellation support let shutdown_token = tokio_util::sync::CancellationToken::new(); let config_arc = Arc::new(RwLock::new(config)); let scheduler = pinakes_core::scheduler::TaskScheduler::new( job_queue.clone(), shutdown_token.clone(), config_arc.clone(), Some(config_path.clone()), ); let scheduler = Arc::new(scheduler); // Restore saved scheduler state from config scheduler.restore_state().await; // Spawn scheduler background loop { let scheduler = scheduler.clone(); tokio::spawn(async move { scheduler.run().await; }); } // Initialize managed storage service if enabled let managed_storage = { let config_read = config_arc.read().await; if config_read.managed_storage.enabled { let service = pinakes_core::managed_storage::ManagedStorageService::new( config_read.managed_storage.storage_dir.clone(), config_read.managed_storage.max_upload_size, config_read.managed_storage.verify_on_read, ); match service.init().await { Ok(()) => { info!( path = %config_read.managed_storage.storage_dir.display(), "managed storage initialized" ); Some(Arc::new(service)) }, Err(e) => { tracing::error!(error = %e, "failed to initialize managed storage"); None }, } } else { tracing::info!("managed storage disabled in configuration"); None } }; // Initialize chunked upload manager if sync is enabled let chunked_upload_manager = { let config_read = config_arc.read().await; if config_read.sync.enabled { let manager = pinakes_core::sync::ChunkedUploadManager::new( config_read.sync.temp_upload_dir.clone(), ); match manager.init().await { Ok(()) => { info!( path = %config_read.sync.temp_upload_dir.display(), "chunked upload manager initialized" ); Some(Arc::new(manager)) }, Err(e) => { tracing::error!(error = %e, "failed to initialize chunked upload manager"); None }, } } else { tracing::info!("sync disabled, chunked upload manager not initialized"); None } }; let state = AppState { storage: storage.clone(), config: config_arc.clone(), config_path: Some(config_path), scan_progress: pinakes_core::scan::ScanProgress::new(), job_queue, cache, scheduler, plugin_manager, transcode_service, managed_storage, chunked_upload_manager, }; // Periodic session cleanup (every 15 minutes) { let storage_clone = storage.clone(); let cancel = shutdown_token.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(15 * 60)); loop { tokio::select! { _ = interval.tick() => { match storage_clone.delete_expired_sessions().await { Ok(count) if count > 0 => { tracing::info!(count = count, "cleaned up expired sessions"); } Err(e) => { tracing::error!(error = %e, "failed to cleanup expired sessions"); } _ => {} } } _ = cancel.cancelled() => { break; } } } }); } // Periodic chunked upload cleanup (every hour) if let Some(ref manager) = state.chunked_upload_manager { let manager_clone = manager.clone(); let cancel = shutdown_token.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 60)); loop { tokio::select! { _ = interval.tick() => { match manager_clone.cleanup_expired(48).await { Ok(count) if count > 0 => { tracing::info!(count = count, "cleaned up expired upload temp files"); } Err(e) => { tracing::error!(error = %e, "failed to cleanup expired upload temp files"); } _ => {} } } _ = cancel.cancelled() => { break; } } } }); } let config_read = config_arc.read().await; let tls_config = config_read.server.tls.clone(); drop(config_read); // Create router with TLS config for HSTS headers let router = if tls_config.enabled { app::create_router_with_tls(state, Some(&tls_config)) } else { app::create_router(state) }; if tls_config.enabled { // TLS/HTTPS mode let cert_path = tls_config.cert_path.as_ref().ok_or_else(|| { anyhow::anyhow!("TLS enabled but cert_path not specified") })?; let key_path = tls_config.key_path.as_ref().ok_or_else(|| { anyhow::anyhow!("TLS enabled but key_path not specified") })?; info!(addr = %addr, cert = %cert_path.display(), "server listening with TLS"); // Configure TLS let tls_config_builder = axum_server::tls_rustls::RustlsConfig::from_pem_file(cert_path, key_path) .await?; // Start HTTP redirect server if configured if tls_config.redirect_http { let http_addr = format!( "{}:{}", config_arc.read().await.server.host, tls_config.http_port ); let https_port = config_arc.read().await.server.port; let https_host = config_arc.read().await.server.host.clone(); let redirect_router = create_https_redirect_router(https_host, https_port); let shutdown = shutdown_token.clone(); tokio::spawn(async move { let listener = match tokio::net::TcpListener::bind(&http_addr).await { Ok(l) => l, Err(e) => { tracing::warn!(error = %e, addr = %http_addr, "failed to bind HTTP redirect listener"); return; }, }; info!(addr = %http_addr, "HTTP redirect server listening"); let server = axum::serve( listener, redirect_router .into_make_service_with_connect_info::(), ); tokio::select! { result = server => { if let Err(e) = result { tracing::warn!(error = %e, "HTTP redirect server error"); } } _ = shutdown.cancelled() => { info!("HTTP redirect server shutting down"); } } }); } // Start HTTPS server with graceful shutdown via Handle let addr_parsed: std::net::SocketAddr = addr.parse()?; let handle = axum_server::Handle::new(); let shutdown_handle = handle.clone(); // Spawn a task to trigger graceful shutdown tokio::spawn(async move { shutdown_signal().await; shutdown_handle .graceful_shutdown(Some(std::time::Duration::from_secs(30))); }); axum_server::bind_rustls(addr_parsed, tls_config_builder) .handle(handle) .serve( router.into_make_service_with_connect_info::(), ) .await?; } else { // Plain HTTP mode info!(addr = %addr, "server listening"); let listener = tokio::net::TcpListener::bind(&addr).await?; axum::serve( listener, router.into_make_service_with_connect_info::(), ) .with_graceful_shutdown(shutdown_signal()) .await?; } shutdown_token.cancel(); info!("server shut down"); Ok(()) } /// Create a router that redirects all HTTP requests to HTTPS fn create_https_redirect_router(https_host: String, https_port: u16) -> Router { Router::new().fallback(any(move |uri: axum::http::Uri| { let https_host = https_host.clone(); async move { let path_and_query = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); let https_url = if https_port == 443 { format!("https://{}{}", https_host, path_and_query) } else { format!("https://{}:{}{}", https_host, https_port, path_and_query) }; Redirect::permanent(&https_url) } })) } async fn shutdown_signal() { let ctrl_c = async { match tokio::signal::ctrl_c().await { Ok(()) => {}, Err(e) => { tracing::warn!(error = %e, "failed to install Ctrl+C handler"); std::future::pending::<()>().await; }, } }; #[cfg(unix)] let terminate = async { match tokio::signal::unix::signal( tokio::signal::unix::SignalKind::terminate(), ) { Ok(mut signal) => { signal.recv().await; }, Err(e) => { tracing::warn!(error = %e, "failed to install SIGTERM handler"); std::future::pending::<()>().await; }, } }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => info!("received Ctrl+C, shutting down"), _ = terminate => info!("received SIGTERM, shutting down"), } }