//! Transcoding service for media files using FFmpeg. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock, Semaphore}; use uuid::Uuid; use crate::config::{TranscodeProfile, TranscodingConfig}; use crate::model::MediaId; use crate::storage::DynStorageBackend; use crate::users::UserId; /// A transcoding session for a media item. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TranscodeSession { pub id: Uuid, pub media_id: MediaId, pub user_id: Option, pub profile: String, pub cache_path: PathBuf, pub status: TranscodeStatus, pub progress: f32, pub created_at: DateTime, pub expires_at: Option>, /// Duration of the source media in seconds, used for progress calculation. #[serde(default)] pub duration_secs: Option, /// Handle to cancel the child FFmpeg process. #[serde(skip)] pub child_cancel: Option>, } /// Status of a transcode session. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "state")] pub enum TranscodeStatus { Pending, Transcoding, Complete, Failed { error: String }, Cancelled, } impl TranscodeStatus { pub fn as_str(&self) -> &str { match self { Self::Pending => "pending", Self::Transcoding => "transcoding", Self::Complete => "complete", Self::Failed { .. } => "failed", Self::Cancelled => "cancelled", } } pub fn from_db(status: &str, error_message: Option<&str>) -> Self { match status { "pending" => Self::Pending, "transcoding" => Self::Transcoding, "complete" => Self::Complete, "failed" => Self::Failed { error: error_message.unwrap_or("unknown error").to_string(), }, "cancelled" => Self::Cancelled, other => { tracing::warn!( "unknown transcode status '{}', defaulting to Pending", other ); Self::Pending } } } pub fn error_message(&self) -> Option<&str> { match self { Self::Failed { error } => Some(error), _ => None, } } } /// Service managing transcoding sessions and FFmpeg invocations. pub struct TranscodeService { pub config: TranscodingConfig, pub sessions: Arc>>, semaphore: Arc, } impl TranscodeService { pub fn new(config: TranscodingConfig) -> Self { let max_concurrent = config.max_concurrent.max(1); Self { sessions: Arc::new(RwLock::new(HashMap::new())), semaphore: Arc::new(Semaphore::new(max_concurrent)), config, } } pub fn is_enabled(&self) -> bool { self.config.enabled } pub fn cache_dir(&self) -> PathBuf { self.config .cache_dir .clone() .unwrap_or_else(|| PathBuf::from("/tmp/pinakes-transcode")) } /// Start a transcode job for a media item. pub async fn start_transcode( &self, media_id: MediaId, source_path: &Path, profile_name: &str, duration_secs: Option, storage: &DynStorageBackend, ) -> crate::error::Result { let profile = self .config .profiles .iter() .find(|p| p.name == profile_name) .cloned() .ok_or_else(|| { crate::error::PinakesError::InvalidOperation(format!( "unknown transcode profile: {}", profile_name )) })?; let session_id = Uuid::now_v7(); let session_dir = self.cache_dir().join(session_id.to_string()); tokio::fs::create_dir_all(&session_dir).await.map_err(|e| { crate::error::PinakesError::InvalidOperation(format!( "failed to create session directory: {}", e )) })?; let expires_at = Some(Utc::now() + chrono::Duration::hours(self.config.cache_ttl_hours as i64)); let cancel_notify = Arc::new(tokio::sync::Notify::new()); let session = TranscodeSession { id: session_id, media_id, user_id: None, profile: profile_name.to_string(), cache_path: session_dir.clone(), status: TranscodeStatus::Pending, progress: 0.0, created_at: Utc::now(), expires_at, duration_secs, child_cancel: Some(cancel_notify.clone()), }; // Store session in DB storage.create_transcode_session(&session).await?; // Store in memory { let mut sessions = self.sessions.write().await; sessions.insert(session_id, session); } // Spawn the FFmpeg task let sessions = self.sessions.clone(); let semaphore = self.semaphore.clone(); let source = source_path.to_path_buf(); let hw_accel = self.config.hardware_acceleration.clone(); let storage = storage.clone(); let cancel = cancel_notify.clone(); tokio::spawn(async move { // Acquire semaphore permit to limit concurrency let _permit = match semaphore.acquire().await { Ok(permit) => permit, Err(e) => { tracing::error!("failed to acquire transcode semaphore: {}", e); let error_msg = format!("semaphore closed: {}", e); let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.status = TranscodeStatus::Failed { error: error_msg.clone(), }; } if let Err(e) = storage .update_transcode_status( session_id, TranscodeStatus::Failed { error: error_msg }, 0.0, ) .await { tracing::error!("failed to update transcode status: {}", e); } return; } }; // Mark as transcoding { let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.status = TranscodeStatus::Transcoding; } } if let Err(e) = storage .update_transcode_status(session_id, TranscodeStatus::Transcoding, 0.0) .await { tracing::error!("failed to update transcode status: {}", e); } // Build FFmpeg args and run let args = get_ffmpeg_args(&source, &session_dir, &profile, hw_accel.as_deref()); match run_ffmpeg(&args, &sessions, session_id, duration_secs, cancel).await { Ok(()) => { let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.status = TranscodeStatus::Complete; sess.progress = 1.0; } if let Err(e) = storage .update_transcode_status(session_id, TranscodeStatus::Complete, 1.0) .await { tracing::error!("failed to update transcode status: {}", e); } } Err(e) => { let error_msg = e.to_string(); let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { // Don't overwrite Cancelled status if matches!(sess.status, TranscodeStatus::Cancelled) { return; } sess.status = TranscodeStatus::Failed { error: error_msg.clone(), }; } drop(s); if let Err(e) = storage .update_transcode_status( session_id, TranscodeStatus::Failed { error: error_msg }, 0.0, ) .await { tracing::error!("failed to update transcode status: {}", e); } } } }); Ok(session_id) } /// Cancel a transcode session and clean up cache files. pub async fn cancel_transcode( &self, session_id: Uuid, storage: &DynStorageBackend, ) -> crate::error::Result<()> { let (cache_path, cancel_notify) = { let mut sessions = self.sessions.write().await; if let Some(sess) = sessions.get_mut(&session_id) { sess.status = TranscodeStatus::Cancelled; let cancel = sess.child_cancel.take(); (Some(sess.cache_path.clone()), cancel) } else { (None, None) } }; // Signal the child process to be killed if let Some(notify) = cancel_notify { notify.notify_one(); } storage .update_transcode_status(session_id, TranscodeStatus::Cancelled, 0.0) .await?; // Clean up cache directory if let Some(path) = cache_path && let Err(e) = tokio::fs::remove_dir_all(&path).await { tracing::error!("failed to remove transcode cache directory: {}", e); } Ok(()) } /// Remove expired transcode sessions and their cache directories. pub async fn cleanup_expired(&self) { let now = Utc::now(); // Collect expired entries and remove them from the map under the lock. let expired: Vec<(Uuid, PathBuf)> = { let mut sessions = self.sessions.write().await; let expired: Vec<(Uuid, PathBuf)> = sessions .iter() .filter_map(|(id, sess)| { if let Some(expires) = sess.expires_at && now > expires { return Some((*id, sess.cache_path.clone())); } None }) .collect(); for (id, _) in &expired { sessions.remove(id); } expired }; // Lock is dropped here; perform filesystem cleanup outside the lock. for (_id, path) in expired { if let Err(e) = tokio::fs::remove_dir_all(&path).await { tracing::error!("failed to remove expired transcode cache directory: {}", e); } } } /// Get a session by ID from the in-memory store. pub async fn get_session(&self, session_id: Uuid) -> Option { let sessions = self.sessions.read().await; sessions.get(&session_id).cloned() } /// Resolve the path to a specific segment file on disk. pub fn segment_path(&self, session_id: Uuid, segment_name: &str) -> PathBuf { // Sanitize segment_name to prevent path traversal let safe_name = std::path::Path::new(segment_name) .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); if safe_name.is_empty() || safe_name.contains('\0') || safe_name.starts_with('.') { // Return a non-existent path that will fail safely return self .cache_dir() .join(session_id.to_string()) .join("__invalid__"); } self.cache_dir() .join(session_id.to_string()) .join(safe_name) } /// Find a session for a given media_id and profile. pub async fn find_session(&self, media_id: MediaId, profile: &str) -> Option { let sessions = self.sessions.read().await; sessions .values() .find(|s| s.media_id == media_id && s.profile == profile) .cloned() } } /// Parse a resolution string like "360p", "720p", "1080p" into (width, height). pub fn parse_resolution(res: &str) -> (u32, u32) { match res.trim_end_matches('p') { "360" => (640, 360), "480" => (854, 480), "720" => (1280, 720), "1080" => (1920, 1080), "1440" => (2560, 1440), "2160" | "4k" => (3840, 2160), _ => (1280, 720), // default to 720p } } /// Estimate bandwidth (bits/sec) from a profile's max_bitrate_kbps. pub fn estimate_bandwidth(profile: &TranscodeProfile) -> u32 { profile.max_bitrate_kbps * 1000 } /// Build FFmpeg CLI arguments for transcoding. fn get_ffmpeg_args( source: &Path, output_dir: &Path, profile: &TranscodeProfile, hw_accel: Option<&str>, ) -> Vec { let (w, h) = parse_resolution(&profile.max_resolution); let playlist = output_dir.join("playlist.m3u8"); let segment_pattern = output_dir.join("segment%d.ts"); let mut args = Vec::new(); // Hardware acceleration if let Some(accel) = hw_accel { args.extend_from_slice(&["-hwaccel".to_string(), accel.to_string()]); } args.extend_from_slice(&[ "-i".to_string(), source.to_string_lossy().to_string(), "-c:v".to_string(), profile.video_codec.clone(), "-c:a".to_string(), profile.audio_codec.clone(), "-b:v".to_string(), format!("{}k", profile.max_bitrate_kbps), "-vf".to_string(), format!("scale={}:{}", w, h), "-f".to_string(), "hls".to_string(), "-hls_time".to_string(), "10".to_string(), "-hls_segment_filename".to_string(), segment_pattern.to_string_lossy().to_string(), "-progress".to_string(), "pipe:1".to_string(), "-y".to_string(), playlist.to_string_lossy().to_string(), ]); args } /// Run FFmpeg as a child process, parsing progress from stdout. async fn run_ffmpeg( args: &[String], sessions: &Arc>>, session_id: Uuid, duration_secs: Option, cancel: Arc, ) -> Result<(), crate::error::PinakesError> { use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; let mut child = Command::new("ffmpeg") .args(args) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| { crate::error::PinakesError::InvalidOperation(format!("failed to spawn ffmpeg: {}", e)) })?; // Capture stderr in a spawned task for error reporting let stderr_handle = if let Some(stderr) = child.stderr.take() { let reader = BufReader::new(stderr); Some(tokio::spawn(async move { let mut lines = reader.lines(); let mut collected = Vec::new(); while let Ok(Some(line)) = lines.next_line().await { collected.push(line); } collected })) } else { None }; // Parse progress from stdout let stdout_handle = if let Some(stdout) = child.stdout.take() { let reader = BufReader::new(stdout); let mut lines = reader.lines(); let sessions = sessions.clone(); Some(tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { // FFmpeg progress output: "out_time_us=12345678" if let Some(time_str) = line.strip_prefix("out_time_us=") && let Ok(us) = time_str.trim().parse::() { let secs = us / 1_000_000.0; // Calculate progress based on known duration let progress = match duration_secs { Some(dur) if dur > 0.0 => (secs / dur).min(0.99) as f32, _ => { // Duration unknown; don't update progress continue; } }; let mut s = sessions.write().await; if let Some(sess) = s.get_mut(&session_id) { sess.progress = progress; } } } })) } else { None }; // Wait for child, but also listen for cancellation let status = tokio::select! { result = child.wait() => { result.map_err(|e| { crate::error::PinakesError::InvalidOperation(format!("ffmpeg process error: {}", e)) })? } _ = cancel.notified() => { // Kill the child process on cancel if let Err(e) = child.kill().await { tracing::error!("failed to kill ffmpeg process: {}", e); } return Err(crate::error::PinakesError::InvalidOperation( "cancelled by user".to_string(), )); } }; // Await the stdout reader task if let Some(handle) = stdout_handle { let _ = handle.await; } // Collect stderr output for error reporting let stderr_output = if let Some(handle) = stderr_handle { handle.await.unwrap_or_default() } else { Vec::new() }; if !status.success() { let last_stderr = stderr_output .iter() .rev() .take(10) .rev() .cloned() .collect::>() .join("\n"); return Err(crate::error::PinakesError::InvalidOperation(format!( "ffmpeg exited with status: {}\nstderr:\n{}", status, last_stderr ))); } Ok(()) }