From aa08ecf2c465260048c7e5eac86a47f201e635c7 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 8 Mar 2023 14:10:48 -0500 Subject: remove more select! use --- Cargo.lock | 33 +++++++++++++++++++++++++ Cargo.toml | 3 +++ src/bin/ttyrec/main.rs | 65 +++++++++++++++++++++++++++++++++----------------- 3 files changed, 79 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d36d03..2228a65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,12 @@ dependencies = [ "syn", ] +[[package]] +name = "futures-sink" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" + [[package]] name = "futures-task" version = "0.3.26" @@ -520,6 +526,30 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "ttyrec" version = "0.4.0" @@ -534,6 +564,7 @@ name = "ttyrec-bin" version = "0.1.3" dependencies = [ "anyhow", + "bytes", "clap", "futures-util", "libc", @@ -541,6 +572,8 @@ dependencies = [ "terminal_size", "textmode", "tokio", + "tokio-stream", + "tokio-util", "ttyrec", "vt100", ] diff --git a/Cargo.toml b/Cargo.toml index 0cdd3f9..1defd82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ include = ["src/**/*", "LICENSE", "README.md", "CHANGELOG.md"] [dependencies] anyhow = "1.0.69" +bytes = "1.4.0" clap = { version = "4.1.8", features = ["wrap_help", "derive"] } futures-util = "0.3.26" libc = "0.2.139" @@ -21,5 +22,7 @@ pty-process = { version = "0.3.0", features = ["async"] } terminal_size = "0.2.5" textmode = { version = "0.4.0", features = ["async"] } tokio = { version = "1.26.0", features = ["full"] } +tokio-stream = { version = "0.1.12", features = ["io-util"] } +tokio-util = { version = "0.7.7", features = ["io"] } ttyrec = { version = "0.4.0", features = ["async"] } vt100 = "0.15.2" diff --git a/src/bin/ttyrec/main.rs b/src/bin/ttyrec/main.rs index 29cf09f..a76119a 100644 --- a/src/bin/ttyrec/main.rs +++ b/src/bin/ttyrec/main.rs @@ -12,7 +12,8 @@ #![allow(clippy::type_complexity)] use clap::Parser as _; -use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +use futures_util::StreamExt as _; +use tokio::io::AsyncWriteExt as _; #[derive(Debug, clap::Parser)] #[command( @@ -56,7 +57,7 @@ fn get_cmd( #[derive(Debug)] enum Event { Key(textmode::Result>), - Stdout(std::io::Result>), + Stdout(std::io::Result), Resize((u16, u16)), Error(anyhow::Error), Quit, @@ -83,8 +84,8 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { let mut child = pty_process::Command::new(cmd).args(args).spawn(&pts)?; let (event_w, mut event_r) = tokio::sync::mpsc::unbounded_channel(); - let (input_w, mut input_r) = tokio::sync::mpsc::unbounded_channel(); - let (resize_w, mut resize_r) = tokio::sync::mpsc::unbounded_channel(); + let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); + let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); { let mut signals = tokio::signal::unix::signal( @@ -123,27 +124,49 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { { let event_w = event_w.clone(); - #[allow(clippy::redundant_pub_crate)] tokio::task::spawn(async move { - loop { - let mut buf = [0_u8; 4096]; - tokio::select! { - res = pty.read(&mut buf) => { - let res = res.map(|n| buf[..n].to_vec()); + enum Res { + Read(std::io::Result), + Input(Vec), + Resize((u16, u16)), + Exit(std::io::Result), + } + + let (pty_r, mut pty_w) = pty.split(); + + let mut select: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(input_r) + .map(Res::Input) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new( + resize_r, + ) + .map(Res::Resize) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + + while let Some(res) = select.next().await { + match res { + Res::Read(res) => { let err = res.is_err(); event_w .send(Event::Stdout(res)) // event_w is never closed, so this can never fail .unwrap(); if err { - eprintln!("pty read failed: {err}"); break; } } - res = input_r.recv() => { - // input_r is never closed, so this can never fail - let bytes: Vec = res.unwrap(); - if let Err(e) = pty.write(&bytes).await { + Res::Input(bytes) => { + if let Err(e) = pty_w.write(&bytes).await { event_w .send(Event::Error(anyhow::anyhow!(e))) // event_w is never closed, so this can never @@ -151,12 +174,10 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { .unwrap(); } } - res = resize_r.recv() => { - // resize_r is never closed, so this can never fail - let size: (u16, u16) = res.unwrap(); - if let Err(e) = pty.resize( - pty_process::Size::new(size.0, size.1), - ) { + Res::Resize((rows, cols)) => { + if let Err(e) = + pty_w.resize(pty_process::Size::new(rows, cols)) + { event_w .send(Event::Error(anyhow::anyhow!(e))) // event_w is never closed, so this can never @@ -164,7 +185,7 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { .unwrap(); } } - _ = child.wait() => { + Res::Exit(_) => { event_w.send(Event::Quit).unwrap(); break; } -- cgit v1.2.3-54-g00ecf