From e842fef74b21d74bf4b731f67c05a1192092ee50 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sun, 9 Jun 2019 03:55:01 -0400 Subject: simplify --- src/process.rs | 116 ++++++++++++++++++++++++++++++++++++--------------------- src/repl.rs | 53 +++++++++++--------------- 2 files changed, 95 insertions(+), 74 deletions(-) diff --git a/src/process.rs b/src/process.rs index 7b8d8e7..70e1178 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,5 +1,4 @@ use futures::future::Future; -use futures::try_ready; use tokio::io::AsyncRead; use tokio_pty_process::CommandExt; @@ -8,59 +7,92 @@ pub enum Error { IOError(std::io::Error), } -pub fn spawn( - line: &str, -) -> Result< - ( - PtyStream, - impl futures::future::Future< - Item = std::process::ExitStatus, - Error = Error, - >, - ), - Error, -> { - let master = tokio_pty_process::AsyncPtyMaster::open() - .map_err(|e| Error::IOError(e))?; - let mut argv: Vec<_> = line.split(' ').collect(); - let cmd = argv.remove(0); - let child = std::process::Command::new(cmd) - .args(&argv) - .spawn_pty_async(&master) - .map_err(|e| Error::IOError(e))? - .map_err(|e| Error::IOError(e)); - let stream = PtyStream::new(master); - Ok((stream, child)) +pub fn spawn(line: &str) -> Result { + RunningProcess::new(line) } -pub struct PtyStream { - master: tokio_pty_process::AsyncPtyMaster, +pub enum ProcessEvent { + Output(Vec), + Exit(std::process::ExitStatus), +} + +pub struct RunningProcess { + pty: tokio_pty_process::AsyncPtyMaster, + process: tokio_pty_process::Child, buf: Vec, + output_done: bool, + exit_done: bool, } -impl PtyStream { - fn new(master: tokio_pty_process::AsyncPtyMaster) -> Self { - let buf = Vec::with_capacity(4096); - PtyStream { master, buf } +impl RunningProcess { + fn new(line: &str) -> Result { + let pty = tokio_pty_process::AsyncPtyMaster::open() + .map_err(|e| Error::IOError(e))?; + + let mut argv: Vec<_> = line.split(' ').collect(); + let cmd = argv.remove(0); + let process = std::process::Command::new(cmd) + .args(&argv) + .spawn_pty_async(&pty) + .map_err(|e| Error::IOError(e))?; + + Ok(RunningProcess { + pty, + process, + buf: Vec::with_capacity(4096), + output_done: false, + exit_done: false, + }) } } #[must_use = "streams do nothing unless polled"] -impl futures::stream::Stream for PtyStream { - type Item = Vec; +impl futures::stream::Stream for RunningProcess { + type Item = ProcessEvent; type Error = Error; fn poll(&mut self) -> futures::Poll, Self::Error> { - self.buf.clear(); - let n = try_ready!(self - .master - .read_buf(&mut self.buf) - .map_err(|e| { Error::IOError(e) })); - if n > 0 { - let bytes = self.buf[..n].to_vec(); - Ok(futures::Async::Ready(Some(bytes))) - } else { - Ok(futures::Async::NotReady) + if !self.output_done { + self.buf.clear(); + let output_poll = self + .pty + .read_buf(&mut self.buf) + .map_err(|e| Error::IOError(e)); + match output_poll { + Ok(futures::Async::Ready(n)) => { + let bytes = self.buf[..n].to_vec(); + return Ok(futures::Async::Ready(Some( + ProcessEvent::Output(bytes), + ))); + } + Ok(futures::Async::NotReady) => { + return Ok(futures::Async::NotReady); + } + Err(_) => { + self.output_done = true; + } + } } + + if !self.exit_done { + let exit_poll = + self.process.poll().map_err(|e| Error::IOError(e)); + match exit_poll { + Ok(futures::Async::Ready(status)) => { + self.exit_done = true; + return Ok(futures::Async::Ready(Some( + ProcessEvent::Exit(status), + ))); + } + Ok(futures::Async::NotReady) => { + return Ok(futures::Async::NotReady); + } + Err(e) => { + return Err(e); + } + } + } + + Ok(futures::Async::Ready(None)) } } diff --git a/src/repl.rs b/src/repl.rs index af62f95..c6d8b07 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -16,27 +16,28 @@ pub fn repl() { } let repl = read().and_then(|line| { - eval(&line).and_then(|(out, status)| { - out - // print the results as they come in - .and_then(|out| print(&out)) - // wait for all output to be finished - .collect() - // ignore io errors since we just keep reading even after - // the process exits and the other end of the pty is - // closed - .or_else(|_| futures::future::ok(vec![])) - // once the output is all processed, then wait on the - // process to exit - .and_then(|_| status) + eval(&line).fold(None, |acc, event| match event { + crate::process::ProcessEvent::Output(out) => { + match print(&out) { + Ok(()) => futures::future::ok(acc), + Err(e) => futures::future::err(e), + } + } + crate::process::ProcessEvent::Exit(status) => { + futures::future::ok(Some(status)) + } }) }); Some(repl.then(move |res| match res { - Ok(status) => { + Ok(Some(status)) => { eprint!("process exited with status {}\r\n", status); return Ok((done, false)); } + Ok(None) => { + eprint!("process exited weirdly?\r\n"); + return Ok((done, false)); + } Err(Error::ReadError(crate::readline::Error::EOF)) => { return Ok((done, true)); } @@ -58,24 +59,12 @@ fn read() -> impl futures::future::Future { fn eval( line: &str, -) -> impl futures::future::Future< - Item = ( - impl futures::stream::Stream, Error = Error>, - impl futures::future::Future< - Item = std::process::ExitStatus, - Error = Error, - >, - ), - Error = Error, -> { - match crate::process::spawn(line) { - Ok((out, status)) => Ok(( - out.map_err(|e| Error::EvalError(e)), - status.map_err(|e| Error::EvalError(e)), - )), - Err(e) => Err(e).map_err(|e| Error::EvalError(e)), - } - .into_future() +) -> impl futures::stream::Stream +{ + crate::process::spawn(line) + .into_future() + .flatten_stream() + .map_err(|e| Error::EvalError(e)) } fn print(out: &[u8]) -> Result<(), Error> { -- cgit v1.2.3-54-g00ecf