//! Transcoding service for media files using `FFmpeg`. use std::{ collections::HashMap, path::{Path, PathBuf}, sync::Arc, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock, Semaphore}; use uuid::Uuid; use crate::{ config::{TranscodeProfile, TranscodingConfig}, model::MediaId, storage::DynStorageBackend, users::UserId, }; /// A transcoding session for a media item. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TranscodeSession { pub id: Uuid, pub media_id: MediaId, pub user_id: Option, pub profile: String, pub cache_path: PathBuf, pub status: TranscodeStatus, pub progress: f32, pub created_at: DateTime, pub expires_at: Option>, /// Duration of the source media in seconds, used for progress calculation. #[serde(default)] pub duration_secs: Option, /// Handle to cancel the child `FFmpeg` process. #[serde(skip)] pub child_cancel: Option>, } /// Status of a transcode session. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "state")] pub enum TranscodeStatus { Pending, Transcoding, Complete, Failed { error: String }, Cancelled, } impl TranscodeStatus { #[must_use] pub const fn as_str(&self) -> &str { match self { Self::Pending => "pending", Self::Transcoding => "transcoding", Self::Complete => "complete", Self::Failed { .. } => "failed", Self::Cancelled => "cancelled", } } pub fn from_db(status: &str, error_message: Option<&str>) -> Self { match status { "pending" => Self::Pending, "transcoding" => Self::Transcoding, "complete" => Self::Complete, "failed" => { Self::Failed { error: error_message.unwrap_or("unknown error").to_string(), } }, "cancelled" => Self::Cancelled, other => { tracing::warn!( "unknown transcode status '{}', defaulting to Pending", other ); Self::Pending }, } } #[must_use] pub fn error_message(&self) -> Option<&str> { match self { Self::Failed { error } => Some(error), _ => None, } } } /// Service managing transcoding sessions and `FFmpeg` invocations. pub struct TranscodeService { pub config: TranscodingConfig, pub sessions: Arc>>, semaphore: Arc, } impl TranscodeService { #[must_use] pub fn new(config: TranscodingConfig) -> Self { let max_concurrent = config.max_concurrent.max(1); Self { sessions: Arc::new(RwLock::new(HashMap::new())), semaphore: Arc::new(Semaphore::new(max_concurrent)), config, } } #[must_use] pub const fn is_enabled(&self) -> bool { self.config.enabled } #[must_use] pub fn cache_dir(&self) -> PathBuf { self .config .cache_dir .clone() .unwrap_or_else(|| PathBuf::from("/tmp/pinakes-transcode")) } /// Start a transcode job for a media item. /// /// # Errors /// /// Returns an error if the profile is not found, the session directory cannot /// be created, or the session cannot be stored in the database. pub async fn start_transcode( &self, media_id: MediaId, source_path: &Path, profile_name: &str, duration_secs: Option, storage: &DynStorageBackend, ) -> crate::error::Result { let profile = self .config .profiles .iter() .find(|p| p.name == profile_name) .cloned() .ok_or_else(|| { crate::error::PinakesError::InvalidOperation(format!( "unknown transcode profile: {profile_name}" )) })?; let session_id = Uuid::now_v7(); let session_dir = self.cache_dir().join(session_id.to_string()); tokio::fs::create_dir_all(&session_dir).await.map_err(|e| { crate::error::PinakesError::InvalidOperation(format!( "failed to create session directory: {e}" )) })?; let expires_at = Some( Utc::now() + chrono::Duration::hours( i64::try_from(self.config.cache_ttl_hours).unwrap_or(i64::MAX), ), ); let cancel_notify = Arc::new(tokio::sync::Notify::new()); let session = TranscodeSession { id: session_id, media_id, user_id: None, profile: profile_name.to_string(), cache_path: session_dir.clone(), status: TranscodeStatus::Pending, progress: 0.0, created_at: Utc::now(), expires_at, duration_secs, child_cancel: Some(Arc::clone(&cancel_notify)), }; // Store session in DB storage.create_transcode_session(&session).await?; // Store in memory { let mut sessions = self.sessions.write().await; sessions.insert(session_id, session); } // Spawn the FFmpeg task let sessions = Arc::clone(&self.sessions); let semaphore = Arc::clone(&self.semaphore); let source = source_path.to_path_buf(); let hw_accel = self.config.hardware_acceleration.clone(); let storage = Arc::clone(storage); let cancel = Arc::clone(&cancel_notify); tokio::spawn(async move { // Acquire semaphore permit to limit concurrency let _permit = match semaphore.acquire().await { Ok(permit) => permit, Err(e) => { tracing::error!("failed to acquire transcode semaphore: {}", e); let error_msg = format!("semaphore closed: {e}"); { let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.status = TranscodeStatus::Failed { error: error_msg.clone(), }; } } if let Err(e) = storage .update_transcode_status( session_id, TranscodeStatus::Failed { error: error_msg }, 0.0, ) .await { tracing::error!("failed to update transcode status: {}", e); } return; }, }; // Mark as transcoding { let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.status = TranscodeStatus::Transcoding; } } if let Err(e) = storage .update_transcode_status(session_id, TranscodeStatus::Transcoding, 0.0) .await { tracing::error!("failed to update transcode status: {}", e); } // Build FFmpeg args and run let args = get_ffmpeg_args(&source, &session_dir, &profile, hw_accel.as_deref()); match run_ffmpeg(&args, &sessions, session_id, duration_secs, cancel) .await { Ok(()) => { { let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.status = TranscodeStatus::Complete; sess.progress = 1.0; } } if let Err(e) = storage .update_transcode_status(session_id, TranscodeStatus::Complete, 1.0) .await { tracing::error!("failed to update transcode status: {}", e); } }, Err(e) => { let error_msg = e.to_string(); let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { // Don't overwrite Cancelled status if matches!(sess.status, TranscodeStatus::Cancelled) { return; } sess.status = TranscodeStatus::Failed { error: error_msg.clone(), }; } drop(s); if let Err(e) = storage .update_transcode_status( session_id, TranscodeStatus::Failed { error: error_msg }, 0.0, ) .await { tracing::error!("failed to update transcode status: {}", e); } }, } }); Ok(session_id) } /// Cancel a transcode session and clean up cache files. /// /// # Errors /// /// Returns an error if the database status update fails. pub async fn cancel_transcode( &self, session_id: Uuid, storage: &DynStorageBackend, ) -> crate::error::Result<()> { let (cache_path, cancel_notify) = { let mut sessions = self.sessions.write().await; if let Some(sess) = sessions.get_mut(&session_id) { sess.status = TranscodeStatus::Cancelled; let cancel = sess.child_cancel.take(); (Some(sess.cache_path.clone()), cancel) } else { (None, None) } }; // Signal the child process to be killed if let Some(notify) = cancel_notify { notify.notify_one(); } storage .update_transcode_status(session_id, TranscodeStatus::Cancelled, 0.0) .await?; // Clean up cache directory if let Some(path) = cache_path && let Err(e) = tokio::fs::remove_dir_all(&path).await { tracing::error!("failed to remove transcode cache directory: {}", e); } Ok(()) } /// Remove expired transcode sessions and their cache directories. pub async fn cleanup_expired(&self) { let now = Utc::now(); // Collect expired entries and remove them from the map under the lock. let expired: Vec<(Uuid, PathBuf)> = { let mut sessions = self.sessions.write().await; let expired: Vec<(Uuid, PathBuf)> = sessions .iter() .filter_map(|(id, sess)| { if let Some(expires) = sess.expires_at && now > expires { return Some((*id, sess.cache_path.clone())); } None }) .collect(); for (id, _) in &expired { sessions.remove(id); } expired }; // Lock is dropped here; perform filesystem cleanup outside the lock. for (_id, path) in expired { if let Err(e) = tokio::fs::remove_dir_all(&path).await { tracing::error!( "failed to remove expired transcode cache directory: {}", e ); } } } /// Get a session by ID from the in-memory store. pub async fn get_session( &self, session_id: Uuid, ) -> Option { let sessions = self.sessions.read().await; sessions.get(&session_id).cloned() } /// Resolve the path to a specific segment file on disk. #[must_use] pub fn segment_path(&self, session_id: Uuid, segment_name: &str) -> PathBuf { // Sanitize segment_name to prevent path traversal let safe_name = std::path::Path::new(segment_name) .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); if safe_name.is_empty() || safe_name.contains('\0') || safe_name.starts_with('.') { // Return a non-existent path that will fail safely return self .cache_dir() .join(session_id.to_string()) .join("__invalid__"); } self .cache_dir() .join(session_id.to_string()) .join(safe_name) } /// Find a session for a given `media_id` and profile. pub async fn find_session( &self, media_id: MediaId, profile: &str, ) -> Option { let sessions = self.sessions.read().await; sessions .values() .find(|s| s.media_id == media_id && s.profile == profile) .cloned() } } /// Parse a resolution string like "360p", "720p", "1080p" into (width, height). #[must_use] pub fn parse_resolution(res: &str) -> (u32, u32) { match res.trim_end_matches('p') { "360" => (640, 360), "480" => (854, 480), "1080" => (1920, 1080), "1440" => (2560, 1440), "2160" | "4k" => (3840, 2160), _ => (1280, 720), // default to 720p (includes "720") } } /// Estimate bandwidth (bits/sec) from a profile's `max_bitrate_kbps`. #[must_use] pub const fn estimate_bandwidth(profile: &TranscodeProfile) -> u32 { profile.max_bitrate_kbps * 1000 } /// Build `FFmpeg` CLI arguments for transcoding. fn get_ffmpeg_args( source: &Path, output_dir: &Path, profile: &TranscodeProfile, hw_accel: Option<&str>, ) -> Vec { let (w, h) = parse_resolution(&profile.max_resolution); let playlist = output_dir.join("playlist.m3u8"); let segment_pattern = output_dir.join("segment%d.ts"); let mut args = Vec::new(); // Hardware acceleration if let Some(accel) = hw_accel { args.extend_from_slice(&["-hwaccel".to_string(), accel.to_string()]); } args.extend_from_slice(&[ "-i".to_string(), source.to_string_lossy().to_string(), "-c:v".to_string(), profile.video_codec.clone(), "-c:a".to_string(), profile.audio_codec.clone(), "-b:v".to_string(), format!("{}k", profile.max_bitrate_kbps), "-vf".to_string(), format!("scale={w}:{h}"), "-f".to_string(), "hls".to_string(), "-hls_time".to_string(), "10".to_string(), "-hls_segment_filename".to_string(), segment_pattern.to_string_lossy().to_string(), "-progress".to_string(), "pipe:1".to_string(), "-y".to_string(), playlist.to_string_lossy().to_string(), ]); args } /// Run `FFmpeg` as a child process, parsing progress from stdout. async fn run_ffmpeg( args: &[String], sessions: &Arc>>, session_id: Uuid, duration_secs: Option, cancel: Arc, ) -> Result<(), crate::error::PinakesError> { use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, }; let mut child = Command::new("ffmpeg") .args(args) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| { crate::error::PinakesError::InvalidOperation(format!( "failed to spawn ffmpeg: {e}" )) })?; // Capture stderr in a spawned task for error reporting let stderr_handle = child.stderr.take().map(|stderr| { let reader = BufReader::new(stderr); tokio::spawn(async move { let mut lines = reader.lines(); let mut collected = Vec::new(); while let Ok(Some(line)) = lines.next_line().await { collected.push(line); } collected }) }); // Parse progress from stdout let stdout_handle = child.stdout.take().map(|stdout| { let reader = BufReader::new(stdout); let mut lines = reader.lines(); let sessions = Arc::clone(sessions); tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { // FFmpeg progress output: "out_time_us=12345678" if let Some(time_str) = line.strip_prefix("out_time_us=") && let Ok(us) = time_str.trim().parse::() { let secs = us / 1_000_000.0; // Calculate progress based on known duration let progress = match duration_secs { Some(dur) if dur > 0.0 => { #[allow(clippy::cast_possible_truncation)] let p = (secs / dur).min(0.99) as f32; p }, _ => { // Duration unknown; don't update progress continue; }, }; let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.progress = progress; } } } }) }); // Wait for child, but also listen for cancellation let status = tokio::select! { result = child.wait() => { result.map_err(|e| { crate::error::PinakesError::InvalidOperation(format!("ffmpeg process error: {e}")) })? } () = cancel.notified() => { // Kill the child process on cancel if let Err(e) = child.kill().await { tracing::error!("failed to kill ffmpeg process: {}", e); } return Err(crate::error::PinakesError::InvalidOperation( "cancelled by user".to_string(), )); } }; // Await the stdout reader task if let Some(handle) = stdout_handle { let _ = handle.await; } // Collect stderr output for error reporting let stderr_output = if let Some(handle) = stderr_handle { handle.await.unwrap_or_default() } else { Vec::new() }; if !status.success() { let last_stderr = stderr_output .iter() .rev() .take(10) .rev() .cloned() .collect::>() .join("\n"); return Err(crate::error::PinakesError::InvalidOperation(format!( "ffmpeg exited with status: {status}\nstderr:\n{last_stderr}" ))); } Ok(()) }