aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-06-09 00:55:53 -0400
committerJesse Luehrs <doy@tozt.net>2019-06-09 01:05:53 -0400
commitc383570c75d9757f405b2e43ab48f458759b1403 (patch)
treeeebc7e4f4e8587feba1c2d9643df8f148463e24c
parent59abbab62e936a46ceea7bb7ebac5ca9ef4ba2da (diff)
downloadnbsh-old-c383570c75d9757f405b2e43ab48f458759b1403.tar.gz
nbsh-old-c383570c75d9757f405b2e43ab48f458759b1403.zip
implement process running
-rw-r--r--Cargo.toml8
-rw-r--r--examples/pty.rs25
-rw-r--r--src/main.rs3
-rw-r--r--src/process.rs66
-rw-r--r--src/readline.rs37
-rw-r--r--src/repl.rs95
6 files changed, 183 insertions, 51 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 702c094..a9cb7e1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,7 +5,7 @@ authors = ["Jesse Luehrs <doy@tozt.net>"]
edition = "2018"
[dependencies]
-crossterm = "0.9.5"
-futures = "0.1.27"
-tokio = "0.1.21"
-tokio-pty-process = "0.4.0"
+crossterm = "0.9"
+futures = "0.1"
+tokio = "0.1"
+tokio-pty-process = "0.4"
diff --git a/examples/pty.rs b/examples/pty.rs
new file mode 100644
index 0000000..d5b5a88
--- /dev/null
+++ b/examples/pty.rs
@@ -0,0 +1,25 @@
+use futures::future::{Future, IntoFuture};
+use tokio_pty_process::CommandExt;
+
+fn main() {
+ tokio::run(futures::future::lazy(move || {
+ let master = tokio_pty_process::AsyncPtyMaster::open().unwrap();
+ let args: Vec<&str> = vec![];
+ let child = std::process::Command::new("false")
+ .args(&args)
+ .spawn_pty_async(&master)
+ .unwrap();
+ tokio::spawn(
+ child
+ .map(|status| {
+ eprintln!("got status {}", status);
+ ()
+ })
+ .map_err(|_| ()),
+ )
+ .into_future()
+ .wait()
+ .unwrap();
+ futures::future::ok(())
+ }));
+}
diff --git a/src/main.rs b/src/main.rs
index 105450f..1d78e10 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,8 +1,7 @@
+mod process;
mod readline;
mod repl;
fn main() {
- let _screen = crossterm::RawScreen::into_raw_mode().unwrap();
-
repl::repl();
}
diff --git a/src/process.rs b/src/process.rs
new file mode 100644
index 0000000..7b8d8e7
--- /dev/null
+++ b/src/process.rs
@@ -0,0 +1,66 @@
+use futures::future::Future;
+use futures::try_ready;
+use tokio::io::AsyncRead;
+use tokio_pty_process::CommandExt;
+
+#[derive(Debug)]
+pub enum Error {
+ IOError(std::io::Error),
+}
+
+pub fn spawn(
+ line: &str,
+) -> Result<
+ (
+ PtyStream,
+ impl futures::future::Future<
+ Item = std::process::ExitStatus,
+ Error = Error,
+ >,
+ ),
+ Error,
+> {
+ let master = tokio_pty_process::AsyncPtyMaster::open()
+ .map_err(|e| Error::IOError(e))?;
+ let mut argv: Vec<_> = line.split(' ').collect();
+ let cmd = argv.remove(0);
+ let child = std::process::Command::new(cmd)
+ .args(&argv)
+ .spawn_pty_async(&master)
+ .map_err(|e| Error::IOError(e))?
+ .map_err(|e| Error::IOError(e));
+ let stream = PtyStream::new(master);
+ Ok((stream, child))
+}
+
+pub struct PtyStream {
+ master: tokio_pty_process::AsyncPtyMaster,
+ buf: Vec<u8>,
+}
+
+impl PtyStream {
+ fn new(master: tokio_pty_process::AsyncPtyMaster) -> Self {
+ let buf = Vec::with_capacity(4096);
+ PtyStream { master, buf }
+ }
+}
+
+#[must_use = "streams do nothing unless polled"]
+impl futures::stream::Stream for PtyStream {
+ type Item = Vec<u8>;
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ self.buf.clear();
+ let n = try_ready!(self
+ .master
+ .read_buf(&mut self.buf)
+ .map_err(|e| { Error::IOError(e) }));
+ if n > 0 {
+ let bytes = self.buf[..n].to_vec();
+ Ok(futures::Async::Ready(Some(bytes)))
+ } else {
+ Ok(futures::Async::NotReady)
+ }
+ }
+}
diff --git a/src/readline.rs b/src/readline.rs
index 73b5195..ef4fcae 100644
--- a/src/readline.rs
+++ b/src/readline.rs
@@ -12,9 +12,23 @@ pub struct Readline {
prompt: String,
wrote_prompt: bool,
echo: bool,
+ _raw_screen: crossterm::RawScreen,
}
impl Readline {
+ fn new(prompt: &str, echo: bool) -> Self {
+ let screen = crossterm::RawScreen::into_raw_mode().unwrap();
+
+ Readline {
+ reader: None,
+ buffer: String::new(),
+ prompt: prompt.to_string(),
+ wrote_prompt: false,
+ echo,
+ _raw_screen: screen,
+ }
+ }
+
fn process_event(
&mut self,
event: crossterm::InputEvent,
@@ -85,6 +99,7 @@ impl Readline {
}
}
+#[must_use = "futures do nothing unless polled"]
impl futures::future::Future for Readline {
type Item = String;
type Error = Error;
@@ -95,9 +110,9 @@ impl futures::future::Future for Readline {
self.wrote_prompt = true;
}
- let reader = self.reader.get_or_insert_with(|| {
- KeyReader::new(tokio::prelude::task::current())
- });
+ let reader = self
+ .reader
+ .get_or_insert_with(|| KeyReader::new(futures::task::current()));
if let Some(event) = reader.poll() {
self.process_event(event)
} else {
@@ -107,13 +122,7 @@ impl futures::future::Future for Readline {
}
pub fn readline(prompt: &str, echo: bool) -> Readline {
- Readline {
- reader: None,
- buffer: String::new(),
- prompt: prompt.to_string(),
- wrote_prompt: false,
- echo,
- }
+ Readline::new(prompt, echo)
}
struct KeyReader {
@@ -122,10 +131,12 @@ struct KeyReader {
}
impl KeyReader {
- fn new(task: tokio::prelude::task::Task) -> Self {
+ fn new(task: futures::task::Task) -> Self {
let reader = crossterm::input().read_sync();
let (events_tx, events_rx) = std::sync::mpsc::channel();
let (quit_tx, quit_rx) = std::sync::mpsc::channel();
+ // TODO: this is pretty janky - it'd be better to build in more useful
+ // support to crossterm directly
std::thread::spawn(move || {
for event in reader {
let newline = event
@@ -162,8 +173,6 @@ impl Drop for KeyReader {
// don't care if it fails to send, this can happen if the thread
// terminates due to seeing a newline before the keyreader goes out of
// scope
- match self.quit.send(()) {
- _ => {}
- }
+ let _ = self.quit.send(());
}
}
diff --git a/src/repl.rs b/src/repl.rs
index c44174a..af62f95 100644
--- a/src/repl.rs
+++ b/src/repl.rs
@@ -1,53 +1,86 @@
-use futures::future::Future;
+use futures::future::{Future, IntoFuture};
+use futures::stream::Stream;
use std::io::Write;
#[derive(Debug)]
enum Error {
ReadError(crate::readline::Error),
- // EvalError(std::io::Error),
+ EvalError(crate::process::Error),
PrintError(std::io::Error),
- // LoopError,
}
pub fn repl() {
- tokio::run(tokio::prelude::future::lazy(|| {
- let mut done = false;
- while !done {
- let res = read()
- .and_then(move |line| eval(&line))
- .and_then(move |out| print(&out))
- .wait();
- match res {
- Ok(_) => {}
- Err(Error::ReadError(crate::readline::Error::EOF)) => {
- done = true;
- }
- Err(e) => {
- let stderr = std::io::stderr();
- let mut stderr = stderr.lock();
- write!(stderr, "error: {:?}", e).unwrap();
- stderr.flush().unwrap();
- done = true;
- }
- }
+ let loop_stream = futures::stream::unfold(false, |done| {
+ if done {
+ return None;
}
- futures::future::ok(())
- }));
+
+ let repl = read().and_then(|line| {
+ eval(&line).and_then(|(out, status)| {
+ out
+ // print the results as they come in
+ .and_then(|out| print(&out))
+ // wait for all output to be finished
+ .collect()
+ // ignore io errors since we just keep reading even after
+ // the process exits and the other end of the pty is
+ // closed
+ .or_else(|_| futures::future::ok(vec![]))
+ // once the output is all processed, then wait on the
+ // process to exit
+ .and_then(|_| status)
+ })
+ });
+
+ Some(repl.then(move |res| match res {
+ Ok(status) => {
+ eprint!("process exited with status {}\r\n", status);
+ return Ok((done, false));
+ }
+ Err(Error::ReadError(crate::readline::Error::EOF)) => {
+ return Ok((done, true));
+ }
+ Err(e) => {
+ let stderr = std::io::stderr();
+ let mut stderr = stderr.lock();
+ write!(stderr, "error: {:?}\r\n", e).unwrap();
+ stderr.flush().unwrap();
+ return Err(());
+ }
+ }))
+ });
+ tokio::run(loop_stream.collect().map(|_| ()));
}
fn read() -> impl futures::future::Future<Item = String, Error = Error> {
crate::readline::readline("$ ", true).map_err(|e| Error::ReadError(e))
}
-fn eval(line: &str) -> Result<String, Error> {
- Ok(format!("got line '{}'\r\n", line))
+fn eval(
+ line: &str,
+) -> impl futures::future::Future<
+ Item = (
+ impl futures::stream::Stream<Item = Vec<u8>, Error = Error>,
+ impl futures::future::Future<
+ Item = std::process::ExitStatus,
+ Error = Error,
+ >,
+ ),
+ Error = Error,
+> {
+ match crate::process::spawn(line) {
+ Ok((out, status)) => Ok((
+ out.map_err(|e| Error::EvalError(e)),
+ status.map_err(|e| Error::EvalError(e)),
+ )),
+ Err(e) => Err(e).map_err(|e| Error::EvalError(e)),
+ }
+ .into_future()
}
-fn print(out: &str) -> Result<(), Error> {
+fn print(out: &[u8]) -> Result<(), Error> {
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
- stdout
- .write(out.as_bytes())
- .map_err(|e| Error::PrintError(e))?;
+ stdout.write(out).map_err(|e| Error::PrintError(e))?;
stdout.flush().map_err(|e| Error::PrintError(e))
}