From fd9625037549f7047e35cef37a2f93630e7121e5 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 23 Nov 2019 07:55:58 -0500 Subject: refactor a bit --- teleterm/src/web.rs | 415 +--------------------------------------------- teleterm/src/web/list.rs | 136 +++++++++++++++ teleterm/src/web/watch.rs | 267 +++++++++++++++++++++++++++++ 3 files changed, 409 insertions(+), 409 deletions(-) create mode 100644 teleterm/src/web/list.rs create mode 100644 teleterm/src/web/watch.rs diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs index 0e9d914..991f543 100644 --- a/teleterm/src/web.rs +++ b/teleterm/src/web.rs @@ -1,20 +1,12 @@ +mod list; mod view; +mod watch; mod ws; use crate::prelude::*; use gotham::router::builder::{DefineSingleRoute as _, DrawRoutes as _}; use gotham::state::FromState as _; -use tokio_tungstenite::tungstenite; - -#[derive( - serde::Deserialize, - gotham_derive::StateData, - gotham_derive::StaticResponseExtender, -)] -struct WatchQueryParams { - id: String, -} #[derive(Clone, serde::Serialize, gotham_derive::StateData)] struct Config { @@ -44,31 +36,12 @@ impl Server { } } -impl Server { - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - (), - Error, - >] = &[&Self::poll_web_server]; - - fn poll_web_server(&mut self) -> component_future::Poll<(), Error> { - component_future::try_ready!(self - .server - .poll() - .map_err(|_| unreachable!())); - Ok(component_future::Async::Ready(())) - } -} - impl futures::Future for Server { type Item = (); type Error = Error; fn poll(&mut self) -> futures::Poll { - component_future::poll_future(self, Self::POLL_FNS) + self.server.poll().map_err(|_| unreachable!()) } } @@ -94,11 +67,11 @@ fn router(data: &Config) -> impl gotham::handler::NewHandler { route .get("/teleterm.css") .to(serve_static("text/css", &view::TELETERM_CSS)); - route.get("/list").to(handle_list); + route.get("/list").to(list::run); route .get("/watch") - .with_query_string_extractor::() - .to(handle_watch); + .with_query_string_extractor::() + .to(watch::run); }) } @@ -129,379 +102,3 @@ fn serve_template( (state, response) } } - -fn handle_list( - state: gotham::state::State, -) -> (gotham::state::State, hyper::Response) { - let config = Config::borrow_from(&state); - let (_, address) = config.server_address; - let connector: crate::client::Connector<_> = Box::new(move || { - Box::new( - tokio::net::tcp::TcpStream::connect(&address) - .context(crate::error::Connect { address }), - ) - }); - let client = crate::client::Client::list( - "teleterm-web", - connector, - &crate::protocol::Auth::plain("test"), - ); - let (w_sessions, r_sessions) = tokio::sync::oneshot::channel(); - let lister = Lister::new(client, w_sessions); - tokio::spawn(lister.map_err(|e| log::warn!("error listing: {}", e))); - match r_sessions.wait().unwrap() { - Ok(sessions) => { - let body = serde_json::to_string(&sessions).unwrap(); - (state, hyper::Response::new(hyper::Body::from(body))) - } - Err(e) => { - log::warn!("error retrieving sessions: {}", e); - ( - state, - hyper::Response::new(hyper::Body::from(format!( - "error retrieving sessions: {}", - e - ))), - ) - } - } -} - -struct Lister< - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, -> { - client: crate::client::Client, - w_sessions: Option< - tokio::sync::oneshot::Sender>>, - >, -} - -impl - Lister -{ - fn new( - client: crate::client::Client, - w_sessions: tokio::sync::oneshot::Sender< - Result>, - >, - ) -> Self { - Self { - client, - w_sessions: Some(w_sessions), - } - } - - fn server_message( - &mut self, - msg: crate::protocol::Message, - ) -> Option>> { - match msg { - crate::protocol::Message::Sessions { sessions } => { - Some(Ok(sessions)) - } - crate::protocol::Message::Disconnected => { - Some(Err(Error::ServerDisconnected)) - } - crate::protocol::Message::Error { msg } => { - Some(Err(Error::Server { message: msg })) - } - msg => Some(Err(crate::error::Error::UnexpectedMessage { - message: msg, - })), - } - } -} - -impl - Lister -{ - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - (), - Error, - >] = &[&Self::poll_client]; - - fn poll_client(&mut self) -> component_future::Poll<(), Error> { - match component_future::try_ready!(self.client.poll()).unwrap() { - crate::client::Event::Disconnect => { - let res = Err(Error::ServerDisconnected); - self.w_sessions.take().unwrap().send(res).unwrap(); - return Ok(component_future::Async::Ready(())); - } - crate::client::Event::Connect => { - self.client - .send_message(crate::protocol::Message::list_sessions()); - } - crate::client::Event::ServerMessage(msg) => { - if let Some(res) = self.server_message(msg) { - self.w_sessions.take().unwrap().send(res).unwrap(); - return Ok(component_future::Async::Ready(())); - } - } - } - Ok(component_future::Async::DidWork) - } -} - -impl - futures::Future for Lister -{ - type Item = (); - type Error = Error; - - fn poll(&mut self) -> futures::Poll { - component_future::poll_future(self, Self::POLL_FNS) - } -} - -fn handle_watch( - mut state: gotham::state::State, -) -> (gotham::state::State, hyper::Response) { - let body = hyper::Body::take_from(&mut state); - let headers = hyper::HeaderMap::take_from(&mut state); - let config = Config::borrow_from(&state); - if ws::requested(&headers) { - let (response, stream) = match ws::accept(&headers, body) { - Ok(res) => res, - Err(_) => { - log::error!("failed to accept websocket request"); - return ( - state, - hyper::Response::builder() - .status(hyper::StatusCode::BAD_REQUEST) - .body(hyper::Body::empty()) - .unwrap(), - ); - } - }; - - let query_params = WatchQueryParams::borrow_from(&state); - let (_, address) = config.server_address; - let connector: crate::client::Connector<_> = Box::new(move || { - Box::new( - tokio::net::tcp::TcpStream::connect(&address) - .context(crate::error::Connect { address }), - ) - }); - let client = crate::client::Client::watch( - "teleterm-web", - connector, - &crate::protocol::Auth::plain("test"), - &query_params.id, - ); - let conn = Connection::new( - gotham::state::request_id(&state), - client, - ConnectionState::Connecting(Box::new( - stream.context(crate::error::WebSocketAccept), - )), - ); - - tokio::spawn(conn.map_err(|e| log::error!("{}", e))); - - (state, response) - } else { - ( - state, - hyper::Response::new(hyper::Body::from( - "non-websocket request to websocket endpoint", - )), - ) - } -} - -type WebSocketConnectionFuture = Box< - dyn futures::Future< - Item = tokio_tungstenite::WebSocketStream< - hyper::upgrade::Upgraded, - >, - Error = Error, - > + Send, ->; -type MessageSink = Box< - dyn futures::Sink - + Send, ->; -type MessageStream = Box< - dyn futures::Stream + Send, ->; - -enum SenderState { - Temporary, - Connected(MessageSink), - Sending( - Box + Send>, - ), - Flushing( - Box + Send>, - ), -} - -enum ConnectionState { - Connecting(WebSocketConnectionFuture), - Connected(SenderState, MessageStream), -} - -impl ConnectionState { - fn sink(&mut self) -> Option<&mut MessageSink> { - match self { - Self::Connected(sender, _) => match sender { - SenderState::Connected(sink) => Some(sink), - _ => None, - }, - _ => None, - } - } - - fn send(&mut self, msg: tungstenite::Message) { - match self { - Self::Connected(sender, _) => { - let fut = - match std::mem::replace(sender, SenderState::Temporary) { - SenderState::Connected(sink) => sink.send(msg), - _ => unreachable!(), - }; - *sender = SenderState::Sending(Box::new(fut)); - } - _ => unreachable!(), - } - } -} - -struct Connection< - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, -> { - id: String, - client: crate::client::Client, - conn: ConnectionState, -} - -impl - Connection -{ - fn new( - id: &str, - client: crate::client::Client, - conn: ConnectionState, - ) -> Self { - Self { - client, - id: id.to_string(), - conn, - } - } - - fn handle_client_message( - &mut self, - msg: &crate::protocol::Message, - ) -> Result> { - match msg { - crate::protocol::Message::TerminalOutput { .. } - | crate::protocol::Message::Disconnected - | crate::protocol::Message::Resize { .. } => { - let json = serde_json::to_string(msg) - .context(crate::error::SerializeMessage)?; - Ok(Some(tungstenite::Message::Text(json))) - } - _ => Ok(None), - } - } - - fn handle_websocket_message( - &mut self, - msg: &tungstenite::Message, - ) -> Result<()> { - // TODO - log::info!("websocket stream message for {}: {:?}", self.id, msg); - Ok(()) - } -} - -impl - Connection -{ - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - (), - Error, - >] = &[&Self::poll_client, &Self::poll_websocket_stream]; - - fn poll_client(&mut self) -> component_future::Poll<(), Error> { - // don't start up the client until the websocket connection is fully - // established and isn't busy - if self.conn.sink().is_none() { - return Ok(component_future::Async::NothingToDo); - }; - - match component_future::try_ready!(self.client.poll()).unwrap() { - crate::client::Event::Disconnect => { - // TODO: better reconnect handling? - return Ok(component_future::Async::Ready(())); - } - crate::client::Event::Connect => {} - crate::client::Event::ServerMessage(msg) => { - if let Some(msg) = self.handle_client_message(&msg)? { - self.conn.send(msg); - } - } - } - Ok(component_future::Async::DidWork) - } - - fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> { - match &mut self.conn { - ConnectionState::Connecting(fut) => { - let stream = component_future::try_ready!(fut.poll()); - let (sink, stream) = stream.split(); - self.conn = ConnectionState::Connected( - SenderState::Connected(Box::new( - sink.sink_map_err(|e| Error::WebSocket { source: e }), - )), - Box::new(stream.context(crate::error::WebSocket)), - ); - Ok(component_future::Async::DidWork) - } - ConnectionState::Connected(sender, stream) => match sender { - SenderState::Temporary => unreachable!(), - SenderState::Connected(_) => { - if let Some(msg) = - component_future::try_ready!(stream.poll()) - { - self.handle_websocket_message(&msg)?; - Ok(component_future::Async::DidWork) - } else { - log::info!("disconnect for {}", self.id); - Ok(component_future::Async::Ready(())) - } - } - SenderState::Sending(fut) => { - let sink = component_future::try_ready!(fut.poll()); - *sender = SenderState::Flushing(Box::new(sink.flush())); - Ok(component_future::Async::DidWork) - } - SenderState::Flushing(fut) => { - let sink = component_future::try_ready!(fut.poll()); - *sender = SenderState::Connected(Box::new(sink)); - Ok(component_future::Async::DidWork) - } - }, - } - } -} - -impl - futures::Future for Connection -{ - type Item = (); - type Error = Error; - - fn poll(&mut self) -> futures::Poll { - component_future::poll_future(self, Self::POLL_FNS) - } -} diff --git a/teleterm/src/web/list.rs b/teleterm/src/web/list.rs new file mode 100644 index 0000000..9c1bcc6 --- /dev/null +++ b/teleterm/src/web/list.rs @@ -0,0 +1,136 @@ +use crate::prelude::*; + +use gotham::state::FromState as _; + +pub fn run( + state: gotham::state::State, +) -> (gotham::state::State, hyper::Response) { + let config = crate::web::Config::borrow_from(&state); + + let (_, address) = config.server_address; + let connector: crate::client::Connector<_> = Box::new(move || { + Box::new( + tokio::net::tcp::TcpStream::connect(&address) + .context(crate::error::Connect { address }), + ) + }); + let client = crate::client::Client::list( + "teleterm-web", + connector, + &crate::protocol::Auth::plain("test"), + ); + + let (w_sessions, r_sessions) = tokio::sync::oneshot::channel(); + + tokio::spawn( + Lister::new(client, w_sessions) + .map_err(|e| log::warn!("error listing: {}", e)), + ); + + match r_sessions.wait().unwrap() { + Ok(sessions) => { + let body = serde_json::to_string(&sessions).unwrap(); + (state, hyper::Response::new(hyper::Body::from(body))) + } + Err(e) => { + log::warn!("error retrieving sessions: {}", e); + ( + state, + hyper::Response::new(hyper::Body::from(format!( + "error retrieving sessions: {}", + e + ))), + ) + } + } +} + +struct Lister< + S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, +> { + client: crate::client::Client, + w_sessions: Option< + tokio::sync::oneshot::Sender>>, + >, +} + +impl + Lister +{ + fn new( + client: crate::client::Client, + w_sessions: tokio::sync::oneshot::Sender< + Result>, + >, + ) -> Self { + Self { + client, + w_sessions: Some(w_sessions), + } + } + + fn server_message( + &mut self, + msg: crate::protocol::Message, + ) -> Option>> { + match msg { + crate::protocol::Message::Sessions { sessions } => { + Some(Ok(sessions)) + } + crate::protocol::Message::Disconnected => { + Some(Err(Error::ServerDisconnected)) + } + crate::protocol::Message::Error { msg } => { + Some(Err(Error::Server { message: msg })) + } + msg => Some(Err(crate::error::Error::UnexpectedMessage { + message: msg, + })), + } + } +} + +impl + Lister +{ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + (), + Error, + >] = &[&Self::poll_client]; + + fn poll_client(&mut self) -> component_future::Poll<(), Error> { + match component_future::try_ready!(self.client.poll()).unwrap() { + crate::client::Event::Disconnect => { + let res = Err(Error::ServerDisconnected); + self.w_sessions.take().unwrap().send(res).unwrap(); + return Ok(component_future::Async::Ready(())); + } + crate::client::Event::Connect => { + self.client + .send_message(crate::protocol::Message::list_sessions()); + } + crate::client::Event::ServerMessage(msg) => { + if let Some(res) = self.server_message(msg) { + self.w_sessions.take().unwrap().send(res).unwrap(); + return Ok(component_future::Async::Ready(())); + } + } + } + Ok(component_future::Async::DidWork) + } +} + +impl + futures::Future for Lister +{ + type Item = (); + type Error = Error; + + fn poll(&mut self) -> futures::Poll { + component_future::poll_future(self, Self::POLL_FNS) + } +} diff --git a/teleterm/src/web/watch.rs b/teleterm/src/web/watch.rs new file mode 100644 index 0000000..720e9f8 --- /dev/null +++ b/teleterm/src/web/watch.rs @@ -0,0 +1,267 @@ +use crate::prelude::*; + +use gotham::state::FromState as _; +use tokio_tungstenite::tungstenite; + +#[derive( + serde::Deserialize, + gotham_derive::StateData, + gotham_derive::StaticResponseExtender, +)] +pub struct QueryParams { + id: String, +} + +pub fn run( + mut state: gotham::state::State, +) -> (gotham::state::State, hyper::Response) { + let body = hyper::Body::take_from(&mut state); + let headers = hyper::HeaderMap::take_from(&mut state); + let config = crate::web::Config::borrow_from(&state); + + if crate::web::ws::requested(&headers) { + let (response, stream) = match crate::web::ws::accept(&headers, body) + { + Ok(res) => res, + Err(_) => { + log::error!("failed to accept websocket request"); + return ( + state, + hyper::Response::builder() + .status(hyper::StatusCode::BAD_REQUEST) + .body(hyper::Body::empty()) + .unwrap(), + ); + } + }; + + let query_params = QueryParams::borrow_from(&state); + + let (_, address) = config.server_address; + let connector: crate::client::Connector<_> = Box::new(move || { + Box::new( + tokio::net::tcp::TcpStream::connect(&address) + .context(crate::error::Connect { address }), + ) + }); + let client = crate::client::Client::watch( + "teleterm-web", + connector, + &crate::protocol::Auth::plain("test"), + &query_params.id, + ); + + tokio::spawn( + Connection::new( + gotham::state::request_id(&state), + client, + ConnectionState::Connecting(Box::new( + stream.context(crate::error::WebSocketAccept), + )), + ) + .map_err(|e| log::error!("{}", e)), + ); + + (state, response) + } else { + ( + state, + hyper::Response::new(hyper::Body::from( + "non-websocket request to websocket endpoint", + )), + ) + } +} + +type WebSocketConnectionFuture = Box< + dyn futures::Future< + Item = tokio_tungstenite::WebSocketStream< + hyper::upgrade::Upgraded, + >, + Error = Error, + > + Send, +>; +type MessageSink = Box< + dyn futures::Sink + + Send, +>; +type MessageStream = Box< + dyn futures::Stream + Send, +>; + +enum SenderState { + Temporary, + Connected(MessageSink), + Sending( + Box + Send>, + ), + Flushing( + Box + Send>, + ), +} + +enum ConnectionState { + Connecting(WebSocketConnectionFuture), + Connected(SenderState, MessageStream), +} + +impl ConnectionState { + fn sink(&mut self) -> Option<&mut MessageSink> { + match self { + Self::Connected(sender, _) => match sender { + SenderState::Connected(sink) => Some(sink), + _ => None, + }, + _ => None, + } + } + + fn send(&mut self, msg: tungstenite::Message) { + match self { + Self::Connected(sender, _) => { + let fut = + match std::mem::replace(sender, SenderState::Temporary) { + SenderState::Connected(sink) => sink.send(msg), + _ => unreachable!(), + }; + *sender = SenderState::Sending(Box::new(fut)); + } + _ => unreachable!(), + } + } +} + +struct Connection< + S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, +> { + id: String, + client: crate::client::Client, + conn: ConnectionState, +} + +impl + Connection +{ + fn new( + id: &str, + client: crate::client::Client, + conn: ConnectionState, + ) -> Self { + Self { + client, + id: id.to_string(), + conn, + } + } + + fn handle_client_message( + &mut self, + msg: &crate::protocol::Message, + ) -> Result> { + match msg { + crate::protocol::Message::TerminalOutput { .. } + | crate::protocol::Message::Disconnected + | crate::protocol::Message::Resize { .. } => { + let json = serde_json::to_string(msg) + .context(crate::error::SerializeMessage)?; + Ok(Some(tungstenite::Message::Text(json))) + } + _ => Ok(None), + } + } + + fn handle_websocket_message( + &mut self, + msg: &tungstenite::Message, + ) -> Result<()> { + // TODO + log::info!("websocket stream message for {}: {:?}", self.id, msg); + Ok(()) + } +} + +impl + Connection +{ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + (), + Error, + >] = &[&Self::poll_client, &Self::poll_websocket_stream]; + + fn poll_client(&mut self) -> component_future::Poll<(), Error> { + // don't start up the client until the websocket connection is fully + // established and isn't busy + if self.conn.sink().is_none() { + return Ok(component_future::Async::NothingToDo); + }; + + match component_future::try_ready!(self.client.poll()).unwrap() { + crate::client::Event::Disconnect => { + // TODO: better reconnect handling? + return Ok(component_future::Async::Ready(())); + } + crate::client::Event::Connect => {} + crate::client::Event::ServerMessage(msg) => { + if let Some(msg) = self.handle_client_message(&msg)? { + self.conn.send(msg); + } + } + } + Ok(component_future::Async::DidWork) + } + + fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> { + match &mut self.conn { + ConnectionState::Connecting(fut) => { + let stream = component_future::try_ready!(fut.poll()); + let (sink, stream) = stream.split(); + self.conn = ConnectionState::Connected( + SenderState::Connected(Box::new( + sink.sink_map_err(|e| Error::WebSocket { source: e }), + )), + Box::new(stream.context(crate::error::WebSocket)), + ); + Ok(component_future::Async::DidWork) + } + ConnectionState::Connected(sender, stream) => match sender { + SenderState::Temporary => unreachable!(), + SenderState::Connected(_) => { + if let Some(msg) = + component_future::try_ready!(stream.poll()) + { + self.handle_websocket_message(&msg)?; + Ok(component_future::Async::DidWork) + } else { + log::info!("disconnect for {}", self.id); + Ok(component_future::Async::Ready(())) + } + } + SenderState::Sending(fut) => { + let sink = component_future::try_ready!(fut.poll()); + *sender = SenderState::Flushing(Box::new(sink.flush())); + Ok(component_future::Async::DidWork) + } + SenderState::Flushing(fut) => { + let sink = component_future::try_ready!(fut.poll()); + *sender = SenderState::Connected(Box::new(sink)); + Ok(component_future::Async::DidWork) + } + }, + } + } +} + +impl + futures::Future for Connection +{ + type Item = (); + type Error = Error; + + fn poll(&mut self) -> futures::Poll { + component_future::poll_future(self, Self::POLL_FNS) + } +} -- cgit v1.2.3-54-g00ecf