aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-24 15:28:58 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-24 15:28:58 -0400
commit359ccda66cdda3a88907e51014aa52473ac46605 (patch)
tree08a33387deac68bf4a4f02ad79882ce07617505a /src
parent6552f2d69eb0f19851dbebd21742b7bc9f37cc42 (diff)
downloadteleterm-359ccda66cdda3a88907e51014aa52473ac46605.tar.gz
teleterm-359ccda66cdda3a88907e51014aa52473ac46605.zip
move process out to a separate crate
Diffstat (limited to 'src')
-rw-r--r--src/cmd/record.rs16
-rw-r--r--src/cmd/stream.rs14
-rw-r--r--src/error.rs5
-rw-r--r--src/main.rs1
-rw-r--r--src/process.rs308
-rw-r--r--src/resize.rs20
-rw-r--r--src/term.rs8
7 files changed, 36 insertions, 336 deletions
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index 6a31c5d..6fda88f 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -86,7 +86,7 @@ impl RecordSession {
) -> Self {
let input = crate::async_stdin::Stdin::new();
let process = crate::resize::ResizingProcess::new(
- crate::process::Process::new(cmd, args, input),
+ tokio_pty_process_stream::Process::new(cmd, args, input),
);
Self {
@@ -156,7 +156,9 @@ impl RecordSession {
match component_future::try_ready!(self.process.poll()) {
Some(crate::resize::Event::Process(e)) => {
match e {
- crate::process::Event::CommandStart(..) => {
+ tokio_pty_process_stream::Event::CommandStart {
+ ..
+ } => {
if self.raw_screen.is_none() {
self.raw_screen = Some(
crossterm::RawScreen::into_raw_mode()
@@ -164,13 +166,15 @@ impl RecordSession {
);
}
}
- crate::process::Event::CommandExit(..) => {
+ tokio_pty_process_stream::Event::CommandExit {
+ ..
+ } => {
self.done = true;
}
- crate::process::Event::Output(output) => {
- self.record_bytes(&output);
+ tokio_pty_process_stream::Event::Output { data } => {
+ self.record_bytes(&data);
if let FileState::Open { file } = &mut self.file {
- file.write_frame(&output)?;
+ file.write_frame(&data)?;
}
}
}
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index 823a4b1..9d2cf83 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -149,7 +149,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
let input = crate::async_stdin::Stdin::new();
let process = crate::resize::ResizingProcess::new(
- crate::process::Process::new(cmd, args, input),
+ tokio_pty_process_stream::Process::new(cmd, args, input),
);
Self {
@@ -238,7 +238,9 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
match component_future::try_ready!(self.process.poll()) {
Some(crate::resize::Event::Process(e)) => {
match e {
- crate::process::Event::CommandStart(..) => {
+ tokio_pty_process_stream::Event::CommandStart {
+ ..
+ } => {
if self.raw_screen.is_none() {
self.raw_screen = Some(
crossterm::RawScreen::into_raw_mode()
@@ -246,11 +248,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
);
}
}
- crate::process::Event::CommandExit(..) => {
+ tokio_pty_process_stream::Event::CommandExit {
+ ..
+ } => {
self.done = true;
}
- crate::process::Event::Output(output) => {
- self.record_bytes(&output);
+ tokio_pty_process_stream::Event::Output { data } => {
+ self.record_bytes(&data);
}
}
Ok(component_future::Async::DidWork)
diff --git a/src/error.rs b/src/error.rs
index 1d14250..662f860 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -359,6 +359,11 @@ pub enum Error {
#[snafu(display("failed to spawn process for `{}`: {}", cmd, source))]
SpawnProcess { cmd: String, source: std::io::Error },
+ #[snafu(display("poll subprocess failed: {}", source))]
+ Subprocess {
+ source: tokio_pty_process_stream::Error,
+ },
+
#[snafu(display("failed to switch gid: {}", source))]
SwitchGid { source: std::io::Error },
diff --git a/src/main.rs b/src/main.rs
index 03cfc35..bc57f00 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,7 +23,6 @@ mod dirs;
mod error;
mod key_reader;
mod oauth;
-mod process;
mod protocol;
mod resize;
mod server;
diff --git a/src/process.rs b/src/process.rs
deleted file mode 100644
index bbd1820..0000000
--- a/src/process.rs
+++ /dev/null
@@ -1,308 +0,0 @@
-use crate::prelude::*;
-use std::os::unix::io::AsRawFd as _;
-use tokio::io::{AsyncRead as _, AsyncWrite as _};
-use tokio_pty_process::CommandExt as _;
-
-const READ_BUFFER_SIZE: usize = 4 * 1024;
-
-#[derive(Debug, PartialEq, Eq)]
-pub enum Event {
- CommandStart(String, Vec<String>),
- Output(Vec<u8>),
- CommandExit(std::process::ExitStatus),
-}
-
-pub struct State {
- pty: Option<tokio_pty_process::AsyncPtyMaster>,
- process: Option<tokio_pty_process::Child>,
-}
-
-impl State {
- fn new() -> Self {
- Self {
- pty: None,
- process: None,
- }
- }
-
- fn pty(&self) -> &tokio_pty_process::AsyncPtyMaster {
- self.pty.as_ref().unwrap()
- }
-
- fn pty_mut(&mut self) -> &mut tokio_pty_process::AsyncPtyMaster {
- self.pty.as_mut().unwrap()
- }
-
- fn process(&mut self) -> &mut tokio_pty_process::Child {
- self.process.as_mut().unwrap()
- }
-}
-
-pub struct Process<R: tokio::io::AsyncRead> {
- state: State,
- input: R,
- input_buf: std::collections::VecDeque<u8>,
- cmd: String,
- args: Vec<String>,
- buf: [u8; READ_BUFFER_SIZE],
- started: bool,
- exited: bool,
- needs_resize: Option<crate::term::Size>,
- stdin_closed: bool,
- stdout_closed: bool,
-}
-
-impl<R: tokio::io::AsyncRead + 'static> Process<R> {
- pub fn new(cmd: &str, args: &[String], input: R) -> Self {
- Self {
- state: State::new(),
- input,
- input_buf: std::collections::VecDeque::new(),
- cmd: cmd.to_string(),
- args: args.to_vec(),
- buf: [0; READ_BUFFER_SIZE],
- started: false,
- exited: false,
- needs_resize: None,
- stdin_closed: false,
- stdout_closed: false,
- }
- }
-
- pub fn resize(&mut self, size: crate::term::Size) {
- self.needs_resize = Some(size);
- }
-}
-
-impl<R: tokio::io::AsyncRead + 'static> Process<R> {
- const POLL_FNS:
- &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- )
- -> component_future::Poll<
- Option<Event>,
- Error,
- >] = &[
- // order is important here - checking command_exit first so that we
- // don't try to read from a process that has already exited, which
- // causes an error. also, poll_resize needs to happen after
- // poll_command_start, or else the pty might not be initialized.
- &Self::poll_command_start,
- &Self::poll_command_exit,
- &Self::poll_resize,
- &Self::poll_read_stdin,
- &Self::poll_write_stdin,
- &Self::poll_read_stdout,
- ];
-
- fn poll_resize(
- &mut self,
- ) -> component_future::Poll<Option<Event>, Error> {
- if let Some(size) = &self.needs_resize {
- component_future::try_ready!(size.resize_pty(self.state.pty()));
- log::debug!("resize({:?})", size);
- self.needs_resize = None;
- Ok(component_future::Async::DidWork)
- } else {
- Ok(component_future::Async::NothingToDo)
- }
- }
-
- fn poll_command_start(
- &mut self,
- ) -> component_future::Poll<Option<Event>, Error> {
- if self.started {
- return Ok(component_future::Async::NothingToDo);
- }
-
- if self.state.pty.is_none() {
- self.state.pty = Some(
- tokio_pty_process::AsyncPtyMaster::open()
- .context(crate::error::OpenPty)?,
- );
- log::debug!(
- "openpty({})",
- self.state.pty.as_ref().unwrap().as_raw_fd()
- );
- }
-
- if self.state.process.is_none() {
- self.state.process = Some(
- std::process::Command::new(&self.cmd)
- .args(&self.args)
- .spawn_pty_async(self.state.pty())
- .context(crate::error::SpawnProcess {
- cmd: self.cmd.clone(),
- })?,
- );
- log::debug!(
- "spawn({})",
- self.state.process.as_ref().unwrap().id()
- );
- }
-
- self.started = true;
- Ok(component_future::Async::Ready(Some(Event::CommandStart(
- self.cmd.clone(),
- self.args.clone(),
- ))))
- }
-
- fn poll_read_stdin(
- &mut self,
- ) -> component_future::Poll<Option<Event>, Error> {
- if self.exited || self.stdin_closed {
- return Ok(component_future::Async::NothingToDo);
- }
-
- let n = component_future::try_ready!(self
- .input
- .poll_read(&mut self.buf)
- .context(crate::error::ReadTerminal));
- log::debug!("read_stdin({})", n);
- if n > 0 {
- self.input_buf.extend(self.buf[..n].iter());
- } else {
- self.input_buf.push_back(b'\x04');
- self.stdin_closed = true;
- }
- Ok(component_future::Async::DidWork)
- }
-
- fn poll_write_stdin(
- &mut self,
- ) -> component_future::Poll<Option<Event>, Error> {
- if self.exited || self.input_buf.is_empty() {
- return Ok(component_future::Async::NothingToDo);
- }
-
- let (a, b) = self.input_buf.as_slices();
- let buf = if a.is_empty() { b } else { a };
- let n = component_future::try_ready!(self
- .state
- .pty_mut()
- .poll_write(buf)
- .context(crate::error::WritePty));
- log::debug!("write_stdin({})", n);
- for _ in 0..n {
- self.input_buf.pop_front();
- }
- Ok(component_future::Async::DidWork)
- }
-
- fn poll_read_stdout(
- &mut self,
- ) -> component_future::Poll<Option<Event>, Error> {
- match self
- .state
- .pty_mut()
- .poll_read(&mut self.buf)
- .context(crate::error::ReadPty)
- {
- Ok(futures::Async::Ready(n)) => {
- log::debug!("read_stdout({})", n);
- let bytes = self.buf[..n].to_vec();
- Ok(component_future::Async::Ready(Some(Event::Output(bytes))))
- }
- Ok(futures::Async::NotReady) => {
- Ok(component_future::Async::NotReady)
- }
- Err(e) => {
- // XXX this seems to be how eof is returned, but this seems...
- // wrong? i feel like there has to be a better way to do this
- if let Error::ReadPty { source } = &e {
- if source.kind() == std::io::ErrorKind::Other {
- log::debug!("read_stdout(eof)");
- self.stdout_closed = true;
- return Ok(component_future::Async::DidWork);
- }
- }
- Err(e)
- }
- }
- }
-
- fn poll_command_exit(
- &mut self,
- ) -> component_future::Poll<Option<Event>, Error> {
- if self.exited {
- return Ok(component_future::Async::Ready(None));
- }
- if !self.stdout_closed {
- return Ok(component_future::Async::NothingToDo);
- }
-
- let status = component_future::try_ready!(self
- .state
- .process()
- .poll()
- .context(crate::error::ProcessExitPoll));
- log::debug!("exit({})", status);
- self.exited = true;
- Ok(component_future::Async::Ready(Some(Event::CommandExit(
- status,
- ))))
- }
-}
-
-#[must_use = "streams do nothing unless polled"]
-impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
- for Process<R>
-{
- type Item = Event;
- type Error = Error;
-
- fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- component_future::poll_stream(self, Self::POLL_FNS)
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
-
- #[test]
- fn test_simple() {
- let (wres, rres) = tokio::sync::mpsc::channel(100);
- let wres2 = wres.clone();
- let mut wres = wres.wait();
- let buf = std::io::Cursor::new(b"hello world\n");
- let fut = Process::new("cat", &[], buf)
- .for_each(move |e| {
- wres.send(Ok(e)).unwrap();
- Ok(())
- })
- .map_err(|e| {
- wres2.wait().send(Err(e)).unwrap();
- });
- tokio::run(fut);
-
- let mut rres = rres.wait();
-
- let event = rres.next();
- let event = event.unwrap();
- let event = event.unwrap();
- let event = event.unwrap();
- assert_eq!(event, Event::CommandStart("cat".to_string(), vec![]));
-
- let mut output: Vec<u8> = vec![];
- let mut exited = false;
- for event in rres {
- assert!(!exited);
- let event = event.unwrap();
- let event = event.unwrap();
- match event {
- Event::CommandStart(..) => panic!("unexpected CommandStart"),
- Event::Output(buf) => {
- output.extend(buf.iter());
- }
- Event::CommandExit(status) => {
- assert!(status.success());
- exited = true;
- }
- }
- }
- assert!(exited);
- assert_eq!(output, b"hello world\r\nhello world\r\n");
- }
-}
diff --git a/src/resize.rs b/src/resize.rs
index 2dcf928..d792708 100644
--- a/src/resize.rs
+++ b/src/resize.rs
@@ -39,17 +39,19 @@ impl futures::stream::Stream for Resizer {
}
pub enum Event<R: tokio::io::AsyncRead + 'static> {
- Process(<crate::process::Process<R> as futures::stream::Stream>::Item),
+ Process(
+ <tokio_pty_process_stream::Process<R> as futures::stream::Stream>::Item
+ ),
Resize(crate::term::Size),
}
pub struct ResizingProcess<R: tokio::io::AsyncRead + 'static> {
- process: crate::process::Process<R>,
+ process: tokio_pty_process_stream::Process<R>,
resizer: Resizer,
}
impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
- pub fn new(process: crate::process::Process<R>) -> Self {
+ pub fn new(process: tokio_pty_process_stream::Process<R>) -> Self {
Self {
process,
resizer: Resizer::new(),
@@ -71,7 +73,7 @@ impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
&mut self,
) -> component_future::Poll<Option<Event<R>>, Error> {
let size = component_future::try_ready!(self.resizer.poll()).unwrap();
- self.process.resize(size.clone());
+ self.process.resize(size.rows, size.cols);
Ok(component_future::Async::Ready(Some(Event::Resize(size))))
}
@@ -79,8 +81,11 @@ impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
&mut self,
) -> component_future::Poll<Option<Event<R>>, Error> {
Ok(component_future::Async::Ready(
- component_future::try_ready!(self.process.poll())
- .map(Event::Process),
+ component_future::try_ready!(self
+ .process
+ .poll()
+ .context(crate::error::Subprocess))
+ .map(Event::Process),
))
}
}
@@ -90,8 +95,7 @@ impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
for ResizingProcess<R>
{
type Item = Event<R>;
- type Error =
- <crate::process::Process<R> as futures::stream::Stream>::Error;
+ type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
component_future::poll_stream(self, Self::POLL_FNS)
diff --git a/src/term.rs b/src/term.rs
index cbd537a..c8e795f 100644
--- a/src/term.rs
+++ b/src/term.rs
@@ -20,14 +20,6 @@ impl Size {
Ok(Self { rows, cols })
}
- pub fn resize_pty<T: tokio_pty_process::PtyMaster>(
- &self,
- pty: &T,
- ) -> futures::Poll<(), Error> {
- pty.resize(self.rows, self.cols)
- .context(crate::error::ResizePty)
- }
-
pub fn fits_in(&self, other: &Self) -> bool {
self.rows <= other.rows && self.cols <= other.cols
}