eh: rewrite command exec with thread-based pipe reading

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Id0e34e109a6423820e24676968e08dc66a6a6964
This commit is contained in:
raf 2026-01-30 18:30:26 +03:00
commit 9b632788c2
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF

View file

@ -1,7 +1,8 @@
use std::{
collections::VecDeque,
io::{self, Read, Write},
process::{Command, ExitStatus, Output, Stdio},
sync::mpsc,
thread,
time::{Duration, Instant},
};
@ -31,6 +32,40 @@ const DEFAULT_BUFFER_SIZE: usize = 4096;
/// Default timeout for command execution
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes
enum PipeEvent {
Stdout(Vec<u8>),
Stderr(Vec<u8>),
Error(io::Error),
}
/// Drain a pipe reader, sending chunks through the channel.
fn read_pipe<R: Read>(
mut reader: R,
tx: mpsc::Sender<PipeEvent>,
is_stderr: bool,
) {
let mut buf = [0u8; DEFAULT_BUFFER_SIZE];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let event = if is_stderr {
PipeEvent::Stderr(buf[..n].to_vec())
} else {
PipeEvent::Stdout(buf[..n].to_vec())
};
if tx.send(event).is_err() {
break;
}
},
Err(e) => {
let _ = tx.send(PipeEvent::Error(e));
break;
},
}
}
}
/// Builder and executor for Nix commands.
pub struct NixCommand {
subcommand: String,
@ -58,16 +93,6 @@ impl NixCommand {
self
}
#[allow(dead_code, reason = "FIXME")]
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.args.extend(args.into_iter().map(Into::into));
self
}
#[must_use]
pub fn args_ref(mut self, args: &[String]) -> Self {
self.args.extend(args.iter().cloned());
@ -101,11 +126,9 @@ impl NixCommand {
self
}
/// Run the command, streaming output to the provided interceptor.
pub fn run_with_logs<I: LogInterceptor + 'static>(
&self,
mut interceptor: I,
) -> Result<ExitStatus> {
/// Build the underlying `std::process::Command` with all configured
/// arguments, environment variables, and flags.
fn build_command(&self) -> Command {
let mut cmd = Command::new("nix");
cmd.arg(&self.subcommand);
@ -121,6 +144,18 @@ impl NixCommand {
cmd.env(k, v);
}
cmd.args(&self.args);
cmd
}
/// Run the command, streaming output to the provided interceptor.
///
/// Stdout and stderr are read concurrently using background threads
/// so that neither pipe blocks the other.
pub fn run_with_logs<I: LogInterceptor + 'static>(
&self,
mut interceptor: I,
) -> Result<ExitStatus> {
let mut cmd = self.build_command();
if self.interactive {
cmd.stdout(Stdio::inherit());
@ -133,100 +168,152 @@ impl NixCommand {
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn()?;
let child_stdout = child.stdout.take().ok_or_else(|| {
let stdout = child.stdout.take().ok_or_else(|| {
EhError::CommandFailed {
command: format!("nix {}", self.subcommand),
}
})?;
let child_stderr = child.stderr.take().ok_or_else(|| {
let stderr = child.stderr.take().ok_or_else(|| {
EhError::CommandFailed {
command: format!("nix {}", self.subcommand),
}
})?;
let mut stdout = child_stdout;
let mut stderr = child_stderr;
let mut out_buf = [0u8; DEFAULT_BUFFER_SIZE];
let mut err_buf = [0u8; DEFAULT_BUFFER_SIZE];
let (tx, rx) = mpsc::channel();
let tx_out = tx.clone();
let stdout_thread = thread::spawn(move || read_pipe(stdout, tx_out, false));
let tx_err = tx;
let stderr_thread = thread::spawn(move || read_pipe(stderr, tx_err, true));
let mut out_queue = VecDeque::new();
let mut err_queue = VecDeque::new();
let start_time = Instant::now();
loop {
let mut did_something = false;
// Check for timeout
if start_time.elapsed() > DEFAULT_TIMEOUT {
let _ = child.kill();
return Err(EhError::CommandFailed {
command: format!("nix {} timed out after 5 minutes", self.subcommand),
let _ = stdout_thread.join();
let _ = stderr_thread.join();
let _ = child.wait();
return Err(EhError::Timeout {
command: format!("nix {}", self.subcommand),
duration: DEFAULT_TIMEOUT,
});
}
match stdout.read(&mut out_buf) {
Ok(0) => {},
Ok(n) => {
interceptor.on_stdout(&out_buf[..n]);
out_queue.push_back(Vec::from(&out_buf[..n]));
did_something = true;
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(PipeEvent::Stdout(data)) => interceptor.on_stdout(&data),
Ok(PipeEvent::Stderr(data)) => interceptor.on_stderr(&data),
Ok(PipeEvent::Error(e)) => {
let _ = child.kill();
let _ = stdout_thread.join();
let _ = stderr_thread.join();
let _ = child.wait();
return Err(EhError::Io(e));
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {},
Err(e) => return Err(EhError::Io(e)),
}
match stderr.read(&mut err_buf) {
Ok(0) => {},
Ok(n) => {
interceptor.on_stderr(&err_buf[..n]);
err_queue.push_back(Vec::from(&err_buf[..n]));
did_something = true;
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {},
Err(e) => return Err(EhError::Io(e)),
}
if !did_something && child.try_wait()?.is_some() {
break;
}
// Prevent busy waiting when no data is available
if !did_something {
std::thread::sleep(Duration::from_millis(10));
Err(mpsc::RecvTimeoutError::Timeout) => {},
// All senders dropped — both reader threads finished
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
let _ = stdout_thread.join();
let _ = stderr_thread.join();
let status = child.wait()?;
Ok(status)
}
/// Run the command and capture all output.
/// Run the command and capture all output (with timeout).
pub fn output(&self) -> Result<Output> {
let mut cmd = Command::new("nix");
cmd.arg(&self.subcommand);
if self.print_build_logs
&& !self.args.iter().any(|a| a == "--no-build-output")
{
cmd.arg("--print-build-logs");
}
if self.impure {
cmd.arg("--impure");
}
for (k, v) in &self.env {
cmd.env(k, v);
}
cmd.args(&self.args);
let mut cmd = self.build_command();
if self.interactive {
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::inherit());
cmd.stdin(Stdio::inherit());
} else {
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
return Ok(cmd.output()?);
}
Ok(cmd.output()?)
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn()?;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let (tx, rx) = mpsc::channel();
let tx_out = tx.clone();
let stdout_thread = thread::spawn(move || {
let mut buf = Vec::new();
if let Some(mut r) = stdout {
let _ = r.read_to_end(&mut buf);
}
let _ = tx_out.send((false, buf));
});
let tx_err = tx;
let stderr_thread = thread::spawn(move || {
let mut buf = Vec::new();
if let Some(mut r) = stderr {
let _ = r.read_to_end(&mut buf);
}
let _ = tx_err.send((true, buf));
});
let start_time = Instant::now();
let mut stdout_buf = Vec::new();
let mut stderr_buf = Vec::new();
let mut received = 0;
while received < 2 {
let remaining = DEFAULT_TIMEOUT
.checked_sub(start_time.elapsed())
.unwrap_or(Duration::ZERO);
if remaining.is_zero() {
let _ = child.kill();
let _ = stdout_thread.join();
let _ = stderr_thread.join();
let _ = child.wait();
return Err(EhError::Timeout {
command: format!("nix {}", self.subcommand),
duration: DEFAULT_TIMEOUT,
});
}
match rx.recv_timeout(remaining) {
Ok((true, buf)) => {
stderr_buf = buf;
received += 1;
},
Ok((false, buf)) => {
stdout_buf = buf;
received += 1;
},
Err(mpsc::RecvTimeoutError::Timeout) => {
let _ = child.kill();
let _ = stdout_thread.join();
let _ = stderr_thread.join();
let _ = child.wait();
return Err(EhError::Timeout {
command: format!("nix {}", self.subcommand),
duration: DEFAULT_TIMEOUT,
});
},
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
let _ = stdout_thread.join();
let _ = stderr_thread.join();
let status = child.wait()?;
Ok(Output {
status,
stdout: stdout_buf,
stderr: stderr_buf,
})
}
}