aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-15 03:48:12 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-15 03:48:12 -0400
commit405c0d50b20b8a35bd279aa32bdeb6e061d210de (patch)
tree39210fb210a31e864299bbb4f5983960105432c7
parent00b51411b2a1fb97d6a66a2898443d8b9b355296 (diff)
downloadteleterm-405c0d50b20b8a35bd279aa32bdeb6e061d210de.tar.gz
teleterm-405c0d50b20b8a35bd279aa32bdeb6e061d210de.zip
various client cleanups
-rw-r--r--src/client.rs190
1 files changed, 99 insertions, 91 deletions
diff --git a/src/client.rs b/src/client.rs
index b51784e..fce3630 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -206,6 +206,76 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.reconnect_timer = None;
self.reconnect_backoff_amount = RECONNECT_BACKOFF_BASE;
}
+
+ fn has_seen_server_recently(&self) -> bool {
+ let since_last_server =
+ std::time::Instant::now().duration_since(self.last_server_time);
+ if since_last_server > HEARTBEAT_DURATION * 2 {
+ return false;
+ }
+
+ true
+ }
+
+ fn should_wait_to_reconnect(&mut self) -> Result<bool> {
+ if let Some(timer) = &mut self.reconnect_timer {
+ match timer.poll().context(crate::error::TimerReconnect)? {
+ futures::Async::NotReady => {
+ return Ok(true);
+ }
+ _ => {}
+ }
+ }
+
+ Ok(false)
+ }
+
+ fn handle_successful_connection(&mut self, s: S) -> Result<()> {
+ self.last_server_time = std::time::Instant::now();
+
+ let (rs, ws) = s.split();
+ self.rsock = ReadSocket::Connected(
+ crate::protocol::FramedReader::new(rs, self.buffer_size),
+ );
+ self.wsock = WriteSocket::Connected(
+ crate::protocol::FramedWriter::new(ws, self.buffer_size),
+ );
+
+ self.size = Some(crate::term::Size::get()?);
+
+ self.to_send.clear();
+ self.send_message(crate::protocol::Message::login_plain(
+ &self.username,
+ &self.term_type,
+ self.size.as_ref().unwrap(),
+ ));
+ for msg in &self.on_connect {
+ self.to_send.push_back(msg.clone());
+ }
+
+ self.reset_reconnect_timer();
+
+ Ok(())
+ }
+
+ fn handle_message(
+ &mut self,
+ msg: crate::protocol::Message,
+ ) -> Result<crate::component_future::Poll<Event>> {
+ match msg {
+ crate::protocol::Message::LoggedIn { .. } => {
+ Ok(crate::component_future::Poll::Event(Event::Connect(
+ self.size.clone().unwrap(),
+ )))
+ }
+ crate::protocol::Message::Heartbeat => {
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ _ => Ok(crate::component_future::Poll::Event(
+ Event::ServerMessage(msg),
+ )),
+ }
+ }
}
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
@@ -227,90 +297,41 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_reconnect_server(
&mut self,
) -> Result<crate::component_future::Poll<Event>> {
- match self.wsock {
- WriteSocket::NotConnected | WriteSocket::Connecting(..) => {}
- _ => {
- let since_last_server = std::time::Instant::now()
- .duration_since(self.last_server_time);
- if since_last_server > HEARTBEAT_DURATION * 2 {
- self.reconnect();
- return Ok(crate::component_future::Poll::Event(
- Event::Disconnect,
- ));
- }
- }
- }
-
match &mut self.wsock {
WriteSocket::NotConnected => {
- if let Some(timer) = &mut self.reconnect_timer {
- match timer
- .poll()
- .context(crate::error::TimerReconnect)?
- {
- futures::Async::Ready(..) => {
- self.reconnect_timer = None;
- }
- futures::Async::NotReady => {
- return Ok(
- crate::component_future::Poll::NotReady,
- );
- }
- }
+ if self.should_wait_to_reconnect()? {
+ return Ok(crate::component_future::Poll::NotReady);
}
self.set_reconnect_timer();
self.wsock = WriteSocket::Connecting((self.connect)());
-
- Ok(crate::component_future::Poll::DidWork)
}
WriteSocket::Connecting(ref mut fut) => match fut.poll() {
Ok(futures::Async::Ready(s)) => {
- let (rs, ws) = s.split();
- self.last_server_time = std::time::Instant::now();
- self.rsock = ReadSocket::Connected(
- crate::protocol::FramedReader::new(
- rs,
- self.buffer_size,
- ),
- );
- self.wsock = WriteSocket::Connected(
- crate::protocol::FramedWriter::new(
- ws,
- self.buffer_size,
- ),
- );
-
- self.to_send.clear();
-
- self.size = Some(crate::term::Size::get()?);
- let msg = crate::protocol::Message::login_plain(
- &self.username,
- &self.term_type,
- self.size.as_ref().unwrap(),
- );
- self.to_send.push_back(msg);
-
- for msg in &self.on_connect {
- self.to_send.push_back(msg.clone());
- }
-
- self.reset_reconnect_timer();
-
- Ok(crate::component_future::Poll::DidWork)
+ self.handle_successful_connection(s)?;
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ return Ok(crate::component_future::Poll::NotReady);
}
Err(..) => {
self.reconnect();
- Ok(crate::component_future::Poll::DidWork)
+ // not sending a disconnect event here because we never
+ // actually connected, so it'd just be spammy
}
},
WriteSocket::Connected(..) | WriteSocket::WritingMessage(..) => {
- Ok(crate::component_future::Poll::NothingToDo)
+ if self.has_seen_server_recently() {
+ return Ok(crate::component_future::Poll::NothingToDo);
+ } else {
+ self.reconnect();
+ return Ok(crate::component_future::Poll::Event(
+ Event::Disconnect,
+ ));
+ }
}
}
+
+ Ok(crate::component_future::Poll::DidWork)
}
fn poll_read_server(
@@ -321,9 +342,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Ok(crate::component_future::Poll::NothingToDo)
}
ReadSocket::Connected(..) => {
- let mut tmp = ReadSocket::NotConnected;
- std::mem::swap(&mut self.rsock, &mut tmp);
- if let ReadSocket::Connected(s) = tmp {
+ if let ReadSocket::Connected(s) = std::mem::replace(
+ &mut self.rsock,
+ ReadSocket::NotConnected,
+ ) {
let fut = crate::protocol::Message::read_async(s);
self.rsock = ReadSocket::ReadingMessage(Box::new(fut));
} else {
@@ -335,19 +357,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Ok(futures::Async::Ready((msg, s))) => {
self.last_server_time = std::time::Instant::now();
self.rsock = ReadSocket::Connected(s);
- match msg {
- crate::protocol::Message::LoggedIn { .. } => {
- Ok(crate::component_future::Poll::Event(
- Event::Connect(self.size.clone().unwrap()),
- ))
- }
- crate::protocol::Message::Heartbeat => {
- Ok(crate::component_future::Poll::DidWork)
- }
- _ => Ok(crate::component_future::Poll::Event(
- Event::ServerMessage(msg),
- )),
- }
+ self.handle_message(msg)
}
Ok(futures::Async::NotReady) => {
Ok(crate::component_future::Poll::NotReady)
@@ -374,19 +384,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
return Ok(crate::component_future::Poll::NothingToDo);
}
- let mut tmp = WriteSocket::NotConnected;
- std::mem::swap(&mut self.wsock, &mut tmp);
- if let WriteSocket::Connected(s) = tmp {
- if let Some(msg) = self.to_send.pop_front() {
- let fut = msg.write_async(s);
- self.wsock =
- WriteSocket::WritingMessage(Box::new(fut));
- } else {
- unreachable!()
- }
+ if let WriteSocket::Connected(s) = std::mem::replace(
+ &mut self.wsock,
+ WriteSocket::NotConnected,
+ ) {
+ let msg = self.to_send.pop_front().unwrap();
+ let fut = msg.write_async(s);
+ self.wsock = WriteSocket::WritingMessage(Box::new(fut));
} else {
unreachable!()
}
+
Ok(crate::component_future::Poll::DidWork)
}
WriteSocket::WritingMessage(ref mut fut) => match fut.poll() {