From 0410b649e7e485b60f02b080528116f67b888442 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 22 Nov 2019 07:41:15 -0500 Subject: send terminal output messages back to the web page --- teleterm/src/error.rs | 3 + teleterm/src/protocol.rs | 4 +- teleterm/src/web.rs | 144 ++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 130 insertions(+), 21 deletions(-) diff --git a/teleterm/src/error.rs b/teleterm/src/error.rs index 2e4b57a..9cc0ca7 100644 --- a/teleterm/src/error.rs +++ b/teleterm/src/error.rs @@ -300,6 +300,9 @@ pub enum Error { source: std::io::Error, }, + #[snafu(display("failed to serialize message as json: {}", source))] + SerializeMessage { source: serde_json::Error }, + #[snafu(display("received error from server: {}", message))] Server { message: String }, diff --git a/teleterm/src/protocol.rs b/teleterm/src/protocol.rs index 03ecd97..dd928e2 100644 --- a/teleterm/src/protocol.rs +++ b/teleterm/src/protocol.rs @@ -105,7 +105,7 @@ impl std::convert::TryFrom<&str> for AuthType { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] pub enum Auth { Plain { username: String }, RecurseCenter { id: Option }, @@ -183,7 +183,7 @@ impl std::convert::TryFrom for MessageType { // XXX https://github.com/rust-lang/rust/issues/64362 #[allow(dead_code)] -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] pub enum Message { Login { proto_version: u8, diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs index af0e458..202f577 100644 --- a/teleterm/src/web.rs +++ b/teleterm/src/web.rs @@ -246,11 +246,6 @@ fn handle_watch( } }; - let stream = stream - .context(crate::error::WebSocketAccept) - .map(|stream| stream.context(crate::error::WebSocket)) - .flatten_stream(); - let query_params = WatchQueryParams::borrow_from(&state); let address = "127.0.0.1:4144"; let (_, address) = @@ -270,7 +265,9 @@ fn handle_watch( let conn = Connection::new( gotham::state::request_id(&state), client, - Box::new(stream), + ConnectionState::Connecting(Box::new( + stream.context(crate::error::WebSocketAccept), + )), ); tokio::spawn(conn.map_err(|e| log::error!("{}", e))); @@ -286,6 +283,20 @@ fn handle_watch( } } +type WebSocketConnectionFuture = Box< + dyn futures::Future< + Item = tokio_tungstenite::WebSocketStream< + hyper::upgrade::Upgraded, + >, + Error = Error, + > + Send, +>; +type MessageSink = Box< + dyn futures::Sink< + SinkItem = tokio_tungstenite::tungstenite::protocol::Message, + SinkError = Error, + > + Send, +>; type MessageStream = Box< dyn futures::Stream< Item = tokio_tungstenite::tungstenite::protocol::Message, @@ -293,12 +304,57 @@ type MessageStream = Box< > + Send, >; +enum SenderState { + Temporary, + Connected(MessageSink), + Sending( + Box + Send>, + ), + Flushing( + Box + Send>, + ), +} + +enum ConnectionState { + Connecting(WebSocketConnectionFuture), + Connected(SenderState, MessageStream), +} + +impl ConnectionState { + fn sink(&mut self) -> Option<&mut MessageSink> { + match self { + Self::Connected(sender, _) => match sender { + SenderState::Connected(sink) => Some(sink), + _ => None, + }, + _ => None, + } + } + + fn send( + &mut self, + msg: tokio_tungstenite::tungstenite::protocol::Message, + ) { + match self { + Self::Connected(sender, _) => { + let fut = + match std::mem::replace(sender, SenderState::Temporary) { + SenderState::Connected(sink) => sink.send(msg), + _ => unreachable!(), + }; + *sender = SenderState::Sending(Box::new(fut)); + } + _ => unreachable!(), + } + } +} + struct Connection< S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, > { id: String, client: crate::client::Client, - stream: MessageStream, + conn: ConnectionState, } impl @@ -307,22 +363,34 @@ impl fn new( id: &str, client: crate::client::Client, - stream: MessageStream, + conn: ConnectionState, ) -> Self { Self { client, id: id.to_string(), - stream, + conn, } } fn handle_client_message( &mut self, msg: &crate::protocol::Message, - ) -> Result<()> { - // TODO + ) -> Result> + { log::info!("teleterm client message for {}: {:?}", self.id, msg); - Ok(()) + + match msg { + crate::protocol::Message::TerminalOutput { .. } => { + let json = serde_json::to_string(msg) + .context(crate::error::SerializeMessage)?; + Ok(Some( + tokio_tungstenite::tungstenite::protocol::Message::Text( + json, + ), + )) + } + _ => Ok(None), + } } fn handle_websocket_message( @@ -348,6 +416,12 @@ impl >] = &[&Self::poll_client, &Self::poll_websocket_stream]; fn poll_client(&mut self) -> component_future::Poll<(), Error> { + // don't start up the client until the websocket connection is fully + // established and isn't busy + if self.conn.sink().is_none() { + return Ok(component_future::Async::NothingToDo); + }; + match component_future::try_ready!(self.client.poll()).unwrap() { crate::client::Event::Disconnect => { // TODO: better reconnect handling? @@ -355,19 +429,51 @@ impl } crate::client::Event::Connect => {} crate::client::Event::ServerMessage(msg) => { - self.handle_client_message(&msg)?; + if let Some(msg) = self.handle_client_message(&msg)? { + self.conn.send(msg); + } } } Ok(component_future::Async::DidWork) } fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> { - if let Some(msg) = component_future::try_ready!(self.stream.poll()) { - self.handle_websocket_message(&msg)?; - Ok(component_future::Async::DidWork) - } else { - log::info!("disconnect for {}", self.id); - Ok(component_future::Async::Ready(())) + match &mut self.conn { + ConnectionState::Connecting(fut) => { + let stream = component_future::try_ready!(fut.poll()); + let (sink, stream) = stream.split(); + self.conn = ConnectionState::Connected( + SenderState::Connected(Box::new( + sink.sink_map_err(|e| Error::WebSocket { source: e }), + )), + Box::new(stream.context(crate::error::WebSocket)), + ); + Ok(component_future::Async::DidWork) + } + ConnectionState::Connected(sender, stream) => match sender { + SenderState::Temporary => unreachable!(), + SenderState::Connected(_) => { + if let Some(msg) = + component_future::try_ready!(stream.poll()) + { + self.handle_websocket_message(&msg)?; + Ok(component_future::Async::DidWork) + } else { + log::info!("disconnect for {}", self.id); + Ok(component_future::Async::Ready(())) + } + } + SenderState::Sending(fut) => { + let sink = component_future::try_ready!(fut.poll()); + *sender = SenderState::Flushing(Box::new(sink.flush())); + Ok(component_future::Async::DidWork) + } + SenderState::Flushing(fut) => { + let sink = component_future::try_ready!(fut.poll()); + *sender = SenderState::Connected(Box::new(sink)); + Ok(component_future::Async::DidWork) + } + }, } } } -- cgit v1.2.3-54-g00ecf