pakker/src/ipc.rs
NotAShelf ef28bdaeb4
initial commit
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ife1391ed23a1e7f388b1b5eca90b9ea76a6a6964
2026-02-13 00:14:46 +03:00

1326 lines
41 KiB
Rust

//! IPC coordination for concurrent Pakker operations.
//!
//! Uses tmpfs for inter-process coordination with cryptographic hashing
//! of modpack content (like Nix store paths) to identify unique modpacks.
//!
//! IPC path: `/run/user/<uid>/pakker/<hash>/ops.json`
//!
//! The hash is derived from the modpack's `parentLockHash` in pakku.json,
//! ensuring the same modpack in different directories/sessions shares IPC.
use std::{
fs::{self, File, OpenOptions},
io::Write,
os::unix::{fs::PermissionsExt, io::AsRawFd},
path::PathBuf,
time::{Duration, SystemTime},
};
use libc::{LOCK_EX, LOCK_UN, flock};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum IpcError {
#[error("operation already in progress: {0}")]
OperationInProgress(String),
#[error("invalid ops file format: {0}")]
InvalidFormat(String),
#[error("operation not found: {0}")]
OperationNotFound(String),
#[error("failed to create IPC directory: {0}")]
IpcDirCreationFailed(String),
#[error("timeout waiting for operation: {0}")]
Timeout(String),
#[error("failed to read pakku.json for modpack hash: {0}")]
PakkuJsonReadFailed(String),
}
impl From<std::io::Error> for IpcError {
fn from(e: std::io::Error) -> Self {
Self::InvalidFormat(e.to_string())
}
}
impl From<serde_json::Error> for IpcError {
fn from(e: serde_json::Error) -> Self {
Self::InvalidFormat(e.to_string())
}
}
/// Represents an ongoing operation tracked in IPC
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OngoingOperation {
pub id: String,
pub r#type: OperationType,
pub pid: u32,
pub started_at: u64,
pub status: OperationStatus,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationType {
Fetch,
Export,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationStatus {
Running,
Completed,
Failed,
}
/// Guard that releases an advisory lock when dropped
struct FileLock {
file: File,
}
impl Drop for FileLock {
fn drop(&mut self) {
let _ = unsafe { flock(self.file.as_raw_fd(), LOCK_UN) };
}
}
/// IPC coordination for concurrent Pakker operations
#[derive(Clone)]
pub struct IpcCoordinator {
ops_file: PathBuf,
}
impl IpcCoordinator {
/// Get the base IPC directory in tmpfs
fn get_ipc_base_dir() -> PathBuf {
// Use XDG_RUNTIME_DIR if available, otherwise fallback to /tmp
if let Ok(runtime) = std::env::var("XDG_RUNTIME_DIR") {
PathBuf::from(runtime).join("pakker")
} else {
PathBuf::from("/tmp/pakker")
}
}
/// Extract modpack hash from pakku.json's parentLockHash field.
/// This is the authoritative content hash for the modpack (Nix-style).
fn get_modpack_hash(working_dir: &PathBuf) -> Result<String, IpcError> {
let pakku_path = working_dir.join("pakku.json");
if !pakku_path.exists() {
return Err(IpcError::PakkuJsonReadFailed(
"pakku.json not found in working directory".to_string(),
));
}
let content = fs::read_to_string(&pakku_path)
.map_err(|e| IpcError::PakkuJsonReadFailed(e.to_string()))?;
// Parse pakku.json and extract parentLockHash
let pakku: serde_json::Value = serde_json::from_str(&content)
.map_err(|e| IpcError::PakkuJsonReadFailed(e.to_string()))?;
let hash = pakku
.get("pakku")
.and_then(|p| p.get("parentLockHash"))
.and_then(|v| v.as_str())
.ok_or_else(|| {
IpcError::PakkuJsonReadFailed(
"parentLockHash not found in pakku.json".to_string(),
)
})?
.to_string();
// Validate it's a valid hex string (SHA256 = 64 chars)
if hash.len() != 64 || !hash.chars().all(|c| c.is_ascii_hexdigit()) {
return Err(IpcError::PakkuJsonReadFailed(
"parentLockHash is not a valid SHA256 hash".to_string(),
));
}
Ok(hash)
}
/// Create a new IPC coordinator for the given modpack directory.
/// Uses parentLockHash from pakku.json to identify the modpack.
pub fn new(working_dir: &PathBuf) -> Result<Self, IpcError> {
let modpack_hash = Self::get_modpack_hash(working_dir)?;
let ipc_base = Self::get_ipc_base_dir();
let ipc_dir = ipc_base.join(&modpack_hash);
// Create IPC directory with restricted permissions
if let Err(e) = fs::create_dir_all(&ipc_dir)
&& !ipc_dir.exists()
{
return Err(IpcError::IpcDirCreationFailed(e.to_string()));
}
if ipc_dir.exists() {
// Set permissions to 700 (owner only)
if let Ok(metadata) = fs::metadata(&ipc_dir)
&& metadata.permissions().mode() != 0o700
{
let mut perms = metadata.permissions();
perms.set_mode(0o700);
let _ = fs::set_permissions(&ipc_dir, perms);
}
}
let ops_file = ipc_dir.join("ops.json");
Ok(Self { ops_file })
}
/// Create a new IPC coordinator for testing with guaranteed unique isolation.
/// Uses the absolute path of the working directory to ensure no collisions.
/// Acquire an exclusive advisory lock on the ops file for atomic operations.
/// Returns a guard that releases the lock on drop.
fn lock_ops_file(&self) -> Result<FileLock, IpcError> {
log::debug!("Acquiring file lock on {:?}", self.ops_file);
// Open or create the ops file with read/write access
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&self.ops_file)
.map_err(|e| IpcError::InvalidFormat(e.to_string()))?;
// Set permissions to 600
let mut perms = fs::metadata(&self.ops_file)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(&self.ops_file, perms)?;
// Acquire exclusive lock using flock
unsafe {
if flock(file.as_raw_fd(), LOCK_EX) != 0 {
log::warn!("Failed to acquire file lock on {:?}", self.ops_file);
return Err(IpcError::InvalidFormat(
"failed to acquire file lock".to_string(),
));
}
}
log::debug!("File lock acquired on {:?}", self.ops_file);
// Return a guard that releases the lock on drop
Ok(FileLock { file })
}
/// Load all ongoing operations from IPC
pub fn load_operations(&self) -> Result<Vec<OngoingOperation>, IpcError> {
if !self.ops_file.exists() {
return Ok(Vec::new());
}
let content = fs::read_to_string(&self.ops_file)
.map_err(|e| IpcError::InvalidFormat(e.to_string()))?;
// Handle empty file case
if content.trim().is_empty() {
return Ok(Vec::new());
}
serde_json::from_str(&content)
.map_err(|e| IpcError::InvalidFormat(e.to_string()))
}
/// Save operations to IPC
fn save_operations(
&self,
operations: &[OngoingOperation],
) -> Result<(), IpcError> {
let content = serde_json::to_string_pretty(operations)
.map_err(|e| IpcError::InvalidFormat(e.to_string()))?;
let mut file = File::create(&self.ops_file)
.map_err(|e| IpcError::InvalidFormat(e.to_string()))?;
file
.write_all(content.as_bytes())
.map_err(|e| IpcError::InvalidFormat(e.to_string()))?;
// Set file permissions to 600
let mut perms = fs::metadata(&self.ops_file)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(&self.ops_file, perms)?;
Ok(())
}
/// Register a new operation, returns error if one of the same type is already
/// running Uses advisory locking to prevent TOCTOU race conditions between
/// processes
pub fn register_operation(
&self,
operation_type: OperationType,
) -> Result<String, IpcError> {
log::debug!("Registering {operation_type:?} operation");
// Acquire exclusive lock before load-check-save sequence
let _lock = self.lock_ops_file()?;
let mut operations = self.load_operations()?;
log::debug!("Loaded {} existing operations", operations.len());
// Clean up stale operations (older than 10 minutes)
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
let stale_count = operations
.iter()
.filter(|op| {
op.status == OperationStatus::Running
&& now.saturating_sub(op.started_at) > 600
})
.count();
operations.retain(|op| {
if op.status == OperationStatus::Running
&& now.saturating_sub(op.started_at) > 600
{
false // Remove stale operations
} else {
true
}
});
if stale_count > 0 {
log::info!("Cleaned up {stale_count} stale operations");
}
// Check for conflicting operations
let conflicting: Vec<_> = operations
.iter()
.filter(|op| {
op.r#type == operation_type && op.status == OperationStatus::Running
})
.collect();
if !conflicting.is_empty() {
log::debug!("Found {} conflicting operations", conflicting.len());
return Err(IpcError::OperationInProgress(format!(
"{} operation already in progress (PID: {})",
operation_type.as_str(),
conflicting[0].pid
)));
}
log::debug!("No conflicts found, registering new operation");
// Generate operation ID with nanosecond timestamp and PID for uniqueness
// Using nanoseconds instead of milliseconds to prevent ID collisions
let now_ns = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_nanos();
let rand_suffix = rand::random::<u32>();
let id = format!(
"{}-{}-{:x}-{}",
operation_type.as_str(),
now_ns,
rand_suffix,
std::process::id()
);
log::debug!("Generated operation ID: {id}");
let op_type_str = operation_type.as_str();
// Register new operation
let new_op = OngoingOperation {
id: id.clone(),
r#type: operation_type,
pid: std::process::id(),
started_at: now,
status: OperationStatus::Running,
};
operations.push(new_op);
self.save_operations(&operations)?;
log::info!("Registered {op_type_str} operation with ID: {id}");
Ok(id)
}
/// Mark an operation as completed
pub fn complete_operation(&self, operation_id: &str) -> Result<(), IpcError> {
log::debug!("Completing operation: {operation_id}");
// Acquire exclusive lock for atomic read-modify-write
let _lock = self.lock_ops_file()?;
let mut operations = self.load_operations()?;
let mut found = false;
for op in &mut operations {
if op.id == operation_id {
op.status = OperationStatus::Completed;
found = true;
log::info!("Marked operation {operation_id} as completed");
break;
}
}
if !found {
log::warn!("Operation not found: {operation_id}");
return Err(IpcError::OperationNotFound(operation_id.to_string()));
}
self.save_operations(&operations)?;
Ok(())
}
/// Wait for any conflicting operations to complete
pub async fn wait_for_conflicts(
&self,
operation_type: OperationType,
timeout: Duration,
) -> Result<(), IpcError> {
let start = SystemTime::now();
loop {
let operations = self.load_operations()?;
let conflicts: Vec<_> = operations
.iter()
.filter(|op| {
op.r#type == operation_type && op.status == OperationStatus::Running
})
.collect();
if conflicts.is_empty() {
return Ok(());
}
if start.elapsed().unwrap_or(Duration::ZERO) > timeout {
return Err(IpcError::Timeout(format!(
"timeout waiting for {} operation(s) to complete",
conflicts.len()
)));
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
/// Check if there are any running operations of the given type
pub fn has_running_operation(&self, operation_type: OperationType) -> bool {
let operations = self.load_operations().unwrap_or_default();
operations.iter().any(|op| {
op.r#type == operation_type && op.status == OperationStatus::Running
})
}
/// Get list of running operations of a specific type
pub fn get_running_operations(
&self,
operation_type: OperationType,
) -> Vec<OngoingOperation> {
let operations = self.load_operations().unwrap_or_default();
operations
.into_iter()
.filter(|op| {
op.r#type == operation_type && op.status == OperationStatus::Running
})
.collect()
}
}
impl OperationType {
pub const fn as_str(&self) -> &'static str {
match self {
Self::Fetch => "fetch",
Self::Export => "export",
}
}
}
/// RAII guard for registering an operation
pub struct OperationGuard {
coordinator: IpcCoordinator,
operation_id: String,
}
impl OperationGuard {
pub const fn new(coordinator: IpcCoordinator, operation_id: String) -> Self {
Self {
coordinator,
operation_id,
}
}
}
impl Drop for OperationGuard {
fn drop(&mut self) {
// On drop, mark the operation as completed
// Note: We ignore errors here since we might be panicking
let _ = self.coordinator.complete_operation(&self.operation_id);
}
}
#[cfg(test)]
mod tests {
use std::fs;
use tempfile::TempDir;
use super::*;
fn create_test_modpack(files: &[(&str, &str)]) -> TempDir {
// Generate a unique parentLockHash for each test to ensure test isolation
let unique_hash = format!("{:064x}", rand::random::<u128>());
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
for (path, content) in files {
let full_path = temp_dir.path().join(path);
if let Some(parent) = full_path.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(&full_path, content).unwrap();
}
// Write pakku.json with unique parentLockHash (nested under "pakku" key)
let pakku_content =
format!(r#"{{"pakku": {{"parentLockHash": "{}"}}}}"#, unique_hash);
fs::write(temp_dir.path().join("pakku.json"), pakku_content).unwrap();
temp_dir
}
fn create_test_modpack_with_hash(
files: &[(&str, &str)],
hash: &str,
) -> TempDir {
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
for (path, content) in files {
let full_path = temp_dir.path().join(path);
if let Some(parent) = full_path.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(&full_path, content).unwrap();
}
// Write pakku.json with specified parentLockHash (nested under "pakku" key)
let pakku_content =
format!(r#"{{"pakku": {{"parentLockHash": "{}"}}}}"#, hash);
fs::write(temp_dir.path().join("pakku.json"), pakku_content).unwrap();
temp_dir
}
#[tokio::test]
async fn test_operation_type_as_str() {
assert_eq!(OperationType::Fetch.as_str(), "fetch");
assert_eq!(OperationType::Export.as_str(), "export");
}
#[test]
fn test_get_modpack_hash_valid() {
// Use create_test_modpack_with_hash for specific hash values
let temp_dir = create_test_modpack_with_hash(
&[("mod.jar", "content")],
"cfe85e0e7e7aa0922d30d8faad071e3a4126cb78b5f0f792f191e90a295aa2c7",
);
let hash =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf()).unwrap();
assert_eq!(
hash,
"cfe85e0e7e7aa0922d30d8faad071e3a4126cb78b5f0f792f191e90a295aa2c7"
);
}
#[test]
fn test_get_modpack_hash_missing_pakku_json() {
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
let result =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf());
assert!(matches!(result, Err(IpcError::PakkuJsonReadFailed(_))));
}
#[test]
fn test_get_modpack_hash_missing_parent_lock_hash() {
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
fs::write(temp_dir.path().join("pakku.json"), r#"{"other": "field"}"#)
.unwrap();
let result =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf());
assert!(matches!(result, Err(IpcError::PakkuJsonReadFailed(_))));
}
#[test]
fn test_get_modpack_hash_invalid_hash() {
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
fs::write(
temp_dir.path().join("pakku.json"),
r#"{"parentLockHash": "not-a-sha256"}"#,
)
.unwrap();
let result =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf());
assert!(matches!(result, Err(IpcError::PakkuJsonReadFailed(_))));
}
#[test]
fn test_same_modpack_different_dirs_same_ipc() {
// Two different directories with SAME parentLockHash should use same IPC
// Use valid 64-character SHA256 hash
let shared_hash =
"abc123def456789012345678901234567890abcd123456789012345678901234";
let temp_dir1 = create_test_modpack_with_hash(
&[("mods/mod1.jar", "content1")],
shared_hash,
);
let temp_dir2 = create_test_modpack_with_hash(
&[("config/settings.toml", "content2")],
shared_hash,
);
let coord1 = IpcCoordinator::new(&temp_dir1.path().to_path_buf()).unwrap();
let coord2 = IpcCoordinator::new(&temp_dir2.path().to_path_buf()).unwrap();
assert_eq!(
coord1.ops_file, coord2.ops_file,
"Same modpack hash should use same ops file"
);
}
#[test]
fn test_different_modpacks_different_ipc() {
// Two different directories with DIFFERENT parentLockHash should use
// different IPC Use create_test_modpack which auto-generates unique
// hashes
let temp_dir1 = create_test_modpack(&[("mod1.jar", "content")]);
let temp_dir2 = create_test_modpack(&[("mod1.jar", "content")]);
let coord1 = IpcCoordinator::new(&temp_dir1.path().to_path_buf()).unwrap();
let coord2 = IpcCoordinator::new(&temp_dir2.path().to_path_buf()).unwrap();
assert_ne!(
coord1.ops_file, coord2.ops_file,
"Different modpack hashes should use different ops files"
);
}
#[test]
fn test_ipc_coordinator_new_creates_dir() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Check that the parent directory (ipc_dir) exists
assert!(coordinator.ops_file.parent().unwrap().exists());
}
#[test]
fn test_load_operations_empty() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let operations = coordinator.load_operations().unwrap();
assert!(operations.is_empty());
}
#[test]
fn test_register_operation() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
assert!(!id.is_empty());
assert!(id.contains("fetch"));
assert!(id.contains(&std::process::id().to_string()));
}
#[test]
fn test_register_multiple_operations_different_types() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let fetch_id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
let export_id = coordinator
.register_operation(OperationType::Export)
.unwrap();
assert_ne!(fetch_id, export_id);
let operations = coordinator.load_operations().unwrap();
assert_eq!(operations.len(), 2);
}
#[test]
fn test_register_conflicting_operation_same_type() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let _id1 = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
let result = coordinator.register_operation(OperationType::Fetch);
assert!(matches!(result, Err(IpcError::OperationInProgress(_))));
}
#[test]
fn test_complete_operation() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
coordinator.complete_operation(&id).unwrap();
let operations = coordinator.load_operations().unwrap();
assert_eq!(operations.len(), 1);
assert_eq!(operations[0].status, OperationStatus::Completed);
}
#[test]
fn test_complete_nonexistent_operation() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let result = coordinator.complete_operation("nonexistent-id");
assert!(matches!(result, Err(IpcError::OperationNotFound(_))));
}
#[test]
fn test_has_running_operation() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
assert!(!coordinator.has_running_operation(OperationType::Fetch));
let _id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
assert!(coordinator.has_running_operation(OperationType::Fetch));
assert!(!coordinator.has_running_operation(OperationType::Export));
coordinator.complete_operation(&_id).unwrap();
assert!(!coordinator.has_running_operation(OperationType::Fetch));
}
#[test]
fn test_get_running_operations() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let operations = coordinator.get_running_operations(OperationType::Fetch);
assert!(operations.is_empty());
let id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
let operations = coordinator.get_running_operations(OperationType::Fetch);
assert_eq!(operations.len(), 1);
assert_eq!(operations[0].id, id);
}
#[tokio::test]
async fn test_wait_for_conflicts_no_conflicts() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let result = coordinator
.wait_for_conflicts(OperationType::Fetch, Duration::from_secs(1))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_conflicts_with_completion() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Register an operation
let id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
// Complete it after a short delay
let coordinator_clone = coordinator.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(200)).await;
let _ = coordinator_clone.complete_operation(&id);
});
// Wait should succeed once operation completes
let result = coordinator
.wait_for_conflicts(OperationType::Fetch, Duration::from_secs(5))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_conflicts_timeout() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Register a long-running operation (we won't complete it)
let _id = coordinator
.register_operation(OperationType::Export)
.unwrap();
// Wait should timeout
let result = coordinator
.wait_for_conflicts(OperationType::Export, Duration::from_millis(500))
.await;
assert!(matches!(result, Err(IpcError::Timeout(_))));
}
#[test]
fn test_operation_guard_completes_on_drop() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
{
let _guard = OperationGuard::new(coordinator.clone(), id.clone());
assert!(coordinator.has_running_operation(OperationType::Fetch));
} // guard drops here
// After guard drops, operation should be completed
let operations = coordinator.load_operations().unwrap();
assert!(
operations
.iter()
.any(|op| op.id == id && op.status == OperationStatus::Completed)
);
}
#[test]
fn test_operation_guard_manual_complete() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id = coordinator
.register_operation(OperationType::Export)
.unwrap();
let guard = OperationGuard::new(coordinator.clone(), id.clone());
// Manual complete
coordinator.complete_operation(&id).unwrap();
drop(guard);
// Operation should still be completed (not marked twice)
let operations = coordinator.load_operations().unwrap();
assert_eq!(operations.iter().filter(|op| op.id == id).count(), 1);
}
#[test]
fn test_stale_operation_cleanup() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Manually add a stale operation (started 15 minutes ago)
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let stale_op = OngoingOperation {
id: "stale-fetch-123".to_string(),
r#type: OperationType::Fetch,
pid: 99999,
started_at: now - 900, // 15 minutes ago
status: OperationStatus::Running,
};
let mut operations = coordinator.load_operations().unwrap();
operations.push(stale_op);
coordinator.save_operations(&operations).unwrap();
// Registering a new fetch should clean up the stale one
let new_id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
assert!(new_id.contains("fetch"));
let operations = coordinator.load_operations().unwrap();
assert!(!operations.iter().any(|op| op.id == "stale-fetch-123"));
}
#[test]
fn test_multiple_operations_same_process() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id1 = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
coordinator.complete_operation(&id1).unwrap();
// Sleep for 1ms to ensure different timestamp
std::thread::sleep(std::time::Duration::from_millis(2));
let id2 = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
assert_ne!(id1, id2, "IDs should differ when timestamps differ");
let operations = coordinator.load_operations().unwrap();
// Both completed and new running operation
assert_eq!(operations.len(), 2);
}
#[test]
fn test_ipc_base_dir_fallback() {
// Test that XDG_RUNTIME_DIR fallback works
// This is tricky to test because we can't easily unset env vars,
// but we can verify the function returns a valid path
let base_dir = IpcCoordinator::get_ipc_base_dir();
assert!(base_dir.is_absolute());
}
#[test]
fn test_operation_id_format() {
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id = coordinator
.register_operation(OperationType::Export)
.unwrap();
let parts: Vec<&str> = id.split('-').collect();
// Format is: type-nanoseconds-random-pid (4 parts)
assert_eq!(parts.len(), 4);
assert_eq!(parts[0], "export");
assert!(parts[1].parse::<u64>().is_ok(), "nanoseconds should be u64");
// random suffix is formatted as hex
assert!(
parts[2].starts_with("0x") || u64::from_str_radix(parts[2], 16).is_ok(),
"random should be hex"
);
assert_eq!(parts[3], std::process::id().to_string());
}
#[test]
fn test_different_temp_dirs_same_content_hash() {
// This is the key test: two temp dirs with SAME pakku.json parentLockHash
// should produce the SAME IPC directory
let temp_dir1 = tempfile::Builder::new()
.prefix("pakker-ipc-test1-")
.tempdir()
.unwrap();
let temp_dir2 = tempfile::Builder::new()
.prefix("pakker-ipc-test2-")
.tempdir()
.unwrap();
// Both have same parentLockHash (nested under "pakku" key)
let pakku_content = r#"{"pakku": {"parentLockHash": "cfe85e0e7e7aa0922d30d8faad071e3a4126cb78b5f0f792f191e90a295aa2c7"}}"#;
fs::write(temp_dir1.path().join("pakku.json"), pakku_content).unwrap();
fs::write(temp_dir2.path().join("pakku.json"), pakku_content).unwrap();
// Different files in each
fs::write(temp_dir1.path().join("file1.txt"), "content1").unwrap();
fs::write(temp_dir2.path().join("file2.txt"), "content2").unwrap();
let coord1 = IpcCoordinator::new(&temp_dir1.path().to_path_buf()).unwrap();
let coord2 = IpcCoordinator::new(&temp_dir2.path().to_path_buf()).unwrap();
// Both should point to SAME ops file despite different paths
assert_eq!(coord1.ops_file, coord2.ops_file);
}
#[test]
fn test_corrupted_ops_json_trailing_bracket() {
// Test handling of corrupted ops.json with trailing characters
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Register an operation to create ops.json
let _id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
// Manually corrupt the ops.json by appending extra bracket
let ops_content = fs::read_to_string(&coordinator.ops_file).unwrap();
fs::write(&coordinator.ops_file, format!("{}]", ops_content)).unwrap();
// Loading should fail with InvalidFormat error
let result = coordinator.load_operations();
assert!(matches!(result, Err(IpcError::InvalidFormat(_))));
}
#[test]
fn test_corrupted_ops_json_invalid_json() {
// Test handling of completely invalid JSON
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Write invalid JSON to ops.json
fs::write(&coordinator.ops_file, "not valid json {[}").unwrap();
let result = coordinator.load_operations();
assert!(matches!(result, Err(IpcError::InvalidFormat(_))));
}
#[test]
fn test_corrupted_ops_json_missing_fields() {
// Test handling of JSON with missing required fields
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Write JSON with missing fields
fs::write(&coordinator.ops_file, r#"[{"id": "test"}]"#).unwrap();
let result = coordinator.load_operations();
assert!(matches!(result, Err(IpcError::InvalidFormat(_))));
}
#[test]
fn test_empty_ops_file() {
// Test handling of empty ops.json file
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Create empty ops.json
fs::write(&coordinator.ops_file, "").unwrap();
let operations = coordinator.load_operations().unwrap();
assert!(operations.is_empty(), "Empty file should return empty vec");
}
#[test]
fn test_whitespace_only_ops_file() {
// Test handling of whitespace-only ops.json
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
fs::write(&coordinator.ops_file, " \n\t \n ").unwrap();
let operations = coordinator.load_operations().unwrap();
assert!(
operations.is_empty(),
"Whitespace-only file should return empty vec"
);
}
#[test]
fn test_nested_pakku_json_structure() {
// Test correct parsing of nested pakku.json structure
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
let pakku_content = r#"{
"pakku": {
"parentLockHash": "abc123def456789012345678901234567890abcd123456789012345678901234",
"other_field": "value"
},
"another_field": "ignored"
}"#;
fs::write(temp_dir.path().join("pakku.json"), pakku_content).unwrap();
let hash =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf()).unwrap();
assert_eq!(
hash,
"abc123def456789012345678901234567890abcd123456789012345678901234"
);
}
#[test]
fn test_old_pakku_json_format_rejected() {
// Test that old (non-nested) format is rejected
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
// Old format: parentLockHash at root level
let old_format = r#"{
"parentLockHash": "abc123def456789012345678901234567890abcd123456789012345678901234"
}"#;
fs::write(temp_dir.path().join("pakku.json"), old_format).unwrap();
let result =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf());
assert!(
matches!(result, Err(IpcError::PakkuJsonReadFailed(_))),
"Old format should be rejected"
);
}
#[test]
fn test_hash_validation_too_short() {
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
let pakku_content = r#"{"pakku": {"parentLockHash": "tooshort"}}"#;
fs::write(temp_dir.path().join("pakku.json"), pakku_content).unwrap();
let result =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf());
assert!(matches!(result, Err(IpcError::PakkuJsonReadFailed(_))));
}
#[test]
fn test_hash_validation_non_hex() {
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
// 64 chars but not all hex
let pakku_content = r#"{"pakku": {"parentLockHash": "gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg"}}"#;
fs::write(temp_dir.path().join("pakku.json"), pakku_content).unwrap();
let result =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf());
assert!(matches!(result, Err(IpcError::PakkuJsonReadFailed(_))));
}
#[test]
fn test_hash_validation_uppercase_hex() {
// Uppercase hex should be accepted
let temp_dir = tempfile::Builder::new()
.prefix("pakker-ipc-test-")
.tempdir()
.unwrap();
let pakku_content = r#"{"pakku": {"parentLockHash": "ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789"}}"#;
fs::write(temp_dir.path().join("pakku.json"), pakku_content).unwrap();
let hash =
IpcCoordinator::get_modpack_hash(&temp_dir.path().to_path_buf()).unwrap();
assert_eq!(
hash,
"ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789"
);
}
#[test]
fn test_concurrent_registration_race_condition() {
// Test that file locking prevents race conditions
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// First registration should succeed
let id1 = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
// Second registration of same type should fail (conflict)
let result = coordinator.register_operation(OperationType::Fetch);
assert!(matches!(result, Err(IpcError::OperationInProgress(_))));
// After completing first, second should succeed
coordinator.complete_operation(&id1).unwrap();
let id2 = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
assert!(id2.contains("fetch"));
}
#[test]
fn test_file_permissions_ipc_dir() {
// Test that IPC directory has correct permissions (700 - owner only)
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let ipc_dir = coordinator.ops_file.parent().unwrap();
let metadata = fs::metadata(ipc_dir).unwrap();
let permissions = metadata.permissions();
assert_eq!(permissions.mode() & 0o777, 0o700);
}
#[test]
fn test_file_permissions_ops_file() {
// Test that ops.json has correct permissions (600 - owner read/write only)
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Register operation to create ops.json
let _id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
let metadata = fs::metadata(&coordinator.ops_file).unwrap();
let permissions = metadata.permissions();
assert_eq!(permissions.mode() & 0o777, 0o600);
}
#[test]
fn test_operations_persistence_across_coordinators() {
// Test that operations persist when creating new coordinator instances
// Use unique hash per test run to avoid conflicts from previous runs
let unique_hash = format!(
"{:064x}",
rand::random::<u128>()
^ (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u128)
);
let temp_dir = create_test_modpack_with_hash(
&[("test.txt", "test")],
&unique_hash[..64],
);
let coord1 = IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let id = coord1.register_operation(OperationType::Fetch).unwrap();
// Create new coordinator instance
let coord2 = IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let operations = coord2.load_operations().unwrap();
assert_eq!(operations.len(), 1);
assert_eq!(operations[0].id, id);
assert_eq!(operations[0].r#type, OperationType::Fetch);
// Cleanup: complete the operation so it doesn't interfere with other tests
coord2.complete_operation(&id).unwrap();
}
#[test]
fn test_stale_cleanup_preserves_completed() {
// Test that stale cleanup only removes running operations, not completed
// ones
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
// Add old completed operation
let old_completed = OngoingOperation {
id: "old-completed-123".to_string(),
r#type: OperationType::Fetch,
pid: 88888,
started_at: now - 1000, // 16+ minutes ago
status: OperationStatus::Completed,
};
// Add old running operation (stale)
let old_running = OngoingOperation {
id: "old-running-456".to_string(),
r#type: OperationType::Export,
pid: 99999,
started_at: now - 1000, // 16+ minutes ago
status: OperationStatus::Running,
};
let operations = vec![old_completed, old_running];
coordinator.save_operations(&operations).unwrap();
// Register new operation triggers cleanup
let _new_id = coordinator
.register_operation(OperationType::Fetch)
.unwrap();
let operations = coordinator.load_operations().unwrap();
// Old completed should still be there
assert!(operations.iter().any(|op| op.id == "old-completed-123"));
// Old running should be removed (stale)
assert!(!operations.iter().any(|op| op.id == "old-running-456"));
}
#[tokio::test]
async fn test_wait_for_conflicts_multiple_types() {
// Test that wait_for_conflicts only waits for matching operation types
let temp_dir = create_test_modpack(&[("test.txt", "test")]);
let coordinator =
IpcCoordinator::new(&temp_dir.path().to_path_buf()).unwrap();
// Register Export operation (different type)
let _export_id = coordinator
.register_operation(OperationType::Export)
.unwrap();
// Wait for Fetch should succeed immediately (no conflicts)
let result = coordinator
.wait_for_conflicts(OperationType::Fetch, Duration::from_secs(1))
.await;
assert!(result.is_ok());
}
#[test]
fn test_operation_serialization_roundtrip() {
// Test that OngoingOperation can be serialized and deserialized correctly
let op = OngoingOperation {
id: "test-op-123".to_string(),
r#type: OperationType::Fetch,
pid: 12345,
started_at: 1234567890,
status: OperationStatus::Running,
};
let json = serde_json::to_string(&op).unwrap();
let deserialized: OngoingOperation = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.id, op.id);
assert_eq!(deserialized.r#type, op.r#type);
assert_eq!(deserialized.pid, op.pid);
assert_eq!(deserialized.started_at, op.started_at);
assert_eq!(deserialized.status, op.status);
}
}