aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-24 15:16:10 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-24 15:16:10 -0400
commit894ec4f24c1269166f14c05b2ae3f737f76dd571 (patch)
treeb30021b97dee11d1d5976561c42bd2cc59ee8762
parent090b50a13f0d07ae722dd9fe6ef064cd893ff6ec (diff)
downloadtokio-pty-process-stream-894ec4f24c1269166f14c05b2ae3f737f76dd571.tar.gz
tokio-pty-process-stream-894ec4f24c1269166f14c05b2ae3f737f76dd571.zip
add a couple examples
-rw-r--r--Cargo.toml2
-rw-r--r--examples/expect.rs129
-rw-r--r--examples/input/buf.rs43
-rw-r--r--examples/input/evented_stdin.rs88
-rw-r--r--examples/input/mod.rs2
-rw-r--r--examples/interhack.rs127
-rw-r--r--examples/shell.rs41
7 files changed, 432 insertions, 0 deletions
diff --git a/Cargo.toml b/Cargo.toml
index dfdd991..1d7e7d3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,4 +14,6 @@ tokio-pty-process = "0.4"
[dev-dependencies]
crossterm = "0.11"
+lazy_static = "1"
mio = "0.6"
+regex = "1"
diff --git a/examples/expect.rs b/examples/expect.rs
new file mode 100644
index 0000000..dcd40c1
--- /dev/null
+++ b/examples/expect.rs
@@ -0,0 +1,129 @@
+use futures::stream::Stream as _;
+
+mod input;
+
+#[allow(clippy::type_complexity)]
+struct Expect {
+ process: tokio_pty_process_stream::Process<input::buf::Stdin>,
+ expectations: Vec<(
+ regex::Regex,
+ Box<
+ dyn Fn(&mut tokio_pty_process_stream::Process<input::buf::Stdin>)
+ + Send,
+ >,
+ )>,
+}
+
+impl Expect {
+ fn new(cmd: &str, args: &[String]) -> Self {
+ Self {
+ process: tokio_pty_process_stream::Process::new(
+ cmd,
+ args,
+ input::buf::Stdin::new(),
+ ),
+ expectations: vec![],
+ }
+ }
+
+ fn expect<
+ F: Fn(&mut tokio_pty_process_stream::Process<input::buf::Stdin>)
+ + Send
+ + 'static,
+ >(
+ &mut self,
+ rx: &str,
+ cb: F,
+ ) {
+ self.expectations
+ .push((regex::Regex::new(rx).unwrap(), Box::new(cb)));
+ }
+}
+
+impl futures::future::Future for Expect {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ loop {
+ let event = futures::try_ready!(self
+ .process
+ .poll()
+ .map_err(|e| panic!(e)));
+ match event {
+ Some(tokio_pty_process_stream::Event::Output { data }) => {
+ let s = std::string::String::from_utf8_lossy(&data);
+ let mut found = None;
+ for (rx, cb) in &self.expectations {
+ if rx.is_match(&s) {
+ found = Some(cb);
+ break;
+ }
+ }
+ if let Some(cb) = found {
+ cb(&mut self.process);
+ }
+ }
+ Some(tokio_pty_process_stream::Event::CommandExit {
+ ..
+ }) => break,
+ None => break,
+ _ => {}
+ }
+ }
+ Ok(futures::Async::Ready(()))
+ }
+}
+
+fn main() {
+ tokio::run(futures::future::lazy(|| {
+ let mut expect = Expect::new("nethack", &[]);
+ expect.expect(r"Shall I pick.*[ynaq]", |process| {
+ println!("shall i pick");
+ process.input().send(b"n");
+ });
+ expect.expect(r"Pick a role", |process| {
+ println!("pick a role");
+ process.input().send(b"w");
+ });
+ expect.expect(r"Pick a race", |process| {
+ println!("pick a race");
+ process.input().send(b"e");
+ });
+ expect.expect(r"Pick a gender", |process| {
+ println!("pick a gender");
+ process.input().send(b"f");
+ });
+ expect.expect(r"start game", |process| {
+ println!("start game");
+ process.input().send(b"y");
+ });
+ expect.expect(r"welcome to NetHack", |process| {
+ println!("welcome");
+ process.input().send(b"#quit\n");
+ });
+ expect.expect(r"Really quit", |process| {
+ println!("really quit");
+ process.input().send(b"y");
+ });
+ expect.expect(
+ r"Do you want your possessions identified",
+ |process| {
+ println!("dywypi");
+ process.input().send(b"n");
+ },
+ );
+ expect.expect(r"--More--", |process| {
+ println!("more");
+ process.input().send(b" ");
+ });
+ expect.expect(
+ r"Do you want to see the dungeon overview",
+ |process| {
+ println!("dungeon overview");
+ process.input().send(b"n");
+ },
+ );
+ expect
+ }));
+}
diff --git a/examples/input/buf.rs b/examples/input/buf.rs
new file mode 100644
index 0000000..768baf1
--- /dev/null
+++ b/examples/input/buf.rs
@@ -0,0 +1,43 @@
+use std::io::Read as _;
+
+pub struct Stdin {
+ buf: Vec<u8>,
+ task: futures::task::Task,
+}
+
+#[allow(dead_code)]
+impl Stdin {
+ pub fn new() -> Self {
+ Self {
+ buf: vec![],
+ task: futures::task::current(),
+ }
+ }
+
+ pub fn send(&mut self, buf: &[u8]) {
+ self.buf.extend(buf.iter());
+ self.task.notify();
+ }
+}
+
+impl std::io::Read for Stdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let len = self.buf.len().min(buf.len());
+ buf[..len].clone_from_slice(&self.buf[..len]);
+ self.buf = self.buf.iter().copied().skip(len).collect();
+ Ok(len)
+ }
+}
+
+impl tokio::io::AsyncRead for Stdin {
+ fn poll_read(
+ &mut self,
+ buf: &mut [u8],
+ ) -> std::result::Result<futures::Async<usize>, tokio::io::Error> {
+ if self.buf.is_empty() {
+ return Ok(futures::Async::NotReady);
+ }
+ let n = self.read(buf)?;
+ Ok(futures::Async::Ready(n))
+ }
+}
diff --git a/examples/input/evented_stdin.rs b/examples/input/evented_stdin.rs
new file mode 100644
index 0000000..107c394
--- /dev/null
+++ b/examples/input/evented_stdin.rs
@@ -0,0 +1,88 @@
+// this is a hack around the fact that tokio::io::stdin() is actually
+// blocking, which makes it useless for interactive programs. this isn't great
+// (or particularly correct) but it mostly works.
+
+use std::io::Read as _;
+
+struct EventedStdin;
+
+const STDIN: i32 = 0;
+
+impl std::io::Read for EventedStdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let stdin = std::io::stdin();
+ let mut stdin = stdin.lock();
+ stdin.read(buf)
+ }
+}
+
+impl mio::Evented for EventedStdin {
+ fn register(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.deregister(poll)
+ }
+}
+
+pub struct Stdin {
+ input: tokio::reactor::PollEvented2<EventedStdin>,
+}
+
+#[allow(dead_code)]
+impl Stdin {
+ pub fn new() -> Self {
+ Default::default()
+ }
+}
+
+impl Default for Stdin {
+ fn default() -> Self {
+ Self {
+ input: tokio::reactor::PollEvented2::new(EventedStdin),
+ }
+ }
+}
+
+impl std::io::Read for Stdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ self.input.read(buf)
+ }
+}
+
+impl tokio::io::AsyncRead for Stdin {
+ fn poll_read(
+ &mut self,
+ buf: &mut [u8],
+ ) -> std::result::Result<futures::Async<usize>, tokio::io::Error> {
+ let ready = mio::Ready::readable();
+ futures::try_ready!(self.input.poll_read_ready(ready));
+
+ let res = self.read(buf)?;
+ self.input.clear_read_ready(ready)?;
+ Ok(futures::Async::Ready(res))
+ }
+}
diff --git a/examples/input/mod.rs b/examples/input/mod.rs
new file mode 100644
index 0000000..7288cbc
--- /dev/null
+++ b/examples/input/mod.rs
@@ -0,0 +1,2 @@
+pub mod buf;
+pub mod evented_stdin;
diff --git a/examples/interhack.rs b/examples/interhack.rs
new file mode 100644
index 0000000..c0c575e
--- /dev/null
+++ b/examples/interhack.rs
@@ -0,0 +1,127 @@
+#![allow(clippy::trivial_regex)]
+
+use futures::stream::Stream as _;
+use std::io::Write as _;
+use tokio::io::AsyncRead as _;
+
+mod input;
+
+struct Interhack {
+ process: tokio_pty_process_stream::Process<input::buf::Stdin>,
+ stdin: input::evented_stdin::Stdin,
+ read_buf: [u8; 4096],
+ raw_screen: Option<crossterm::RawScreen>,
+}
+
+impl Interhack {
+ fn new() -> Self {
+ Self {
+ process: tokio_pty_process_stream::Process::new(
+ "nethack",
+ &[],
+ input::buf::Stdin::new(),
+ ),
+ stdin: input::evented_stdin::Stdin::new(),
+ read_buf: [0; 4096],
+ raw_screen: None,
+ }
+ }
+
+ fn filter_input(&self, buf: Vec<u8>) -> Vec<u8> {
+ lazy_static::lazy_static! {
+ static ref RE: regex::bytes::Regex = regex::bytes::Regex::new(
+ "\x05"
+ ).unwrap();
+ }
+ if let Some(m) = RE.find(&buf) {
+ let mut new: Vec<u8> = vec![];
+ new.extend(buf[..m.start()].iter());
+ new.extend(b"E- Elbereth\n");
+ new.extend(buf[m.end()..].iter());
+ new
+ } else {
+ buf
+ }
+ }
+
+ fn filter_output(&self, buf: Vec<u8>) -> Vec<u8> {
+ lazy_static::lazy_static! {
+ static ref RE: regex::bytes::Regex = regex::bytes::Regex::new(
+ r"Elbereth"
+ ).unwrap();
+ }
+ if let Some(m) = RE.find(&buf) {
+ let mut new: Vec<u8> = vec![];
+ new.extend(buf[..m.start()].iter());
+ new.extend(b"\x1b[35m");
+ new.extend(buf[m.start()..m.end()].iter());
+ new.extend(b"\x1b[m");
+ new.extend(buf[m.end()..].iter());
+ new
+ } else {
+ buf
+ }
+ }
+}
+
+#[allow(clippy::type_complexity)]
+impl Interhack {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<(), ()>] =
+ &[&Self::poll_input, &Self::poll_process, &Self::poll_screen];
+
+ fn poll_input(&mut self) -> component_future::Poll<(), ()> {
+ let n = component_future::try_ready!(self
+ .stdin
+ .poll_read(&mut self.read_buf)
+ .map_err(|e| panic!(e)));
+ let input = self.filter_input(self.read_buf[..n].to_vec());
+ self.process.input().send(&input);
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_process(&mut self) -> component_future::Poll<(), ()> {
+ let event = component_future::try_ready!(self
+ .process
+ .poll()
+ .map_err(|e| panic!(e)));
+ match event {
+ Some(tokio_pty_process_stream::Event::Output { data }) => {
+ let output = self.filter_output(data);
+ let stdout = std::io::stdout();
+ let mut stdout = stdout.lock();
+ stdout.write_all(&output).unwrap();
+ stdout.flush().unwrap();
+ }
+ None => return Ok(component_future::Async::Ready(())),
+ _ => {}
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_screen(&mut self) -> component_future::Poll<(), ()> {
+ if self.raw_screen.is_none() {
+ self.raw_screen =
+ Some(crossterm::RawScreen::into_raw_mode().unwrap());
+ Ok(component_future::Async::DidWork)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+}
+
+impl futures::future::Future for Interhack {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
+
+fn main() {
+ tokio::run(futures::future::lazy(Interhack::new));
+}
diff --git a/examples/shell.rs b/examples/shell.rs
new file mode 100644
index 0000000..682c008
--- /dev/null
+++ b/examples/shell.rs
@@ -0,0 +1,41 @@
+use futures::future::Future as _;
+use futures::stream::Stream as _;
+use std::io::Write as _;
+
+mod input;
+
+fn main() {
+ let mut argv = std::env::args();
+ argv.next().unwrap();
+ let cmd = argv.next().unwrap();
+ let args: Vec<_> = argv.collect();
+
+ let process = tokio_pty_process_stream::Process::new(
+ &cmd,
+ &args,
+ input::evented_stdin::Stdin::new(),
+ );
+
+ let _raw = crossterm::RawScreen::into_raw_mode().unwrap();
+ tokio::run(
+ process
+ .for_each(|ev| {
+ match ev {
+ tokio_pty_process_stream::Event::CommandStart {
+ ..
+ } => {}
+ tokio_pty_process_stream::Event::Output { data } => {
+ let stdout = std::io::stdout();
+ let mut stdout = stdout.lock();
+ stdout.write_all(&data).unwrap();
+ stdout.flush().unwrap();
+ }
+ tokio_pty_process_stream::Event::CommandExit {
+ ..
+ } => {}
+ }
+ futures::future::ok(())
+ })
+ .map_err(|e| panic!(e)),
+ );
+}