summaryrefslogtreecommitdiffstats
path: root/src/pipeline/mod.rs
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-01-03 08:12:49 -0500
committerJesse Luehrs <doy@tozt.net>2022-01-03 08:14:02 -0500
commit22b0f525e36ea8bb9602ecc5e1a2dc4595b894a3 (patch)
tree6eaf80b333b3c1738ade5f9b800a655b0ed463da /src/pipeline/mod.rs
parent6c79e520f02e7d05f389db5856fda250437a563f (diff)
downloadnbsh-22b0f525e36ea8bb9602ecc5e1a2dc4595b894a3.tar.gz
nbsh-22b0f525e36ea8bb9602ecc5e1a2dc4595b894a3.zip
restart stopped processes
eventually we will want to do something better here, but for now this at least avoids deadlocks
Diffstat (limited to 'src/pipeline/mod.rs')
-rw-r--r--src/pipeline/mod.rs164
1 files changed, 132 insertions, 32 deletions
diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs
index 1f9a607..669d501 100644
--- a/src/pipeline/mod.rs
+++ b/src/pipeline/mod.rs
@@ -1,5 +1,6 @@
use async_std::io::ReadExt as _;
use async_std::stream::StreamExt as _;
+use futures_lite::future::FutureExt as _;
use std::os::unix::io::FromRawFd as _;
use std::os::unix::process::ExitStatusExt as _;
@@ -11,37 +12,12 @@ pub use command::{Child, Command};
pub async fn run() -> anyhow::Result<i32> {
let (code, pipeline) = read_data().await?;
let env = crate::env::Env::new(code);
- let children = spawn_children(pipeline, &env)?;
- let count = children.len();
-
- let mut children: futures_util::stream::FuturesUnordered<_> =
- children
- .into_iter()
- .enumerate()
- .map(|(i, child)| async move {
- (child.status().await, i == count - 1)
- })
- .collect();
- let mut final_status = None;
- while let Some((status, last)) = children.next().await {
- let status = status.unwrap_or_else(|_| {
- async_std::process::ExitStatus::from_raw(1 << 8)
- });
- // this conversion is safe because the Signal enum is repr(i32)
- #[allow(clippy::as_conversions)]
- if status.signal() == Some(nix::sys::signal::Signal::SIGINT as i32) {
- nix::sys::signal::raise(nix::sys::signal::Signal::SIGINT)?;
- }
- if last {
- final_status = Some(status);
- }
- }
-
- let final_status = final_status.unwrap();
- if let Some(signal) = final_status.signal() {
+ let (children, pg) = spawn_children(pipeline, &env)?;
+ let status = wait_children(children, pg).await;
+ if let Some(signal) = status.signal() {
nix::sys::signal::raise(signal.try_into().unwrap())?;
}
- Ok(final_status.code().unwrap())
+ Ok(status.code().unwrap())
}
async fn read_data() -> anyhow::Result<(i32, crate::parse::Pipeline)> {
@@ -61,7 +37,7 @@ async fn read_data() -> anyhow::Result<(i32, crate::parse::Pipeline)> {
fn spawn_children(
pipeline: crate::parse::Pipeline,
env: &crate::env::Env,
-) -> anyhow::Result<Vec<Child>> {
+) -> anyhow::Result<(Vec<Child>, Option<nix::unistd::Pid>)> {
let mut cmds: Vec<_> = pipeline.into_exes().map(Command::new).collect();
for i in 0..(cmds.len() - 1) {
let (r, w) = pipe()?;
@@ -71,7 +47,7 @@ fn spawn_children(
let mut children = vec![];
let mut pg_pid = None;
- for mut cmd in cmds.drain(..) {
+ for mut cmd in cmds {
// Safety: setpgid is an async-signal-safe function
unsafe {
cmd.pre_exec(move || {
@@ -90,7 +66,131 @@ fn spawn_children(
}
children.push(child);
}
- Ok(children)
+ Ok((children, pg_pid))
+}
+
+async fn wait_children(
+ children: Vec<Child<'_>>,
+ pg: Option<nix::unistd::Pid>,
+) -> std::process::ExitStatus {
+ enum Res {
+ Child(nix::Result<nix::sys::wait::WaitStatus>),
+ Builtin(Option<(anyhow::Result<std::process::ExitStatus>, bool)>),
+ }
+
+ macro_rules! bail {
+ ($msg:expr) => {
+ eprintln!("{}", $msg);
+ return std::process::ExitStatus::from_raw(1 << 8);
+ };
+ }
+
+ let mut final_status = None;
+
+ let count = children.len();
+ let (children, builtins): (Vec<_>, Vec<_>) = children
+ .into_iter()
+ .enumerate()
+ .partition(|(_, child)| child.id().is_some());
+ let mut children: std::collections::HashMap<_, _> = children
+ .into_iter()
+ .map(|(i, child)| {
+ (id_to_pid(child.id().unwrap()), (child, i == count - 1))
+ })
+ .collect();
+ let mut builtins: futures_util::stream::FuturesUnordered<_> =
+ builtins
+ .into_iter()
+ .map(|(i, child)| async move {
+ (child.status().await, i == count - 1)
+ })
+ .collect();
+ loop {
+ if children.is_empty() && builtins.is_empty() {
+ break;
+ }
+
+ let child = async {
+ Res::Child(if let Some(pg) = pg {
+ blocking::unblock(move || {
+ nix::sys::wait::waitpid(
+ neg_pid(pg),
+ Some(nix::sys::wait::WaitPidFlag::WUNTRACED),
+ )
+ })
+ .await
+ } else {
+ std::future::pending().await
+ })
+ };
+ let builtin = async {
+ Res::Builtin(if builtins.is_empty() {
+ std::future::pending().await
+ } else {
+ builtins.next().await
+ })
+ };
+ match child.race(builtin).await {
+ Res::Child(Ok(status)) => match status {
+ nix::sys::wait::WaitStatus::Exited(pid, code) => {
+ let (_, last) = children.remove(&pid).unwrap();
+ if last {
+ final_status = Some(
+ std::process::ExitStatus::from_raw(code << 8),
+ );
+ }
+ }
+ nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => {
+ let (_, last) = children.remove(&pid).unwrap();
+ // this conversion is safe because the Signal enum is
+ // repr(i32)
+ #[allow(clippy::as_conversions)]
+ if last {
+ final_status = Some(
+ std::process::ExitStatus::from_raw(signal as i32),
+ );
+ }
+ }
+ nix::sys::wait::WaitStatus::Stopped(pid, signal) => {
+ if signal == nix::sys::signal::Signal::SIGTSTP {
+ // TODO: notify the main shell process and have it
+ // refocus the readline
+ if let Err(e) = nix::sys::signal::kill(
+ pid,
+ nix::sys::signal::Signal::SIGCONT,
+ ) {
+ bail!(e);
+ }
+ }
+ }
+ _ => {}
+ },
+ Res::Child(Err(e)) => {
+ bail!(e);
+ }
+ Res::Builtin(Some((Ok(status), last))) => {
+ // this conversion is safe because the Signal enum is
+ // repr(i32)
+ #[allow(clippy::as_conversions)]
+ if status.signal()
+ == Some(nix::sys::signal::Signal::SIGINT as i32)
+ {
+ if let Err(e) = nix::sys::signal::raise(
+ nix::sys::signal::Signal::SIGINT,
+ ) {
+ bail!(e);
+ }
+ }
+ if last {
+ final_status = Some(status);
+ }
+ }
+ Res::Builtin(Some((Err(e), _))) => {}
+ Res::Builtin(None) => {}
+ }
+ }
+
+ final_status.unwrap()
}
fn pipe() -> anyhow::Result<(std::fs::File, std::fs::File)> {