aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-28 13:28:06 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-28 13:28:06 -0400
commit1a2bc078ef06b3a28d5c8ca5f2f495d63c3afbd8 (patch)
treeedaaa8ac2f545f3694b386f2baefc8312f0dfb52 /src
parent607e9a1f1cbaa3f08d2d4c109821c6e987fa5a7e (diff)
downloadnbsh-old-1a2bc078ef06b3a28d5c8ca5f2f495d63c3afbd8.tar.gz
nbsh-old-1a2bc078ef06b3a28d5c8ca5f2f495d63c3afbd8.zip
move to tokio-pty-process-stream
Diffstat (limited to 'src')
-rw-r--r--src/async_stdin.rs88
-rw-r--r--src/builtins.rs22
-rw-r--r--src/eval.rs32
-rw-r--r--src/lib.rs2
-rw-r--r--src/process.rs256
-rw-r--r--src/repl.rs15
-rw-r--r--src/tui.rs20
7 files changed, 140 insertions, 295 deletions
diff --git a/src/async_stdin.rs b/src/async_stdin.rs
new file mode 100644
index 0000000..e3b0ead
--- /dev/null
+++ b/src/async_stdin.rs
@@ -0,0 +1,88 @@
+struct EventedStdin;
+
+const STDIN: i32 = 0;
+
+impl std::io::Read for EventedStdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let stdin = std::io::stdin();
+ let mut stdin = stdin.lock();
+ stdin.read(buf)
+ }
+}
+
+impl mio::Evented for EventedStdin {
+ fn register(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.deregister(poll)
+ }
+}
+
+pub struct Stdin {
+ input: tokio::reactor::PollEvented2<EventedStdin>,
+}
+
+impl Stdin {
+ pub fn new() -> Self {
+ Self {
+ input: tokio::reactor::PollEvented2::new(EventedStdin),
+ }
+ }
+}
+
+impl std::io::Read for Stdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ self.input.read(buf)
+ }
+}
+
+impl tokio::io::AsyncRead for Stdin {
+ fn poll_read(
+ &mut self,
+ buf: &mut [u8],
+ ) -> std::result::Result<futures::Async<usize>, tokio::io::Error> {
+ // XXX this is why i had to do the EventedFd thing - poll_read on its
+ // own will block reading from stdin, so i need a way to explicitly
+ // check readiness before doing the read
+ let ready = mio::Ready::readable();
+ match self.input.poll_read_ready(ready)? {
+ futures::Async::Ready(_) => {
+ let res = self.input.poll_read(buf);
+
+ // XXX i'm pretty sure this is wrong (if the single poll_read
+ // call didn't return all waiting data, clearing read ready
+ // state means that we won't get the rest until some more data
+ // beyond that appears), but i don't know that there's a way
+ // to do it correctly given that poll_read blocks
+ self.input.clear_read_ready(ready)?;
+
+ res
+ }
+ futures::Async::NotReady => Ok(futures::Async::NotReady),
+ }
+ }
+}
diff --git a/src/builtins.rs b/src/builtins.rs
index d1f5b61..eb7989d 100644
--- a/src/builtins.rs
+++ b/src/builtins.rs
@@ -60,17 +60,17 @@ impl Builtin {
#[must_use = "streams do nothing unless polled"]
impl futures::stream::Stream for Builtin {
- type Item = crate::eval::CommandEvent;
+ type Item = tokio_pty_process_stream::Event;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
if !self.started {
self.started = true;
Ok(futures::Async::Ready(Some(
- crate::eval::CommandEvent::CommandStart(
- self.cmd.clone(),
- self.args.clone(),
- ),
+ tokio_pty_process_stream::Event::CommandStart {
+ cmd: self.cmd.clone(),
+ args: self.args.clone(),
+ },
)))
} else if !self.done {
self.done = true;
@@ -82,17 +82,17 @@ impl futures::stream::Stream for Builtin {
};
res.map(|_| {
futures::Async::Ready(Some(
- crate::eval::CommandEvent::CommandExit(
- std::process::ExitStatus::from_raw(0),
- ),
+ tokio_pty_process_stream::Event::CommandExit {
+ status: std::process::ExitStatus::from_raw(0),
+ },
))
})
.or_else(|e| match e {
Error::UnknownBuiltin { .. } => Err(e),
_ => Ok(futures::Async::Ready(Some(
- crate::eval::CommandEvent::CommandExit(
- std::process::ExitStatus::from_raw(256),
- ),
+ tokio_pty_process_stream::Event::CommandExit {
+ status: std::process::ExitStatus::from_raw(256),
+ },
))),
})
} else {
diff --git a/src/eval.rs b/src/eval.rs
index a7e09c4..e29d10f 100644
--- a/src/eval.rs
+++ b/src/eval.rs
@@ -12,7 +12,7 @@ pub enum Error {
#[snafu(display("failed to find command `{}`: {}", cmd, source))]
Command {
cmd: String,
- source: crate::process::Error,
+ source: tokio_pty_process_stream::Error,
},
#[snafu(display("failed to run builtin command `{}`: {}", cmd, source))]
@@ -24,7 +24,7 @@ pub enum Error {
#[snafu(display("failed to run executable `{}`: {}", cmd, source))]
ProcessExecution {
cmd: String,
- source: crate::process::Error,
+ source: tokio_pty_process_stream::Error,
},
}
@@ -35,18 +35,14 @@ pub fn eval(line: &str) -> Eval {
Eval::new(line)
}
-pub enum CommandEvent {
- CommandStart(String, Vec<String>),
- Output(Vec<u8>),
- CommandExit(std::process::ExitStatus),
-}
-
pub struct Eval {
line: String,
stream: Option<
Box<
- dyn futures::stream::Stream<Item = CommandEvent, Error = Error>
- + Send,
+ dyn futures::stream::Stream<
+ Item = tokio_pty_process_stream::Event,
+ Error = Error,
+ > + Send,
>,
>,
manage_screen: bool,
@@ -69,7 +65,7 @@ impl Eval {
#[must_use = "streams do nothing unless polled"]
impl futures::stream::Stream for Eval {
- type Item = CommandEvent;
+ type Item = tokio_pty_process_stream::Event;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
@@ -80,17 +76,19 @@ impl futures::stream::Stream for Eval {
let builtin_stream = crate::builtins::Builtin::new(&cmd, &args);
let stream: Box<
dyn futures::stream::Stream<
- Item = CommandEvent,
+ Item = tokio_pty_process_stream::Event,
Error = Error,
> + Send,
> = if let Ok(s) = builtin_stream {
Box::new(s.context(BuiltinExecution { cmd }))
} else {
- let process_stream =
- crate::process::Process::new(&cmd, &args)
- .context(Command { cmd: cmd.clone() })?
- .set_raw(self.manage_screen);
- Box::new(process_stream.context(ProcessExecution { cmd }))
+ let input = crate::async_stdin::Stdin::new();
+ let process = tokio_pty_process_stream::ResizingProcess::new(
+ tokio_pty_process_stream::Process::new(
+ &cmd, &args, input,
+ ),
+ );
+ Box::new(process.context(ProcessExecution { cmd }))
};
self.stream = Some(stream);
}
diff --git a/src/lib.rs b/src/lib.rs
index e79c17a..1d51a3c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,11 +13,11 @@
#![allow(clippy::single_match)]
#![allow(clippy::write_with_newline)]
+mod async_stdin;
mod builtins;
mod eval;
mod key_reader;
mod parser;
-mod process;
mod readline;
pub mod repl;
diff --git a/src/process.rs b/src/process.rs
deleted file mode 100644
index 9993be7..0000000
--- a/src/process.rs
+++ /dev/null
@@ -1,256 +0,0 @@
-use futures::future::Future as _;
-use snafu::ResultExt as _;
-use std::io::{Read as _, Write as _};
-use tokio::io::AsyncRead as _;
-use tokio_pty_process::CommandExt as _;
-
-#[derive(Debug, snafu::Snafu)]
-pub enum Error {
- #[snafu(display("failed to open a pty: {}", source))]
- OpenPty { source: std::io::Error },
-
- #[snafu(display("failed to spawn process for `{}`: {}", cmd, source))]
- SpawnProcess { cmd: String, source: std::io::Error },
-
- #[snafu(display("failed to resize pty: {}", source))]
- ResizePty { source: std::io::Error },
-
- #[snafu(display("failed to write to pty: {}", source))]
- WriteToPty { source: std::io::Error },
-
- #[snafu(display("failed to read from terminal: {}", source))]
- ReadFromTerminal { source: std::io::Error },
-
- #[snafu(display(
- "failed to clear ready state on pty for reading: {}",
- source
- ))]
- PtyClearReadReady { source: std::io::Error },
-
- #[snafu(display("failed to poll for process exit: {}", source))]
- ProcessExitPoll { source: std::io::Error },
-
- #[snafu(display(
- "failed to put the terminal into raw mode: {}",
- source
- ))]
- IntoRawMode { source: std::io::Error },
-}
-
-pub type Result<T> = std::result::Result<T, Error>;
-
-pub struct Process {
- pty: tokio_pty_process::AsyncPtyMaster,
- process: tokio_pty_process::Child,
- // TODO: tokio::io::Stdin is broken
- // input: tokio::io::Stdin,
- input: tokio::reactor::PollEvented2<EventedStdin>,
- cmd: String,
- args: Vec<String>,
- buf: Vec<u8>,
- started: bool,
- output_done: bool,
- exit_done: bool,
- manage_screen: bool,
- raw_screen: Option<crossterm::RawScreen>,
-}
-
-struct Resizer<'a, T> {
- rows: u16,
- cols: u16,
- pty: &'a T,
-}
-
-impl<'a, T: tokio_pty_process::PtyMaster> futures::future::Future
- for Resizer<'a, T>
-{
- type Item = ();
- type Error = std::io::Error;
-
- fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- self.pty.resize(self.rows, self.cols)
- }
-}
-
-impl Process {
- pub fn new(cmd: &str, args: &[String]) -> Result<Self> {
- let pty =
- tokio_pty_process::AsyncPtyMaster::open().context(OpenPty)?;
-
- let process = std::process::Command::new(cmd)
- .args(args)
- .spawn_pty_async(&pty)
- .context(SpawnProcess { cmd })?;
-
- let (cols, rows) = crossterm::terminal().terminal_size();
- Resizer {
- rows: rows + 1,
- cols: cols + 1,
- pty: &pty,
- }
- .wait()
- .context(ResizePty)?;
-
- // TODO: tokio::io::stdin is broken (it's blocking)
- // let input = tokio::io::stdin();
- let input = tokio::reactor::PollEvented2::new(EventedStdin);
-
- Ok(Self {
- pty,
- process,
- input,
- cmd: cmd.to_string(),
- args: args.to_vec(),
- buf: Vec::with_capacity(4096),
- started: false,
- output_done: false,
- exit_done: false,
- manage_screen: true,
- raw_screen: None,
- })
- }
-
- #[allow(dead_code)]
- pub fn set_raw(mut self, raw: bool) -> Self {
- self.manage_screen = raw;
- self
- }
-}
-
-#[must_use = "streams do nothing unless polled"]
-impl futures::stream::Stream for Process {
- type Item = crate::eval::CommandEvent;
- type Error = Error;
-
- fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- if self.manage_screen && self.raw_screen.is_none() {
- self.raw_screen = Some(
- crossterm::RawScreen::into_raw_mode().context(IntoRawMode)?,
- );
- }
-
- if !self.started {
- self.started = true;
- return Ok(futures::Async::Ready(Some(
- crate::eval::CommandEvent::CommandStart(
- self.cmd.clone(),
- self.args.clone(),
- ),
- )));
- }
-
- let ready = mio::Ready::readable();
- let input_poll = self.input.poll_read_ready(ready);
- match input_poll {
- Ok(futures::Async::Ready(_)) => {
- let stdin = std::io::stdin();
- let mut stdin = stdin.lock();
- let mut buf = vec![0; 4096];
- // TODO: async
- let n = stdin.read(&mut buf).context(ReadFromTerminal)?;
- if n > 0 {
- let bytes = buf[..n].to_vec();
-
- // TODO: async
- self.pty.write_all(&bytes).context(WriteToPty)?;
- }
- }
- _ => {}
- }
- // TODO: this could lose pending bytes if there is stuff to read in
- // the buffer but we don't read it all in the previous read call,
- // since i think we won't get another notification until new bytes
- // actually arrive even if there are bytes in the buffer
- self.input
- .clear_read_ready(ready)
- .context(PtyClearReadReady)?;
-
- if !self.output_done {
- self.buf.clear();
- let output_poll = self.pty.read_buf(&mut self.buf);
- match output_poll {
- Ok(futures::Async::Ready(n)) => {
- let bytes = self.buf[..n].to_vec();
- let bytes: Vec<_> = bytes
- .iter()
- // replace \n with \r\n
- .fold(vec![], |mut acc, &c| {
- if c == b'\n' {
- acc.push(b'\r');
- acc.push(b'\n');
- } else {
- acc.push(c);
- }
- acc
- });
- return Ok(futures::Async::Ready(Some(
- crate::eval::CommandEvent::Output(bytes),
- )));
- }
- Ok(futures::Async::NotReady) => {
- return Ok(futures::Async::NotReady);
- }
- Err(_) => {
- // explicitly ignoring errors (for now?) because we
- // always read off the end of the pty after the process
- // is done
- self.output_done = true;
- }
- }
- }
-
- if !self.exit_done {
- let exit_poll = self.process.poll().context(ProcessExitPoll);
- match exit_poll {
- Ok(futures::Async::Ready(status)) => {
- self.exit_done = true;
- return Ok(futures::Async::Ready(Some(
- crate::eval::CommandEvent::CommandExit(status),
- )));
- }
- Ok(futures::Async::NotReady) => {
- return Ok(futures::Async::NotReady);
- }
- Err(e) => {
- return Err(e);
- }
- }
- }
-
- Ok(futures::Async::Ready(None))
- }
-}
-
-struct EventedStdin;
-
-impl mio::Evented for EventedStdin {
- fn register(
- &self,
- poll: &mio::Poll,
- token: mio::Token,
- interest: mio::Ready,
- opts: mio::PollOpt,
- ) -> std::io::Result<()> {
- let fd = 0 as std::os::unix::io::RawFd;
- let eventedfd = mio::unix::EventedFd(&fd);
- eventedfd.register(poll, token, interest, opts)
- }
-
- fn reregister(
- &self,
- poll: &mio::Poll,
- token: mio::Token,
- interest: mio::Ready,
- opts: mio::PollOpt,
- ) -> std::io::Result<()> {
- let fd = 0 as std::os::unix::io::RawFd;
- let eventedfd = mio::unix::EventedFd(&fd);
- eventedfd.reregister(poll, token, interest, opts)
- }
-
- fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
- let fd = 0 as std::os::unix::io::RawFd;
- let eventedfd = mio::unix::EventedFd(&fd);
- eventedfd.deregister(poll)
- }
-}
diff --git a/src/repl.rs b/src/repl.rs
index 4188fbe..3ec7084 100644
--- a/src/repl.rs
+++ b/src/repl.rs
@@ -56,21 +56,24 @@ fn read() -> impl futures::future::Future<Item = String, Error = Error> {
fn eval(
line: &str,
-) -> impl futures::stream::Stream<Item = crate::eval::CommandEvent, Error = Error>
-{
+) -> impl futures::stream::Stream<
+ Item = tokio_pty_process_stream::Event,
+ Error = Error,
+> {
crate::eval::eval(line).context(Eval)
}
-fn print(event: &crate::eval::CommandEvent) -> Result<()> {
+fn print(event: &tokio_pty_process_stream::Event) -> Result<()> {
match event {
- crate::eval::CommandEvent::CommandStart(_, _) => {}
- crate::eval::CommandEvent::Output(out) => {
+ tokio_pty_process_stream::Event::CommandStart { .. } => {}
+ tokio_pty_process_stream::Event::Output { data: out } => {
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
stdout.write(out).context(Print)?;
stdout.flush().context(Print)?;
}
- crate::eval::CommandEvent::CommandExit(_) => {}
+ tokio_pty_process_stream::Event::CommandExit { .. } => {}
+ tokio_pty_process_stream::Event::Resize { .. } => {}
}
Ok(())
}
diff --git a/src/tui.rs b/src/tui.rs
index 17b2eed..b74670f 100644
--- a/src/tui.rs
+++ b/src/tui.rs
@@ -66,18 +66,21 @@ impl Tui {
fn print(
&mut self,
idx: usize,
- event: crate::eval::CommandEvent,
+ event: tokio_pty_process_stream::Event,
) -> Result<()> {
match event {
- crate::eval::CommandEvent::CommandStart(cmd, args) => {
+ tokio_pty_process_stream::Event::CommandStart { cmd, args } => {
self.command_start(idx, &cmd, &args)
}
- crate::eval::CommandEvent::Output(out) => {
+ tokio_pty_process_stream::Event::Output { data: out } => {
self.command_output(idx, &out)
}
- crate::eval::CommandEvent::CommandExit(status) => {
+ tokio_pty_process_stream::Event::CommandExit { status } => {
self.command_exit(idx, status)
}
+ tokio_pty_process_stream::Event::Resize { size } => {
+ self.command_resize(idx, size)
+ }
}
}
@@ -126,6 +129,15 @@ impl Tui {
Ok(())
}
+ fn command_resize(
+ &mut self,
+ _idx: usize,
+ _size: (u16, u16),
+ ) -> Result<()> {
+ // TODO
+ Ok(())
+ }
+
fn poll_read(&mut self) {
if self.readline.is_none() && self.commands.is_empty() {
self.idx += 1;