diff options
author | Jesse Luehrs <doy@tozt.net> | 2019-10-15 03:48:12 -0400 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2019-10-15 03:48:12 -0400 |
commit | 405c0d50b20b8a35bd279aa32bdeb6e061d210de (patch) | |
tree | 39210fb210a31e864299bbb4f5983960105432c7 | |
parent | 00b51411b2a1fb97d6a66a2898443d8b9b355296 (diff) | |
download | teleterm-405c0d50b20b8a35bd279aa32bdeb6e061d210de.tar.gz teleterm-405c0d50b20b8a35bd279aa32bdeb6e061d210de.zip |
various client cleanups
-rw-r--r-- | src/client.rs | 190 |
1 files changed, 99 insertions, 91 deletions
diff --git a/src/client.rs b/src/client.rs index b51784e..fce3630 100644 --- a/src/client.rs +++ b/src/client.rs @@ -206,6 +206,76 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> self.reconnect_timer = None; self.reconnect_backoff_amount = RECONNECT_BACKOFF_BASE; } + + fn has_seen_server_recently(&self) -> bool { + let since_last_server = + std::time::Instant::now().duration_since(self.last_server_time); + if since_last_server > HEARTBEAT_DURATION * 2 { + return false; + } + + true + } + + fn should_wait_to_reconnect(&mut self) -> Result<bool> { + if let Some(timer) = &mut self.reconnect_timer { + match timer.poll().context(crate::error::TimerReconnect)? { + futures::Async::NotReady => { + return Ok(true); + } + _ => {} + } + } + + Ok(false) + } + + fn handle_successful_connection(&mut self, s: S) -> Result<()> { + self.last_server_time = std::time::Instant::now(); + + let (rs, ws) = s.split(); + self.rsock = ReadSocket::Connected( + crate::protocol::FramedReader::new(rs, self.buffer_size), + ); + self.wsock = WriteSocket::Connected( + crate::protocol::FramedWriter::new(ws, self.buffer_size), + ); + + self.size = Some(crate::term::Size::get()?); + + self.to_send.clear(); + self.send_message(crate::protocol::Message::login_plain( + &self.username, + &self.term_type, + self.size.as_ref().unwrap(), + )); + for msg in &self.on_connect { + self.to_send.push_back(msg.clone()); + } + + self.reset_reconnect_timer(); + + Ok(()) + } + + fn handle_message( + &mut self, + msg: crate::protocol::Message, + ) -> Result<crate::component_future::Poll<Event>> { + match msg { + crate::protocol::Message::LoggedIn { .. } => { + Ok(crate::component_future::Poll::Event(Event::Connect( + self.size.clone().unwrap(), + ))) + } + crate::protocol::Message::Heartbeat => { + Ok(crate::component_future::Poll::DidWork) + } + _ => Ok(crate::component_future::Poll::Event( + Event::ServerMessage(msg), + )), + } + } } impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> @@ -227,90 +297,41 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> fn poll_reconnect_server( &mut self, ) -> Result<crate::component_future::Poll<Event>> { - match self.wsock { - WriteSocket::NotConnected | WriteSocket::Connecting(..) => {} - _ => { - let since_last_server = std::time::Instant::now() - .duration_since(self.last_server_time); - if since_last_server > HEARTBEAT_DURATION * 2 { - self.reconnect(); - return Ok(crate::component_future::Poll::Event( - Event::Disconnect, - )); - } - } - } - match &mut self.wsock { WriteSocket::NotConnected => { - if let Some(timer) = &mut self.reconnect_timer { - match timer - .poll() - .context(crate::error::TimerReconnect)? - { - futures::Async::Ready(..) => { - self.reconnect_timer = None; - } - futures::Async::NotReady => { - return Ok( - crate::component_future::Poll::NotReady, - ); - } - } + if self.should_wait_to_reconnect()? { + return Ok(crate::component_future::Poll::NotReady); } self.set_reconnect_timer(); self.wsock = WriteSocket::Connecting((self.connect)()); - - Ok(crate::component_future::Poll::DidWork) } WriteSocket::Connecting(ref mut fut) => match fut.poll() { Ok(futures::Async::Ready(s)) => { - let (rs, ws) = s.split(); - self.last_server_time = std::time::Instant::now(); - self.rsock = ReadSocket::Connected( - crate::protocol::FramedReader::new( - rs, - self.buffer_size, - ), - ); - self.wsock = WriteSocket::Connected( - crate::protocol::FramedWriter::new( - ws, - self.buffer_size, - ), - ); - - self.to_send.clear(); - - self.size = Some(crate::term::Size::get()?); - let msg = crate::protocol::Message::login_plain( - &self.username, - &self.term_type, - self.size.as_ref().unwrap(), - ); - self.to_send.push_back(msg); - - for msg in &self.on_connect { - self.to_send.push_back(msg.clone()); - } - - self.reset_reconnect_timer(); - - Ok(crate::component_future::Poll::DidWork) + self.handle_successful_connection(s)?; } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + return Ok(crate::component_future::Poll::NotReady); } Err(..) => { self.reconnect(); - Ok(crate::component_future::Poll::DidWork) + // not sending a disconnect event here because we never + // actually connected, so it'd just be spammy } }, WriteSocket::Connected(..) | WriteSocket::WritingMessage(..) => { - Ok(crate::component_future::Poll::NothingToDo) + if self.has_seen_server_recently() { + return Ok(crate::component_future::Poll::NothingToDo); + } else { + self.reconnect(); + return Ok(crate::component_future::Poll::Event( + Event::Disconnect, + )); + } } } + + Ok(crate::component_future::Poll::DidWork) } fn poll_read_server( @@ -321,9 +342,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> Ok(crate::component_future::Poll::NothingToDo) } ReadSocket::Connected(..) => { - let mut tmp = ReadSocket::NotConnected; - std::mem::swap(&mut self.rsock, &mut tmp); - if let ReadSocket::Connected(s) = tmp { + if let ReadSocket::Connected(s) = std::mem::replace( + &mut self.rsock, + ReadSocket::NotConnected, + ) { let fut = crate::protocol::Message::read_async(s); self.rsock = ReadSocket::ReadingMessage(Box::new(fut)); } else { @@ -335,19 +357,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> Ok(futures::Async::Ready((msg, s))) => { self.last_server_time = std::time::Instant::now(); self.rsock = ReadSocket::Connected(s); - match msg { - crate::protocol::Message::LoggedIn { .. } => { - Ok(crate::component_future::Poll::Event( - Event::Connect(self.size.clone().unwrap()), - )) - } - crate::protocol::Message::Heartbeat => { - Ok(crate::component_future::Poll::DidWork) - } - _ => Ok(crate::component_future::Poll::Event( - Event::ServerMessage(msg), - )), - } + self.handle_message(msg) } Ok(futures::Async::NotReady) => { Ok(crate::component_future::Poll::NotReady) @@ -374,19 +384,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> return Ok(crate::component_future::Poll::NothingToDo); } - let mut tmp = WriteSocket::NotConnected; - std::mem::swap(&mut self.wsock, &mut tmp); - if let WriteSocket::Connected(s) = tmp { - if let Some(msg) = self.to_send.pop_front() { - let fut = msg.write_async(s); - self.wsock = - WriteSocket::WritingMessage(Box::new(fut)); - } else { - unreachable!() - } + if let WriteSocket::Connected(s) = std::mem::replace( + &mut self.wsock, + WriteSocket::NotConnected, + ) { + let msg = self.to_send.pop_front().unwrap(); + let fut = msg.write_async(s); + self.wsock = WriteSocket::WritingMessage(Box::new(fut)); } else { unreachable!() } + Ok(crate::component_future::Poll::DidWork) } WriteSocket::WritingMessage(ref mut fut) => match fut.poll() { |