From dfeb1dc518af88418f324a3878a802c39272f9b2 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 8 Jan 2022 17:42:05 -0500 Subject: also fix the other use of blocking::unblock --- src/pipeline/mod.rs | 114 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index b15e182..498b5f4 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -161,24 +161,36 @@ async fn wait_children( (child.status().await, i == count - 1) }) .collect(); - loop { - if children.is_empty() && builtins.is_empty() { - break; - } - let child = async { - Res::Child(if let Some(pg) = pg { - blocking::unblock(move || { + let (wait_w, wait_r) = async_std::channel::unbounded(); + let new_wait = move || { + if let Some(pg) = pg { + let wait_w = wait_w.clone(); + async_std::task::spawn(async move { + let res = blocking::unblock(move || { nix::sys::wait::waitpid( neg_pid(pg), Some(nix::sys::wait::WaitPidFlag::WUNTRACED), ) }) - .await - } else { - std::future::pending().await - }) - }; + .await; + if wait_w.is_closed() { + // we shouldn't be able to drop real process terminations + assert!(res.is_err()); + } else { + wait_w.send(res).await.unwrap(); + } + }); + } + }; + + new_wait(); + loop { + if children.is_empty() && builtins.is_empty() { + break; + } + + let child = async { Res::Child(wait_r.recv().await.unwrap()) }; let builtin = async { Res::Builtin(if builtins.is_empty() { std::future::pending().await @@ -187,48 +199,52 @@ async fn wait_children( }) }; match child.race(builtin).await { - Res::Child(Ok(status)) => match status { - // we can't call child.status() here to unify these branches - // because our waitpid call already collected the status - nix::sys::wait::WaitStatus::Exited(pid, code) => { - let (_, last) = children.remove(&pid).unwrap(); - if last { - final_status = Some( - std::process::ExitStatus::from_raw(code << 8), - ); - } - } - nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => { - let (_, last) = children.remove(&pid).unwrap(); - // this conversion is safe because the Signal enum is - // repr(i32) - #[allow(clippy::as_conversions)] - if last { - final_status = Some( - std::process::ExitStatus::from_raw(signal as i32), - ); + Res::Child(Ok(status)) => { + match status { + // we can't call child.status() here to unify these branches + // because our waitpid call already collected the status + nix::sys::wait::WaitStatus::Exited(pid, code) => { + let (_, last) = children.remove(&pid).unwrap(); + if last { + final_status = Some( + std::process::ExitStatus::from_raw(code << 8), + ); + } } - } - nix::sys::wait::WaitStatus::Stopped(pid, signal) => { - if signal == nix::sys::signal::Signal::SIGTSTP { - if let Err(e) = write_event( - shell_write, - Event::Suspend(env.idx()), - ) - .await - { - bail!(e); + nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => { + let (_, last) = children.remove(&pid).unwrap(); + // this conversion is safe because the Signal enum is + // repr(i32) + #[allow(clippy::as_conversions)] + if last { + final_status = + Some(std::process::ExitStatus::from_raw( + signal as i32, + )); } - if let Err(e) = nix::sys::signal::kill( - pid, - nix::sys::signal::Signal::SIGCONT, - ) { - bail!(e); + } + nix::sys::wait::WaitStatus::Stopped(pid, signal) => { + if signal == nix::sys::signal::Signal::SIGTSTP { + if let Err(e) = write_event( + shell_write, + Event::Suspend(env.idx()), + ) + .await + { + bail!(e); + } + if let Err(e) = nix::sys::signal::kill( + pid, + nix::sys::signal::Signal::SIGCONT, + ) { + bail!(e); + } } } + _ => {} } - _ => {} - }, + new_wait(); + } Res::Child(Err(e)) => { bail!(e); } -- cgit v1.2.3-54-g00ecf