use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; use uuid::Uuid; use crate::model::MediaId; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "type")] pub enum JobKind { Scan { path: Option, }, GenerateThumbnails { media_ids: Vec, }, VerifyIntegrity { media_ids: Vec, }, OrphanDetection, CleanupThumbnails, Export { format: ExportFormat, destination: PathBuf, }, Transcode { media_id: MediaId, profile: String, }, Enrich { media_ids: Vec, }, CleanupAnalytics, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ExportFormat { Json, Csv, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "state")] pub enum JobStatus { Pending, Running { progress: f32, message: String }, Completed { result: Value }, Failed { error: String }, Cancelled, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Job { pub id: Uuid, pub kind: JobKind, pub status: JobStatus, pub created_at: DateTime, pub updated_at: DateTime, } struct WorkerItem { job_id: Uuid, kind: JobKind, cancel: CancellationToken, } pub struct JobQueue { jobs: Arc>>, cancellations: Arc>>, tx: mpsc::Sender, } impl JobQueue { /// Create a new job queue and spawn `worker_count` background workers. /// /// The `executor` callback is invoked for each job; it receives the job kind, /// a progress-reporting callback, and a cancellation token. pub fn new(worker_count: usize, executor: F) -> Arc where F: Fn( Uuid, JobKind, CancellationToken, Arc>>, ) -> tokio::task::JoinHandle<()> + Send + Sync + 'static, { let (tx, rx) = mpsc::channel::(256); let rx = Arc::new(tokio::sync::Mutex::new(rx)); let jobs: Arc>> = Arc::new(RwLock::new(HashMap::new())); let cancellations: Arc>> = Arc::new(RwLock::new(HashMap::new())); let executor = Arc::new(executor); for _ in 0..worker_count { let rx = rx.clone(); let jobs = jobs.clone(); let cancellations = cancellations.clone(); let executor = executor.clone(); tokio::spawn(async move { loop { let item = { let mut guard = rx.lock().await; guard.recv().await }; let Some(item) = item else { break }; // Mark as running { let mut map = jobs.write().await; if let Some(job) = map.get_mut(&item.job_id) { job.status = JobStatus::Running { progress: 0.0, message: "starting".to_string(), }; job.updated_at = Utc::now(); } } let handle = executor(item.job_id, item.kind, item.cancel, jobs.clone()); let _ = handle.await; // Clean up cancellation token cancellations.write().await.remove(&item.job_id); } }); } Arc::new(Self { jobs, cancellations, tx, }) } /// Submit a new job, returning its ID. pub async fn submit(&self, kind: JobKind) -> Uuid { let id = Uuid::now_v7(); let now = Utc::now(); let cancel = CancellationToken::new(); let job = Job { id, kind: kind.clone(), status: JobStatus::Pending, created_at: now, updated_at: now, }; self.jobs.write().await.insert(id, job); self.cancellations.write().await.insert(id, cancel.clone()); let item = WorkerItem { job_id: id, kind, cancel, }; // If the channel is full we still record the job — it'll stay Pending let _ = self.tx.send(item).await; id } /// Get the status of a job. pub async fn status(&self, id: Uuid) -> Option { self.jobs.read().await.get(&id).cloned() } /// List all jobs, most recent first. pub async fn list(&self) -> Vec { let map = self.jobs.read().await; let mut jobs: Vec = map.values().cloned().collect(); jobs.sort_by_key(|job| std::cmp::Reverse(job.created_at)); jobs } /// Cancel a running or pending job. pub async fn cancel(&self, id: Uuid) -> bool { if let Some(token) = self.cancellations.read().await.get(&id) { token.cancel(); let mut map = self.jobs.write().await; if let Some(job) = map.get_mut(&id) { job.status = JobStatus::Cancelled; job.updated_at = Utc::now(); } true } else { false } } /// Update a job's progress. Called by executors. pub async fn update_progress( jobs: &Arc>>, id: Uuid, progress: f32, message: String, ) { let mut map = jobs.write().await; if let Some(job) = map.get_mut(&id) { job.status = JobStatus::Running { progress, message }; job.updated_at = Utc::now(); } } /// Mark a job as completed. pub async fn complete(jobs: &Arc>>, id: Uuid, result: Value) { let mut map = jobs.write().await; if let Some(job) = map.get_mut(&id) { job.status = JobStatus::Completed { result }; job.updated_at = Utc::now(); } } /// Mark a job as failed. pub async fn fail(jobs: &Arc>>, id: Uuid, error: String) { let mut map = jobs.write().await; if let Some(job) = map.get_mut(&id) { job.status = JobStatus::Failed { error }; job.updated_at = Utc::now(); } } /// Get job queue statistics pub async fn stats(&self) -> JobQueueStats { let jobs = self.jobs.read().await; let mut pending = 0; let mut running = 0; let mut completed = 0; let mut failed = 0; for job in jobs.values() { match job.status { JobStatus::Pending => pending += 1, JobStatus::Running { .. } => running += 1, JobStatus::Completed { .. } => completed += 1, JobStatus::Failed { .. } => failed += 1, JobStatus::Cancelled => {} // Don't count cancelled jobs } } JobQueueStats { pending, running, completed, failed, total: jobs.len(), } } } /// Statistics about the job queue #[derive(Debug, Clone, Default)] pub struct JobQueueStats { pub pending: usize, pub running: usize, pub completed: usize, pub failed: usize, pub total: usize, }