From c383570c75d9757f405b2e43ab48f458759b1403 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sun, 9 Jun 2019 00:55:53 -0400 Subject: implement process running --- Cargo.toml | 8 ++--- examples/pty.rs | 25 +++++++++++++++ src/main.rs | 3 +- src/process.rs | 66 +++++++++++++++++++++++++++++++++++++++ src/readline.rs | 37 +++++++++++++--------- src/repl.rs | 95 ++++++++++++++++++++++++++++++++++++++------------------- 6 files changed, 183 insertions(+), 51 deletions(-) create mode 100644 examples/pty.rs create mode 100644 src/process.rs diff --git a/Cargo.toml b/Cargo.toml index 702c094..a9cb7e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Jesse Luehrs "] edition = "2018" [dependencies] -crossterm = "0.9.5" -futures = "0.1.27" -tokio = "0.1.21" -tokio-pty-process = "0.4.0" +crossterm = "0.9" +futures = "0.1" +tokio = "0.1" +tokio-pty-process = "0.4" diff --git a/examples/pty.rs b/examples/pty.rs new file mode 100644 index 0000000..d5b5a88 --- /dev/null +++ b/examples/pty.rs @@ -0,0 +1,25 @@ +use futures::future::{Future, IntoFuture}; +use tokio_pty_process::CommandExt; + +fn main() { + tokio::run(futures::future::lazy(move || { + let master = tokio_pty_process::AsyncPtyMaster::open().unwrap(); + let args: Vec<&str> = vec![]; + let child = std::process::Command::new("false") + .args(&args) + .spawn_pty_async(&master) + .unwrap(); + tokio::spawn( + child + .map(|status| { + eprintln!("got status {}", status); + () + }) + .map_err(|_| ()), + ) + .into_future() + .wait() + .unwrap(); + futures::future::ok(()) + })); +} diff --git a/src/main.rs b/src/main.rs index 105450f..1d78e10 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,7 @@ +mod process; mod readline; mod repl; fn main() { - let _screen = crossterm::RawScreen::into_raw_mode().unwrap(); - repl::repl(); } diff --git a/src/process.rs b/src/process.rs new file mode 100644 index 0000000..7b8d8e7 --- /dev/null +++ b/src/process.rs @@ -0,0 +1,66 @@ +use futures::future::Future; +use futures::try_ready; +use tokio::io::AsyncRead; +use tokio_pty_process::CommandExt; + +#[derive(Debug)] +pub enum Error { + IOError(std::io::Error), +} + +pub fn spawn( + line: &str, +) -> Result< + ( + PtyStream, + impl futures::future::Future< + Item = std::process::ExitStatus, + Error = Error, + >, + ), + Error, +> { + let master = tokio_pty_process::AsyncPtyMaster::open() + .map_err(|e| Error::IOError(e))?; + let mut argv: Vec<_> = line.split(' ').collect(); + let cmd = argv.remove(0); + let child = std::process::Command::new(cmd) + .args(&argv) + .spawn_pty_async(&master) + .map_err(|e| Error::IOError(e))? + .map_err(|e| Error::IOError(e)); + let stream = PtyStream::new(master); + Ok((stream, child)) +} + +pub struct PtyStream { + master: tokio_pty_process::AsyncPtyMaster, + buf: Vec, +} + +impl PtyStream { + fn new(master: tokio_pty_process::AsyncPtyMaster) -> Self { + let buf = Vec::with_capacity(4096); + PtyStream { master, buf } + } +} + +#[must_use = "streams do nothing unless polled"] +impl futures::stream::Stream for PtyStream { + type Item = Vec; + type Error = Error; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + self.buf.clear(); + let n = try_ready!(self + .master + .read_buf(&mut self.buf) + .map_err(|e| { Error::IOError(e) })); + if n > 0 { + let bytes = self.buf[..n].to_vec(); + Ok(futures::Async::Ready(Some(bytes))) + } else { + Ok(futures::Async::NotReady) + } + } +} diff --git a/src/readline.rs b/src/readline.rs index 73b5195..ef4fcae 100644 --- a/src/readline.rs +++ b/src/readline.rs @@ -12,9 +12,23 @@ pub struct Readline { prompt: String, wrote_prompt: bool, echo: bool, + _raw_screen: crossterm::RawScreen, } impl Readline { + fn new(prompt: &str, echo: bool) -> Self { + let screen = crossterm::RawScreen::into_raw_mode().unwrap(); + + Readline { + reader: None, + buffer: String::new(), + prompt: prompt.to_string(), + wrote_prompt: false, + echo, + _raw_screen: screen, + } + } + fn process_event( &mut self, event: crossterm::InputEvent, @@ -85,6 +99,7 @@ impl Readline { } } +#[must_use = "futures do nothing unless polled"] impl futures::future::Future for Readline { type Item = String; type Error = Error; @@ -95,9 +110,9 @@ impl futures::future::Future for Readline { self.wrote_prompt = true; } - let reader = self.reader.get_or_insert_with(|| { - KeyReader::new(tokio::prelude::task::current()) - }); + let reader = self + .reader + .get_or_insert_with(|| KeyReader::new(futures::task::current())); if let Some(event) = reader.poll() { self.process_event(event) } else { @@ -107,13 +122,7 @@ impl futures::future::Future for Readline { } pub fn readline(prompt: &str, echo: bool) -> Readline { - Readline { - reader: None, - buffer: String::new(), - prompt: prompt.to_string(), - wrote_prompt: false, - echo, - } + Readline::new(prompt, echo) } struct KeyReader { @@ -122,10 +131,12 @@ struct KeyReader { } impl KeyReader { - fn new(task: tokio::prelude::task::Task) -> Self { + fn new(task: futures::task::Task) -> Self { let reader = crossterm::input().read_sync(); let (events_tx, events_rx) = std::sync::mpsc::channel(); let (quit_tx, quit_rx) = std::sync::mpsc::channel(); + // TODO: this is pretty janky - it'd be better to build in more useful + // support to crossterm directly std::thread::spawn(move || { for event in reader { let newline = event @@ -162,8 +173,6 @@ impl Drop for KeyReader { // don't care if it fails to send, this can happen if the thread // terminates due to seeing a newline before the keyreader goes out of // scope - match self.quit.send(()) { - _ => {} - } + let _ = self.quit.send(()); } } diff --git a/src/repl.rs b/src/repl.rs index c44174a..af62f95 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -1,53 +1,86 @@ -use futures::future::Future; +use futures::future::{Future, IntoFuture}; +use futures::stream::Stream; use std::io::Write; #[derive(Debug)] enum Error { ReadError(crate::readline::Error), - // EvalError(std::io::Error), + EvalError(crate::process::Error), PrintError(std::io::Error), - // LoopError, } pub fn repl() { - tokio::run(tokio::prelude::future::lazy(|| { - let mut done = false; - while !done { - let res = read() - .and_then(move |line| eval(&line)) - .and_then(move |out| print(&out)) - .wait(); - match res { - Ok(_) => {} - Err(Error::ReadError(crate::readline::Error::EOF)) => { - done = true; - } - Err(e) => { - let stderr = std::io::stderr(); - let mut stderr = stderr.lock(); - write!(stderr, "error: {:?}", e).unwrap(); - stderr.flush().unwrap(); - done = true; - } - } + let loop_stream = futures::stream::unfold(false, |done| { + if done { + return None; } - futures::future::ok(()) - })); + + let repl = read().and_then(|line| { + eval(&line).and_then(|(out, status)| { + out + // print the results as they come in + .and_then(|out| print(&out)) + // wait for all output to be finished + .collect() + // ignore io errors since we just keep reading even after + // the process exits and the other end of the pty is + // closed + .or_else(|_| futures::future::ok(vec![])) + // once the output is all processed, then wait on the + // process to exit + .and_then(|_| status) + }) + }); + + Some(repl.then(move |res| match res { + Ok(status) => { + eprint!("process exited with status {}\r\n", status); + return Ok((done, false)); + } + Err(Error::ReadError(crate::readline::Error::EOF)) => { + return Ok((done, true)); + } + Err(e) => { + let stderr = std::io::stderr(); + let mut stderr = stderr.lock(); + write!(stderr, "error: {:?}\r\n", e).unwrap(); + stderr.flush().unwrap(); + return Err(()); + } + })) + }); + tokio::run(loop_stream.collect().map(|_| ())); } fn read() -> impl futures::future::Future { crate::readline::readline("$ ", true).map_err(|e| Error::ReadError(e)) } -fn eval(line: &str) -> Result { - Ok(format!("got line '{}'\r\n", line)) +fn eval( + line: &str, +) -> impl futures::future::Future< + Item = ( + impl futures::stream::Stream, Error = Error>, + impl futures::future::Future< + Item = std::process::ExitStatus, + Error = Error, + >, + ), + Error = Error, +> { + match crate::process::spawn(line) { + Ok((out, status)) => Ok(( + out.map_err(|e| Error::EvalError(e)), + status.map_err(|e| Error::EvalError(e)), + )), + Err(e) => Err(e).map_err(|e| Error::EvalError(e)), + } + .into_future() } -fn print(out: &str) -> Result<(), Error> { +fn print(out: &[u8]) -> Result<(), Error> { let stdout = std::io::stdout(); let mut stdout = stdout.lock(); - stdout - .write(out.as_bytes()) - .map_err(|e| Error::PrintError(e))?; + stdout.write(out).map_err(|e| Error::PrintError(e))?; stdout.flush().map_err(|e| Error::PrintError(e)) } -- cgit v1.2.3-54-g00ecf