summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-01-08 04:16:35 -0500
committerJesse Luehrs <doy@tozt.net>2022-01-08 04:16:35 -0500
commit91418540883b5732f5af1060975d583043a5886d (patch)
treecc43f1059da9767c89eacffaa97b0cae1590f890
parent3809ca6a7e119bcd1c77a190380597e937605e9a (diff)
downloadnbsh-91418540883b5732f5af1060975d583043a5886d.tar.gz
nbsh-91418540883b5732f5af1060975d583043a5886d.zip
fix race condition when reading pipeline events
we can't actually create a new blocking::unblock in every loop iteration, because ones from the previous iteration are not actually cancelled when they are dropped. also if there is a lot of data to serialize, we might get the exit event before the read event
-rw-r--r--src/shell/history/mod.rs85
1 files changed, 55 insertions, 30 deletions
diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs
index 8ff9ccc..5b7f5f0 100644
--- a/src/shell/history/mod.rs
+++ b/src/shell/history/mod.rs
@@ -356,58 +356,83 @@ async fn run_pipeline(
to_w.write_all(&env.as_bytes()).await.unwrap();
drop(to_w);
+ let (read_w, read_r) = async_std::channel::unbounded();
+ let new_read = move || {
+ let read_w = read_w.clone();
+ async_std::task::spawn(async move {
+ let event = blocking::unblock(move || {
+ // Safety: from_r was just opened above and is only
+ // referenced in this closure, which takes ownership of it
+ // at the start and returns ownership of it at the end
+ let fh = unsafe { std::fs::File::from_raw_fd(from_r) };
+ let event = bincode::deserialize_from(&fh);
+ let _ = fh.into_raw_fd();
+ event
+ })
+ .await;
+ if read_w.is_closed() {
+ // we should never drop read_r while there are still valid
+ // things to read
+ assert!(event.is_err());
+ } else {
+ read_w.send(event).await.unwrap();
+ }
+ });
+ };
+
+ new_read();
+ let mut read_done = false;
+ let mut exit_done = None;
loop {
enum Res {
Read(bincode::Result<crate::pipeline::Event>),
Exit(std::io::Result<std::process::ExitStatus>),
}
- let read = async move {
- Res::Read(
- blocking::unblock(move || {
- // Safety: from_r was just opened above and is only
- // referenced in this closure, which takes ownership of it
- // at the start and returns ownership of it at the end
- let fh = unsafe { std::fs::File::from_raw_fd(from_r) };
- let event = bincode::deserialize_from(&fh);
- let _ = fh.into_raw_fd();
- event
- })
- .await,
- )
- };
+ let read_r = read_r.clone();
+ let read = async move { Res::Read(read_r.recv().await.unwrap()) };
let exit = async { Res::Exit(child.status_no_drop().await) };
match read.or(exit).await {
- Res::Read(Ok(event)) => match event {
- crate::pipeline::Event::Suspend(idx) => {
- event_w.send(Event::ChildSuspend(idx)).await.unwrap();
+ Res::Read(Ok(event)) => {
+ match event {
+ crate::pipeline::Event::Suspend(idx) => {
+ event_w.send(Event::ChildSuspend(idx)).await.unwrap();
+ }
+ crate::pipeline::Event::Exit(new_env) => {
+ *env = new_env;
+ }
}
- crate::pipeline::Event::Exit(new_env) => *env = new_env,
- },
+ new_read();
+ }
Res::Read(Err(e)) => {
if let bincode::ErrorKind::Io(e) = &*e {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
+ read_done = true;
continue;
}
}
anyhow::bail!(e);
}
Res::Exit(Ok(status)) => {
- // nix::sys::signal::Signal is repr(i32)
- #[allow(clippy::as_conversions)]
- return Ok((
- status,
- // i'm not sure what exactly the expected behavior here is
- // - in zsh, SIGINT kills the whole command line while
- // SIGTERM doesn't, but i don't know what the precise
- // logic is or how other signals are handled
- status.signal()
- == Some(nix::sys::signal::Signal::SIGINT as i32),
- ));
+ exit_done = Some(status);
}
Res::Exit(Err(e)) => {
anyhow::bail!(e);
}
}
+ if let (true, Some(status)) = (read_done, exit_done) {
+ // nix::sys::signal::Signal is repr(i32)
+ #[allow(clippy::as_conversions)]
+ return Ok((
+ status,
+ // i'm not sure what exactly the expected behavior
+ // here is - in zsh, SIGINT kills the whole command
+ // line while SIGTERM doesn't, but i don't know what
+ // the precise logic is or how other signals are
+ // handled
+ status.signal()
+ == Some(nix::sys::signal::Signal::SIGINT as i32),
+ ));
+ }
}
}