aboutsummaryrefslogtreecommitdiffstats
path: root/teleterm
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-21 06:45:43 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-21 06:45:43 -0500
commit4b4cad4f9c73e0f4ec6aa8e0242b4377fa995d82 (patch)
tree33fc5013ff8dd09ae1b69f9c3ae039048c576ff4 /teleterm
parent51ccce9f254781e6a4a206861a362a98eba2469a (diff)
downloadteleterm-4b4cad4f9c73e0f4ec6aa8e0242b4377fa995d82.tar.gz
teleterm-4b4cad4f9c73e0f4ec6aa8e0242b4377fa995d82.zip
pull connection stream out into a struct
so that it can store more state
Diffstat (limited to 'teleterm')
-rw-r--r--teleterm/src/web.rs77
1 files changed, 64 insertions, 13 deletions
diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs
index 0aa54cf..8513629 100644
--- a/teleterm/src/web.rs
+++ b/teleterm/src/web.rs
@@ -105,18 +105,18 @@ fn handle_websocket_connection(
);
}
};
+
let stream = stream
.context(crate::error::WebSocketAccept)
.map(|stream| stream.context(crate::error::WebSocket))
.flatten_stream();
-
- let req_id = gotham::state::request_id(&state).to_owned();
- tokio::spawn(
- stream
- .for_each(move |msg| handle_websocket_message(&req_id, &msg))
- .map_err(|e| log::error!("{}", e)),
+ let conn = Connection::new(
+ gotham::state::request_id(&state),
+ Box::new(stream),
);
+ tokio::spawn(conn.map_err(|e| log::error!("{}", e)));
+
(state, response)
} else {
(
@@ -128,11 +128,62 @@ fn handle_websocket_connection(
}
}
-fn handle_websocket_message(
- req_id: &str,
- msg: &tokio_tungstenite::tungstenite::protocol::Message,
-) -> Result<()> {
- // TODO
- log::info!("websocket stream message for {}: {:?}", req_id, msg);
- Ok(())
+type MessageStream = Box<
+ dyn futures::Stream<
+ Item = tokio_tungstenite::tungstenite::protocol::Message,
+ Error = Error,
+ > + Send,
+>;
+
+struct Connection {
+ id: String,
+ stream: MessageStream,
+}
+
+impl Connection {
+ fn new(id: &str, stream: MessageStream) -> Self {
+ Self {
+ id: id.to_string(),
+ stream,
+ }
+ }
+
+ fn handle_websocket_message(
+ &mut self,
+ msg: &tokio_tungstenite::tungstenite::protocol::Message,
+ ) -> Result<()> {
+ // TODO
+ log::info!("websocket stream message for {}: {:?}", self.id, msg);
+ Ok(())
+ }
+}
+
+impl Connection {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[&Self::poll_websocket_stream];
+
+ fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> {
+ if let Some(msg) = component_future::try_ready!(self.stream.poll()) {
+ self.handle_websocket_message(&msg)?;
+ Ok(component_future::Async::DidWork)
+ } else {
+ log::info!("disconnect for {}", self.id);
+ Ok(component_future::Async::Ready(()))
+ }
+ }
+}
+
+impl futures::Future for Connection {
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
}