diff options
author | Jesse Luehrs <doy@tozt.net> | 2019-10-15 06:53:44 -0400 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2019-10-15 06:53:44 -0400 |
commit | f93d1edc1a854269ec4e5d8853a26f9f8fd2140d (patch) | |
tree | 19a9a3f06aaef74ffe3407b6849d1cbe32ea790c | |
parent | 5f33bb16071f68313c12a2fe9a06514bade894f4 (diff) | |
download | teleterm-f93d1edc1a854269ec4e5d8853a26f9f8fd2140d.tar.gz teleterm-f93d1edc1a854269ec4e5d8853a26f9f8fd2140d.zip |
add a processing state to server connections
this allows us to do long-lived actions in response to messages
-rw-r--r-- | src/server.rs | 58 |
1 files changed, 50 insertions, 8 deletions
diff --git a/src/server.rs b/src/server.rs index 9031425..f83f4df 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,6 +18,15 @@ enum ReadSocket< > + Send, >, ), + Processing( + crate::protocol::FramedReadHalf<S>, + Box< + dyn futures::future::Future< + Item = (ConnectionState, crate::protocol::Message), + Error = Error, + > + Send, + >, + ), } enum WriteSocket< @@ -499,7 +508,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> &mut self, conn: &mut Connection<S>, message: crate::protocol::Message, - ) -> Result<()> { + ) -> Result< + Option< + Box< + dyn futures::future::Future< + Item = (ConnectionState, crate::protocol::Message), + Error = Error, + > + Send, + >, + >, + > { if let crate::protocol::Message::TerminalOutput { .. } = message { // do nothing, we expect TerminalOutput spam } else { @@ -517,16 +535,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> match conn.state { ConnectionState::Accepted { .. } => { - self.handle_accepted_message(conn, message) + self.handle_accepted_message(conn, message).map(|_| None) } ConnectionState::LoggedIn { .. } => { - self.handle_logged_in_message(conn, message) + self.handle_logged_in_message(conn, message).map(|_| None) } ConnectionState::Streaming { .. } => { - self.handle_streaming_message(conn, message) + self.handle_streaming_message(conn, message).map(|_| None) } ConnectionState::Watching { .. } => { - self.handle_watching_message(conn, message) + self.handle_watching_message(conn, message).map(|_| None) } } } @@ -552,10 +570,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> Some(ReadSocket::Reading(fut)) => match fut.poll() { Ok(futures::Async::Ready((msg, s))) => { let res = self.handle_message(conn, msg); - if res.is_err() { - conn.close(res); + match res { + Ok(Some(fut)) => { + conn.rsock = Some(ReadSocket::Processing(s, fut)); + } + Ok(None) => { + conn.rsock = Some(ReadSocket::Connected(s)); + } + e @ Err(..) => { + conn.close(e.map(|_| ())); + } } - conn.rsock = Some(ReadSocket::Connected(s)); Ok(crate::component_future::Poll::DidWork) } Ok(futures::Async::NotReady) => { @@ -563,6 +588,23 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> } Err(e) => classify_connection_error(e), }, + Some(ReadSocket::Processing(_, fut)) => match fut.poll()? { + futures::Async::Ready((state, msg)) => { + if let Some(ReadSocket::Processing(s, _)) = + conn.rsock.take() + { + conn.state = state; + conn.send_message(msg); + conn.rsock = Some(ReadSocket::Connected(s)); + } else { + unreachable!() + } + Ok(crate::component_future::Poll::DidWork) + } + futures::Async::NotReady => { + Ok(crate::component_future::Poll::NotReady) + } + }, _ => Ok(crate::component_future::Poll::NothingToDo), } } |