From 91418540883b5732f5af1060975d583043a5886d Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 8 Jan 2022 04:16:35 -0500 Subject: fix race condition when reading pipeline events we can't actually create a new blocking::unblock in every loop iteration, because ones from the previous iteration are not actually cancelled when they are dropped. also if there is a lot of data to serialize, we might get the exit event before the read event --- src/shell/history/mod.rs | 85 +++++++++++++++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 30 deletions(-) (limited to 'src/shell/history/mod.rs') diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 8ff9ccc..5b7f5f0 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -356,58 +356,83 @@ async fn run_pipeline( to_w.write_all(&env.as_bytes()).await.unwrap(); drop(to_w); + let (read_w, read_r) = async_std::channel::unbounded(); + let new_read = move || { + let read_w = read_w.clone(); + async_std::task::spawn(async move { + let event = blocking::unblock(move || { + // Safety: from_r was just opened above and is only + // referenced in this closure, which takes ownership of it + // at the start and returns ownership of it at the end + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; + let event = bincode::deserialize_from(&fh); + let _ = fh.into_raw_fd(); + event + }) + .await; + if read_w.is_closed() { + // we should never drop read_r while there are still valid + // things to read + assert!(event.is_err()); + } else { + read_w.send(event).await.unwrap(); + } + }); + }; + + new_read(); + let mut read_done = false; + let mut exit_done = None; loop { enum Res { Read(bincode::Result), Exit(std::io::Result), } - let read = async move { - Res::Read( - blocking::unblock(move || { - // Safety: from_r was just opened above and is only - // referenced in this closure, which takes ownership of it - // at the start and returns ownership of it at the end - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - let event = bincode::deserialize_from(&fh); - let _ = fh.into_raw_fd(); - event - }) - .await, - ) - }; + let read_r = read_r.clone(); + let read = async move { Res::Read(read_r.recv().await.unwrap()) }; let exit = async { Res::Exit(child.status_no_drop().await) }; match read.or(exit).await { - Res::Read(Ok(event)) => match event { - crate::pipeline::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).await.unwrap(); + Res::Read(Ok(event)) => { + match event { + crate::pipeline::Event::Suspend(idx) => { + event_w.send(Event::ChildSuspend(idx)).await.unwrap(); + } + crate::pipeline::Event::Exit(new_env) => { + *env = new_env; + } } - crate::pipeline::Event::Exit(new_env) => *env = new_env, - }, + new_read(); + } Res::Read(Err(e)) => { if let bincode::ErrorKind::Io(e) = &*e { if e.kind() == std::io::ErrorKind::UnexpectedEof { + read_done = true; continue; } } anyhow::bail!(e); } Res::Exit(Ok(status)) => { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - return Ok(( - status, - // i'm not sure what exactly the expected behavior here is - // - in zsh, SIGINT kills the whole command line while - // SIGTERM doesn't, but i don't know what the precise - // logic is or how other signals are handled - status.signal() - == Some(nix::sys::signal::Signal::SIGINT as i32), - )); + exit_done = Some(status); } Res::Exit(Err(e)) => { anyhow::bail!(e); } } + if let (true, Some(status)) = (read_done, exit_done) { + // nix::sys::signal::Signal is repr(i32) + #[allow(clippy::as_conversions)] + return Ok(( + status, + // i'm not sure what exactly the expected behavior + // here is - in zsh, SIGINT kills the whole command + // line while SIGTERM doesn't, but i don't know what + // the precise logic is or how other signals are + // handled + status.signal() + == Some(nix::sys::signal::Signal::SIGINT as i32), + )); + } } } -- cgit v1.2.3-54-g00ecf