diff options
author | Jesse Luehrs <doy@tozt.net> | 2019-10-24 15:28:58 -0400 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2019-10-24 15:28:58 -0400 |
commit | 359ccda66cdda3a88907e51014aa52473ac46605 (patch) | |
tree | 08a33387deac68bf4a4f02ad79882ce07617505a /src | |
parent | 6552f2d69eb0f19851dbebd21742b7bc9f37cc42 (diff) | |
download | teleterm-359ccda66cdda3a88907e51014aa52473ac46605.tar.gz teleterm-359ccda66cdda3a88907e51014aa52473ac46605.zip |
move process out to a separate crate
Diffstat (limited to 'src')
-rw-r--r-- | src/cmd/record.rs | 16 | ||||
-rw-r--r-- | src/cmd/stream.rs | 14 | ||||
-rw-r--r-- | src/error.rs | 5 | ||||
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/process.rs | 308 | ||||
-rw-r--r-- | src/resize.rs | 20 | ||||
-rw-r--r-- | src/term.rs | 8 |
7 files changed, 36 insertions, 336 deletions
diff --git a/src/cmd/record.rs b/src/cmd/record.rs index 6a31c5d..6fda88f 100644 --- a/src/cmd/record.rs +++ b/src/cmd/record.rs @@ -86,7 +86,7 @@ impl RecordSession { ) -> Self { let input = crate::async_stdin::Stdin::new(); let process = crate::resize::ResizingProcess::new( - crate::process::Process::new(cmd, args, input), + tokio_pty_process_stream::Process::new(cmd, args, input), ); Self { @@ -156,7 +156,9 @@ impl RecordSession { match component_future::try_ready!(self.process.poll()) { Some(crate::resize::Event::Process(e)) => { match e { - crate::process::Event::CommandStart(..) => { + tokio_pty_process_stream::Event::CommandStart { + .. + } => { if self.raw_screen.is_none() { self.raw_screen = Some( crossterm::RawScreen::into_raw_mode() @@ -164,13 +166,15 @@ impl RecordSession { ); } } - crate::process::Event::CommandExit(..) => { + tokio_pty_process_stream::Event::CommandExit { + .. + } => { self.done = true; } - crate::process::Event::Output(output) => { - self.record_bytes(&output); + tokio_pty_process_stream::Event::Output { data } => { + self.record_bytes(&data); if let FileState::Open { file } = &mut self.file { - file.write_frame(&output)?; + file.write_frame(&data)?; } } } diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index 823a4b1..9d2cf83 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -149,7 +149,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> let input = crate::async_stdin::Stdin::new(); let process = crate::resize::ResizingProcess::new( - crate::process::Process::new(cmd, args, input), + tokio_pty_process_stream::Process::new(cmd, args, input), ); Self { @@ -238,7 +238,9 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> match component_future::try_ready!(self.process.poll()) { Some(crate::resize::Event::Process(e)) => { match e { - crate::process::Event::CommandStart(..) => { + tokio_pty_process_stream::Event::CommandStart { + .. + } => { if self.raw_screen.is_none() { self.raw_screen = Some( crossterm::RawScreen::into_raw_mode() @@ -246,11 +248,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> ); } } - crate::process::Event::CommandExit(..) => { + tokio_pty_process_stream::Event::CommandExit { + .. + } => { self.done = true; } - crate::process::Event::Output(output) => { - self.record_bytes(&output); + tokio_pty_process_stream::Event::Output { data } => { + self.record_bytes(&data); } } Ok(component_future::Async::DidWork) diff --git a/src/error.rs b/src/error.rs index 1d14250..662f860 100644 --- a/src/error.rs +++ b/src/error.rs @@ -359,6 +359,11 @@ pub enum Error { #[snafu(display("failed to spawn process for `{}`: {}", cmd, source))] SpawnProcess { cmd: String, source: std::io::Error }, + #[snafu(display("poll subprocess failed: {}", source))] + Subprocess { + source: tokio_pty_process_stream::Error, + }, + #[snafu(display("failed to switch gid: {}", source))] SwitchGid { source: std::io::Error }, diff --git a/src/main.rs b/src/main.rs index 03cfc35..bc57f00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,6 @@ mod dirs; mod error; mod key_reader; mod oauth; -mod process; mod protocol; mod resize; mod server; diff --git a/src/process.rs b/src/process.rs deleted file mode 100644 index bbd1820..0000000 --- a/src/process.rs +++ /dev/null @@ -1,308 +0,0 @@ -use crate::prelude::*; -use std::os::unix::io::AsRawFd as _; -use tokio::io::{AsyncRead as _, AsyncWrite as _}; -use tokio_pty_process::CommandExt as _; - -const READ_BUFFER_SIZE: usize = 4 * 1024; - -#[derive(Debug, PartialEq, Eq)] -pub enum Event { - CommandStart(String, Vec<String>), - Output(Vec<u8>), - CommandExit(std::process::ExitStatus), -} - -pub struct State { - pty: Option<tokio_pty_process::AsyncPtyMaster>, - process: Option<tokio_pty_process::Child>, -} - -impl State { - fn new() -> Self { - Self { - pty: None, - process: None, - } - } - - fn pty(&self) -> &tokio_pty_process::AsyncPtyMaster { - self.pty.as_ref().unwrap() - } - - fn pty_mut(&mut self) -> &mut tokio_pty_process::AsyncPtyMaster { - self.pty.as_mut().unwrap() - } - - fn process(&mut self) -> &mut tokio_pty_process::Child { - self.process.as_mut().unwrap() - } -} - -pub struct Process<R: tokio::io::AsyncRead> { - state: State, - input: R, - input_buf: std::collections::VecDeque<u8>, - cmd: String, - args: Vec<String>, - buf: [u8; READ_BUFFER_SIZE], - started: bool, - exited: bool, - needs_resize: Option<crate::term::Size>, - stdin_closed: bool, - stdout_closed: bool, -} - -impl<R: tokio::io::AsyncRead + 'static> Process<R> { - pub fn new(cmd: &str, args: &[String], input: R) -> Self { - Self { - state: State::new(), - input, - input_buf: std::collections::VecDeque::new(), - cmd: cmd.to_string(), - args: args.to_vec(), - buf: [0; READ_BUFFER_SIZE], - started: false, - exited: false, - needs_resize: None, - stdin_closed: false, - stdout_closed: false, - } - } - - pub fn resize(&mut self, size: crate::term::Size) { - self.needs_resize = Some(size); - } -} - -impl<R: tokio::io::AsyncRead + 'static> Process<R> { - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - Option<Event>, - Error, - >] = &[ - // order is important here - checking command_exit first so that we - // don't try to read from a process that has already exited, which - // causes an error. also, poll_resize needs to happen after - // poll_command_start, or else the pty might not be initialized. - &Self::poll_command_start, - &Self::poll_command_exit, - &Self::poll_resize, - &Self::poll_read_stdin, - &Self::poll_write_stdin, - &Self::poll_read_stdout, - ]; - - fn poll_resize( - &mut self, - ) -> component_future::Poll<Option<Event>, Error> { - if let Some(size) = &self.needs_resize { - component_future::try_ready!(size.resize_pty(self.state.pty())); - log::debug!("resize({:?})", size); - self.needs_resize = None; - Ok(component_future::Async::DidWork) - } else { - Ok(component_future::Async::NothingToDo) - } - } - - fn poll_command_start( - &mut self, - ) -> component_future::Poll<Option<Event>, Error> { - if self.started { - return Ok(component_future::Async::NothingToDo); - } - - if self.state.pty.is_none() { - self.state.pty = Some( - tokio_pty_process::AsyncPtyMaster::open() - .context(crate::error::OpenPty)?, - ); - log::debug!( - "openpty({})", - self.state.pty.as_ref().unwrap().as_raw_fd() - ); - } - - if self.state.process.is_none() { - self.state.process = Some( - std::process::Command::new(&self.cmd) - .args(&self.args) - .spawn_pty_async(self.state.pty()) - .context(crate::error::SpawnProcess { - cmd: self.cmd.clone(), - })?, - ); - log::debug!( - "spawn({})", - self.state.process.as_ref().unwrap().id() - ); - } - - self.started = true; - Ok(component_future::Async::Ready(Some(Event::CommandStart( - self.cmd.clone(), - self.args.clone(), - )))) - } - - fn poll_read_stdin( - &mut self, - ) -> component_future::Poll<Option<Event>, Error> { - if self.exited || self.stdin_closed { - return Ok(component_future::Async::NothingToDo); - } - - let n = component_future::try_ready!(self - .input - .poll_read(&mut self.buf) - .context(crate::error::ReadTerminal)); - log::debug!("read_stdin({})", n); - if n > 0 { - self.input_buf.extend(self.buf[..n].iter()); - } else { - self.input_buf.push_back(b'\x04'); - self.stdin_closed = true; - } - Ok(component_future::Async::DidWork) - } - - fn poll_write_stdin( - &mut self, - ) -> component_future::Poll<Option<Event>, Error> { - if self.exited || self.input_buf.is_empty() { - return Ok(component_future::Async::NothingToDo); - } - - let (a, b) = self.input_buf.as_slices(); - let buf = if a.is_empty() { b } else { a }; - let n = component_future::try_ready!(self - .state - .pty_mut() - .poll_write(buf) - .context(crate::error::WritePty)); - log::debug!("write_stdin({})", n); - for _ in 0..n { - self.input_buf.pop_front(); - } - Ok(component_future::Async::DidWork) - } - - fn poll_read_stdout( - &mut self, - ) -> component_future::Poll<Option<Event>, Error> { - match self - .state - .pty_mut() - .poll_read(&mut self.buf) - .context(crate::error::ReadPty) - { - Ok(futures::Async::Ready(n)) => { - log::debug!("read_stdout({})", n); - let bytes = self.buf[..n].to_vec(); - Ok(component_future::Async::Ready(Some(Event::Output(bytes)))) - } - Ok(futures::Async::NotReady) => { - Ok(component_future::Async::NotReady) - } - Err(e) => { - // XXX this seems to be how eof is returned, but this seems... - // wrong? i feel like there has to be a better way to do this - if let Error::ReadPty { source } = &e { - if source.kind() == std::io::ErrorKind::Other { - log::debug!("read_stdout(eof)"); - self.stdout_closed = true; - return Ok(component_future::Async::DidWork); - } - } - Err(e) - } - } - } - - fn poll_command_exit( - &mut self, - ) -> component_future::Poll<Option<Event>, Error> { - if self.exited { - return Ok(component_future::Async::Ready(None)); - } - if !self.stdout_closed { - return Ok(component_future::Async::NothingToDo); - } - - let status = component_future::try_ready!(self - .state - .process() - .poll() - .context(crate::error::ProcessExitPoll)); - log::debug!("exit({})", status); - self.exited = true; - Ok(component_future::Async::Ready(Some(Event::CommandExit( - status, - )))) - } -} - -#[must_use = "streams do nothing unless polled"] -impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream - for Process<R> -{ - type Item = Event; - type Error = Error; - - fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { - component_future::poll_stream(self, Self::POLL_FNS) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_simple() { - let (wres, rres) = tokio::sync::mpsc::channel(100); - let wres2 = wres.clone(); - let mut wres = wres.wait(); - let buf = std::io::Cursor::new(b"hello world\n"); - let fut = Process::new("cat", &[], buf) - .for_each(move |e| { - wres.send(Ok(e)).unwrap(); - Ok(()) - }) - .map_err(|e| { - wres2.wait().send(Err(e)).unwrap(); - }); - tokio::run(fut); - - let mut rres = rres.wait(); - - let event = rres.next(); - let event = event.unwrap(); - let event = event.unwrap(); - let event = event.unwrap(); - assert_eq!(event, Event::CommandStart("cat".to_string(), vec![])); - - let mut output: Vec<u8> = vec![]; - let mut exited = false; - for event in rres { - assert!(!exited); - let event = event.unwrap(); - let event = event.unwrap(); - match event { - Event::CommandStart(..) => panic!("unexpected CommandStart"), - Event::Output(buf) => { - output.extend(buf.iter()); - } - Event::CommandExit(status) => { - assert!(status.success()); - exited = true; - } - } - } - assert!(exited); - assert_eq!(output, b"hello world\r\nhello world\r\n"); - } -} diff --git a/src/resize.rs b/src/resize.rs index 2dcf928..d792708 100644 --- a/src/resize.rs +++ b/src/resize.rs @@ -39,17 +39,19 @@ impl futures::stream::Stream for Resizer { } pub enum Event<R: tokio::io::AsyncRead + 'static> { - Process(<crate::process::Process<R> as futures::stream::Stream>::Item), + Process( + <tokio_pty_process_stream::Process<R> as futures::stream::Stream>::Item + ), Resize(crate::term::Size), } pub struct ResizingProcess<R: tokio::io::AsyncRead + 'static> { - process: crate::process::Process<R>, + process: tokio_pty_process_stream::Process<R>, resizer: Resizer, } impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> { - pub fn new(process: crate::process::Process<R>) -> Self { + pub fn new(process: tokio_pty_process_stream::Process<R>) -> Self { Self { process, resizer: Resizer::new(), @@ -71,7 +73,7 @@ impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> { &mut self, ) -> component_future::Poll<Option<Event<R>>, Error> { let size = component_future::try_ready!(self.resizer.poll()).unwrap(); - self.process.resize(size.clone()); + self.process.resize(size.rows, size.cols); Ok(component_future::Async::Ready(Some(Event::Resize(size)))) } @@ -79,8 +81,11 @@ impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> { &mut self, ) -> component_future::Poll<Option<Event<R>>, Error> { Ok(component_future::Async::Ready( - component_future::try_ready!(self.process.poll()) - .map(Event::Process), + component_future::try_ready!(self + .process + .poll() + .context(crate::error::Subprocess)) + .map(Event::Process), )) } } @@ -90,8 +95,7 @@ impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream for ResizingProcess<R> { type Item = Event<R>; - type Error = - <crate::process::Process<R> as futures::stream::Stream>::Error; + type Error = Error; fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { component_future::poll_stream(self, Self::POLL_FNS) diff --git a/src/term.rs b/src/term.rs index cbd537a..c8e795f 100644 --- a/src/term.rs +++ b/src/term.rs @@ -20,14 +20,6 @@ impl Size { Ok(Self { rows, cols }) } - pub fn resize_pty<T: tokio_pty_process::PtyMaster>( - &self, - pty: &T, - ) -> futures::Poll<(), Error> { - pty.resize(self.rows, self.cols) - .context(crate::error::ResizePty) - } - pub fn fits_in(&self, other: &Self) -> bool { self.rows <= other.rows && self.cols <= other.cols } |