aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-07-07 01:50:29 -0400
committerJesse Luehrs <doy@tozt.net>2019-07-07 02:04:15 -0400
commitfd3f93882209b413b6bec95d3f6c806ebc01a460 (patch)
treec3b8e3a2332c9b8837f911f9bf4ea0e5d6ea4f90
parenta850e3039ab8480846d8407b1c7fbb4439974f76 (diff)
downloadnbsh-old-fd3f93882209b413b6bec95d3f6c806ebc01a460.tar.gz
nbsh-old-fd3f93882209b413b6bec95d3f6c806ebc01a460.zip
keep track of process state as it is being executed
-rw-r--r--src/main.rs1
-rw-r--r--src/repl.rs117
-rw-r--r--src/state.rs164
3 files changed, 226 insertions, 56 deletions
diff --git a/src/main.rs b/src/main.rs
index 0a5ebde..cd3dc54 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -17,6 +17,7 @@ mod parser;
mod process;
mod readline;
mod repl;
+mod state;
fn main() {
repl::repl();
diff --git a/src/repl.rs b/src/repl.rs
index 5a028fe..a4dba86 100644
--- a/src/repl.rs
+++ b/src/repl.rs
@@ -1,6 +1,5 @@
use futures::future::{Future as _, IntoFuture as _};
use futures::stream::Stream as _;
-use snafu::ResultExt as _;
use std::io::Write as _;
#[derive(Debug, snafu::Snafu)]
@@ -12,51 +11,58 @@ pub enum Error {
Eval { source: crate::eval::Error },
#[snafu(display("error during print: {}", source))]
- Print { source: std::io::Error },
+ Print { source: crate::state::Error },
}
-pub type Result<T> = std::result::Result<T, Error>;
-
pub fn repl() {
- let loop_stream = futures::stream::unfold(false, |done| {
- if done {
- return None;
- }
+ tokio::run(futures::lazy(|| {
+ let (w, r) = futures::sync::mpsc::channel(0);
+
+ let state_stream = crate::state::State::new(r).map_err(|e| {
+ error(&Error::Print { source: e });
+ });
+ tokio::spawn(state_stream.collect().map(|_| ()));
- let repl = read()
- .and_then(|line| {
- eval(&line).for_each(|event| {
- futures::future::FutureResult::from(print(&event))
- })
- })
- .then(|res| match res {
- // successful run or empty input means prompt again
- Ok(_)
- | Err(Error::Eval {
- source:
- crate::eval::Error::Parser {
- source: crate::parser::Error::CommandRequired,
- ..
- },
- }) => Ok((false, false)),
- // eof means we're done
- Err(Error::Read {
- source: crate::readline::Error::EOF,
- }) => Ok((false, true)),
- // any other errors should be displayed, then we prompt again
- Err(e) => {
- let stderr = std::io::stderr();
- let mut stderr = stderr.lock();
- // panics seem fine for errors during error handling
- write!(stderr, "{}\r\n", e).unwrap();
- stderr.flush().unwrap();
- Ok((false, false))
+ let loop_stream =
+ futures::stream::unfold((false, 0), move |(done, idx)| {
+ if done {
+ return None;
}
+ let w = w.clone();
+
+ let repl = read()
+ .and_then(move |line| {
+ eval(&line).for_each(move |event| {
+ let w = w.clone();
+ print(w, idx, &event)
+ })
+ })
+ .then(move |res| match res {
+ // successful run or empty input means prompt again
+ Ok(_)
+ | Err(Error::Eval {
+ source:
+ crate::eval::Error::Parser {
+ source:
+ crate::parser::Error::CommandRequired,
+ ..
+ },
+ }) => Ok(((false, idx + 1), (false, idx + 1))),
+ // eof means we're done
+ Err(Error::Read {
+ source: crate::readline::Error::EOF,
+ }) => Ok(((false, idx + 1), (true, idx + 1))),
+ // any other errors should be displayed, then we
+ // prompt again
+ Err(e) => {
+ error(&e);
+ Ok(((false, idx + 1), (false, idx + 1)))
+ }
+ });
+ Some(repl)
});
- Some(repl)
- });
- let loop_future = loop_stream.collect().map(|_| ());
- tokio::run(loop_future);
+ loop_stream.collect().map(|_| ())
+ }));
}
fn read() -> impl futures::future::Future<Item = String, Error = Error> {
@@ -76,20 +82,19 @@ fn eval(
.map_err(|e| Error::Eval { source: e })
}
-fn print(event: &crate::eval::CommandEvent) -> Result<()> {
- match event {
- crate::eval::CommandEvent::CommandStart(cmd, args) => {
- eprint!("running '{} {:?}'\r\n", cmd, args);
- }
- crate::eval::CommandEvent::Output(out) => {
- let stdout = std::io::stdout();
- let mut stdout = stdout.lock();
- stdout.write(out).context(Print)?;
- stdout.flush().context(Print)?;
- }
- crate::eval::CommandEvent::CommandExit(status) => {
- eprint!("command exited: {}\r\n", status);
- }
- }
- Ok(())
+fn print(
+ w: futures::sync::mpsc::Sender<crate::state::StateEvent>,
+ idx: usize,
+ event: &crate::eval::CommandEvent,
+) -> impl futures::future::Future<Item = (), Error = Error> {
+ crate::state::update(w, idx, event)
+ .map_err(|e| Error::Print { source: e })
+}
+
+fn error(e: &Error) {
+ let stderr = std::io::stderr();
+ let mut stderr = stderr.lock();
+ // panics seem fine for errors during error handling
+ write!(stderr, "{}\r\n", e).unwrap();
+ stderr.flush().unwrap();
}
diff --git a/src/state.rs b/src/state.rs
new file mode 100644
index 0000000..45ee36d
--- /dev/null
+++ b/src/state.rs
@@ -0,0 +1,164 @@
+use futures::future::Future as _;
+use futures::sink::Sink as _;
+use snafu::{OptionExt as _, ResultExt as _};
+use std::io::Write as _;
+
+#[derive(Debug, snafu::Snafu)]
+pub enum Error {
+ #[snafu(display("invalid command index: {}", idx))]
+ InvalidCommandIndex { idx: usize },
+
+ #[snafu(display("error sending message: {}", source))]
+ Send {
+ source: futures::sync::mpsc::SendError<StateEvent>,
+ },
+
+ #[snafu(display("error printing output: {}", source))]
+ PrintOutput { source: std::io::Error },
+
+ #[snafu(display("this error should not be possible"))]
+ Unreachable,
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug)]
+pub enum StateEvent {
+ Start(usize, String, Vec<String>),
+ Output(usize, Vec<u8>),
+ Exit(usize, std::process::ExitStatus),
+}
+
+#[derive(Debug)]
+pub struct State {
+ r: futures::sync::mpsc::Receiver<StateEvent>,
+ commands: std::collections::HashMap<usize, Command>,
+}
+
+impl State {
+ pub fn new(r: futures::sync::mpsc::Receiver<StateEvent>) -> Self {
+ Self {
+ r,
+ commands: std::collections::HashMap::new(),
+ }
+ }
+
+ pub fn command_start(
+ &mut self,
+ idx: usize,
+ cmd: &str,
+ args: &[String],
+ ) -> Result<()> {
+ snafu::ensure!(
+ !self.commands.contains_key(&idx),
+ InvalidCommandIndex { idx }
+ );
+ let command = Command::new(cmd, args);
+ self.commands.insert(idx, command.clone());
+ eprint!("running '{} {:?}'\r\n", command.cmd, command.args);
+ Ok(())
+ }
+
+ pub fn command_output(
+ &mut self,
+ idx: usize,
+ output: &[u8],
+ ) -> Result<()> {
+ let command = self
+ .commands
+ .get_mut(&idx)
+ .context(InvalidCommandIndex { idx })?;
+ command.output.append(&mut output.to_vec());
+
+ let stdout = std::io::stdout();
+ let mut stdout = stdout.lock();
+ stdout.write(output).context(PrintOutput)?;
+ stdout.flush().context(PrintOutput)?;
+
+ Ok(())
+ }
+
+ pub fn command_exit(
+ &mut self,
+ idx: usize,
+ status: std::process::ExitStatus,
+ ) -> Result<()> {
+ let command = self
+ .commands
+ .get_mut(&idx)
+ .context(InvalidCommandIndex { idx })?;
+ command.status = Some(status);
+ eprint!("command exited: {}\r\n", status);
+ Ok(())
+ }
+}
+
+impl futures::stream::Stream for State {
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ let event = futures::try_ready!(self
+ .r
+ .poll()
+ .map_err(|_| Error::Unreachable));
+ match event {
+ Some(StateEvent::Start(idx, cmd, args)) => {
+ self.command_start(idx, &cmd, &args)?;
+ Ok(futures::Async::Ready(Some(())))
+ }
+ Some(StateEvent::Output(idx, output)) => {
+ self.command_output(idx, &output)?;
+ Ok(futures::Async::Ready(Some(())))
+ }
+ Some(StateEvent::Exit(idx, status)) => {
+ self.command_exit(idx, status)?;
+ Ok(futures::Async::Ready(Some(())))
+ }
+ None => Ok(futures::Async::Ready(None)),
+ }
+ }
+}
+
+pub fn update(
+ w: futures::sync::mpsc::Sender<StateEvent>,
+ idx: usize,
+ event: &crate::eval::CommandEvent,
+) -> impl futures::future::Future<Item = (), Error = Error> {
+ match event {
+ crate::eval::CommandEvent::CommandStart(cmd, args) => {
+ w.send(crate::state::StateEvent::Start(
+ idx,
+ cmd.to_string(),
+ args.to_vec(),
+ ))
+ }
+ crate::eval::CommandEvent::Output(out) => {
+ w.send(crate::state::StateEvent::Output(idx, out.to_vec()))
+ }
+ crate::eval::CommandEvent::CommandExit(status) => {
+ w.send(crate::state::StateEvent::Exit(idx, *status))
+ }
+ }
+ .map(|_| ())
+ .map_err(|e| Error::Send { source: e })
+}
+
+#[derive(Debug, Clone)]
+struct Command {
+ cmd: String,
+ args: Vec<String>,
+ output: Vec<u8>,
+ status: Option<std::process::ExitStatus>,
+}
+
+impl Command {
+ fn new(cmd: &str, args: &[String]) -> Self {
+ Self {
+ cmd: cmd.to_string(),
+ args: args.to_vec(),
+ output: vec![],
+ status: None,
+ }
+ }
+}