From 5cf20a142ef667b9664dd2a2dc3bb26c7562c9a0 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sun, 9 Jun 2019 06:53:24 -0400 Subject: pass input events through to the running process --- Cargo.lock | 1 + Cargo.toml | 1 + examples/cooked.rs | 19 +++++++++++ examples/raw.rs | 20 ++++++++++++ src/process.rs | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 133 insertions(+) create mode 100644 examples/cooked.rs create mode 100644 examples/raw.rs diff --git a/Cargo.lock b/Cargo.lock index 2309b4c..095c97d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,7 @@ version = "0.1.0" dependencies = [ "crossterm 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-pty-process 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index a9cb7e1..2a6a61b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,6 @@ edition = "2018" [dependencies] crossterm = "0.9" futures = "0.1" +mio = "0.6" tokio = "0.1" tokio-pty-process = "0.4" diff --git a/examples/cooked.rs b/examples/cooked.rs new file mode 100644 index 0000000..cf0384c --- /dev/null +++ b/examples/cooked.rs @@ -0,0 +1,19 @@ +use std::io::Read; + +fn main() { + loop { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + let mut buf = [0; 1]; + let n = stdin.read(&mut buf).unwrap(); + if n > 0 { + eprint!("got {}\r\n", buf[0]); + if buf[0] == 4 { + break; + } + } else { + eprint!("got no bytes\r\n"); + break; + } + } +} diff --git a/examples/raw.rs b/examples/raw.rs new file mode 100644 index 0000000..5890a99 --- /dev/null +++ b/examples/raw.rs @@ -0,0 +1,20 @@ +use std::io::Read; + +fn main() { + let _screen = crossterm::RawScreen::into_raw_mode().unwrap(); + loop { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + let mut buf = [0; 1]; + let n = stdin.read(&mut buf).unwrap(); + if n > 0 { + eprint!("got {}\r\n", buf[0]); + if buf[0] == 4 { + break; + } + } else { + eprint!("got no bytes\r\n"); + break; + } + } +} diff --git a/src/process.rs b/src/process.rs index 70e1178..0c7b61f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,5 @@ use futures::future::Future; +use std::io::{Read, Write}; use tokio::io::AsyncRead; use tokio_pty_process::CommandExt; @@ -19,9 +20,13 @@ pub enum ProcessEvent { pub struct RunningProcess { pty: tokio_pty_process::AsyncPtyMaster, process: tokio_pty_process::Child, + // TODO: tokio::io::Stdin is broken + // input: tokio::io::Stdin, + input: tokio::reactor::PollEvented2, buf: Vec, output_done: bool, exit_done: bool, + _screen: crossterm::RawScreen, } impl RunningProcess { @@ -36,12 +41,18 @@ impl RunningProcess { .spawn_pty_async(&pty) .map_err(|e| Error::IOError(e))?; + // TODO: tokio::io::stdin is broken (it's blocking) + // let input = tokio::io::stdin(); + let input = tokio::reactor::PollEvented2::new(EventedStdin); + Ok(RunningProcess { pty, process, + input, buf: Vec::with_capacity(4096), output_done: false, exit_done: false, + _screen: crossterm::RawScreen::into_raw_mode().unwrap(), }) } } @@ -52,6 +63,41 @@ impl futures::stream::Stream for RunningProcess { type Error = Error; fn poll(&mut self) -> futures::Poll, Self::Error> { + let ready = mio::Ready::readable(); + let input_poll = self.input.poll_read_ready(ready); + match input_poll { + Ok(futures::Async::Ready(_)) => { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + let mut buf = vec![0; 4096]; + // TODO: async + match stdin.read(&mut buf) { + Ok(n) => { + if n > 0 { + let bytes = buf[..n].to_vec(); + + // TODO: async + let res = self.pty.write_all(&bytes); + if let Err(e) = res { + return Err(Error::IOError(e)); + } + } + } + Err(e) => { + return Err(Error::IOError(e)); + } + } + } + _ => {} + } + // TODO: this could lose pending bytes if there is stuff to read in + // the buffer but we don't read it all in the previous read call, + // since i think we won't get another notification until new bytes + // actually arrive even if there are bytes in the buffer + if let Err(e) = self.input.clear_read_ready(ready) { + return Err(Error::IOError(e)); + } + if !self.output_done { self.buf.clear(); let output_poll = self @@ -61,6 +107,18 @@ impl futures::stream::Stream for RunningProcess { match output_poll { Ok(futures::Async::Ready(n)) => { let bytes = self.buf[..n].to_vec(); + let bytes: Vec<_> = bytes + .iter() + // replace \n with \r\n + .fold(vec![], |mut acc, &c| { + if c == b'\n' { + acc.push(b'\r'); + acc.push(b'\n'); + } else { + acc.push(c); + } + acc + }); return Ok(futures::Async::Ready(Some( ProcessEvent::Output(bytes), ))); @@ -96,3 +154,37 @@ impl futures::stream::Stream for RunningProcess { Ok(futures::Async::Ready(None)) } } + +struct EventedStdin; + +impl mio::Evented for EventedStdin { + fn register( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> std::io::Result<()> { + let fd = 0 as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> std::io::Result<()> { + let fd = 0 as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> { + let fd = 0 as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.deregister(poll) + } +} -- cgit v1.2.3-54-g00ecf