aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-06-09 03:55:01 -0400
committerJesse Luehrs <doy@tozt.net>2019-06-09 03:55:01 -0400
commite842fef74b21d74bf4b731f67c05a1192092ee50 (patch)
treea871f77f95617f3cbd66c8276dc602ce30b2aad1
parentc383570c75d9757f405b2e43ab48f458759b1403 (diff)
downloadnbsh-old-e842fef74b21d74bf4b731f67c05a1192092ee50.tar.gz
nbsh-old-e842fef74b21d74bf4b731f67c05a1192092ee50.zip
simplify
-rw-r--r--src/process.rs116
-rw-r--r--src/repl.rs53
2 files changed, 95 insertions, 74 deletions
diff --git a/src/process.rs b/src/process.rs
index 7b8d8e7..70e1178 100644
--- a/src/process.rs
+++ b/src/process.rs
@@ -1,5 +1,4 @@
use futures::future::Future;
-use futures::try_ready;
use tokio::io::AsyncRead;
use tokio_pty_process::CommandExt;
@@ -8,59 +7,92 @@ pub enum Error {
IOError(std::io::Error),
}
-pub fn spawn(
- line: &str,
-) -> Result<
- (
- PtyStream,
- impl futures::future::Future<
- Item = std::process::ExitStatus,
- Error = Error,
- >,
- ),
- Error,
-> {
- let master = tokio_pty_process::AsyncPtyMaster::open()
- .map_err(|e| Error::IOError(e))?;
- let mut argv: Vec<_> = line.split(' ').collect();
- let cmd = argv.remove(0);
- let child = std::process::Command::new(cmd)
- .args(&argv)
- .spawn_pty_async(&master)
- .map_err(|e| Error::IOError(e))?
- .map_err(|e| Error::IOError(e));
- let stream = PtyStream::new(master);
- Ok((stream, child))
+pub fn spawn(line: &str) -> Result<RunningProcess, Error> {
+ RunningProcess::new(line)
}
-pub struct PtyStream {
- master: tokio_pty_process::AsyncPtyMaster,
+pub enum ProcessEvent {
+ Output(Vec<u8>),
+ Exit(std::process::ExitStatus),
+}
+
+pub struct RunningProcess {
+ pty: tokio_pty_process::AsyncPtyMaster,
+ process: tokio_pty_process::Child,
buf: Vec<u8>,
+ output_done: bool,
+ exit_done: bool,
}
-impl PtyStream {
- fn new(master: tokio_pty_process::AsyncPtyMaster) -> Self {
- let buf = Vec::with_capacity(4096);
- PtyStream { master, buf }
+impl RunningProcess {
+ fn new(line: &str) -> Result<Self, Error> {
+ let pty = tokio_pty_process::AsyncPtyMaster::open()
+ .map_err(|e| Error::IOError(e))?;
+
+ let mut argv: Vec<_> = line.split(' ').collect();
+ let cmd = argv.remove(0);
+ let process = std::process::Command::new(cmd)
+ .args(&argv)
+ .spawn_pty_async(&pty)
+ .map_err(|e| Error::IOError(e))?;
+
+ Ok(RunningProcess {
+ pty,
+ process,
+ buf: Vec::with_capacity(4096),
+ output_done: false,
+ exit_done: false,
+ })
}
}
#[must_use = "streams do nothing unless polled"]
-impl futures::stream::Stream for PtyStream {
- type Item = Vec<u8>;
+impl futures::stream::Stream for RunningProcess {
+ type Item = ProcessEvent;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- self.buf.clear();
- let n = try_ready!(self
- .master
- .read_buf(&mut self.buf)
- .map_err(|e| { Error::IOError(e) }));
- if n > 0 {
- let bytes = self.buf[..n].to_vec();
- Ok(futures::Async::Ready(Some(bytes)))
- } else {
- Ok(futures::Async::NotReady)
+ if !self.output_done {
+ self.buf.clear();
+ let output_poll = self
+ .pty
+ .read_buf(&mut self.buf)
+ .map_err(|e| Error::IOError(e));
+ match output_poll {
+ Ok(futures::Async::Ready(n)) => {
+ let bytes = self.buf[..n].to_vec();
+ return Ok(futures::Async::Ready(Some(
+ ProcessEvent::Output(bytes),
+ )));
+ }
+ Ok(futures::Async::NotReady) => {
+ return Ok(futures::Async::NotReady);
+ }
+ Err(_) => {
+ self.output_done = true;
+ }
+ }
}
+
+ if !self.exit_done {
+ let exit_poll =
+ self.process.poll().map_err(|e| Error::IOError(e));
+ match exit_poll {
+ Ok(futures::Async::Ready(status)) => {
+ self.exit_done = true;
+ return Ok(futures::Async::Ready(Some(
+ ProcessEvent::Exit(status),
+ )));
+ }
+ Ok(futures::Async::NotReady) => {
+ return Ok(futures::Async::NotReady);
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+
+ Ok(futures::Async::Ready(None))
}
}
diff --git a/src/repl.rs b/src/repl.rs
index af62f95..c6d8b07 100644
--- a/src/repl.rs
+++ b/src/repl.rs
@@ -16,27 +16,28 @@ pub fn repl() {
}
let repl = read().and_then(|line| {
- eval(&line).and_then(|(out, status)| {
- out
- // print the results as they come in
- .and_then(|out| print(&out))
- // wait for all output to be finished
- .collect()
- // ignore io errors since we just keep reading even after
- // the process exits and the other end of the pty is
- // closed
- .or_else(|_| futures::future::ok(vec![]))
- // once the output is all processed, then wait on the
- // process to exit
- .and_then(|_| status)
+ eval(&line).fold(None, |acc, event| match event {
+ crate::process::ProcessEvent::Output(out) => {
+ match print(&out) {
+ Ok(()) => futures::future::ok(acc),
+ Err(e) => futures::future::err(e),
+ }
+ }
+ crate::process::ProcessEvent::Exit(status) => {
+ futures::future::ok(Some(status))
+ }
})
});
Some(repl.then(move |res| match res {
- Ok(status) => {
+ Ok(Some(status)) => {
eprint!("process exited with status {}\r\n", status);
return Ok((done, false));
}
+ Ok(None) => {
+ eprint!("process exited weirdly?\r\n");
+ return Ok((done, false));
+ }
Err(Error::ReadError(crate::readline::Error::EOF)) => {
return Ok((done, true));
}
@@ -58,24 +59,12 @@ fn read() -> impl futures::future::Future<Item = String, Error = Error> {
fn eval(
line: &str,
-) -> impl futures::future::Future<
- Item = (
- impl futures::stream::Stream<Item = Vec<u8>, Error = Error>,
- impl futures::future::Future<
- Item = std::process::ExitStatus,
- Error = Error,
- >,
- ),
- Error = Error,
-> {
- match crate::process::spawn(line) {
- Ok((out, status)) => Ok((
- out.map_err(|e| Error::EvalError(e)),
- status.map_err(|e| Error::EvalError(e)),
- )),
- Err(e) => Err(e).map_err(|e| Error::EvalError(e)),
- }
- .into_future()
+) -> impl futures::stream::Stream<Item = crate::process::ProcessEvent, Error = Error>
+{
+ crate::process::spawn(line)
+ .into_future()
+ .flatten_stream()
+ .map_err(|e| Error::EvalError(e))
}
fn print(out: &[u8]) -> Result<(), Error> {