diff --git a/eh/src/command.rs b/eh/src/command.rs index 7eace98..a6ff260 100644 --- a/eh/src/command.rs +++ b/eh/src/command.rs @@ -1,7 +1,8 @@ use std::{ - collections::VecDeque, io::{self, Read, Write}, process::{Command, ExitStatus, Output, Stdio}, + sync::mpsc, + thread, time::{Duration, Instant}, }; @@ -31,6 +32,40 @@ const DEFAULT_BUFFER_SIZE: usize = 4096; /// Default timeout for command execution const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes +enum PipeEvent { + Stdout(Vec), + Stderr(Vec), + Error(io::Error), +} + +/// Drain a pipe reader, sending chunks through the channel. +fn read_pipe( + mut reader: R, + tx: mpsc::Sender, + is_stderr: bool, +) { + let mut buf = [0u8; DEFAULT_BUFFER_SIZE]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + let event = if is_stderr { + PipeEvent::Stderr(buf[..n].to_vec()) + } else { + PipeEvent::Stdout(buf[..n].to_vec()) + }; + if tx.send(event).is_err() { + break; + } + }, + Err(e) => { + let _ = tx.send(PipeEvent::Error(e)); + break; + }, + } + } +} + /// Builder and executor for Nix commands. pub struct NixCommand { subcommand: String, @@ -58,16 +93,6 @@ impl NixCommand { self } - #[allow(dead_code, reason = "FIXME")] - pub fn args(mut self, args: I) -> Self - where - I: IntoIterator, - S: Into, - { - self.args.extend(args.into_iter().map(Into::into)); - self - } - #[must_use] pub fn args_ref(mut self, args: &[String]) -> Self { self.args.extend(args.iter().cloned()); @@ -101,11 +126,9 @@ impl NixCommand { self } - /// Run the command, streaming output to the provided interceptor. - pub fn run_with_logs( - &self, - mut interceptor: I, - ) -> Result { + /// Build the underlying `std::process::Command` with all configured + /// arguments, environment variables, and flags. + fn build_command(&self) -> Command { let mut cmd = Command::new("nix"); cmd.arg(&self.subcommand); @@ -121,6 +144,18 @@ impl NixCommand { cmd.env(k, v); } cmd.args(&self.args); + cmd + } + + /// Run the command, streaming output to the provided interceptor. + /// + /// Stdout and stderr are read concurrently using background threads + /// so that neither pipe blocks the other. + pub fn run_with_logs( + &self, + mut interceptor: I, + ) -> Result { + let mut cmd = self.build_command(); if self.interactive { cmd.stdout(Stdio::inherit()); @@ -133,100 +168,152 @@ impl NixCommand { cmd.stderr(Stdio::piped()); let mut child = cmd.spawn()?; - let child_stdout = child.stdout.take().ok_or_else(|| { + let stdout = child.stdout.take().ok_or_else(|| { EhError::CommandFailed { command: format!("nix {}", self.subcommand), } })?; - let child_stderr = child.stderr.take().ok_or_else(|| { + let stderr = child.stderr.take().ok_or_else(|| { EhError::CommandFailed { command: format!("nix {}", self.subcommand), } })?; - let mut stdout = child_stdout; - let mut stderr = child_stderr; - let mut out_buf = [0u8; DEFAULT_BUFFER_SIZE]; - let mut err_buf = [0u8; DEFAULT_BUFFER_SIZE]; + let (tx, rx) = mpsc::channel(); + + let tx_out = tx.clone(); + let stdout_thread = thread::spawn(move || read_pipe(stdout, tx_out, false)); + + let tx_err = tx; + let stderr_thread = thread::spawn(move || read_pipe(stderr, tx_err, true)); - let mut out_queue = VecDeque::new(); - let mut err_queue = VecDeque::new(); let start_time = Instant::now(); loop { - let mut did_something = false; - - // Check for timeout if start_time.elapsed() > DEFAULT_TIMEOUT { let _ = child.kill(); - return Err(EhError::CommandFailed { - command: format!("nix {} timed out after 5 minutes", self.subcommand), + let _ = stdout_thread.join(); + let _ = stderr_thread.join(); + let _ = child.wait(); + return Err(EhError::Timeout { + command: format!("nix {}", self.subcommand), + duration: DEFAULT_TIMEOUT, }); } - match stdout.read(&mut out_buf) { - Ok(0) => {}, - Ok(n) => { - interceptor.on_stdout(&out_buf[..n]); - out_queue.push_back(Vec::from(&out_buf[..n])); - did_something = true; + match rx.recv_timeout(Duration::from_millis(100)) { + Ok(PipeEvent::Stdout(data)) => interceptor.on_stdout(&data), + Ok(PipeEvent::Stderr(data)) => interceptor.on_stderr(&data), + Ok(PipeEvent::Error(e)) => { + let _ = child.kill(); + let _ = stdout_thread.join(); + let _ = stderr_thread.join(); + let _ = child.wait(); + return Err(EhError::Io(e)); }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}, - Err(e) => return Err(EhError::Io(e)), - } - - match stderr.read(&mut err_buf) { - Ok(0) => {}, - Ok(n) => { - interceptor.on_stderr(&err_buf[..n]); - err_queue.push_back(Vec::from(&err_buf[..n])); - did_something = true; - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}, - Err(e) => return Err(EhError::Io(e)), - } - - if !did_something && child.try_wait()?.is_some() { - break; - } - - // Prevent busy waiting when no data is available - if !did_something { - std::thread::sleep(Duration::from_millis(10)); + Err(mpsc::RecvTimeoutError::Timeout) => {}, + // All senders dropped — both reader threads finished + Err(mpsc::RecvTimeoutError::Disconnected) => break, } } + let _ = stdout_thread.join(); + let _ = stderr_thread.join(); + let status = child.wait()?; Ok(status) } - /// Run the command and capture all output. + /// Run the command and capture all output (with timeout). pub fn output(&self) -> Result { - let mut cmd = Command::new("nix"); - cmd.arg(&self.subcommand); - - if self.print_build_logs - && !self.args.iter().any(|a| a == "--no-build-output") - { - cmd.arg("--print-build-logs"); - } - if self.impure { - cmd.arg("--impure"); - } - for (k, v) in &self.env { - cmd.env(k, v); - } - cmd.args(&self.args); + let mut cmd = self.build_command(); if self.interactive { cmd.stdout(Stdio::inherit()); cmd.stderr(Stdio::inherit()); cmd.stdin(Stdio::inherit()); - } else { - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); + return Ok(cmd.output()?); } - Ok(cmd.output()?) + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + + let mut child = cmd.spawn()?; + let stdout = child.stdout.take(); + let stderr = child.stderr.take(); + + let (tx, rx) = mpsc::channel(); + + let tx_out = tx.clone(); + let stdout_thread = thread::spawn(move || { + let mut buf = Vec::new(); + if let Some(mut r) = stdout { + let _ = r.read_to_end(&mut buf); + } + let _ = tx_out.send((false, buf)); + }); + + let tx_err = tx; + let stderr_thread = thread::spawn(move || { + let mut buf = Vec::new(); + if let Some(mut r) = stderr { + let _ = r.read_to_end(&mut buf); + } + let _ = tx_err.send((true, buf)); + }); + + let start_time = Instant::now(); + let mut stdout_buf = Vec::new(); + let mut stderr_buf = Vec::new(); + let mut received = 0; + + while received < 2 { + let remaining = DEFAULT_TIMEOUT + .checked_sub(start_time.elapsed()) + .unwrap_or(Duration::ZERO); + + if remaining.is_zero() { + let _ = child.kill(); + let _ = stdout_thread.join(); + let _ = stderr_thread.join(); + let _ = child.wait(); + return Err(EhError::Timeout { + command: format!("nix {}", self.subcommand), + duration: DEFAULT_TIMEOUT, + }); + } + + match rx.recv_timeout(remaining) { + Ok((true, buf)) => { + stderr_buf = buf; + received += 1; + }, + Ok((false, buf)) => { + stdout_buf = buf; + received += 1; + }, + Err(mpsc::RecvTimeoutError::Timeout) => { + let _ = child.kill(); + let _ = stdout_thread.join(); + let _ = stderr_thread.join(); + let _ = child.wait(); + return Err(EhError::Timeout { + command: format!("nix {}", self.subcommand), + duration: DEFAULT_TIMEOUT, + }); + }, + Err(mpsc::RecvTimeoutError::Disconnected) => break, + } + } + + let _ = stdout_thread.join(); + let _ = stderr_thread.join(); + + let status = child.wait()?; + Ok(Output { + status, + stdout: stdout_buf, + stderr: stderr_buf, + }) } }