aboutsummaryrefslogtreecommitdiffstats
path: root/teleterm/src/web/watch.rs
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-23 07:55:58 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-23 07:55:58 -0500
commitfd9625037549f7047e35cef37a2f93630e7121e5 (patch)
treebf1862c1f52ce90f8138c4d4ada5674a67431242 /teleterm/src/web/watch.rs
parent547185d4eee66e09776b928bafc873794e9f3bc7 (diff)
downloadteleterm-fd9625037549f7047e35cef37a2f93630e7121e5.tar.gz
teleterm-fd9625037549f7047e35cef37a2f93630e7121e5.zip
refactor a bit
Diffstat (limited to 'teleterm/src/web/watch.rs')
-rw-r--r--teleterm/src/web/watch.rs267
1 files changed, 267 insertions, 0 deletions
diff --git a/teleterm/src/web/watch.rs b/teleterm/src/web/watch.rs
new file mode 100644
index 0000000..720e9f8
--- /dev/null
+++ b/teleterm/src/web/watch.rs
@@ -0,0 +1,267 @@
+use crate::prelude::*;
+
+use gotham::state::FromState as _;
+use tokio_tungstenite::tungstenite;
+
+#[derive(
+ serde::Deserialize,
+ gotham_derive::StateData,
+ gotham_derive::StaticResponseExtender,
+)]
+pub struct QueryParams {
+ id: String,
+}
+
+pub fn run(
+ mut state: gotham::state::State,
+) -> (gotham::state::State, hyper::Response<hyper::Body>) {
+ let body = hyper::Body::take_from(&mut state);
+ let headers = hyper::HeaderMap::take_from(&mut state);
+ let config = crate::web::Config::borrow_from(&state);
+
+ if crate::web::ws::requested(&headers) {
+ let (response, stream) = match crate::web::ws::accept(&headers, body)
+ {
+ Ok(res) => res,
+ Err(_) => {
+ log::error!("failed to accept websocket request");
+ return (
+ state,
+ hyper::Response::builder()
+ .status(hyper::StatusCode::BAD_REQUEST)
+ .body(hyper::Body::empty())
+ .unwrap(),
+ );
+ }
+ };
+
+ let query_params = QueryParams::borrow_from(&state);
+
+ let (_, address) = config.server_address;
+ let connector: crate::client::Connector<_> = Box::new(move || {
+ Box::new(
+ tokio::net::tcp::TcpStream::connect(&address)
+ .context(crate::error::Connect { address }),
+ )
+ });
+ let client = crate::client::Client::watch(
+ "teleterm-web",
+ connector,
+ &crate::protocol::Auth::plain("test"),
+ &query_params.id,
+ );
+
+ tokio::spawn(
+ Connection::new(
+ gotham::state::request_id(&state),
+ client,
+ ConnectionState::Connecting(Box::new(
+ stream.context(crate::error::WebSocketAccept),
+ )),
+ )
+ .map_err(|e| log::error!("{}", e)),
+ );
+
+ (state, response)
+ } else {
+ (
+ state,
+ hyper::Response::new(hyper::Body::from(
+ "non-websocket request to websocket endpoint",
+ )),
+ )
+ }
+}
+
+type WebSocketConnectionFuture = Box<
+ dyn futures::Future<
+ Item = tokio_tungstenite::WebSocketStream<
+ hyper::upgrade::Upgraded,
+ >,
+ Error = Error,
+ > + Send,
+>;
+type MessageSink = Box<
+ dyn futures::Sink<SinkItem = tungstenite::Message, SinkError = Error>
+ + Send,
+>;
+type MessageStream = Box<
+ dyn futures::Stream<Item = tungstenite::Message, Error = Error> + Send,
+>;
+
+enum SenderState {
+ Temporary,
+ Connected(MessageSink),
+ Sending(
+ Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
+ ),
+ Flushing(
+ Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
+ ),
+}
+
+enum ConnectionState {
+ Connecting(WebSocketConnectionFuture),
+ Connected(SenderState, MessageStream),
+}
+
+impl ConnectionState {
+ fn sink(&mut self) -> Option<&mut MessageSink> {
+ match self {
+ Self::Connected(sender, _) => match sender {
+ SenderState::Connected(sink) => Some(sink),
+ _ => None,
+ },
+ _ => None,
+ }
+ }
+
+ fn send(&mut self, msg: tungstenite::Message) {
+ match self {
+ Self::Connected(sender, _) => {
+ let fut =
+ match std::mem::replace(sender, SenderState::Temporary) {
+ SenderState::Connected(sink) => sink.send(msg),
+ _ => unreachable!(),
+ };
+ *sender = SenderState::Sending(Box::new(fut));
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+struct Connection<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ id: String,
+ client: crate::client::Client<S>,
+ conn: ConnectionState,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Connection<S>
+{
+ fn new(
+ id: &str,
+ client: crate::client::Client<S>,
+ conn: ConnectionState,
+ ) -> Self {
+ Self {
+ client,
+ id: id.to_string(),
+ conn,
+ }
+ }
+
+ fn handle_client_message(
+ &mut self,
+ msg: &crate::protocol::Message,
+ ) -> Result<Option<tungstenite::Message>> {
+ match msg {
+ crate::protocol::Message::TerminalOutput { .. }
+ | crate::protocol::Message::Disconnected
+ | crate::protocol::Message::Resize { .. } => {
+ let json = serde_json::to_string(msg)
+ .context(crate::error::SerializeMessage)?;
+ Ok(Some(tungstenite::Message::Text(json)))
+ }
+ _ => Ok(None),
+ }
+ }
+
+ fn handle_websocket_message(
+ &mut self,
+ msg: &tungstenite::Message,
+ ) -> Result<()> {
+ // TODO
+ log::info!("websocket stream message for {}: {:?}", self.id, msg);
+ Ok(())
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Connection<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[&Self::poll_client, &Self::poll_websocket_stream];
+
+ fn poll_client(&mut self) -> component_future::Poll<(), Error> {
+ // don't start up the client until the websocket connection is fully
+ // established and isn't busy
+ if self.conn.sink().is_none() {
+ return Ok(component_future::Async::NothingToDo);
+ };
+
+ match component_future::try_ready!(self.client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ // TODO: better reconnect handling?
+ return Ok(component_future::Async::Ready(()));
+ }
+ crate::client::Event::Connect => {}
+ crate::client::Event::ServerMessage(msg) => {
+ if let Some(msg) = self.handle_client_message(&msg)? {
+ self.conn.send(msg);
+ }
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> {
+ match &mut self.conn {
+ ConnectionState::Connecting(fut) => {
+ let stream = component_future::try_ready!(fut.poll());
+ let (sink, stream) = stream.split();
+ self.conn = ConnectionState::Connected(
+ SenderState::Connected(Box::new(
+ sink.sink_map_err(|e| Error::WebSocket { source: e }),
+ )),
+ Box::new(stream.context(crate::error::WebSocket)),
+ );
+ Ok(component_future::Async::DidWork)
+ }
+ ConnectionState::Connected(sender, stream) => match sender {
+ SenderState::Temporary => unreachable!(),
+ SenderState::Connected(_) => {
+ if let Some(msg) =
+ component_future::try_ready!(stream.poll())
+ {
+ self.handle_websocket_message(&msg)?;
+ Ok(component_future::Async::DidWork)
+ } else {
+ log::info!("disconnect for {}", self.id);
+ Ok(component_future::Async::Ready(()))
+ }
+ }
+ SenderState::Sending(fut) => {
+ let sink = component_future::try_ready!(fut.poll());
+ *sender = SenderState::Flushing(Box::new(sink.flush()));
+ Ok(component_future::Async::DidWork)
+ }
+ SenderState::Flushing(fut) => {
+ let sink = component_future::try_ready!(fut.poll());
+ *sender = SenderState::Connected(Box::new(sink));
+ Ok(component_future::Async::DidWork)
+ }
+ },
+ }
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::Future for Connection<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}