aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-06-09 06:53:24 -0400
committerJesse Luehrs <doy@tozt.net>2019-06-09 06:53:24 -0400
commit5cf20a142ef667b9664dd2a2dc3bb26c7562c9a0 (patch)
tree0bfea46547add38d6daeac47d7ccf383a079443f
parente842fef74b21d74bf4b731f67c05a1192092ee50 (diff)
downloadnbsh-old-5cf20a142ef667b9664dd2a2dc3bb26c7562c9a0.tar.gz
nbsh-old-5cf20a142ef667b9664dd2a2dc3bb26c7562c9a0.zip
pass input events through to the running process
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--examples/cooked.rs19
-rw-r--r--examples/raw.rs20
-rw-r--r--src/process.rs92
5 files changed, 133 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2309b4c..095c97d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -296,6 +296,7 @@ version = "0.1.0"
dependencies = [
"crossterm 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)",
+ "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-pty-process 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
diff --git a/Cargo.toml b/Cargo.toml
index a9cb7e1..2a6a61b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,5 +7,6 @@ edition = "2018"
[dependencies]
crossterm = "0.9"
futures = "0.1"
+mio = "0.6"
tokio = "0.1"
tokio-pty-process = "0.4"
diff --git a/examples/cooked.rs b/examples/cooked.rs
new file mode 100644
index 0000000..cf0384c
--- /dev/null
+++ b/examples/cooked.rs
@@ -0,0 +1,19 @@
+use std::io::Read;
+
+fn main() {
+ loop {
+ let stdin = std::io::stdin();
+ let mut stdin = stdin.lock();
+ let mut buf = [0; 1];
+ let n = stdin.read(&mut buf).unwrap();
+ if n > 0 {
+ eprint!("got {}\r\n", buf[0]);
+ if buf[0] == 4 {
+ break;
+ }
+ } else {
+ eprint!("got no bytes\r\n");
+ break;
+ }
+ }
+}
diff --git a/examples/raw.rs b/examples/raw.rs
new file mode 100644
index 0000000..5890a99
--- /dev/null
+++ b/examples/raw.rs
@@ -0,0 +1,20 @@
+use std::io::Read;
+
+fn main() {
+ let _screen = crossterm::RawScreen::into_raw_mode().unwrap();
+ loop {
+ let stdin = std::io::stdin();
+ let mut stdin = stdin.lock();
+ let mut buf = [0; 1];
+ let n = stdin.read(&mut buf).unwrap();
+ if n > 0 {
+ eprint!("got {}\r\n", buf[0]);
+ if buf[0] == 4 {
+ break;
+ }
+ } else {
+ eprint!("got no bytes\r\n");
+ break;
+ }
+ }
+}
diff --git a/src/process.rs b/src/process.rs
index 70e1178..0c7b61f 100644
--- a/src/process.rs
+++ b/src/process.rs
@@ -1,4 +1,5 @@
use futures::future::Future;
+use std::io::{Read, Write};
use tokio::io::AsyncRead;
use tokio_pty_process::CommandExt;
@@ -19,9 +20,13 @@ pub enum ProcessEvent {
pub struct RunningProcess {
pty: tokio_pty_process::AsyncPtyMaster,
process: tokio_pty_process::Child,
+ // TODO: tokio::io::Stdin is broken
+ // input: tokio::io::Stdin,
+ input: tokio::reactor::PollEvented2<EventedStdin>,
buf: Vec<u8>,
output_done: bool,
exit_done: bool,
+ _screen: crossterm::RawScreen,
}
impl RunningProcess {
@@ -36,12 +41,18 @@ impl RunningProcess {
.spawn_pty_async(&pty)
.map_err(|e| Error::IOError(e))?;
+ // TODO: tokio::io::stdin is broken (it's blocking)
+ // let input = tokio::io::stdin();
+ let input = tokio::reactor::PollEvented2::new(EventedStdin);
+
Ok(RunningProcess {
pty,
process,
+ input,
buf: Vec::with_capacity(4096),
output_done: false,
exit_done: false,
+ _screen: crossterm::RawScreen::into_raw_mode().unwrap(),
})
}
}
@@ -52,6 +63,41 @@ impl futures::stream::Stream for RunningProcess {
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ let ready = mio::Ready::readable();
+ let input_poll = self.input.poll_read_ready(ready);
+ match input_poll {
+ Ok(futures::Async::Ready(_)) => {
+ let stdin = std::io::stdin();
+ let mut stdin = stdin.lock();
+ let mut buf = vec![0; 4096];
+ // TODO: async
+ match stdin.read(&mut buf) {
+ Ok(n) => {
+ if n > 0 {
+ let bytes = buf[..n].to_vec();
+
+ // TODO: async
+ let res = self.pty.write_all(&bytes);
+ if let Err(e) = res {
+ return Err(Error::IOError(e));
+ }
+ }
+ }
+ Err(e) => {
+ return Err(Error::IOError(e));
+ }
+ }
+ }
+ _ => {}
+ }
+ // TODO: this could lose pending bytes if there is stuff to read in
+ // the buffer but we don't read it all in the previous read call,
+ // since i think we won't get another notification until new bytes
+ // actually arrive even if there are bytes in the buffer
+ if let Err(e) = self.input.clear_read_ready(ready) {
+ return Err(Error::IOError(e));
+ }
+
if !self.output_done {
self.buf.clear();
let output_poll = self
@@ -61,6 +107,18 @@ impl futures::stream::Stream for RunningProcess {
match output_poll {
Ok(futures::Async::Ready(n)) => {
let bytes = self.buf[..n].to_vec();
+ let bytes: Vec<_> = bytes
+ .iter()
+ // replace \n with \r\n
+ .fold(vec![], |mut acc, &c| {
+ if c == b'\n' {
+ acc.push(b'\r');
+ acc.push(b'\n');
+ } else {
+ acc.push(c);
+ }
+ acc
+ });
return Ok(futures::Async::Ready(Some(
ProcessEvent::Output(bytes),
)));
@@ -96,3 +154,37 @@ impl futures::stream::Stream for RunningProcess {
Ok(futures::Async::Ready(None))
}
}
+
+struct EventedStdin;
+
+impl mio::Evented for EventedStdin {
+ fn register(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = 0 as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = 0 as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
+ let fd = 0 as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.deregister(poll)
+ }
+}