From 27463b78a4c12c9840f9df95e4278a050b73152e Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Mon, 8 Jul 2019 23:49:00 -0400 Subject: these have no reason to be streams, they can just be futures --- src/repl.rs | 74 +++++++++++++++++++++++++++--------------------------------- src/state.rs | 38 +++++++++++++++---------------- 2 files changed, 52 insertions(+), 60 deletions(-) diff --git a/src/repl.rs b/src/repl.rs index a4dba86..874a22e 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -18,50 +18,42 @@ pub fn repl() { tokio::run(futures::lazy(|| { let (w, r) = futures::sync::mpsc::channel(0); - let state_stream = crate::state::State::new(r).map_err(|e| { + tokio::spawn(crate::state::State::new(r).map_err(|e| { error(&Error::Print { source: e }); - }); - tokio::spawn(state_stream.collect().map(|_| ())); + })); - let loop_stream = - futures::stream::unfold((false, 0), move |(done, idx)| { - if done { - return None; - } - let w = w.clone(); - - let repl = read() - .and_then(move |line| { - eval(&line).for_each(move |event| { - let w = w.clone(); - print(w, idx, &event) - }) + futures::future::loop_fn(0, move |idx| { + let w = w.clone(); + read() + .and_then(move |line| { + let w = w.clone(); + eval(&line).for_each(move |event| { + let w = w.clone(); + print(w, idx, &event) }) - .then(move |res| match res { - // successful run or empty input means prompt again - Ok(_) - | Err(Error::Eval { - source: - crate::eval::Error::Parser { - source: - crate::parser::Error::CommandRequired, - .. - }, - }) => Ok(((false, idx + 1), (false, idx + 1))), - // eof means we're done - Err(Error::Read { - source: crate::readline::Error::EOF, - }) => Ok(((false, idx + 1), (true, idx + 1))), - // any other errors should be displayed, then we - // prompt again - Err(e) => { - error(&e); - Ok(((false, idx + 1), (false, idx + 1))) - } - }); - Some(repl) - }); - loop_stream.collect().map(|_| ()) + }) + .then(move |res| match res { + // successful run or empty input means prompt again + Ok(_) + | Err(Error::Eval { + source: + crate::eval::Error::Parser { + source: crate::parser::Error::CommandRequired, + .. + }, + }) => Ok(futures::future::Loop::Continue(idx + 1)), + // eof means we're done + Err(Error::Read { + source: crate::readline::Error::EOF, + }) => Ok(futures::future::Loop::Break(())), + // any other errors should be displayed, then we + // prompt again + Err(e) => { + error(&e); + Ok(futures::future::Loop::Continue(idx + 1)) + } + }) + }) })); } diff --git a/src/state.rs b/src/state.rs index 45ee36d..0965aea 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,5 +1,6 @@ use futures::future::Future as _; use futures::sink::Sink as _; +use futures::stream::Stream as _; use snafu::{OptionExt as _, ResultExt as _}; use std::io::Write as _; @@ -93,29 +94,28 @@ impl State { } } -impl futures::stream::Stream for State { +impl futures::future::Future for State { type Item = (); type Error = Error; - fn poll(&mut self) -> futures::Poll, Self::Error> { - let event = futures::try_ready!(self - .r - .poll() - .map_err(|_| Error::Unreachable)); - match event { - Some(StateEvent::Start(idx, cmd, args)) => { - self.command_start(idx, &cmd, &args)?; - Ok(futures::Async::Ready(Some(()))) + fn poll(&mut self) -> futures::Poll { + loop { + let event = futures::try_ready!(self + .r + .poll() + .map_err(|_| Error::Unreachable)); + match event { + Some(StateEvent::Start(idx, cmd, args)) => { + self.command_start(idx, &cmd, &args)?; + } + Some(StateEvent::Output(idx, output)) => { + self.command_output(idx, &output)?; + } + Some(StateEvent::Exit(idx, status)) => { + self.command_exit(idx, status)?; + } + None => return Ok(futures::Async::Ready(())), } - Some(StateEvent::Output(idx, output)) => { - self.command_output(idx, &output)?; - Ok(futures::Async::Ready(Some(()))) - } - Some(StateEvent::Exit(idx, status)) => { - self.command_exit(idx, status)?; - Ok(futures::Async::Ready(Some(()))) - } - None => Ok(futures::Async::Ready(None)), } } } -- cgit v1.2.3-54-g00ecf