aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-22 07:41:15 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-22 07:41:15 -0500
commit0410b649e7e485b60f02b080528116f67b888442 (patch)
treec27d37d0748fdec22d5bb12c6e8b87cc63a06190
parent601a3bf1e085e3a28d0e91e3bd2f2d0de5b4ab62 (diff)
downloadteleterm-0410b649e7e485b60f02b080528116f67b888442.tar.gz
teleterm-0410b649e7e485b60f02b080528116f67b888442.zip
send terminal output messages back to the web page
-rw-r--r--teleterm/src/error.rs3
-rw-r--r--teleterm/src/protocol.rs4
-rw-r--r--teleterm/src/web.rs144
3 files changed, 130 insertions, 21 deletions
diff --git a/teleterm/src/error.rs b/teleterm/src/error.rs
index 2e4b57a..9cc0ca7 100644
--- a/teleterm/src/error.rs
+++ b/teleterm/src/error.rs
@@ -300,6 +300,9 @@ pub enum Error {
source: std::io::Error,
},
+ #[snafu(display("failed to serialize message as json: {}", source))]
+ SerializeMessage { source: serde_json::Error },
+
#[snafu(display("received error from server: {}", message))]
Server { message: String },
diff --git a/teleterm/src/protocol.rs b/teleterm/src/protocol.rs
index 03ecd97..dd928e2 100644
--- a/teleterm/src/protocol.rs
+++ b/teleterm/src/protocol.rs
@@ -105,7 +105,7 @@ impl std::convert::TryFrom<&str> for AuthType {
}
}
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
pub enum Auth {
Plain { username: String },
RecurseCenter { id: Option<String> },
@@ -183,7 +183,7 @@ impl std::convert::TryFrom<u8> for MessageType {
// XXX https://github.com/rust-lang/rust/issues/64362
#[allow(dead_code)]
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
pub enum Message {
Login {
proto_version: u8,
diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs
index af0e458..202f577 100644
--- a/teleterm/src/web.rs
+++ b/teleterm/src/web.rs
@@ -246,11 +246,6 @@ fn handle_watch(
}
};
- let stream = stream
- .context(crate::error::WebSocketAccept)
- .map(|stream| stream.context(crate::error::WebSocket))
- .flatten_stream();
-
let query_params = WatchQueryParams::borrow_from(&state);
let address = "127.0.0.1:4144";
let (_, address) =
@@ -270,7 +265,9 @@ fn handle_watch(
let conn = Connection::new(
gotham::state::request_id(&state),
client,
- Box::new(stream),
+ ConnectionState::Connecting(Box::new(
+ stream.context(crate::error::WebSocketAccept),
+ )),
);
tokio::spawn(conn.map_err(|e| log::error!("{}", e)));
@@ -286,6 +283,20 @@ fn handle_watch(
}
}
+type WebSocketConnectionFuture = Box<
+ dyn futures::Future<
+ Item = tokio_tungstenite::WebSocketStream<
+ hyper::upgrade::Upgraded,
+ >,
+ Error = Error,
+ > + Send,
+>;
+type MessageSink = Box<
+ dyn futures::Sink<
+ SinkItem = tokio_tungstenite::tungstenite::protocol::Message,
+ SinkError = Error,
+ > + Send,
+>;
type MessageStream = Box<
dyn futures::Stream<
Item = tokio_tungstenite::tungstenite::protocol::Message,
@@ -293,12 +304,57 @@ type MessageStream = Box<
> + Send,
>;
+enum SenderState {
+ Temporary,
+ Connected(MessageSink),
+ Sending(
+ Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
+ ),
+ Flushing(
+ Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
+ ),
+}
+
+enum ConnectionState {
+ Connecting(WebSocketConnectionFuture),
+ Connected(SenderState, MessageStream),
+}
+
+impl ConnectionState {
+ fn sink(&mut self) -> Option<&mut MessageSink> {
+ match self {
+ Self::Connected(sender, _) => match sender {
+ SenderState::Connected(sink) => Some(sink),
+ _ => None,
+ },
+ _ => None,
+ }
+ }
+
+ fn send(
+ &mut self,
+ msg: tokio_tungstenite::tungstenite::protocol::Message,
+ ) {
+ match self {
+ Self::Connected(sender, _) => {
+ let fut =
+ match std::mem::replace(sender, SenderState::Temporary) {
+ SenderState::Connected(sink) => sink.send(msg),
+ _ => unreachable!(),
+ };
+ *sender = SenderState::Sending(Box::new(fut));
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
struct Connection<
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
> {
id: String,
client: crate::client::Client<S>,
- stream: MessageStream,
+ conn: ConnectionState,
}
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
@@ -307,22 +363,34 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn new(
id: &str,
client: crate::client::Client<S>,
- stream: MessageStream,
+ conn: ConnectionState,
) -> Self {
Self {
client,
id: id.to_string(),
- stream,
+ conn,
}
}
fn handle_client_message(
&mut self,
msg: &crate::protocol::Message,
- ) -> Result<()> {
- // TODO
+ ) -> Result<Option<tokio_tungstenite::tungstenite::protocol::Message>>
+ {
log::info!("teleterm client message for {}: {:?}", self.id, msg);
- Ok(())
+
+ match msg {
+ crate::protocol::Message::TerminalOutput { .. } => {
+ let json = serde_json::to_string(msg)
+ .context(crate::error::SerializeMessage)?;
+ Ok(Some(
+ tokio_tungstenite::tungstenite::protocol::Message::Text(
+ json,
+ ),
+ ))
+ }
+ _ => Ok(None),
+ }
}
fn handle_websocket_message(
@@ -348,6 +416,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
>] = &[&Self::poll_client, &Self::poll_websocket_stream];
fn poll_client(&mut self) -> component_future::Poll<(), Error> {
+ // don't start up the client until the websocket connection is fully
+ // established and isn't busy
+ if self.conn.sink().is_none() {
+ return Ok(component_future::Async::NothingToDo);
+ };
+
match component_future::try_ready!(self.client.poll()).unwrap() {
crate::client::Event::Disconnect => {
// TODO: better reconnect handling?
@@ -355,19 +429,51 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
crate::client::Event::Connect => {}
crate::client::Event::ServerMessage(msg) => {
- self.handle_client_message(&msg)?;
+ if let Some(msg) = self.handle_client_message(&msg)? {
+ self.conn.send(msg);
+ }
}
}
Ok(component_future::Async::DidWork)
}
fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> {
- if let Some(msg) = component_future::try_ready!(self.stream.poll()) {
- self.handle_websocket_message(&msg)?;
- Ok(component_future::Async::DidWork)
- } else {
- log::info!("disconnect for {}", self.id);
- Ok(component_future::Async::Ready(()))
+ match &mut self.conn {
+ ConnectionState::Connecting(fut) => {
+ let stream = component_future::try_ready!(fut.poll());
+ let (sink, stream) = stream.split();
+ self.conn = ConnectionState::Connected(
+ SenderState::Connected(Box::new(
+ sink.sink_map_err(|e| Error::WebSocket { source: e }),
+ )),
+ Box::new(stream.context(crate::error::WebSocket)),
+ );
+ Ok(component_future::Async::DidWork)
+ }
+ ConnectionState::Connected(sender, stream) => match sender {
+ SenderState::Temporary => unreachable!(),
+ SenderState::Connected(_) => {
+ if let Some(msg) =
+ component_future::try_ready!(stream.poll())
+ {
+ self.handle_websocket_message(&msg)?;
+ Ok(component_future::Async::DidWork)
+ } else {
+ log::info!("disconnect for {}", self.id);
+ Ok(component_future::Async::Ready(()))
+ }
+ }
+ SenderState::Sending(fut) => {
+ let sink = component_future::try_ready!(fut.poll());
+ *sender = SenderState::Flushing(Box::new(sink.flush()));
+ Ok(component_future::Async::DidWork)
+ }
+ SenderState::Flushing(fut) => {
+ let sink = component_future::try_ready!(fut.poll());
+ *sender = SenderState::Connected(Box::new(sink));
+ Ok(component_future::Async::DidWork)
+ }
+ },
}
}
}