diff options
-rw-r--r-- | src/main.rs | 22 | ||||
-rw-r--r-- | src/pipe.rs | 71 |
2 files changed, 43 insertions, 50 deletions
diff --git a/src/main.rs b/src/main.rs index 2472901..f3b444c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,7 +42,11 @@ fn get_offset() -> time::UtcOffset { } } -async fn async_main() -> anyhow::Result<()> { +async fn async_main() -> anyhow::Result<i32> { + if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { + return pipe::run().await; + } + let mut input = textmode::Input::new().await?; let mut output = textmode::Output::new().await?; @@ -132,22 +136,14 @@ async fn async_main() -> anyhow::Result<()> { } } - Ok(()) + Ok(0) } fn main() { - if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { - match pipe::run() { - Ok(code) => std::process::exit(code), - Err(e) => { - eprintln!("nbsh: {}", e); - std::process::exit(1); - } - } - } - match async_std::task::block_on(async_main()) { - Ok(_) => (), + Ok(code) => { + std::process::exit(code); + } Err(e) => { eprintln!("nbsh: {}", e); std::process::exit(1); diff --git a/src/pipe.rs b/src/pipe.rs index 77904f5..e75f010 100644 --- a/src/pipe.rs +++ b/src/pipe.rs @@ -1,16 +1,18 @@ -use std::io::Read as _; +use async_std::io::ReadExt as _; +use async_std::os::unix::process::CommandExt as _; +use async_std::stream::StreamExt as _; use std::os::unix::io::FromRawFd as _; -use std::os::unix::process::CommandExt as _; +use std::os::unix::process::ExitStatusExt as _; const PID0: nix::unistd::Pid = nix::unistd::Pid::from_raw(0); -pub fn run() -> anyhow::Result<i32> { - let pipeline = read_pipeline()?; +pub async fn run() -> anyhow::Result<i32> { + let pipeline = read_pipeline().await?; let mut cmds: Vec<_> = pipeline .exes() .iter() .map(|exe| { - let mut cmd = std::process::Command::new(exe.exe()); + let mut cmd = async_std::process::Command::new(exe.exe()); cmd.args(exe.args()); cmd }) @@ -52,49 +54,44 @@ pub fn run() -> anyhow::Result<i32> { // ensure that we don't keep the pipes open past when the children exit drop(cmds); - let last_pid = id_to_pid(children[children.len() - 1].id()); - let mut children: std::collections::HashMap< - nix::unistd::Pid, - std::process::Child, - > = children + let mut final_status = None; + + let count = children.len(); + let mut children: futures_util::stream::FuturesUnordered<_> = children .into_iter() - .map(|child| (id_to_pid(child.id()), child)) + .enumerate() + .map(|(i, child)| async move { + (child.status_no_drop().await, i == count - 1) + }) .collect(); - let mut final_code = None; - let mut final_signal = None; - while !children.is_empty() { - match nix::sys::wait::waitpid(neg_pid(pg_pid), None)? { - nix::sys::wait::WaitStatus::Exited(pid, code) => { - if pid == last_pid { - final_code = Some(code); - } - children.remove(&pid); - } - nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => { - if signal == nix::sys::signal::Signal::SIGINT { - nix::sys::signal::raise(nix::sys::signal::SIGINT)?; - } - if pid == last_pid { - final_signal = Some(signal); - } - children.remove(&pid); - } - _ => {} + while let Some((status, last)) = children.next().await { + let status = status.unwrap_or_else(|_| { + async_std::process::ExitStatus::from_raw(1 << 8) + }); + // this conversion is safe because the Signal enum is repr(i32) + #[allow(clippy::as_conversions)] + if status.signal() == Some(nix::sys::signal::Signal::SIGINT as i32) { + nix::sys::signal::raise(nix::sys::signal::Signal::SIGINT)?; + } + if last { + final_status = Some(status); } } - if let Some(signal) = final_signal { - nix::sys::signal::raise(signal)?; + + let final_status = final_status.unwrap(); + if let Some(signal) = final_status.signal() { + nix::sys::signal::raise(signal.try_into().unwrap())?; } - Ok(final_code.unwrap()) + Ok(final_status.code().unwrap()) } -fn read_pipeline() -> anyhow::Result<crate::parse::Pipeline> { +async fn read_pipeline() -> anyhow::Result<crate::parse::Pipeline> { // Safety: this code is only called by crate::history::run_pipeline, which // passes data through on fd 3, and which will not spawn this process // unless the pipe was successfully opened on that fd - let mut fd3 = unsafe { std::fs::File::from_raw_fd(3) }; + let mut fd3 = unsafe { async_std::fs::File::from_raw_fd(3) }; let mut pipeline = String::new(); - fd3.read_to_string(&mut pipeline)?; + fd3.read_to_string(&mut pipeline).await?; let ast = crate::parse::Pipeline::parse(&pipeline)?; Ok(ast) } |