summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main.rs22
-rw-r--r--src/pipe.rs71
2 files changed, 43 insertions, 50 deletions
diff --git a/src/main.rs b/src/main.rs
index 2472901..f3b444c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -42,7 +42,11 @@ fn get_offset() -> time::UtcOffset {
}
}
-async fn async_main() -> anyhow::Result<()> {
+async fn async_main() -> anyhow::Result<i32> {
+ if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") {
+ return pipe::run().await;
+ }
+
let mut input = textmode::Input::new().await?;
let mut output = textmode::Output::new().await?;
@@ -132,22 +136,14 @@ async fn async_main() -> anyhow::Result<()> {
}
}
- Ok(())
+ Ok(0)
}
fn main() {
- if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") {
- match pipe::run() {
- Ok(code) => std::process::exit(code),
- Err(e) => {
- eprintln!("nbsh: {}", e);
- std::process::exit(1);
- }
- }
- }
-
match async_std::task::block_on(async_main()) {
- Ok(_) => (),
+ Ok(code) => {
+ std::process::exit(code);
+ }
Err(e) => {
eprintln!("nbsh: {}", e);
std::process::exit(1);
diff --git a/src/pipe.rs b/src/pipe.rs
index 77904f5..e75f010 100644
--- a/src/pipe.rs
+++ b/src/pipe.rs
@@ -1,16 +1,18 @@
-use std::io::Read as _;
+use async_std::io::ReadExt as _;
+use async_std::os::unix::process::CommandExt as _;
+use async_std::stream::StreamExt as _;
use std::os::unix::io::FromRawFd as _;
-use std::os::unix::process::CommandExt as _;
+use std::os::unix::process::ExitStatusExt as _;
const PID0: nix::unistd::Pid = nix::unistd::Pid::from_raw(0);
-pub fn run() -> anyhow::Result<i32> {
- let pipeline = read_pipeline()?;
+pub async fn run() -> anyhow::Result<i32> {
+ let pipeline = read_pipeline().await?;
let mut cmds: Vec<_> = pipeline
.exes()
.iter()
.map(|exe| {
- let mut cmd = std::process::Command::new(exe.exe());
+ let mut cmd = async_std::process::Command::new(exe.exe());
cmd.args(exe.args());
cmd
})
@@ -52,49 +54,44 @@ pub fn run() -> anyhow::Result<i32> {
// ensure that we don't keep the pipes open past when the children exit
drop(cmds);
- let last_pid = id_to_pid(children[children.len() - 1].id());
- let mut children: std::collections::HashMap<
- nix::unistd::Pid,
- std::process::Child,
- > = children
+ let mut final_status = None;
+
+ let count = children.len();
+ let mut children: futures_util::stream::FuturesUnordered<_> = children
.into_iter()
- .map(|child| (id_to_pid(child.id()), child))
+ .enumerate()
+ .map(|(i, child)| async move {
+ (child.status_no_drop().await, i == count - 1)
+ })
.collect();
- let mut final_code = None;
- let mut final_signal = None;
- while !children.is_empty() {
- match nix::sys::wait::waitpid(neg_pid(pg_pid), None)? {
- nix::sys::wait::WaitStatus::Exited(pid, code) => {
- if pid == last_pid {
- final_code = Some(code);
- }
- children.remove(&pid);
- }
- nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => {
- if signal == nix::sys::signal::Signal::SIGINT {
- nix::sys::signal::raise(nix::sys::signal::SIGINT)?;
- }
- if pid == last_pid {
- final_signal = Some(signal);
- }
- children.remove(&pid);
- }
- _ => {}
+ while let Some((status, last)) = children.next().await {
+ let status = status.unwrap_or_else(|_| {
+ async_std::process::ExitStatus::from_raw(1 << 8)
+ });
+ // this conversion is safe because the Signal enum is repr(i32)
+ #[allow(clippy::as_conversions)]
+ if status.signal() == Some(nix::sys::signal::Signal::SIGINT as i32) {
+ nix::sys::signal::raise(nix::sys::signal::Signal::SIGINT)?;
+ }
+ if last {
+ final_status = Some(status);
}
}
- if let Some(signal) = final_signal {
- nix::sys::signal::raise(signal)?;
+
+ let final_status = final_status.unwrap();
+ if let Some(signal) = final_status.signal() {
+ nix::sys::signal::raise(signal.try_into().unwrap())?;
}
- Ok(final_code.unwrap())
+ Ok(final_status.code().unwrap())
}
-fn read_pipeline() -> anyhow::Result<crate::parse::Pipeline> {
+async fn read_pipeline() -> anyhow::Result<crate::parse::Pipeline> {
// Safety: this code is only called by crate::history::run_pipeline, which
// passes data through on fd 3, and which will not spawn this process
// unless the pipe was successfully opened on that fd
- let mut fd3 = unsafe { std::fs::File::from_raw_fd(3) };
+ let mut fd3 = unsafe { async_std::fs::File::from_raw_fd(3) };
let mut pipeline = String::new();
- fd3.read_to_string(&mut pipeline)?;
+ fd3.read_to_string(&mut pipeline).await?;
let ast = crate::parse::Pipeline::parse(&pipeline)?;
Ok(ast)
}