diff options
-rw-r--r-- | teleterm/src/web.rs | 77 |
1 files changed, 64 insertions, 13 deletions
diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs index 0aa54cf..8513629 100644 --- a/teleterm/src/web.rs +++ b/teleterm/src/web.rs @@ -105,18 +105,18 @@ fn handle_websocket_connection( ); } }; + let stream = stream .context(crate::error::WebSocketAccept) .map(|stream| stream.context(crate::error::WebSocket)) .flatten_stream(); - - let req_id = gotham::state::request_id(&state).to_owned(); - tokio::spawn( - stream - .for_each(move |msg| handle_websocket_message(&req_id, &msg)) - .map_err(|e| log::error!("{}", e)), + let conn = Connection::new( + gotham::state::request_id(&state), + Box::new(stream), ); + tokio::spawn(conn.map_err(|e| log::error!("{}", e))); + (state, response) } else { ( @@ -128,11 +128,62 @@ fn handle_websocket_connection( } } -fn handle_websocket_message( - req_id: &str, - msg: &tokio_tungstenite::tungstenite::protocol::Message, -) -> Result<()> { - // TODO - log::info!("websocket stream message for {}: {:?}", req_id, msg); - Ok(()) +type MessageStream = Box< + dyn futures::Stream< + Item = tokio_tungstenite::tungstenite::protocol::Message, + Error = Error, + > + Send, +>; + +struct Connection { + id: String, + stream: MessageStream, +} + +impl Connection { + fn new(id: &str, stream: MessageStream) -> Self { + Self { + id: id.to_string(), + stream, + } + } + + fn handle_websocket_message( + &mut self, + msg: &tokio_tungstenite::tungstenite::protocol::Message, + ) -> Result<()> { + // TODO + log::info!("websocket stream message for {}: {:?}", self.id, msg); + Ok(()) + } +} + +impl Connection { + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + (), + Error, + >] = &[&Self::poll_websocket_stream]; + + fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> { + if let Some(msg) = component_future::try_ready!(self.stream.poll()) { + self.handle_websocket_message(&msg)?; + Ok(component_future::Async::DidWork) + } else { + log::info!("disconnect for {}", self.id); + Ok(component_future::Async::Ready(())) + } + } +} + +impl futures::Future for Connection { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { + component_future::poll_future(self, Self::POLL_FNS) + } } |