aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2023-03-08 14:10:48 -0500
committerJesse Luehrs <doy@tozt.net>2023-03-08 14:10:48 -0500
commitaa08ecf2c465260048c7e5eac86a47f201e635c7 (patch)
treed309cec31548cc7125b951ca871e1d7b02baa680
parentc7729f3522701248e1584e13ed08ec677aa42580 (diff)
downloadttyrec-bin-aa08ecf2c465260048c7e5eac86a47f201e635c7.tar.gz
ttyrec-bin-aa08ecf2c465260048c7e5eac86a47f201e635c7.zip
remove more select! use
-rw-r--r--Cargo.lock33
-rw-r--r--Cargo.toml3
-rw-r--r--src/bin/ttyrec/main.rs65
3 files changed, 79 insertions, 22 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4d36d03..2228a65 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -121,6 +121,12 @@ dependencies = [
]
[[package]]
+name = "futures-sink"
+version = "0.3.26"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364"
+
+[[package]]
name = "futures-task"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -521,6 +527,30 @@ dependencies = [
]
[[package]]
+name = "tokio-stream"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
+name = "tokio-util"
+version = "0.7.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
name = "ttyrec"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -534,6 +564,7 @@ name = "ttyrec-bin"
version = "0.1.3"
dependencies = [
"anyhow",
+ "bytes",
"clap",
"futures-util",
"libc",
@@ -541,6 +572,8 @@ dependencies = [
"terminal_size",
"textmode",
"tokio",
+ "tokio-stream",
+ "tokio-util",
"ttyrec",
"vt100",
]
diff --git a/Cargo.toml b/Cargo.toml
index 0cdd3f9..1defd82 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,6 +14,7 @@ include = ["src/**/*", "LICENSE", "README.md", "CHANGELOG.md"]
[dependencies]
anyhow = "1.0.69"
+bytes = "1.4.0"
clap = { version = "4.1.8", features = ["wrap_help", "derive"] }
futures-util = "0.3.26"
libc = "0.2.139"
@@ -21,5 +22,7 @@ pty-process = { version = "0.3.0", features = ["async"] }
terminal_size = "0.2.5"
textmode = { version = "0.4.0", features = ["async"] }
tokio = { version = "1.26.0", features = ["full"] }
+tokio-stream = { version = "0.1.12", features = ["io-util"] }
+tokio-util = { version = "0.7.7", features = ["io"] }
ttyrec = { version = "0.4.0", features = ["async"] }
vt100 = "0.15.2"
diff --git a/src/bin/ttyrec/main.rs b/src/bin/ttyrec/main.rs
index 29cf09f..a76119a 100644
--- a/src/bin/ttyrec/main.rs
+++ b/src/bin/ttyrec/main.rs
@@ -12,7 +12,8 @@
#![allow(clippy::type_complexity)]
use clap::Parser as _;
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
+use futures_util::StreamExt as _;
+use tokio::io::AsyncWriteExt as _;
#[derive(Debug, clap::Parser)]
#[command(
@@ -56,7 +57,7 @@ fn get_cmd(
#[derive(Debug)]
enum Event {
Key(textmode::Result<Option<textmode::Key>>),
- Stdout(std::io::Result<Vec<u8>>),
+ Stdout(std::io::Result<bytes::Bytes>),
Resize((u16, u16)),
Error(anyhow::Error),
Quit,
@@ -83,8 +84,8 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> {
let mut child = pty_process::Command::new(cmd).args(args).spawn(&pts)?;
let (event_w, mut event_r) = tokio::sync::mpsc::unbounded_channel();
- let (input_w, mut input_r) = tokio::sync::mpsc::unbounded_channel();
- let (resize_w, mut resize_r) = tokio::sync::mpsc::unbounded_channel();
+ let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel();
+ let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel();
{
let mut signals = tokio::signal::unix::signal(
@@ -123,27 +124,49 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> {
{
let event_w = event_w.clone();
- #[allow(clippy::redundant_pub_crate)]
tokio::task::spawn(async move {
- loop {
- let mut buf = [0_u8; 4096];
- tokio::select! {
- res = pty.read(&mut buf) => {
- let res = res.map(|n| buf[..n].to_vec());
+ enum Res {
+ Read(std::io::Result<bytes::Bytes>),
+ Input(Vec<u8>),
+ Resize((u16, u16)),
+ Exit(std::io::Result<std::process::ExitStatus>),
+ }
+
+ let (pty_r, mut pty_w) = pty.split();
+
+ let mut select: futures_util::stream::SelectAll<_> = [
+ tokio_util::io::ReaderStream::new(pty_r)
+ .map(Res::Read)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(input_r)
+ .map(Res::Input)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(
+ resize_r,
+ )
+ .map(Res::Resize)
+ .boxed(),
+ futures_util::stream::once(child.wait())
+ .map(Res::Exit)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+
+ while let Some(res) = select.next().await {
+ match res {
+ Res::Read(res) => {
let err = res.is_err();
event_w
.send(Event::Stdout(res))
// event_w is never closed, so this can never fail
.unwrap();
if err {
- eprintln!("pty read failed: {err}");
break;
}
}
- res = input_r.recv() => {
- // input_r is never closed, so this can never fail
- let bytes: Vec<u8> = res.unwrap();
- if let Err(e) = pty.write(&bytes).await {
+ Res::Input(bytes) => {
+ if let Err(e) = pty_w.write(&bytes).await {
event_w
.send(Event::Error(anyhow::anyhow!(e)))
// event_w is never closed, so this can never
@@ -151,12 +174,10 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> {
.unwrap();
}
}
- res = resize_r.recv() => {
- // resize_r is never closed, so this can never fail
- let size: (u16, u16) = res.unwrap();
- if let Err(e) = pty.resize(
- pty_process::Size::new(size.0, size.1),
- ) {
+ Res::Resize((rows, cols)) => {
+ if let Err(e) =
+ pty_w.resize(pty_process::Size::new(rows, cols))
+ {
event_w
.send(Event::Error(anyhow::anyhow!(e)))
// event_w is never closed, so this can never
@@ -164,7 +185,7 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> {
.unwrap();
}
}
- _ = child.wait() => {
+ Res::Exit(_) => {
event_w.send(Event::Quit).unwrap();
break;
}