aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-07-08 23:49:00 -0400
committerJesse Luehrs <doy@tozt.net>2019-07-09 00:05:22 -0400
commit27463b78a4c12c9840f9df95e4278a050b73152e (patch)
tree83e2f0b98053e53cbd82177f39ca6392903163dc
parentfd3f93882209b413b6bec95d3f6c806ebc01a460 (diff)
downloadnbsh-old-27463b78a4c12c9840f9df95e4278a050b73152e.tar.gz
nbsh-old-27463b78a4c12c9840f9df95e4278a050b73152e.zip
these have no reason to be streams, they can just be futures
-rw-r--r--src/repl.rs74
-rw-r--r--src/state.rs38
2 files changed, 52 insertions, 60 deletions
diff --git a/src/repl.rs b/src/repl.rs
index a4dba86..874a22e 100644
--- a/src/repl.rs
+++ b/src/repl.rs
@@ -18,50 +18,42 @@ pub fn repl() {
tokio::run(futures::lazy(|| {
let (w, r) = futures::sync::mpsc::channel(0);
- let state_stream = crate::state::State::new(r).map_err(|e| {
+ tokio::spawn(crate::state::State::new(r).map_err(|e| {
error(&Error::Print { source: e });
- });
- tokio::spawn(state_stream.collect().map(|_| ()));
+ }));
- let loop_stream =
- futures::stream::unfold((false, 0), move |(done, idx)| {
- if done {
- return None;
- }
- let w = w.clone();
-
- let repl = read()
- .and_then(move |line| {
- eval(&line).for_each(move |event| {
- let w = w.clone();
- print(w, idx, &event)
- })
+ futures::future::loop_fn(0, move |idx| {
+ let w = w.clone();
+ read()
+ .and_then(move |line| {
+ let w = w.clone();
+ eval(&line).for_each(move |event| {
+ let w = w.clone();
+ print(w, idx, &event)
})
- .then(move |res| match res {
- // successful run or empty input means prompt again
- Ok(_)
- | Err(Error::Eval {
- source:
- crate::eval::Error::Parser {
- source:
- crate::parser::Error::CommandRequired,
- ..
- },
- }) => Ok(((false, idx + 1), (false, idx + 1))),
- // eof means we're done
- Err(Error::Read {
- source: crate::readline::Error::EOF,
- }) => Ok(((false, idx + 1), (true, idx + 1))),
- // any other errors should be displayed, then we
- // prompt again
- Err(e) => {
- error(&e);
- Ok(((false, idx + 1), (false, idx + 1)))
- }
- });
- Some(repl)
- });
- loop_stream.collect().map(|_| ())
+ })
+ .then(move |res| match res {
+ // successful run or empty input means prompt again
+ Ok(_)
+ | Err(Error::Eval {
+ source:
+ crate::eval::Error::Parser {
+ source: crate::parser::Error::CommandRequired,
+ ..
+ },
+ }) => Ok(futures::future::Loop::Continue(idx + 1)),
+ // eof means we're done
+ Err(Error::Read {
+ source: crate::readline::Error::EOF,
+ }) => Ok(futures::future::Loop::Break(())),
+ // any other errors should be displayed, then we
+ // prompt again
+ Err(e) => {
+ error(&e);
+ Ok(futures::future::Loop::Continue(idx + 1))
+ }
+ })
+ })
}));
}
diff --git a/src/state.rs b/src/state.rs
index 45ee36d..0965aea 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,5 +1,6 @@
use futures::future::Future as _;
use futures::sink::Sink as _;
+use futures::stream::Stream as _;
use snafu::{OptionExt as _, ResultExt as _};
use std::io::Write as _;
@@ -93,29 +94,28 @@ impl State {
}
}
-impl futures::stream::Stream for State {
+impl futures::future::Future for State {
type Item = ();
type Error = Error;
- fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- let event = futures::try_ready!(self
- .r
- .poll()
- .map_err(|_| Error::Unreachable));
- match event {
- Some(StateEvent::Start(idx, cmd, args)) => {
- self.command_start(idx, &cmd, &args)?;
- Ok(futures::Async::Ready(Some(())))
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ loop {
+ let event = futures::try_ready!(self
+ .r
+ .poll()
+ .map_err(|_| Error::Unreachable));
+ match event {
+ Some(StateEvent::Start(idx, cmd, args)) => {
+ self.command_start(idx, &cmd, &args)?;
+ }
+ Some(StateEvent::Output(idx, output)) => {
+ self.command_output(idx, &output)?;
+ }
+ Some(StateEvent::Exit(idx, status)) => {
+ self.command_exit(idx, status)?;
+ }
+ None => return Ok(futures::Async::Ready(())),
}
- Some(StateEvent::Output(idx, output)) => {
- self.command_output(idx, &output)?;
- Ok(futures::Async::Ready(Some(())))
- }
- Some(StateEvent::Exit(idx, status)) => {
- self.command_exit(idx, status)?;
- Ok(futures::Async::Ready(Some(())))
- }
- None => Ok(futures::Async::Ready(None)),
}
}
}