pinakes/crates/pinakes-core/src/transcode.rs
NotAShelf 3ccddce7fd
treewide: fix various UI bugs; optimize crypto dependencies & format
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: If8fe8b38c1d9c4fecd40ff71f88d2ae06a6a6964
2026-03-06 18:29:33 +03:00

578 lines
16 KiB
Rust

//! Transcoding service for media files using FFmpeg.
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{RwLock, Semaphore};
use uuid::Uuid;
use crate::{
config::{TranscodeProfile, TranscodingConfig},
model::MediaId,
storage::DynStorageBackend,
users::UserId,
};
/// A transcoding session for a media item.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TranscodeSession {
pub id: Uuid,
pub media_id: MediaId,
pub user_id: Option<UserId>,
pub profile: String,
pub cache_path: PathBuf,
pub status: TranscodeStatus,
pub progress: f32,
pub created_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
/// Duration of the source media in seconds, used for progress calculation.
#[serde(default)]
pub duration_secs: Option<f64>,
/// Handle to cancel the child FFmpeg process.
#[serde(skip)]
pub child_cancel: Option<Arc<tokio::sync::Notify>>,
}
/// Status of a transcode session.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "state")]
pub enum TranscodeStatus {
Pending,
Transcoding,
Complete,
Failed { error: String },
Cancelled,
}
impl TranscodeStatus {
pub fn as_str(&self) -> &str {
match self {
Self::Pending => "pending",
Self::Transcoding => "transcoding",
Self::Complete => "complete",
Self::Failed { .. } => "failed",
Self::Cancelled => "cancelled",
}
}
pub fn from_db(status: &str, error_message: Option<&str>) -> Self {
match status {
"pending" => Self::Pending,
"transcoding" => Self::Transcoding,
"complete" => Self::Complete,
"failed" => {
Self::Failed {
error: error_message.unwrap_or("unknown error").to_string(),
}
},
"cancelled" => Self::Cancelled,
other => {
tracing::warn!(
"unknown transcode status '{}', defaulting to Pending",
other
);
Self::Pending
},
}
}
pub fn error_message(&self) -> Option<&str> {
match self {
Self::Failed { error } => Some(error),
_ => None,
}
}
}
/// Service managing transcoding sessions and FFmpeg invocations.
pub struct TranscodeService {
pub config: TranscodingConfig,
pub sessions: Arc<RwLock<HashMap<Uuid, TranscodeSession>>>,
semaphore: Arc<Semaphore>,
}
impl TranscodeService {
pub fn new(config: TranscodingConfig) -> Self {
let max_concurrent = config.max_concurrent.max(1);
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
semaphore: Arc::new(Semaphore::new(max_concurrent)),
config,
}
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn cache_dir(&self) -> PathBuf {
self
.config
.cache_dir
.clone()
.unwrap_or_else(|| PathBuf::from("/tmp/pinakes-transcode"))
}
/// Start a transcode job for a media item.
pub async fn start_transcode(
&self,
media_id: MediaId,
source_path: &Path,
profile_name: &str,
duration_secs: Option<f64>,
storage: &DynStorageBackend,
) -> crate::error::Result<Uuid> {
let profile = self
.config
.profiles
.iter()
.find(|p| p.name == profile_name)
.cloned()
.ok_or_else(|| {
crate::error::PinakesError::InvalidOperation(format!(
"unknown transcode profile: {}",
profile_name
))
})?;
let session_id = Uuid::now_v7();
let session_dir = self.cache_dir().join(session_id.to_string());
tokio::fs::create_dir_all(&session_dir).await.map_err(|e| {
crate::error::PinakesError::InvalidOperation(format!(
"failed to create session directory: {}",
e
))
})?;
let expires_at = Some(
Utc::now() + chrono::Duration::hours(self.config.cache_ttl_hours as i64),
);
let cancel_notify = Arc::new(tokio::sync::Notify::new());
let session = TranscodeSession {
id: session_id,
media_id,
user_id: None,
profile: profile_name.to_string(),
cache_path: session_dir.clone(),
status: TranscodeStatus::Pending,
progress: 0.0,
created_at: Utc::now(),
expires_at,
duration_secs,
child_cancel: Some(cancel_notify.clone()),
};
// Store session in DB
storage.create_transcode_session(&session).await?;
// Store in memory
{
let mut sessions = self.sessions.write().await;
sessions.insert(session_id, session);
}
// Spawn the FFmpeg task
let sessions = self.sessions.clone();
let semaphore = self.semaphore.clone();
let source = source_path.to_path_buf();
let hw_accel = self.config.hardware_acceleration.clone();
let storage = storage.clone();
let cancel = cancel_notify.clone();
tokio::spawn(async move {
// Acquire semaphore permit to limit concurrency
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(e) => {
tracing::error!("failed to acquire transcode semaphore: {}", e);
let error_msg = format!("semaphore closed: {}", e);
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Failed {
error: error_msg.clone(),
};
}
if let Err(e) = storage
.update_transcode_status(
session_id,
TranscodeStatus::Failed { error: error_msg },
0.0,
)
.await
{
tracing::error!("failed to update transcode status: {}", e);
}
return;
},
};
// Mark as transcoding
{
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Transcoding;
}
}
if let Err(e) = storage
.update_transcode_status(session_id, TranscodeStatus::Transcoding, 0.0)
.await
{
tracing::error!("failed to update transcode status: {}", e);
}
// Build FFmpeg args and run
let args =
get_ffmpeg_args(&source, &session_dir, &profile, hw_accel.as_deref());
match run_ffmpeg(&args, &sessions, session_id, duration_secs, cancel)
.await
{
Ok(()) => {
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Complete;
sess.progress = 1.0;
}
if let Err(e) = storage
.update_transcode_status(session_id, TranscodeStatus::Complete, 1.0)
.await
{
tracing::error!("failed to update transcode status: {}", e);
}
},
Err(e) => {
let error_msg = e.to_string();
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
// Don't overwrite Cancelled status
if matches!(sess.status, TranscodeStatus::Cancelled) {
return;
}
sess.status = TranscodeStatus::Failed {
error: error_msg.clone(),
};
}
drop(s);
if let Err(e) = storage
.update_transcode_status(
session_id,
TranscodeStatus::Failed { error: error_msg },
0.0,
)
.await
{
tracing::error!("failed to update transcode status: {}", e);
}
},
}
});
Ok(session_id)
}
/// Cancel a transcode session and clean up cache files.
pub async fn cancel_transcode(
&self,
session_id: Uuid,
storage: &DynStorageBackend,
) -> crate::error::Result<()> {
let (cache_path, cancel_notify) = {
let mut sessions = self.sessions.write().await;
if let Some(sess) = sessions.get_mut(&session_id) {
sess.status = TranscodeStatus::Cancelled;
let cancel = sess.child_cancel.take();
(Some(sess.cache_path.clone()), cancel)
} else {
(None, None)
}
};
// Signal the child process to be killed
if let Some(notify) = cancel_notify {
notify.notify_one();
}
storage
.update_transcode_status(session_id, TranscodeStatus::Cancelled, 0.0)
.await?;
// Clean up cache directory
if let Some(path) = cache_path
&& let Err(e) = tokio::fs::remove_dir_all(&path).await
{
tracing::error!("failed to remove transcode cache directory: {}", e);
}
Ok(())
}
/// Remove expired transcode sessions and their cache directories.
pub async fn cleanup_expired(&self) {
let now = Utc::now();
// Collect expired entries and remove them from the map under the lock.
let expired: Vec<(Uuid, PathBuf)> = {
let mut sessions = self.sessions.write().await;
let expired: Vec<(Uuid, PathBuf)> = sessions
.iter()
.filter_map(|(id, sess)| {
if let Some(expires) = sess.expires_at
&& now > expires
{
return Some((*id, sess.cache_path.clone()));
}
None
})
.collect();
for (id, _) in &expired {
sessions.remove(id);
}
expired
};
// Lock is dropped here; perform filesystem cleanup outside the lock.
for (_id, path) in expired {
if let Err(e) = tokio::fs::remove_dir_all(&path).await {
tracing::error!(
"failed to remove expired transcode cache directory: {}",
e
);
}
}
}
/// Get a session by ID from the in-memory store.
pub async fn get_session(
&self,
session_id: Uuid,
) -> Option<TranscodeSession> {
let sessions = self.sessions.read().await;
sessions.get(&session_id).cloned()
}
/// Resolve the path to a specific segment file on disk.
pub fn segment_path(&self, session_id: Uuid, segment_name: &str) -> PathBuf {
// Sanitize segment_name to prevent path traversal
let safe_name = std::path::Path::new(segment_name)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
if safe_name.is_empty()
|| safe_name.contains('\0')
|| safe_name.starts_with('.')
{
// Return a non-existent path that will fail safely
return self
.cache_dir()
.join(session_id.to_string())
.join("__invalid__");
}
self
.cache_dir()
.join(session_id.to_string())
.join(safe_name)
}
/// Find a session for a given media_id and profile.
pub async fn find_session(
&self,
media_id: MediaId,
profile: &str,
) -> Option<TranscodeSession> {
let sessions = self.sessions.read().await;
sessions
.values()
.find(|s| s.media_id == media_id && s.profile == profile)
.cloned()
}
}
/// Parse a resolution string like "360p", "720p", "1080p" into (width, height).
pub fn parse_resolution(res: &str) -> (u32, u32) {
match res.trim_end_matches('p') {
"360" => (640, 360),
"480" => (854, 480),
"720" => (1280, 720),
"1080" => (1920, 1080),
"1440" => (2560, 1440),
"2160" | "4k" => (3840, 2160),
_ => (1280, 720), // default to 720p
}
}
/// Estimate bandwidth (bits/sec) from a profile's max_bitrate_kbps.
pub fn estimate_bandwidth(profile: &TranscodeProfile) -> u32 {
profile.max_bitrate_kbps * 1000
}
/// Build FFmpeg CLI arguments for transcoding.
fn get_ffmpeg_args(
source: &Path,
output_dir: &Path,
profile: &TranscodeProfile,
hw_accel: Option<&str>,
) -> Vec<String> {
let (w, h) = parse_resolution(&profile.max_resolution);
let playlist = output_dir.join("playlist.m3u8");
let segment_pattern = output_dir.join("segment%d.ts");
let mut args = Vec::new();
// Hardware acceleration
if let Some(accel) = hw_accel {
args.extend_from_slice(&["-hwaccel".to_string(), accel.to_string()]);
}
args.extend_from_slice(&[
"-i".to_string(),
source.to_string_lossy().to_string(),
"-c:v".to_string(),
profile.video_codec.clone(),
"-c:a".to_string(),
profile.audio_codec.clone(),
"-b:v".to_string(),
format!("{}k", profile.max_bitrate_kbps),
"-vf".to_string(),
format!("scale={}:{}", w, h),
"-f".to_string(),
"hls".to_string(),
"-hls_time".to_string(),
"10".to_string(),
"-hls_segment_filename".to_string(),
segment_pattern.to_string_lossy().to_string(),
"-progress".to_string(),
"pipe:1".to_string(),
"-y".to_string(),
playlist.to_string_lossy().to_string(),
]);
args
}
/// Run FFmpeg as a child process, parsing progress from stdout.
async fn run_ffmpeg(
args: &[String],
sessions: &Arc<RwLock<HashMap<Uuid, TranscodeSession>>>,
session_id: Uuid,
duration_secs: Option<f64>,
cancel: Arc<tokio::sync::Notify>,
) -> Result<(), crate::error::PinakesError> {
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
};
let mut child = Command::new("ffmpeg")
.args(args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| {
crate::error::PinakesError::InvalidOperation(format!(
"failed to spawn ffmpeg: {}",
e
))
})?;
// Capture stderr in a spawned task for error reporting
let stderr_handle = if let Some(stderr) = child.stderr.take() {
let reader = BufReader::new(stderr);
Some(tokio::spawn(async move {
let mut lines = reader.lines();
let mut collected = Vec::new();
while let Ok(Some(line)) = lines.next_line().await {
collected.push(line);
}
collected
}))
} else {
None
};
// Parse progress from stdout
let stdout_handle = if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
let sessions = sessions.clone();
Some(tokio::spawn(async move {
while let Ok(Some(line)) = lines.next_line().await {
// FFmpeg progress output: "out_time_us=12345678"
if let Some(time_str) = line.strip_prefix("out_time_us=")
&& let Ok(us) = time_str.trim().parse::<f64>()
{
let secs = us / 1_000_000.0;
// Calculate progress based on known duration
let progress = match duration_secs {
Some(dur) if dur > 0.0 => (secs / dur).min(0.99) as f32,
_ => {
// Duration unknown; don't update progress
continue;
},
};
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.progress = progress;
}
}
}
}))
} else {
None
};
// Wait for child, but also listen for cancellation
let status = tokio::select! {
result = child.wait() => {
result.map_err(|e| {
crate::error::PinakesError::InvalidOperation(format!("ffmpeg process error: {}", e))
})?
}
_ = cancel.notified() => {
// Kill the child process on cancel
if let Err(e) = child.kill().await {
tracing::error!("failed to kill ffmpeg process: {}", e);
}
return Err(crate::error::PinakesError::InvalidOperation(
"cancelled by user".to_string(),
));
}
};
// Await the stdout reader task
if let Some(handle) = stdout_handle {
let _ = handle.await;
}
// Collect stderr output for error reporting
let stderr_output = if let Some(handle) = stderr_handle {
handle.await.unwrap_or_default()
} else {
Vec::new()
};
if !status.success() {
let last_stderr = stderr_output
.iter()
.rev()
.take(10)
.rev()
.cloned()
.collect::<Vec<_>>()
.join("\n");
return Err(crate::error::PinakesError::InvalidOperation(format!(
"ffmpeg exited with status: {}\nstderr:\n{}",
status, last_stderr
)));
}
Ok(())
}