diff options
author | Jesse Luehrs <doy@tozt.net> | 2019-10-28 13:28:06 -0400 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2019-10-28 13:28:06 -0400 |
commit | 1a2bc078ef06b3a28d5c8ca5f2f495d63c3afbd8 (patch) | |
tree | edaaa8ac2f545f3694b386f2baefc8312f0dfb52 /src | |
parent | 607e9a1f1cbaa3f08d2d4c109821c6e987fa5a7e (diff) | |
download | nbsh-old-1a2bc078ef06b3a28d5c8ca5f2f495d63c3afbd8.tar.gz nbsh-old-1a2bc078ef06b3a28d5c8ca5f2f495d63c3afbd8.zip |
move to tokio-pty-process-stream
Diffstat (limited to 'src')
-rw-r--r-- | src/async_stdin.rs | 88 | ||||
-rw-r--r-- | src/builtins.rs | 22 | ||||
-rw-r--r-- | src/eval.rs | 32 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/process.rs | 256 | ||||
-rw-r--r-- | src/repl.rs | 15 | ||||
-rw-r--r-- | src/tui.rs | 20 |
7 files changed, 140 insertions, 295 deletions
diff --git a/src/async_stdin.rs b/src/async_stdin.rs new file mode 100644 index 0000000..e3b0ead --- /dev/null +++ b/src/async_stdin.rs @@ -0,0 +1,88 @@ +struct EventedStdin; + +const STDIN: i32 = 0; + +impl std::io::Read for EventedStdin { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + stdin.read(buf) + } +} + +impl mio::Evented for EventedStdin { + fn register( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> std::io::Result<()> { + let fd = STDIN as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> std::io::Result<()> { + let fd = STDIN as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> { + let fd = STDIN as std::os::unix::io::RawFd; + let eventedfd = mio::unix::EventedFd(&fd); + eventedfd.deregister(poll) + } +} + +pub struct Stdin { + input: tokio::reactor::PollEvented2<EventedStdin>, +} + +impl Stdin { + pub fn new() -> Self { + Self { + input: tokio::reactor::PollEvented2::new(EventedStdin), + } + } +} + +impl std::io::Read for Stdin { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + self.input.read(buf) + } +} + +impl tokio::io::AsyncRead for Stdin { + fn poll_read( + &mut self, + buf: &mut [u8], + ) -> std::result::Result<futures::Async<usize>, tokio::io::Error> { + // XXX this is why i had to do the EventedFd thing - poll_read on its + // own will block reading from stdin, so i need a way to explicitly + // check readiness before doing the read + let ready = mio::Ready::readable(); + match self.input.poll_read_ready(ready)? { + futures::Async::Ready(_) => { + let res = self.input.poll_read(buf); + + // XXX i'm pretty sure this is wrong (if the single poll_read + // call didn't return all waiting data, clearing read ready + // state means that we won't get the rest until some more data + // beyond that appears), but i don't know that there's a way + // to do it correctly given that poll_read blocks + self.input.clear_read_ready(ready)?; + + res + } + futures::Async::NotReady => Ok(futures::Async::NotReady), + } + } +} diff --git a/src/builtins.rs b/src/builtins.rs index d1f5b61..eb7989d 100644 --- a/src/builtins.rs +++ b/src/builtins.rs @@ -60,17 +60,17 @@ impl Builtin { #[must_use = "streams do nothing unless polled"] impl futures::stream::Stream for Builtin { - type Item = crate::eval::CommandEvent; + type Item = tokio_pty_process_stream::Event; type Error = Error; fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { if !self.started { self.started = true; Ok(futures::Async::Ready(Some( - crate::eval::CommandEvent::CommandStart( - self.cmd.clone(), - self.args.clone(), - ), + tokio_pty_process_stream::Event::CommandStart { + cmd: self.cmd.clone(), + args: self.args.clone(), + }, ))) } else if !self.done { self.done = true; @@ -82,17 +82,17 @@ impl futures::stream::Stream for Builtin { }; res.map(|_| { futures::Async::Ready(Some( - crate::eval::CommandEvent::CommandExit( - std::process::ExitStatus::from_raw(0), - ), + tokio_pty_process_stream::Event::CommandExit { + status: std::process::ExitStatus::from_raw(0), + }, )) }) .or_else(|e| match e { Error::UnknownBuiltin { .. } => Err(e), _ => Ok(futures::Async::Ready(Some( - crate::eval::CommandEvent::CommandExit( - std::process::ExitStatus::from_raw(256), - ), + tokio_pty_process_stream::Event::CommandExit { + status: std::process::ExitStatus::from_raw(256), + }, ))), }) } else { diff --git a/src/eval.rs b/src/eval.rs index a7e09c4..e29d10f 100644 --- a/src/eval.rs +++ b/src/eval.rs @@ -12,7 +12,7 @@ pub enum Error { #[snafu(display("failed to find command `{}`: {}", cmd, source))] Command { cmd: String, - source: crate::process::Error, + source: tokio_pty_process_stream::Error, }, #[snafu(display("failed to run builtin command `{}`: {}", cmd, source))] @@ -24,7 +24,7 @@ pub enum Error { #[snafu(display("failed to run executable `{}`: {}", cmd, source))] ProcessExecution { cmd: String, - source: crate::process::Error, + source: tokio_pty_process_stream::Error, }, } @@ -35,18 +35,14 @@ pub fn eval(line: &str) -> Eval { Eval::new(line) } -pub enum CommandEvent { - CommandStart(String, Vec<String>), - Output(Vec<u8>), - CommandExit(std::process::ExitStatus), -} - pub struct Eval { line: String, stream: Option< Box< - dyn futures::stream::Stream<Item = CommandEvent, Error = Error> - + Send, + dyn futures::stream::Stream< + Item = tokio_pty_process_stream::Event, + Error = Error, + > + Send, >, >, manage_screen: bool, @@ -69,7 +65,7 @@ impl Eval { #[must_use = "streams do nothing unless polled"] impl futures::stream::Stream for Eval { - type Item = CommandEvent; + type Item = tokio_pty_process_stream::Event; type Error = Error; fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { @@ -80,17 +76,19 @@ impl futures::stream::Stream for Eval { let builtin_stream = crate::builtins::Builtin::new(&cmd, &args); let stream: Box< dyn futures::stream::Stream< - Item = CommandEvent, + Item = tokio_pty_process_stream::Event, Error = Error, > + Send, > = if let Ok(s) = builtin_stream { Box::new(s.context(BuiltinExecution { cmd })) } else { - let process_stream = - crate::process::Process::new(&cmd, &args) - .context(Command { cmd: cmd.clone() })? - .set_raw(self.manage_screen); - Box::new(process_stream.context(ProcessExecution { cmd })) + let input = crate::async_stdin::Stdin::new(); + let process = tokio_pty_process_stream::ResizingProcess::new( + tokio_pty_process_stream::Process::new( + &cmd, &args, input, + ), + ); + Box::new(process.context(ProcessExecution { cmd })) }; self.stream = Some(stream); } @@ -13,11 +13,11 @@ #![allow(clippy::single_match)] #![allow(clippy::write_with_newline)] +mod async_stdin; mod builtins; mod eval; mod key_reader; mod parser; -mod process; mod readline; pub mod repl; diff --git a/src/process.rs b/src/process.rs deleted file mode 100644 index 9993be7..0000000 --- a/src/process.rs +++ /dev/null @@ -1,256 +0,0 @@ -use futures::future::Future as _; -use snafu::ResultExt as _; -use std::io::{Read as _, Write as _}; -use tokio::io::AsyncRead as _; -use tokio_pty_process::CommandExt as _; - -#[derive(Debug, snafu::Snafu)] -pub enum Error { - #[snafu(display("failed to open a pty: {}", source))] - OpenPty { source: std::io::Error }, - - #[snafu(display("failed to spawn process for `{}`: {}", cmd, source))] - SpawnProcess { cmd: String, source: std::io::Error }, - - #[snafu(display("failed to resize pty: {}", source))] - ResizePty { source: std::io::Error }, - - #[snafu(display("failed to write to pty: {}", source))] - WriteToPty { source: std::io::Error }, - - #[snafu(display("failed to read from terminal: {}", source))] - ReadFromTerminal { source: std::io::Error }, - - #[snafu(display( - "failed to clear ready state on pty for reading: {}", - source - ))] - PtyClearReadReady { source: std::io::Error }, - - #[snafu(display("failed to poll for process exit: {}", source))] - ProcessExitPoll { source: std::io::Error }, - - #[snafu(display( - "failed to put the terminal into raw mode: {}", - source - ))] - IntoRawMode { source: std::io::Error }, -} - -pub type Result<T> = std::result::Result<T, Error>; - -pub struct Process { - pty: tokio_pty_process::AsyncPtyMaster, - process: tokio_pty_process::Child, - // TODO: tokio::io::Stdin is broken - // input: tokio::io::Stdin, - input: tokio::reactor::PollEvented2<EventedStdin>, - cmd: String, - args: Vec<String>, - buf: Vec<u8>, - started: bool, - output_done: bool, - exit_done: bool, - manage_screen: bool, - raw_screen: Option<crossterm::RawScreen>, -} - -struct Resizer<'a, T> { - rows: u16, - cols: u16, - pty: &'a T, -} - -impl<'a, T: tokio_pty_process::PtyMaster> futures::future::Future - for Resizer<'a, T> -{ - type Item = (); - type Error = std::io::Error; - - fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { - self.pty.resize(self.rows, self.cols) - } -} - -impl Process { - pub fn new(cmd: &str, args: &[String]) -> Result<Self> { - let pty = - tokio_pty_process::AsyncPtyMaster::open().context(OpenPty)?; - - let process = std::process::Command::new(cmd) - .args(args) - .spawn_pty_async(&pty) - .context(SpawnProcess { cmd })?; - - let (cols, rows) = crossterm::terminal().terminal_size(); - Resizer { - rows: rows + 1, - cols: cols + 1, - pty: &pty, - } - .wait() - .context(ResizePty)?; - - // TODO: tokio::io::stdin is broken (it's blocking) - // let input = tokio::io::stdin(); - let input = tokio::reactor::PollEvented2::new(EventedStdin); - - Ok(Self { - pty, - process, - input, - cmd: cmd.to_string(), - args: args.to_vec(), - buf: Vec::with_capacity(4096), - started: false, - output_done: false, - exit_done: false, - manage_screen: true, - raw_screen: None, - }) - } - - #[allow(dead_code)] - pub fn set_raw(mut self, raw: bool) -> Self { - self.manage_screen = raw; - self - } -} - -#[must_use = "streams do nothing unless polled"] -impl futures::stream::Stream for Process { - type Item = crate::eval::CommandEvent; - type Error = Error; - - fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { - if self.manage_screen && self.raw_screen.is_none() { - self.raw_screen = Some( - crossterm::RawScreen::into_raw_mode().context(IntoRawMode)?, - ); - } - - if !self.started { - self.started = true; - return Ok(futures::Async::Ready(Some( - crate::eval::CommandEvent::CommandStart( - self.cmd.clone(), - self.args.clone(), - ), - ))); - } - - let ready = mio::Ready::readable(); - let input_poll = self.input.poll_read_ready(ready); - match input_poll { - Ok(futures::Async::Ready(_)) => { - let stdin = std::io::stdin(); - let mut stdin = stdin.lock(); - let mut buf = vec![0; 4096]; - // TODO: async - let n = stdin.read(&mut buf).context(ReadFromTerminal)?; - if n > 0 { - let bytes = buf[..n].to_vec(); - - // TODO: async - self.pty.write_all(&bytes).context(WriteToPty)?; - } - } - _ => {} - } - // TODO: this could lose pending bytes if there is stuff to read in - // the buffer but we don't read it all in the previous read call, - // since i think we won't get another notification until new bytes - // actually arrive even if there are bytes in the buffer - self.input - .clear_read_ready(ready) - .context(PtyClearReadReady)?; - - if !self.output_done { - self.buf.clear(); - let output_poll = self.pty.read_buf(&mut self.buf); - match output_poll { - Ok(futures::Async::Ready(n)) => { - let bytes = self.buf[..n].to_vec(); - let bytes: Vec<_> = bytes - .iter() - // replace \n with \r\n - .fold(vec![], |mut acc, &c| { - if c == b'\n' { - acc.push(b'\r'); - acc.push(b'\n'); - } else { - acc.push(c); - } - acc - }); - return Ok(futures::Async::Ready(Some( - crate::eval::CommandEvent::Output(bytes), - ))); - } - Ok(futures::Async::NotReady) => { - return Ok(futures::Async::NotReady); - } - Err(_) => { - // explicitly ignoring errors (for now?) because we - // always read off the end of the pty after the process - // is done - self.output_done = true; - } - } - } - - if !self.exit_done { - let exit_poll = self.process.poll().context(ProcessExitPoll); - match exit_poll { - Ok(futures::Async::Ready(status)) => { - self.exit_done = true; - return Ok(futures::Async::Ready(Some( - crate::eval::CommandEvent::CommandExit(status), - ))); - } - Ok(futures::Async::NotReady) => { - return Ok(futures::Async::NotReady); - } - Err(e) => { - return Err(e); - } - } - } - - Ok(futures::Async::Ready(None)) - } -} - -struct EventedStdin; - -impl mio::Evented for EventedStdin { - fn register( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, - ) -> std::io::Result<()> { - let fd = 0 as std::os::unix::io::RawFd; - let eventedfd = mio::unix::EventedFd(&fd); - eventedfd.register(poll, token, interest, opts) - } - - fn reregister( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, - ) -> std::io::Result<()> { - let fd = 0 as std::os::unix::io::RawFd; - let eventedfd = mio::unix::EventedFd(&fd); - eventedfd.reregister(poll, token, interest, opts) - } - - fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> { - let fd = 0 as std::os::unix::io::RawFd; - let eventedfd = mio::unix::EventedFd(&fd); - eventedfd.deregister(poll) - } -} diff --git a/src/repl.rs b/src/repl.rs index 4188fbe..3ec7084 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -56,21 +56,24 @@ fn read() -> impl futures::future::Future<Item = String, Error = Error> { fn eval( line: &str, -) -> impl futures::stream::Stream<Item = crate::eval::CommandEvent, Error = Error> -{ +) -> impl futures::stream::Stream< + Item = tokio_pty_process_stream::Event, + Error = Error, +> { crate::eval::eval(line).context(Eval) } -fn print(event: &crate::eval::CommandEvent) -> Result<()> { +fn print(event: &tokio_pty_process_stream::Event) -> Result<()> { match event { - crate::eval::CommandEvent::CommandStart(_, _) => {} - crate::eval::CommandEvent::Output(out) => { + tokio_pty_process_stream::Event::CommandStart { .. } => {} + tokio_pty_process_stream::Event::Output { data: out } => { let stdout = std::io::stdout(); let mut stdout = stdout.lock(); stdout.write(out).context(Print)?; stdout.flush().context(Print)?; } - crate::eval::CommandEvent::CommandExit(_) => {} + tokio_pty_process_stream::Event::CommandExit { .. } => {} + tokio_pty_process_stream::Event::Resize { .. } => {} } Ok(()) } @@ -66,18 +66,21 @@ impl Tui { fn print( &mut self, idx: usize, - event: crate::eval::CommandEvent, + event: tokio_pty_process_stream::Event, ) -> Result<()> { match event { - crate::eval::CommandEvent::CommandStart(cmd, args) => { + tokio_pty_process_stream::Event::CommandStart { cmd, args } => { self.command_start(idx, &cmd, &args) } - crate::eval::CommandEvent::Output(out) => { + tokio_pty_process_stream::Event::Output { data: out } => { self.command_output(idx, &out) } - crate::eval::CommandEvent::CommandExit(status) => { + tokio_pty_process_stream::Event::CommandExit { status } => { self.command_exit(idx, status) } + tokio_pty_process_stream::Event::Resize { size } => { + self.command_resize(idx, size) + } } } @@ -126,6 +129,15 @@ impl Tui { Ok(()) } + fn command_resize( + &mut self, + _idx: usize, + _size: (u16, u16), + ) -> Result<()> { + // TODO + Ok(()) + } + fn poll_read(&mut self) { if self.readline.is_none() && self.commands.is_empty() { self.idx += 1; |