aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-15 06:53:44 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-15 06:53:44 -0400
commitf93d1edc1a854269ec4e5d8853a26f9f8fd2140d (patch)
tree19a9a3f06aaef74ffe3407b6849d1cbe32ea790c
parent5f33bb16071f68313c12a2fe9a06514bade894f4 (diff)
downloadteleterm-f93d1edc1a854269ec4e5d8853a26f9f8fd2140d.tar.gz
teleterm-f93d1edc1a854269ec4e5d8853a26f9f8fd2140d.zip
add a processing state to server connections
this allows us to do long-lived actions in response to messages
-rw-r--r--src/server.rs58
1 files changed, 50 insertions, 8 deletions
diff --git a/src/server.rs b/src/server.rs
index 9031425..f83f4df 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -18,6 +18,15 @@ enum ReadSocket<
> + Send,
>,
),
+ Processing(
+ crate::protocol::FramedReadHalf<S>,
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ ),
}
enum WriteSocket<
@@ -499,7 +508,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&mut self,
conn: &mut Connection<S>,
message: crate::protocol::Message,
- ) -> Result<()> {
+ ) -> Result<
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ > {
if let crate::protocol::Message::TerminalOutput { .. } = message {
// do nothing, we expect TerminalOutput spam
} else {
@@ -517,16 +535,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
match conn.state {
ConnectionState::Accepted { .. } => {
- self.handle_accepted_message(conn, message)
+ self.handle_accepted_message(conn, message).map(|_| None)
}
ConnectionState::LoggedIn { .. } => {
- self.handle_logged_in_message(conn, message)
+ self.handle_logged_in_message(conn, message).map(|_| None)
}
ConnectionState::Streaming { .. } => {
- self.handle_streaming_message(conn, message)
+ self.handle_streaming_message(conn, message).map(|_| None)
}
ConnectionState::Watching { .. } => {
- self.handle_watching_message(conn, message)
+ self.handle_watching_message(conn, message).map(|_| None)
}
}
}
@@ -552,10 +570,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Some(ReadSocket::Reading(fut)) => match fut.poll() {
Ok(futures::Async::Ready((msg, s))) => {
let res = self.handle_message(conn, msg);
- if res.is_err() {
- conn.close(res);
+ match res {
+ Ok(Some(fut)) => {
+ conn.rsock = Some(ReadSocket::Processing(s, fut));
+ }
+ Ok(None) => {
+ conn.rsock = Some(ReadSocket::Connected(s));
+ }
+ e @ Err(..) => {
+ conn.close(e.map(|_| ()));
+ }
}
- conn.rsock = Some(ReadSocket::Connected(s));
Ok(crate::component_future::Poll::DidWork)
}
Ok(futures::Async::NotReady) => {
@@ -563,6 +588,23 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
Err(e) => classify_connection_error(e),
},
+ Some(ReadSocket::Processing(_, fut)) => match fut.poll()? {
+ futures::Async::Ready((state, msg)) => {
+ if let Some(ReadSocket::Processing(s, _)) =
+ conn.rsock.take()
+ {
+ conn.state = state;
+ conn.send_message(msg);
+ conn.rsock = Some(ReadSocket::Connected(s));
+ } else {
+ unreachable!()
+ }
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Poll::NotReady)
+ }
+ },
_ => Ok(crate::component_future::Poll::NothingToDo),
}
}