From fd3f93882209b413b6bec95d3f6c806ebc01a460 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sun, 7 Jul 2019 01:50:29 -0400 Subject: keep track of process state as it is being executed --- src/main.rs | 1 + src/repl.rs | 117 ++++++++++++++++++++++-------------------- src/state.rs | 164 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 226 insertions(+), 56 deletions(-) create mode 100644 src/state.rs diff --git a/src/main.rs b/src/main.rs index 0a5ebde..cd3dc54 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ mod parser; mod process; mod readline; mod repl; +mod state; fn main() { repl::repl(); diff --git a/src/repl.rs b/src/repl.rs index 5a028fe..a4dba86 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -1,6 +1,5 @@ use futures::future::{Future as _, IntoFuture as _}; use futures::stream::Stream as _; -use snafu::ResultExt as _; use std::io::Write as _; #[derive(Debug, snafu::Snafu)] @@ -12,51 +11,58 @@ pub enum Error { Eval { source: crate::eval::Error }, #[snafu(display("error during print: {}", source))] - Print { source: std::io::Error }, + Print { source: crate::state::Error }, } -pub type Result = std::result::Result; - pub fn repl() { - let loop_stream = futures::stream::unfold(false, |done| { - if done { - return None; - } + tokio::run(futures::lazy(|| { + let (w, r) = futures::sync::mpsc::channel(0); + + let state_stream = crate::state::State::new(r).map_err(|e| { + error(&Error::Print { source: e }); + }); + tokio::spawn(state_stream.collect().map(|_| ())); - let repl = read() - .and_then(|line| { - eval(&line).for_each(|event| { - futures::future::FutureResult::from(print(&event)) - }) - }) - .then(|res| match res { - // successful run or empty input means prompt again - Ok(_) - | Err(Error::Eval { - source: - crate::eval::Error::Parser { - source: crate::parser::Error::CommandRequired, - .. - }, - }) => Ok((false, false)), - // eof means we're done - Err(Error::Read { - source: crate::readline::Error::EOF, - }) => Ok((false, true)), - // any other errors should be displayed, then we prompt again - Err(e) => { - let stderr = std::io::stderr(); - let mut stderr = stderr.lock(); - // panics seem fine for errors during error handling - write!(stderr, "{}\r\n", e).unwrap(); - stderr.flush().unwrap(); - Ok((false, false)) + let loop_stream = + futures::stream::unfold((false, 0), move |(done, idx)| { + if done { + return None; } + let w = w.clone(); + + let repl = read() + .and_then(move |line| { + eval(&line).for_each(move |event| { + let w = w.clone(); + print(w, idx, &event) + }) + }) + .then(move |res| match res { + // successful run or empty input means prompt again + Ok(_) + | Err(Error::Eval { + source: + crate::eval::Error::Parser { + source: + crate::parser::Error::CommandRequired, + .. + }, + }) => Ok(((false, idx + 1), (false, idx + 1))), + // eof means we're done + Err(Error::Read { + source: crate::readline::Error::EOF, + }) => Ok(((false, idx + 1), (true, idx + 1))), + // any other errors should be displayed, then we + // prompt again + Err(e) => { + error(&e); + Ok(((false, idx + 1), (false, idx + 1))) + } + }); + Some(repl) }); - Some(repl) - }); - let loop_future = loop_stream.collect().map(|_| ()); - tokio::run(loop_future); + loop_stream.collect().map(|_| ()) + })); } fn read() -> impl futures::future::Future { @@ -76,20 +82,19 @@ fn eval( .map_err(|e| Error::Eval { source: e }) } -fn print(event: &crate::eval::CommandEvent) -> Result<()> { - match event { - crate::eval::CommandEvent::CommandStart(cmd, args) => { - eprint!("running '{} {:?}'\r\n", cmd, args); - } - crate::eval::CommandEvent::Output(out) => { - let stdout = std::io::stdout(); - let mut stdout = stdout.lock(); - stdout.write(out).context(Print)?; - stdout.flush().context(Print)?; - } - crate::eval::CommandEvent::CommandExit(status) => { - eprint!("command exited: {}\r\n", status); - } - } - Ok(()) +fn print( + w: futures::sync::mpsc::Sender, + idx: usize, + event: &crate::eval::CommandEvent, +) -> impl futures::future::Future { + crate::state::update(w, idx, event) + .map_err(|e| Error::Print { source: e }) +} + +fn error(e: &Error) { + let stderr = std::io::stderr(); + let mut stderr = stderr.lock(); + // panics seem fine for errors during error handling + write!(stderr, "{}\r\n", e).unwrap(); + stderr.flush().unwrap(); } diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..45ee36d --- /dev/null +++ b/src/state.rs @@ -0,0 +1,164 @@ +use futures::future::Future as _; +use futures::sink::Sink as _; +use snafu::{OptionExt as _, ResultExt as _}; +use std::io::Write as _; + +#[derive(Debug, snafu::Snafu)] +pub enum Error { + #[snafu(display("invalid command index: {}", idx))] + InvalidCommandIndex { idx: usize }, + + #[snafu(display("error sending message: {}", source))] + Send { + source: futures::sync::mpsc::SendError, + }, + + #[snafu(display("error printing output: {}", source))] + PrintOutput { source: std::io::Error }, + + #[snafu(display("this error should not be possible"))] + Unreachable, +} + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub enum StateEvent { + Start(usize, String, Vec), + Output(usize, Vec), + Exit(usize, std::process::ExitStatus), +} + +#[derive(Debug)] +pub struct State { + r: futures::sync::mpsc::Receiver, + commands: std::collections::HashMap, +} + +impl State { + pub fn new(r: futures::sync::mpsc::Receiver) -> Self { + Self { + r, + commands: std::collections::HashMap::new(), + } + } + + pub fn command_start( + &mut self, + idx: usize, + cmd: &str, + args: &[String], + ) -> Result<()> { + snafu::ensure!( + !self.commands.contains_key(&idx), + InvalidCommandIndex { idx } + ); + let command = Command::new(cmd, args); + self.commands.insert(idx, command.clone()); + eprint!("running '{} {:?}'\r\n", command.cmd, command.args); + Ok(()) + } + + pub fn command_output( + &mut self, + idx: usize, + output: &[u8], + ) -> Result<()> { + let command = self + .commands + .get_mut(&idx) + .context(InvalidCommandIndex { idx })?; + command.output.append(&mut output.to_vec()); + + let stdout = std::io::stdout(); + let mut stdout = stdout.lock(); + stdout.write(output).context(PrintOutput)?; + stdout.flush().context(PrintOutput)?; + + Ok(()) + } + + pub fn command_exit( + &mut self, + idx: usize, + status: std::process::ExitStatus, + ) -> Result<()> { + let command = self + .commands + .get_mut(&idx) + .context(InvalidCommandIndex { idx })?; + command.status = Some(status); + eprint!("command exited: {}\r\n", status); + Ok(()) + } +} + +impl futures::stream::Stream for State { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + let event = futures::try_ready!(self + .r + .poll() + .map_err(|_| Error::Unreachable)); + match event { + Some(StateEvent::Start(idx, cmd, args)) => { + self.command_start(idx, &cmd, &args)?; + Ok(futures::Async::Ready(Some(()))) + } + Some(StateEvent::Output(idx, output)) => { + self.command_output(idx, &output)?; + Ok(futures::Async::Ready(Some(()))) + } + Some(StateEvent::Exit(idx, status)) => { + self.command_exit(idx, status)?; + Ok(futures::Async::Ready(Some(()))) + } + None => Ok(futures::Async::Ready(None)), + } + } +} + +pub fn update( + w: futures::sync::mpsc::Sender, + idx: usize, + event: &crate::eval::CommandEvent, +) -> impl futures::future::Future { + match event { + crate::eval::CommandEvent::CommandStart(cmd, args) => { + w.send(crate::state::StateEvent::Start( + idx, + cmd.to_string(), + args.to_vec(), + )) + } + crate::eval::CommandEvent::Output(out) => { + w.send(crate::state::StateEvent::Output(idx, out.to_vec())) + } + crate::eval::CommandEvent::CommandExit(status) => { + w.send(crate::state::StateEvent::Exit(idx, *status)) + } + } + .map(|_| ()) + .map_err(|e| Error::Send { source: e }) +} + +#[derive(Debug, Clone)] +struct Command { + cmd: String, + args: Vec, + output: Vec, + status: Option, +} + +impl Command { + fn new(cmd: &str, args: &[String]) -> Self { + Self { + cmd: cmd.to_string(), + args: args.to_vec(), + output: vec![], + status: None, + } + } +} -- cgit v1.2.3-54-g00ecf