From 193e1963afc4e9e78122573cd5b9831f9a847345 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 24 Feb 2022 16:34:56 -0500 Subject: go back to not using tokio::select! --- Cargo.toml | 3 +++ examples/tmux_impl/mod.rs | 39 ++++++++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d7b5be..da176ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,10 +27,13 @@ async = ["tokio"] [dev-dependencies] assert_cmd = "2.0.4" assert_fs = "1.0.7" +bytes = "1.1.0" escargot = "0.5.7" +futures = "0.3.21" libc = "0.2.119" pty-process = { version = "0.2.0", features = ["async"] } tokio = { version = "1.17.0", features = ["full"] } +tokio-util = { version = "0.7.0", features = ["io"] } [patch.crates-io] nix = { git = "https://github.com/nix-rust/nix" } diff --git a/examples/tmux_impl/mod.rs b/examples/tmux_impl/mod.rs index 313aaf2..e05ea51 100644 --- a/examples/tmux_impl/mod.rs +++ b/examples/tmux_impl/mod.rs @@ -1,5 +1,6 @@ +use futures::stream::StreamExt as _; use textmode::Textmode as _; -use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +use tokio::io::AsyncWriteExt as _; #[derive(Debug)] enum Command { @@ -153,7 +154,7 @@ impl State { pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut cmd = pty_process::Command::new("zsh"); let mut child = cmd.spawn(&pts).unwrap(); - let (mut pty_r, pty_w) = pty.into_split(); + let (pty_r, pty_w) = pty.into_split(); let vt = vt100::Parser::default(); let screen = vt.screen().clone(); let vt = std::sync::Arc::new(tokio::sync::Mutex::new(vt)); @@ -168,19 +169,31 @@ impl State { self.current_window = id; self.notify(&format!("created window {}", id)); tokio::task::spawn(async move { + enum Res { + Bytes(tokio::io::Result), + Done, + } + let _pts = pts; - let mut buf = [0_u8; 4096]; - loop { - tokio::select! { - bytes = pty_r.read(&mut buf) => match bytes { + + let mut stream: futures::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Bytes) + .boxed(), + futures::stream::once(child.wait()) + .map(|_| Res::Done) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { + Res::Bytes(bytes) => match bytes { Ok(bytes) => { - if bytes == 0 { + if bytes.is_empty() { continue; } - vt.clone() - .lock_owned() - .await - .process(&buf[..bytes]); + vt.clone().lock_owned().await.process(&bytes); notify.send(Event::Output).unwrap(); } Err(e) => { @@ -188,10 +201,10 @@ impl State { break; } }, - _ = child.wait() => { + Res::Done => { notify.send(Event::WindowExit(id)).unwrap(); break; - }, + } } } }); -- cgit v1.2.3-54-g00ecf