From d47cb39c8fb3e60bf47690626f5287e3cd981c8c Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 17 Oct 2019 12:27:00 -0400 Subject: ensure resize handling works everywhere previously record wasn't getting resize events because they were being handled in the client (which record doesn't use) --- src/client.rs | 62 ++--------------------------- src/cmd/record.rs | 11 +++-- src/cmd/stream.rs | 26 ++++++------ src/cmd/watch.rs | 34 ++++++++-------- src/main.rs | 1 + src/resize.rs | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 160 insertions(+), 91 deletions(-) create mode 100644 src/resize.rs (limited to 'src') diff --git a/src/client.rs b/src/client.rs index be8061c..584dd8e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -63,11 +63,9 @@ enum WriteSocket< } pub enum Event { - Start(crate::term::Size), ServerMessage(crate::protocol::Message), Disconnect, - Connect(), - Resize(crate::term::Size), + Connect, } pub type Connector = Box< @@ -92,16 +90,12 @@ pub struct Client< reconnect_timer: Option, reconnect_backoff_amount: std::time::Duration, last_server_time: std::time::Instant, - winches: Option< - Box + Send>, - >, rsock: ReadSocket, wsock: WriteSocket, on_login: Vec, to_send: std::collections::VecDeque, - started: bool, } impl @@ -117,7 +111,6 @@ impl auth, buffer_size, &[crate::protocol::Message::start_streaming()], - true, ) } @@ -132,7 +125,6 @@ impl auth, buffer_size, &[crate::protocol::Message::start_watching(id)], - false, ) } @@ -141,7 +133,7 @@ impl auth: &crate::protocol::Auth, buffer_size: usize, ) -> Self { - Self::new(connect, auth, buffer_size, &[], true) + Self::new(connect, auth, buffer_size, &[]) } fn new( @@ -149,25 +141,11 @@ impl auth: &crate::protocol::Auth, buffer_size: usize, on_login: &[crate::protocol::Message], - handle_sigwinch: bool, ) -> Self { let term_type = std::env::var("TERM").unwrap_or_else(|_| "".to_string()); let heartbeat_timer = tokio::timer::Interval::new_interval(HEARTBEAT_DURATION); - let winches: Option< - Box + Send>, - > = if handle_sigwinch { - let winches = tokio_signal::unix::Signal::new( - tokio_signal::unix::libc::SIGWINCH, - ) - .flatten_stream() - .map(|_| ()) - .context(crate::error::SigWinchHandler); - Some(Box::new(winches)) - } else { - None - }; Self { connect, @@ -180,14 +158,12 @@ impl reconnect_timer: None, reconnect_backoff_amount: RECONNECT_BACKOFF_BASE, last_server_time: std::time::Instant::now(), - winches, rsock: ReadSocket::NotConnected, wsock: WriteSocket::NotConnected, on_login: on_login.to_vec(), to_send: std::collections::VecDeque::new(), - started: false, } } @@ -308,7 +284,7 @@ impl } Ok(( crate::component_future::Async::Ready(Some( - Event::Connect(), + Event::Connect, )), None, )) @@ -432,7 +408,6 @@ impl &Self::poll_read_server, &Self::poll_write_server, &Self::poll_heartbeat, - &Self::poll_sigwinch, ]; fn poll_reconnect_server( @@ -625,37 +600,6 @@ impl } } } - - fn poll_sigwinch( - &mut self, - ) -> crate::component_future::Poll, Error> { - if let Some(winches) = &mut self.winches { - if !self.started { - self.started = true; - return Ok(crate::component_future::Async::Ready(Some( - Event::Start(crate::term::Size::get()?), - ))); - } - - match winches.poll()? { - futures::Async::Ready(Some(_)) => { - let size = crate::term::Size::get()?; - self.send_message(crate::protocol::Message::resize( - &size, - )); - Ok(crate::component_future::Async::Ready(Some( - Event::Resize(size), - ))) - } - futures::Async::Ready(None) => unreachable!(), - futures::Async::NotReady => { - Ok(crate::component_future::Async::NotReady) - } - } - } else { - Ok(crate::component_future::Async::NothingToDo) - } - } } #[must_use = "streams do nothing unless polled"] diff --git a/src/cmd/record.rs b/src/cmd/record.rs index 7fbfb4b..9092506 100644 --- a/src/cmd/record.rs +++ b/src/cmd/record.rs @@ -103,7 +103,7 @@ enum FileState { struct RecordSession { file: FileState, - process: crate::process::Process, + process: crate::resize::ResizingProcess, stdout: tokio::io::Stdout, buffer: crate::term::Buffer, sent_local: usize, @@ -120,7 +120,9 @@ impl RecordSession { args: &[String], ) -> Self { let input = crate::async_stdin::Stdin::new(); - let process = crate::process::Process::new(cmd, args, input); + let process = crate::resize::ResizingProcess::new( + crate::process::Process::new(cmd, args, input), + ); Self { file: FileState::Closed { @@ -188,7 +190,7 @@ impl RecordSession { &mut self, ) -> crate::component_future::Poll<(), Error> { match self.process.poll()? { - futures::Async::Ready(Some(e)) => { + futures::Async::Ready(Some(crate::resize::Event::Process(e))) => { match e { crate::process::Event::CommandStart(..) => { if self.raw_screen.is_none() { @@ -210,6 +212,9 @@ impl RecordSession { } Ok(crate::component_future::Async::DidWork) } + futures::Async::Ready(Some(crate::resize::Event::Resize(_))) => { + Ok(crate::component_future::Async::DidWork) + } futures::Async::Ready(None) => { if !self.done { unreachable!() diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index eba26bb..a9eea1b 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -180,7 +180,7 @@ struct StreamSession< S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, > { client: crate::client::Client, - process: crate::process::Process, + process: crate::resize::ResizingProcess, stdout: tokio::io::Stdout, buffer: crate::term::Buffer, sent_local: usize, @@ -209,7 +209,9 @@ impl // let input = tokio::io::stdin(); let input = crate::async_stdin::Stdin::new(); - let process = crate::process::Process::new(cmd, args, input); + let process = crate::resize::ResizingProcess::new( + crate::process::Process::new(cmd, args, input), + ); Self { client, @@ -268,11 +270,7 @@ impl self.connected = false; Ok(crate::component_future::Async::DidWork) } - crate::client::Event::Start(size) => { - self.process.resize(size); - Ok(crate::component_future::Async::DidWork) - } - crate::client::Event::Connect() => { + crate::client::Event::Connect => { self.connected = true; self.sent_remote = 0; Ok(crate::component_future::Async::DidWork) @@ -284,10 +282,6 @@ impl self.client.reconnect(); Ok(crate::component_future::Async::DidWork) } - crate::client::Event::Resize(size) => { - self.process.resize(size); - Ok(crate::component_future::Async::DidWork) - } }, Ok(futures::Async::Ready(None)) => { // the client should never exit on its own @@ -307,7 +301,7 @@ impl &mut self, ) -> crate::component_future::Poll<(), Error> { match self.process.poll()? { - futures::Async::Ready(Some(e)) => { + futures::Async::Ready(Some(crate::resize::Event::Process(e))) => { match e { crate::process::Event::CommandStart(..) => { if self.raw_screen.is_none() { @@ -316,7 +310,6 @@ impl .context(crate::error::ToRawMode)?, ); } - self.process.resize(crate::term::Size::get()?); } crate::process::Event::CommandExit(..) => { self.done = true; @@ -327,6 +320,13 @@ impl } Ok(crate::component_future::Async::DidWork) } + futures::Async::Ready(Some(crate::resize::Event::Resize( + size, + ))) => { + self.client + .send_message(crate::protocol::Message::resize(&size)); + Ok(crate::component_future::Async::DidWork) + } futures::Async::Ready(None) => { if !self.done { unreachable!() diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index f4d4948..99819d8 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -224,6 +224,7 @@ struct WatchSession< key_reader: crate::key_reader::KeyReader, list_client: crate::client::Client, + resizer: crate::resize::Resizer, state: State, raw_screen: Option, needs_redraw: bool, @@ -245,6 +246,7 @@ impl key_reader: crate::key_reader::KeyReader::new(), list_client, + resizer: crate::resize::Resizer::new(), state: State::new(), raw_screen: None, needs_redraw: true, @@ -588,11 +590,25 @@ impl (), Error, >] = &[ + &Self::poll_resizer, &Self::poll_input, &Self::poll_list_client, &Self::poll_watch_client, ]; + fn poll_resizer(&mut self) -> crate::component_future::Poll<(), Error> { + match self.resizer.poll()? { + futures::Async::Ready(Some(size)) => { + self.resize(size)?; + Ok(crate::component_future::Async::DidWork) + } + futures::Async::Ready(None) => unreachable!(), + futures::Async::NotReady => { + Ok(crate::component_future::Async::NotReady) + } + } + } + fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> { if self.raw_screen.is_none() { self.raw_screen = Some( @@ -633,13 +649,10 @@ impl match self.list_client.poll()? { futures::Async::Ready(Some(e)) => { match e { - crate::client::Event::Start(size) => { - self.resize(size)?; - } crate::client::Event::Disconnect => { self.reconnect(true)?; } - crate::client::Event::Connect() => { + crate::client::Event::Connect => { self.list_client.send_message( crate::protocol::Message::list_sessions(), ); @@ -647,9 +660,6 @@ impl crate::client::Event::ServerMessage(msg) => { self.list_server_message(msg)?; } - crate::client::Event::Resize(size) => { - self.resize(size)?; - } } Ok(crate::component_future::Async::DidWork) } @@ -675,21 +685,13 @@ impl match client.poll()? { futures::Async::Ready(Some(e)) => { match e { - crate::client::Event::Start(_) => { - // watch clients don't respond to resize events - unreachable!(); - } crate::client::Event::Disconnect => { self.reconnect(true)?; } - crate::client::Event::Connect() => {} + crate::client::Event::Connect => {} crate::client::Event::ServerMessage(msg) => { self.watch_server_message(msg)?; } - crate::client::Event::Resize(_) => { - // watch clients don't respond to resize events - unreachable!(); - } } Ok(crate::component_future::Async::DidWork) } diff --git a/src/main.rs b/src/main.rs index f44dba0..0f7b4d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ mod key_reader; mod oauth; mod process; mod protocol; +mod resize; mod server; mod session_list; mod term; diff --git a/src/resize.rs b/src/resize.rs new file mode 100644 index 0000000..86dd386 --- /dev/null +++ b/src/resize.rs @@ -0,0 +1,117 @@ +use crate::prelude::*; + +pub struct Resizer { + winches: + Box + Send>, + sent_initial_size: bool, +} + +impl Resizer { + pub fn new() -> Self { + let winches = tokio_signal::unix::Signal::new( + tokio_signal::unix::libc::SIGWINCH, + ) + .flatten_stream() + .map(|_| ()) + .context(crate::error::SigWinchHandler); + Self { + winches: Box::new(winches), + sent_initial_size: false, + } + } +} + +#[must_use = "streams do nothing unless polled"] +impl futures::stream::Stream for Resizer { + type Item = crate::term::Size; + type Error = Error; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + if !self.sent_initial_size { + self.sent_initial_size = true; + return Ok(futures::Async::Ready( + Some(crate::term::Size::get()?), + )); + } + let _ = futures::try_ready!(self.winches.poll()); + Ok(futures::Async::Ready(Some(crate::term::Size::get()?))) + } +} + +pub enum Event { + Process( as futures::stream::Stream>::Item), + Resize(crate::term::Size), +} + +pub struct ResizingProcess { + process: crate::process::Process, + resizer: Resizer, +} + +impl ResizingProcess { + pub fn new(process: crate::process::Process) -> Self { + Self { + process, + resizer: Resizer::new(), + } + } +} + +impl ResizingProcess { + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + Option>, + Error, + >] = &[&Self::poll_resize, &Self::poll_process]; + + fn poll_resize( + &mut self, + ) -> crate::component_future::Poll>, Error> { + match self.resizer.poll()? { + futures::Async::Ready(Some(size)) => { + self.process.resize(size.clone()); + Ok(crate::component_future::Async::Ready(Some( + Event::Resize(size), + ))) + } + futures::Async::Ready(None) => unreachable!(), + futures::Async::NotReady => { + Ok(crate::component_future::Async::NotReady) + } + } + } + + fn poll_process( + &mut self, + ) -> crate::component_future::Poll>, Error> { + match self.process.poll()? { + futures::Async::Ready(Some(e)) => { + Ok(crate::component_future::Async::Ready(Some( + Event::Process(e), + ))) + } + futures::Async::Ready(None) => { + Ok(crate::component_future::Async::Ready(None)) + } + futures::Async::NotReady => { + Ok(crate::component_future::Async::NotReady) + } + } + } +} + +#[must_use = "streams do nothing unless polled"] +impl futures::stream::Stream + for ResizingProcess +{ + type Item = Event; + type Error = + as futures::stream::Stream>::Error; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + crate::component_future::poll_stream(self, Self::POLL_FNS) + } +} -- cgit v1.2.3-54-g00ecf