From 22b0f525e36ea8bb9602ecc5e1a2dc4595b894a3 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Mon, 3 Jan 2022 08:12:49 -0500 Subject: restart stopped processes eventually we will want to do something better here, but for now this at least avoids deadlocks --- src/pipeline/mod.rs | 164 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 32 deletions(-) (limited to 'src/pipeline/mod.rs') diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 1f9a607..669d501 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,5 +1,6 @@ use async_std::io::ReadExt as _; use async_std::stream::StreamExt as _; +use futures_lite::future::FutureExt as _; use std::os::unix::io::FromRawFd as _; use std::os::unix::process::ExitStatusExt as _; @@ -11,37 +12,12 @@ pub use command::{Child, Command}; pub async fn run() -> anyhow::Result { let (code, pipeline) = read_data().await?; let env = crate::env::Env::new(code); - let children = spawn_children(pipeline, &env)?; - let count = children.len(); - - let mut children: futures_util::stream::FuturesUnordered<_> = - children - .into_iter() - .enumerate() - .map(|(i, child)| async move { - (child.status().await, i == count - 1) - }) - .collect(); - let mut final_status = None; - while let Some((status, last)) = children.next().await { - let status = status.unwrap_or_else(|_| { - async_std::process::ExitStatus::from_raw(1 << 8) - }); - // this conversion is safe because the Signal enum is repr(i32) - #[allow(clippy::as_conversions)] - if status.signal() == Some(nix::sys::signal::Signal::SIGINT as i32) { - nix::sys::signal::raise(nix::sys::signal::Signal::SIGINT)?; - } - if last { - final_status = Some(status); - } - } - - let final_status = final_status.unwrap(); - if let Some(signal) = final_status.signal() { + let (children, pg) = spawn_children(pipeline, &env)?; + let status = wait_children(children, pg).await; + if let Some(signal) = status.signal() { nix::sys::signal::raise(signal.try_into().unwrap())?; } - Ok(final_status.code().unwrap()) + Ok(status.code().unwrap()) } async fn read_data() -> anyhow::Result<(i32, crate::parse::Pipeline)> { @@ -61,7 +37,7 @@ async fn read_data() -> anyhow::Result<(i32, crate::parse::Pipeline)> { fn spawn_children( pipeline: crate::parse::Pipeline, env: &crate::env::Env, -) -> anyhow::Result> { +) -> anyhow::Result<(Vec, Option)> { let mut cmds: Vec<_> = pipeline.into_exes().map(Command::new).collect(); for i in 0..(cmds.len() - 1) { let (r, w) = pipe()?; @@ -71,7 +47,7 @@ fn spawn_children( let mut children = vec![]; let mut pg_pid = None; - for mut cmd in cmds.drain(..) { + for mut cmd in cmds { // Safety: setpgid is an async-signal-safe function unsafe { cmd.pre_exec(move || { @@ -90,7 +66,131 @@ fn spawn_children( } children.push(child); } - Ok(children) + Ok((children, pg_pid)) +} + +async fn wait_children( + children: Vec>, + pg: Option, +) -> std::process::ExitStatus { + enum Res { + Child(nix::Result), + Builtin(Option<(anyhow::Result, bool)>), + } + + macro_rules! bail { + ($msg:expr) => { + eprintln!("{}", $msg); + return std::process::ExitStatus::from_raw(1 << 8); + }; + } + + let mut final_status = None; + + let count = children.len(); + let (children, builtins): (Vec<_>, Vec<_>) = children + .into_iter() + .enumerate() + .partition(|(_, child)| child.id().is_some()); + let mut children: std::collections::HashMap<_, _> = children + .into_iter() + .map(|(i, child)| { + (id_to_pid(child.id().unwrap()), (child, i == count - 1)) + }) + .collect(); + let mut builtins: futures_util::stream::FuturesUnordered<_> = + builtins + .into_iter() + .map(|(i, child)| async move { + (child.status().await, i == count - 1) + }) + .collect(); + loop { + if children.is_empty() && builtins.is_empty() { + break; + } + + let child = async { + Res::Child(if let Some(pg) = pg { + blocking::unblock(move || { + nix::sys::wait::waitpid( + neg_pid(pg), + Some(nix::sys::wait::WaitPidFlag::WUNTRACED), + ) + }) + .await + } else { + std::future::pending().await + }) + }; + let builtin = async { + Res::Builtin(if builtins.is_empty() { + std::future::pending().await + } else { + builtins.next().await + }) + }; + match child.race(builtin).await { + Res::Child(Ok(status)) => match status { + nix::sys::wait::WaitStatus::Exited(pid, code) => { + let (_, last) = children.remove(&pid).unwrap(); + if last { + final_status = Some( + std::process::ExitStatus::from_raw(code << 8), + ); + } + } + nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => { + let (_, last) = children.remove(&pid).unwrap(); + // this conversion is safe because the Signal enum is + // repr(i32) + #[allow(clippy::as_conversions)] + if last { + final_status = Some( + std::process::ExitStatus::from_raw(signal as i32), + ); + } + } + nix::sys::wait::WaitStatus::Stopped(pid, signal) => { + if signal == nix::sys::signal::Signal::SIGTSTP { + // TODO: notify the main shell process and have it + // refocus the readline + if let Err(e) = nix::sys::signal::kill( + pid, + nix::sys::signal::Signal::SIGCONT, + ) { + bail!(e); + } + } + } + _ => {} + }, + Res::Child(Err(e)) => { + bail!(e); + } + Res::Builtin(Some((Ok(status), last))) => { + // this conversion is safe because the Signal enum is + // repr(i32) + #[allow(clippy::as_conversions)] + if status.signal() + == Some(nix::sys::signal::Signal::SIGINT as i32) + { + if let Err(e) = nix::sys::signal::raise( + nix::sys::signal::Signal::SIGINT, + ) { + bail!(e); + } + } + if last { + final_status = Some(status); + } + } + Res::Builtin(Some((Err(e), _))) => {} + Res::Builtin(None) => {} + } + } + + final_status.unwrap() } fn pipe() -> anyhow::Result<(std::fs::File, std::fs::File)> { -- cgit v1.2.3-54-g00ecf