pinakes-core: update remaining modules and tests

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I9e0ff5ea33a5cf697473423e88f167ce6a6a6964
This commit is contained in:
raf 2026-03-08 00:42:29 +03:00
commit 3d9f8933d2
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
44 changed files with 1207 additions and 578 deletions

View file

@ -1,4 +1,4 @@
//! Transcoding service for media files using FFmpeg.
//! Transcoding service for media files using `FFmpeg`.
use std::{
collections::HashMap,
@ -33,7 +33,7 @@ pub struct TranscodeSession {
/// Duration of the source media in seconds, used for progress calculation.
#[serde(default)]
pub duration_secs: Option<f64>,
/// Handle to cancel the child FFmpeg process.
/// Handle to cancel the child `FFmpeg` process.
#[serde(skip)]
pub child_cancel: Option<Arc<tokio::sync::Notify>>,
}
@ -50,7 +50,8 @@ pub enum TranscodeStatus {
}
impl TranscodeStatus {
pub fn as_str(&self) -> &str {
#[must_use]
pub const fn as_str(&self) -> &str {
match self {
Self::Pending => "pending",
Self::Transcoding => "transcoding",
@ -81,6 +82,7 @@ impl TranscodeStatus {
}
}
#[must_use]
pub fn error_message(&self) -> Option<&str> {
match self {
Self::Failed { error } => Some(error),
@ -89,7 +91,7 @@ impl TranscodeStatus {
}
}
/// Service managing transcoding sessions and FFmpeg invocations.
/// Service managing transcoding sessions and `FFmpeg` invocations.
pub struct TranscodeService {
pub config: TranscodingConfig,
pub sessions: Arc<RwLock<HashMap<Uuid, TranscodeSession>>>,
@ -97,6 +99,7 @@ pub struct TranscodeService {
}
impl TranscodeService {
#[must_use]
pub fn new(config: TranscodingConfig) -> Self {
let max_concurrent = config.max_concurrent.max(1);
Self {
@ -106,10 +109,12 @@ impl TranscodeService {
}
}
pub fn is_enabled(&self) -> bool {
#[must_use]
pub const fn is_enabled(&self) -> bool {
self.config.enabled
}
#[must_use]
pub fn cache_dir(&self) -> PathBuf {
self
.config
@ -119,6 +124,11 @@ impl TranscodeService {
}
/// Start a transcode job for a media item.
///
/// # Errors
///
/// Returns an error if the profile is not found, the session directory cannot
/// be created, or the session cannot be stored in the database.
pub async fn start_transcode(
&self,
media_id: MediaId,
@ -135,8 +145,7 @@ impl TranscodeService {
.cloned()
.ok_or_else(|| {
crate::error::PinakesError::InvalidOperation(format!(
"unknown transcode profile: {}",
profile_name
"unknown transcode profile: {profile_name}"
))
})?;
@ -144,13 +153,15 @@ impl TranscodeService {
let session_dir = self.cache_dir().join(session_id.to_string());
tokio::fs::create_dir_all(&session_dir).await.map_err(|e| {
crate::error::PinakesError::InvalidOperation(format!(
"failed to create session directory: {}",
e
"failed to create session directory: {e}"
))
})?;
let expires_at = Some(
Utc::now() + chrono::Duration::hours(self.config.cache_ttl_hours as i64),
Utc::now()
+ chrono::Duration::hours(
i64::try_from(self.config.cache_ttl_hours).unwrap_or(i64::MAX),
),
);
let cancel_notify = Arc::new(tokio::sync::Notify::new());
@ -166,7 +177,7 @@ impl TranscodeService {
created_at: Utc::now(),
expires_at,
duration_secs,
child_cancel: Some(cancel_notify.clone()),
child_cancel: Some(Arc::clone(&cancel_notify)),
};
// Store session in DB
@ -179,12 +190,12 @@ impl TranscodeService {
}
// Spawn the FFmpeg task
let sessions = self.sessions.clone();
let semaphore = self.semaphore.clone();
let sessions = Arc::clone(&self.sessions);
let semaphore = Arc::clone(&self.semaphore);
let source = source_path.to_path_buf();
let hw_accel = self.config.hardware_acceleration.clone();
let storage = storage.clone();
let cancel = cancel_notify.clone();
let storage = Arc::clone(storage);
let cancel = Arc::clone(&cancel_notify);
tokio::spawn(async move {
// Acquire semaphore permit to limit concurrency
@ -192,12 +203,14 @@ impl TranscodeService {
Ok(permit) => permit,
Err(e) => {
tracing::error!("failed to acquire transcode semaphore: {}", e);
let error_msg = format!("semaphore closed: {}", e);
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Failed {
error: error_msg.clone(),
};
let error_msg = format!("semaphore closed: {e}");
{
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Failed {
error: error_msg.clone(),
};
}
}
if let Err(e) = storage
.update_transcode_status(
@ -234,10 +247,12 @@ impl TranscodeService {
.await
{
Ok(()) => {
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Complete;
sess.progress = 1.0;
{
let mut s = sessions.write().await;
if let Some(sess) = s.get_mut(&session_id) {
sess.status = TranscodeStatus::Complete;
sess.progress = 1.0;
}
}
if let Err(e) = storage
.update_transcode_status(session_id, TranscodeStatus::Complete, 1.0)
@ -277,6 +292,10 @@ impl TranscodeService {
}
/// Cancel a transcode session and clean up cache files.
///
/// # Errors
///
/// Returns an error if the database status update fails.
pub async fn cancel_transcode(
&self,
session_id: Uuid,
@ -359,6 +378,7 @@ impl TranscodeService {
}
/// Resolve the path to a specific segment file on disk.
#[must_use]
pub fn segment_path(&self, session_id: Uuid, segment_name: &str) -> PathBuf {
// Sanitize segment_name to prevent path traversal
let safe_name = std::path::Path::new(segment_name)
@ -381,7 +401,7 @@ impl TranscodeService {
.join(safe_name)
}
/// Find a session for a given media_id and profile.
/// Find a session for a given `media_id` and profile.
pub async fn find_session(
&self,
media_id: MediaId,
@ -396,24 +416,25 @@ impl TranscodeService {
}
/// Parse a resolution string like "360p", "720p", "1080p" into (width, height).
#[must_use]
pub fn parse_resolution(res: &str) -> (u32, u32) {
match res.trim_end_matches('p') {
"360" => (640, 360),
"480" => (854, 480),
"720" => (1280, 720),
"1080" => (1920, 1080),
"1440" => (2560, 1440),
"2160" | "4k" => (3840, 2160),
_ => (1280, 720), // default to 720p
_ => (1280, 720), // default to 720p (includes "720")
}
}
/// Estimate bandwidth (bits/sec) from a profile's max_bitrate_kbps.
pub fn estimate_bandwidth(profile: &TranscodeProfile) -> u32 {
/// Estimate bandwidth (bits/sec) from a profile's `max_bitrate_kbps`.
#[must_use]
pub const fn estimate_bandwidth(profile: &TranscodeProfile) -> u32 {
profile.max_bitrate_kbps * 1000
}
/// Build FFmpeg CLI arguments for transcoding.
/// Build `FFmpeg` CLI arguments for transcoding.
fn get_ffmpeg_args(
source: &Path,
output_dir: &Path,
@ -441,7 +462,7 @@ fn get_ffmpeg_args(
"-b:v".to_string(),
format!("{}k", profile.max_bitrate_kbps),
"-vf".to_string(),
format!("scale={}:{}", w, h),
format!("scale={w}:{h}"),
"-f".to_string(),
"hls".to_string(),
"-hls_time".to_string(),
@ -457,7 +478,7 @@ fn get_ffmpeg_args(
args
}
/// Run FFmpeg as a child process, parsing progress from stdout.
/// Run `FFmpeg` as a child process, parsing progress from stdout.
async fn run_ffmpeg(
args: &[String],
sessions: &Arc<RwLock<HashMap<Uuid, TranscodeSession>>>,
@ -477,33 +498,30 @@ async fn run_ffmpeg(
.spawn()
.map_err(|e| {
crate::error::PinakesError::InvalidOperation(format!(
"failed to spawn ffmpeg: {}",
e
"failed to spawn ffmpeg: {e}"
))
})?;
// Capture stderr in a spawned task for error reporting
let stderr_handle = if let Some(stderr) = child.stderr.take() {
let stderr_handle = child.stderr.take().map(|stderr| {
let reader = BufReader::new(stderr);
Some(tokio::spawn(async move {
tokio::spawn(async move {
let mut lines = reader.lines();
let mut collected = Vec::new();
while let Ok(Some(line)) = lines.next_line().await {
collected.push(line);
}
collected
}))
} else {
None
};
})
});
// Parse progress from stdout
let stdout_handle = if let Some(stdout) = child.stdout.take() {
let stdout_handle = child.stdout.take().map(|stdout| {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
let sessions = sessions.clone();
let sessions = Arc::clone(sessions);
Some(tokio::spawn(async move {
tokio::spawn(async move {
while let Ok(Some(line)) = lines.next_line().await {
// FFmpeg progress output: "out_time_us=12345678"
if let Some(time_str) = line.strip_prefix("out_time_us=")
@ -512,7 +530,11 @@ async fn run_ffmpeg(
let secs = us / 1_000_000.0;
// Calculate progress based on known duration
let progress = match duration_secs {
Some(dur) if dur > 0.0 => (secs / dur).min(0.99) as f32,
Some(dur) if dur > 0.0 => {
#[allow(clippy::cast_possible_truncation)]
let p = (secs / dur).min(0.99) as f32;
p
},
_ => {
// Duration unknown; don't update progress
continue;
@ -524,19 +546,17 @@ async fn run_ffmpeg(
}
}
}
}))
} else {
None
};
})
});
// Wait for child, but also listen for cancellation
let status = tokio::select! {
result = child.wait() => {
result.map_err(|e| {
crate::error::PinakesError::InvalidOperation(format!("ffmpeg process error: {}", e))
crate::error::PinakesError::InvalidOperation(format!("ffmpeg process error: {e}"))
})?
}
_ = cancel.notified() => {
() = cancel.notified() => {
// Kill the child process on cancel
if let Err(e) = child.kill().await {
tracing::error!("failed to kill ffmpeg process: {}", e);
@ -569,8 +589,7 @@ async fn run_ffmpeg(
.collect::<Vec<_>>()
.join("\n");
return Err(crate::error::PinakesError::InvalidOperation(format!(
"ffmpeg exited with status: {}\nstderr:\n{}",
status, last_stderr
"ffmpeg exited with status: {status}\nstderr:\n{last_stderr}"
)));
}