From 894ec4f24c1269166f14c05b2ae3f737f76dd571 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 24 Oct 2019 15:16:10 -0400 Subject: add a couple examples --- Cargo.toml | 2 + examples/expect.rs | 129 ++++++++++++++++++++++++++++++++++++++++ examples/input/buf.rs | 43 ++++++++++++++ examples/input/evented_stdin.rs | 88 +++++++++++++++++++++++++++ examples/input/mod.rs | 2 + examples/interhack.rs | 127 +++++++++++++++++++++++++++++++++++++++ examples/shell.rs | 41 +++++++++++++ 7 files changed, 432 insertions(+) create mode 100644 examples/expect.rs create mode 100644 examples/input/buf.rs create mode 100644 examples/input/evented_stdin.rs create mode 100644 examples/input/mod.rs create mode 100644 examples/interhack.rs create mode 100644 examples/shell.rs diff --git a/Cargo.toml b/Cargo.toml index dfdd991..1d7e7d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,6 @@ tokio-pty-process = "0.4" [dev-dependencies] crossterm = "0.11" +lazy_static = "1" mio = "0.6" +regex = "1" diff --git a/examples/expect.rs b/examples/expect.rs new file mode 100644 index 0000000..dcd40c1 --- /dev/null +++ b/examples/expect.rs @@ -0,0 +1,129 @@ +use futures::stream::Stream as _; + +mod input; + +#[allow(clippy::type_complexity)] +struct Expect { + process: tokio_pty_process_stream::Process, + expectations: Vec<( + regex::Regex, + Box< + dyn Fn(&mut tokio_pty_process_stream::Process) + + Send, + >, + )>, +} + +impl Expect { + fn new(cmd: &str, args: &[String]) -> Self { + Self { + process: tokio_pty_process_stream::Process::new( + cmd, + args, + input::buf::Stdin::new(), + ), + expectations: vec![], + } + } + + fn expect< + F: Fn(&mut tokio_pty_process_stream::Process) + + Send + + 'static, + >( + &mut self, + rx: &str, + cb: F, + ) { + self.expectations + .push((regex::Regex::new(rx).unwrap(), Box::new(cb))); + } +} + +impl futures::future::Future for Expect { + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + loop { + let event = futures::try_ready!(self + .process + .poll() + .map_err(|e| panic!(e))); + match event { + Some(tokio_pty_process_stream::Event::Output { data }) => { + let s = std::string::String::from_utf8_lossy(&data); + let mut found = None; + for (rx, cb) in &self.expectations { + if rx.is_match(&s) { + found = Some(cb); + break; + } + } + if let Some(cb) = found { + cb(&mut self.process); + } + } + Some(tokio_pty_process_stream::Event::CommandExit { + .. + }) => break, + None => break, + _ => {} + } + } + Ok(futures::Async::Ready(())) + } +} + +fn main() { + tokio::run(futures::future::lazy(|| { + let mut expect = Expect::new("nethack", &[]); + expect.expect(r"Shall I pick.*[ynaq]", |process| { + println!("shall i pick"); + process.input().send(b"n"); + }); + expect.expect(r"Pick a role", |process| { + println!("pick a role"); + process.input().send(b"w"); + }); + expect.expect(r"Pick a race", |process| { + println!("pick a race"); + process.input().send(b"e"); + }); + expect.expect(r"Pick a gender", |process| { + println!("pick a gender"); + process.input().send(b"f"); + }); + expect.expect(r"start game", |process| { + println!("start game"); + process.input().send(b"y"); + }); + expect.expect(r"welcome to NetHack", |process| { + println!("welcome"); + process.input().send(b"#quit\n"); + }); + expect.expect(r"Really quit", |process| { + println!("really quit"); + process.input().send(b"y"); + }); + expect.expect( + r"Do you want your possessions identified", + |process| { + println!("dywypi"); + process.input().send(b"n"); + }, + ); + expect.expect(r"--More--", |process| { + println!("more"); + process.input().send(b" "); + }); + expect.expect( + r"Do you want to see the dungeon overview", + |process| { + println!("dungeon overview"); + process.input().send(b"n"); + }, + ); + expect + })); +} diff --git a/examples/input/buf.rs b/examples/input/buf.rs new file mode 100644 index 0000000..768baf1 --- /dev/null +++ b/examples/input/buf.rs @@ -0,0 +1,43 @@ +use std::io::Read as _; + +pub struct Stdin { + buf: Vec, + task: futures::task::Task, +} + +#[allow(dead_code)] +impl Stdin { + pub fn new() -> Self { + Self { + buf: vec![], + task: futures::task::current(), + } + } + + pub fn send(&mut self, buf: &[u8]) { + self.buf.extend(buf.iter()); + self.task.notify(); + } +} + +impl std::io::Read for Stdin { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let len = self.buf.len().min(buf.len()); + buf[..len].clone_from_slice(&self.buf[..len]); + self.buf = self.buf.iter().copied().skip(len).collect(); + Ok(len) + } +} + +impl tokio::io::AsyncRead for Stdin { + fn poll_read( + &mut self, + buf: &mut [u8], + ) -> std::result::Result, tokio::io::Error> { + if self.buf.is_empty() { + return Ok(futures::Async::NotReady); + } + let n = self.read(buf)?; + Ok(futures::Async::Ready(n)) + } +} diff --git a/examples/input/evented_stdin.rs b/examples/input/evented_stdin.rs new file mode 100644 index 0000000..107c394 --- /dev/null +++ b/examples/input/evented_stdin.rs @@ -0,0 +1,88 @@ +// this is a hack around the fact that tokio::io::stdin() is actually +// blocking, which makes it useless for interactive programs. this isn't great +// (or particularly correct) but it mostly works. + +use std::io::Read as _; + +struct EventedStdin; + +const STDIN: i32 = 0; + +impl std::io::Read for EventedStdin { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + stdin.read(buf) + } +} + +impl mio::Evented for EventedStdin { + fn register( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> std::io::Result<()> { + let fd = STDIN as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> std::io::Result<()> { + let fd = STDIN as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> { + let fd = STDIN as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.deregister(poll) + } +} + +pub struct Stdin { + input: tokio::reactor::PollEvented2, +} + +#[allow(dead_code)] +impl Stdin { + pub fn new() -> Self { + Default::default() + } +} + +impl Default for Stdin { + fn default() -> Self { + Self { + input: tokio::reactor::PollEvented2::new(EventedStdin), + } + } +} + +impl std::io::Read for Stdin { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.input.read(buf) + } +} + +impl tokio::io::AsyncRead for Stdin { + fn poll_read( + &mut self, + buf: &mut [u8], + ) -> std::result::Result, tokio::io::Error> { + let ready = mio::Ready::readable(); + futures::try_ready!(self.input.poll_read_ready(ready)); + + let res = self.read(buf)?; + self.input.clear_read_ready(ready)?; + Ok(futures::Async::Ready(res)) + } +} diff --git a/examples/input/mod.rs b/examples/input/mod.rs new file mode 100644 index 0000000..7288cbc --- /dev/null +++ b/examples/input/mod.rs @@ -0,0 +1,2 @@ +pub mod buf; +pub mod evented_stdin; diff --git a/examples/interhack.rs b/examples/interhack.rs new file mode 100644 index 0000000..c0c575e --- /dev/null +++ b/examples/interhack.rs @@ -0,0 +1,127 @@ +#![allow(clippy::trivial_regex)] + +use futures::stream::Stream as _; +use std::io::Write as _; +use tokio::io::AsyncRead as _; + +mod input; + +struct Interhack { + process: tokio_pty_process_stream::Process, + stdin: input::evented_stdin::Stdin, + read_buf: [u8; 4096], + raw_screen: Option, +} + +impl Interhack { + fn new() -> Self { + Self { + process: tokio_pty_process_stream::Process::new( + "nethack", + &[], + input::buf::Stdin::new(), + ), + stdin: input::evented_stdin::Stdin::new(), + read_buf: [0; 4096], + raw_screen: None, + } + } + + fn filter_input(&self, buf: Vec) -> Vec { + lazy_static::lazy_static! { + static ref RE: regex::bytes::Regex = regex::bytes::Regex::new( + "\x05" + ).unwrap(); + } + if let Some(m) = RE.find(&buf) { + let mut new: Vec = vec![]; + new.extend(buf[..m.start()].iter()); + new.extend(b"E- Elbereth\n"); + new.extend(buf[m.end()..].iter()); + new + } else { + buf + } + } + + fn filter_output(&self, buf: Vec) -> Vec { + lazy_static::lazy_static! { + static ref RE: regex::bytes::Regex = regex::bytes::Regex::new( + r"Elbereth" + ).unwrap(); + } + if let Some(m) = RE.find(&buf) { + let mut new: Vec = vec![]; + new.extend(buf[..m.start()].iter()); + new.extend(b"\x1b[35m"); + new.extend(buf[m.start()..m.end()].iter()); + new.extend(b"\x1b[m"); + new.extend(buf[m.end()..].iter()); + new + } else { + buf + } + } +} + +#[allow(clippy::type_complexity)] +impl Interhack { + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll<(), ()>] = + &[&Self::poll_input, &Self::poll_process, &Self::poll_screen]; + + fn poll_input(&mut self) -> component_future::Poll<(), ()> { + let n = component_future::try_ready!(self + .stdin + .poll_read(&mut self.read_buf) + .map_err(|e| panic!(e))); + let input = self.filter_input(self.read_buf[..n].to_vec()); + self.process.input().send(&input); + Ok(component_future::Async::DidWork) + } + + fn poll_process(&mut self) -> component_future::Poll<(), ()> { + let event = component_future::try_ready!(self + .process + .poll() + .map_err(|e| panic!(e))); + match event { + Some(tokio_pty_process_stream::Event::Output { data }) => { + let output = self.filter_output(data); + let stdout = std::io::stdout(); + let mut stdout = stdout.lock(); + stdout.write_all(&output).unwrap(); + stdout.flush().unwrap(); + } + None => return Ok(component_future::Async::Ready(())), + _ => {} + } + Ok(component_future::Async::DidWork) + } + + fn poll_screen(&mut self) -> component_future::Poll<(), ()> { + if self.raw_screen.is_none() { + self.raw_screen = + Some(crossterm::RawScreen::into_raw_mode().unwrap()); + Ok(component_future::Async::DidWork) + } else { + Ok(component_future::Async::NothingToDo) + } + } +} + +impl futures::future::Future for Interhack { + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + component_future::poll_future(self, Self::POLL_FNS) + } +} + +fn main() { + tokio::run(futures::future::lazy(Interhack::new)); +} diff --git a/examples/shell.rs b/examples/shell.rs new file mode 100644 index 0000000..682c008 --- /dev/null +++ b/examples/shell.rs @@ -0,0 +1,41 @@ +use futures::future::Future as _; +use futures::stream::Stream as _; +use std::io::Write as _; + +mod input; + +fn main() { + let mut argv = std::env::args(); + argv.next().unwrap(); + let cmd = argv.next().unwrap(); + let args: Vec<_> = argv.collect(); + + let process = tokio_pty_process_stream::Process::new( + &cmd, + &args, + input::evented_stdin::Stdin::new(), + ); + + let _raw = crossterm::RawScreen::into_raw_mode().unwrap(); + tokio::run( + process + .for_each(|ev| { + match ev { + tokio_pty_process_stream::Event::CommandStart { + .. + } => {} + tokio_pty_process_stream::Event::Output { data } => { + let stdout = std::io::stdout(); + let mut stdout = stdout.lock(); + stdout.write_all(&data).unwrap(); + stdout.flush().unwrap(); + } + tokio_pty_process_stream::Event::CommandExit { + .. + } => {} + } + futures::future::ok(()) + }) + .map_err(|e| panic!(e)), + ); +} -- cgit v1.2.3