aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-17 12:27:00 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-17 12:27:00 -0400
commitd47cb39c8fb3e60bf47690626f5287e3cd981c8c (patch)
tree42a01012d0bbd22ff271697dc72646404b604cef
parent61c8fb0a8d6460aff45ed454cfb2b0a5e37a1a89 (diff)
downloadteleterm-d47cb39c8fb3e60bf47690626f5287e3cd981c8c.tar.gz
teleterm-d47cb39c8fb3e60bf47690626f5287e3cd981c8c.zip
ensure resize handling works everywhere
previously record wasn't getting resize events because they were being handled in the client (which record doesn't use)
-rw-r--r--src/client.rs62
-rw-r--r--src/cmd/record.rs11
-rw-r--r--src/cmd/stream.rs26
-rw-r--r--src/cmd/watch.rs34
-rw-r--r--src/main.rs1
-rw-r--r--src/resize.rs117
6 files changed, 160 insertions, 91 deletions
diff --git a/src/client.rs b/src/client.rs
index be8061c..584dd8e 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -63,11 +63,9 @@ enum WriteSocket<
}
pub enum Event {
- Start(crate::term::Size),
ServerMessage(crate::protocol::Message),
Disconnect,
- Connect(),
- Resize(crate::term::Size),
+ Connect,
}
pub type Connector<S> = Box<
@@ -92,16 +90,12 @@ pub struct Client<
reconnect_timer: Option<tokio::timer::Delay>,
reconnect_backoff_amount: std::time::Duration,
last_server_time: std::time::Instant,
- winches: Option<
- Box<dyn futures::stream::Stream<Item = (), Error = Error> + Send>,
- >,
rsock: ReadSocket<S>,
wsock: WriteSocket<S>,
on_login: Vec<crate::protocol::Message>,
to_send: std::collections::VecDeque<crate::protocol::Message>,
- started: bool,
}
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
@@ -117,7 +111,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
auth,
buffer_size,
&[crate::protocol::Message::start_streaming()],
- true,
)
}
@@ -132,7 +125,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
auth,
buffer_size,
&[crate::protocol::Message::start_watching(id)],
- false,
)
}
@@ -141,7 +133,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
auth: &crate::protocol::Auth,
buffer_size: usize,
) -> Self {
- Self::new(connect, auth, buffer_size, &[], true)
+ Self::new(connect, auth, buffer_size, &[])
}
fn new(
@@ -149,25 +141,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
auth: &crate::protocol::Auth,
buffer_size: usize,
on_login: &[crate::protocol::Message],
- handle_sigwinch: bool,
) -> Self {
let term_type =
std::env::var("TERM").unwrap_or_else(|_| "".to_string());
let heartbeat_timer =
tokio::timer::Interval::new_interval(HEARTBEAT_DURATION);
- let winches: Option<
- Box<dyn futures::stream::Stream<Item = (), Error = Error> + Send>,
- > = if handle_sigwinch {
- let winches = tokio_signal::unix::Signal::new(
- tokio_signal::unix::libc::SIGWINCH,
- )
- .flatten_stream()
- .map(|_| ())
- .context(crate::error::SigWinchHandler);
- Some(Box::new(winches))
- } else {
- None
- };
Self {
connect,
@@ -180,14 +158,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
reconnect_timer: None,
reconnect_backoff_amount: RECONNECT_BACKOFF_BASE,
last_server_time: std::time::Instant::now(),
- winches,
rsock: ReadSocket::NotConnected,
wsock: WriteSocket::NotConnected,
on_login: on_login.to_vec(),
to_send: std::collections::VecDeque::new(),
- started: false,
}
}
@@ -308,7 +284,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
Ok((
crate::component_future::Async::Ready(Some(
- Event::Connect(),
+ Event::Connect,
)),
None,
))
@@ -432,7 +408,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&Self::poll_read_server,
&Self::poll_write_server,
&Self::poll_heartbeat,
- &Self::poll_sigwinch,
];
fn poll_reconnect_server(
@@ -625,37 +600,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
}
}
-
- fn poll_sigwinch(
- &mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
- if let Some(winches) = &mut self.winches {
- if !self.started {
- self.started = true;
- return Ok(crate::component_future::Async::Ready(Some(
- Event::Start(crate::term::Size::get()?),
- )));
- }
-
- match winches.poll()? {
- futures::Async::Ready(Some(_)) => {
- let size = crate::term::Size::get()?;
- self.send_message(crate::protocol::Message::resize(
- &size,
- ));
- Ok(crate::component_future::Async::Ready(Some(
- Event::Resize(size),
- )))
- }
- futures::Async::Ready(None) => unreachable!(),
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
- } else {
- Ok(crate::component_future::Async::NothingToDo)
- }
- }
}
#[must_use = "streams do nothing unless polled"]
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index 7fbfb4b..9092506 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -103,7 +103,7 @@ enum FileState {
struct RecordSession {
file: FileState,
- process: crate::process::Process<crate::async_stdin::Stdin>,
+ process: crate::resize::ResizingProcess<crate::async_stdin::Stdin>,
stdout: tokio::io::Stdout,
buffer: crate::term::Buffer,
sent_local: usize,
@@ -120,7 +120,9 @@ impl RecordSession {
args: &[String],
) -> Self {
let input = crate::async_stdin::Stdin::new();
- let process = crate::process::Process::new(cmd, args, input);
+ let process = crate::resize::ResizingProcess::new(
+ crate::process::Process::new(cmd, args, input),
+ );
Self {
file: FileState::Closed {
@@ -188,7 +190,7 @@ impl RecordSession {
&mut self,
) -> crate::component_future::Poll<(), Error> {
match self.process.poll()? {
- futures::Async::Ready(Some(e)) => {
+ futures::Async::Ready(Some(crate::resize::Event::Process(e))) => {
match e {
crate::process::Event::CommandStart(..) => {
if self.raw_screen.is_none() {
@@ -210,6 +212,9 @@ impl RecordSession {
}
Ok(crate::component_future::Async::DidWork)
}
+ futures::Async::Ready(Some(crate::resize::Event::Resize(_))) => {
+ Ok(crate::component_future::Async::DidWork)
+ }
futures::Async::Ready(None) => {
if !self.done {
unreachable!()
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index eba26bb..a9eea1b 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -180,7 +180,7 @@ struct StreamSession<
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
> {
client: crate::client::Client<S>,
- process: crate::process::Process<crate::async_stdin::Stdin>,
+ process: crate::resize::ResizingProcess<crate::async_stdin::Stdin>,
stdout: tokio::io::Stdout,
buffer: crate::term::Buffer,
sent_local: usize,
@@ -209,7 +209,9 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
// let input = tokio::io::stdin();
let input = crate::async_stdin::Stdin::new();
- let process = crate::process::Process::new(cmd, args, input);
+ let process = crate::resize::ResizingProcess::new(
+ crate::process::Process::new(cmd, args, input),
+ );
Self {
client,
@@ -268,11 +270,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.connected = false;
Ok(crate::component_future::Async::DidWork)
}
- crate::client::Event::Start(size) => {
- self.process.resize(size);
- Ok(crate::component_future::Async::DidWork)
- }
- crate::client::Event::Connect() => {
+ crate::client::Event::Connect => {
self.connected = true;
self.sent_remote = 0;
Ok(crate::component_future::Async::DidWork)
@@ -284,10 +282,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.client.reconnect();
Ok(crate::component_future::Async::DidWork)
}
- crate::client::Event::Resize(size) => {
- self.process.resize(size);
- Ok(crate::component_future::Async::DidWork)
- }
},
Ok(futures::Async::Ready(None)) => {
// the client should never exit on its own
@@ -307,7 +301,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&mut self,
) -> crate::component_future::Poll<(), Error> {
match self.process.poll()? {
- futures::Async::Ready(Some(e)) => {
+ futures::Async::Ready(Some(crate::resize::Event::Process(e))) => {
match e {
crate::process::Event::CommandStart(..) => {
if self.raw_screen.is_none() {
@@ -316,7 +310,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
.context(crate::error::ToRawMode)?,
);
}
- self.process.resize(crate::term::Size::get()?);
}
crate::process::Event::CommandExit(..) => {
self.done = true;
@@ -327,6 +320,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
Ok(crate::component_future::Async::DidWork)
}
+ futures::Async::Ready(Some(crate::resize::Event::Resize(
+ size,
+ ))) => {
+ self.client
+ .send_message(crate::protocol::Message::resize(&size));
+ Ok(crate::component_future::Async::DidWork)
+ }
futures::Async::Ready(None) => {
if !self.done {
unreachable!()
diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs
index f4d4948..99819d8 100644
--- a/src/cmd/watch.rs
+++ b/src/cmd/watch.rs
@@ -224,6 +224,7 @@ struct WatchSession<
key_reader: crate::key_reader::KeyReader,
list_client: crate::client::Client<S>,
+ resizer: crate::resize::Resizer,
state: State<S>,
raw_screen: Option<crossterm::RawScreen>,
needs_redraw: bool,
@@ -245,6 +246,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
key_reader: crate::key_reader::KeyReader::new(),
list_client,
+ resizer: crate::resize::Resizer::new(),
state: State::new(),
raw_screen: None,
needs_redraw: true,
@@ -588,11 +590,25 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
(),
Error,
>] = &[
+ &Self::poll_resizer,
&Self::poll_input,
&Self::poll_list_client,
&Self::poll_watch_client,
];
+ fn poll_resizer(&mut self) -> crate::component_future::Poll<(), Error> {
+ match self.resizer.poll()? {
+ futures::Async::Ready(Some(size)) => {
+ self.resize(size)?;
+ Ok(crate::component_future::Async::DidWork)
+ }
+ futures::Async::Ready(None) => unreachable!(),
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Async::NotReady)
+ }
+ }
+ }
+
fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> {
if self.raw_screen.is_none() {
self.raw_screen = Some(
@@ -633,13 +649,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
match self.list_client.poll()? {
futures::Async::Ready(Some(e)) => {
match e {
- crate::client::Event::Start(size) => {
- self.resize(size)?;
- }
crate::client::Event::Disconnect => {
self.reconnect(true)?;
}
- crate::client::Event::Connect() => {
+ crate::client::Event::Connect => {
self.list_client.send_message(
crate::protocol::Message::list_sessions(),
);
@@ -647,9 +660,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
crate::client::Event::ServerMessage(msg) => {
self.list_server_message(msg)?;
}
- crate::client::Event::Resize(size) => {
- self.resize(size)?;
- }
}
Ok(crate::component_future::Async::DidWork)
}
@@ -675,21 +685,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
match client.poll()? {
futures::Async::Ready(Some(e)) => {
match e {
- crate::client::Event::Start(_) => {
- // watch clients don't respond to resize events
- unreachable!();
- }
crate::client::Event::Disconnect => {
self.reconnect(true)?;
}
- crate::client::Event::Connect() => {}
+ crate::client::Event::Connect => {}
crate::client::Event::ServerMessage(msg) => {
self.watch_server_message(msg)?;
}
- crate::client::Event::Resize(_) => {
- // watch clients don't respond to resize events
- unreachable!();
- }
}
Ok(crate::component_future::Async::DidWork)
}
diff --git a/src/main.rs b/src/main.rs
index f44dba0..0f7b4d3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -20,6 +20,7 @@ mod key_reader;
mod oauth;
mod process;
mod protocol;
+mod resize;
mod server;
mod session_list;
mod term;
diff --git a/src/resize.rs b/src/resize.rs
new file mode 100644
index 0000000..86dd386
--- /dev/null
+++ b/src/resize.rs
@@ -0,0 +1,117 @@
+use crate::prelude::*;
+
+pub struct Resizer {
+ winches:
+ Box<dyn futures::stream::Stream<Item = (), Error = Error> + Send>,
+ sent_initial_size: bool,
+}
+
+impl Resizer {
+ pub fn new() -> Self {
+ let winches = tokio_signal::unix::Signal::new(
+ tokio_signal::unix::libc::SIGWINCH,
+ )
+ .flatten_stream()
+ .map(|_| ())
+ .context(crate::error::SigWinchHandler);
+ Self {
+ winches: Box::new(winches),
+ sent_initial_size: false,
+ }
+ }
+}
+
+#[must_use = "streams do nothing unless polled"]
+impl futures::stream::Stream for Resizer {
+ type Item = crate::term::Size;
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ if !self.sent_initial_size {
+ self.sent_initial_size = true;
+ return Ok(futures::Async::Ready(
+ Some(crate::term::Size::get()?),
+ ));
+ }
+ let _ = futures::try_ready!(self.winches.poll());
+ Ok(futures::Async::Ready(Some(crate::term::Size::get()?)))
+ }
+}
+
+pub enum Event<R: tokio::io::AsyncRead + 'static> {
+ Process(<crate::process::Process<R> as futures::stream::Stream>::Item),
+ Resize(crate::term::Size),
+}
+
+pub struct ResizingProcess<R: tokio::io::AsyncRead + 'static> {
+ process: crate::process::Process<R>,
+ resizer: Resizer,
+}
+
+impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
+ pub fn new(process: crate::process::Process<R>) -> Self {
+ Self {
+ process,
+ resizer: Resizer::new(),
+ }
+ }
+}
+
+impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ Option<Event<R>>,
+ Error,
+ >] = &[&Self::poll_resize, &Self::poll_process];
+
+ fn poll_resize(
+ &mut self,
+ ) -> crate::component_future::Poll<Option<Event<R>>, Error> {
+ match self.resizer.poll()? {
+ futures::Async::Ready(Some(size)) => {
+ self.process.resize(size.clone());
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::Resize(size),
+ )))
+ }
+ futures::Async::Ready(None) => unreachable!(),
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Async::NotReady)
+ }
+ }
+ }
+
+ fn poll_process(
+ &mut self,
+ ) -> crate::component_future::Poll<Option<Event<R>>, Error> {
+ match self.process.poll()? {
+ futures::Async::Ready(Some(e)) => {
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::Process(e),
+ )))
+ }
+ futures::Async::Ready(None) => {
+ Ok(crate::component_future::Async::Ready(None))
+ }
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Async::NotReady)
+ }
+ }
+ }
+}
+
+#[must_use = "streams do nothing unless polled"]
+impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
+ for ResizingProcess<R>
+{
+ type Item = Event<R>;
+ type Error =
+ <crate::process::Process<R> as futures::stream::Stream>::Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ crate::component_future::poll_stream(self, Self::POLL_FNS)
+ }
+}