From fd9625037549f7047e35cef37a2f93630e7121e5 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 23 Nov 2019 07:55:58 -0500 Subject: refactor a bit --- teleterm/src/web.rs | 415 +--------------------------------------------------- 1 file changed, 6 insertions(+), 409 deletions(-) (limited to 'teleterm/src/web.rs') diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs index 0e9d914..991f543 100644 --- a/teleterm/src/web.rs +++ b/teleterm/src/web.rs @@ -1,20 +1,12 @@ +mod list; mod view; +mod watch; mod ws; use crate::prelude::*; use gotham::router::builder::{DefineSingleRoute as _, DrawRoutes as _}; use gotham::state::FromState as _; -use tokio_tungstenite::tungstenite; - -#[derive( - serde::Deserialize, - gotham_derive::StateData, - gotham_derive::StaticResponseExtender, -)] -struct WatchQueryParams { - id: String, -} #[derive(Clone, serde::Serialize, gotham_derive::StateData)] struct Config { @@ -44,31 +36,12 @@ impl Server { } } -impl Server { - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - (), - Error, - >] = &[&Self::poll_web_server]; - - fn poll_web_server(&mut self) -> component_future::Poll<(), Error> { - component_future::try_ready!(self - .server - .poll() - .map_err(|_| unreachable!())); - Ok(component_future::Async::Ready(())) - } -} - impl futures::Future for Server { type Item = (); type Error = Error; fn poll(&mut self) -> futures::Poll { - component_future::poll_future(self, Self::POLL_FNS) + self.server.poll().map_err(|_| unreachable!()) } } @@ -94,11 +67,11 @@ fn router(data: &Config) -> impl gotham::handler::NewHandler { route .get("/teleterm.css") .to(serve_static("text/css", &view::TELETERM_CSS)); - route.get("/list").to(handle_list); + route.get("/list").to(list::run); route .get("/watch") - .with_query_string_extractor::() - .to(handle_watch); + .with_query_string_extractor::() + .to(watch::run); }) } @@ -129,379 +102,3 @@ fn serve_template( (state, response) } } - -fn handle_list( - state: gotham::state::State, -) -> (gotham::state::State, hyper::Response) { - let config = Config::borrow_from(&state); - let (_, address) = config.server_address; - let connector: crate::client::Connector<_> = Box::new(move || { - Box::new( - tokio::net::tcp::TcpStream::connect(&address) - .context(crate::error::Connect { address }), - ) - }); - let client = crate::client::Client::list( - "teleterm-web", - connector, - &crate::protocol::Auth::plain("test"), - ); - let (w_sessions, r_sessions) = tokio::sync::oneshot::channel(); - let lister = Lister::new(client, w_sessions); - tokio::spawn(lister.map_err(|e| log::warn!("error listing: {}", e))); - match r_sessions.wait().unwrap() { - Ok(sessions) => { - let body = serde_json::to_string(&sessions).unwrap(); - (state, hyper::Response::new(hyper::Body::from(body))) - } - Err(e) => { - log::warn!("error retrieving sessions: {}", e); - ( - state, - hyper::Response::new(hyper::Body::from(format!( - "error retrieving sessions: {}", - e - ))), - ) - } - } -} - -struct Lister< - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, -> { - client: crate::client::Client, - w_sessions: Option< - tokio::sync::oneshot::Sender>>, - >, -} - -impl - Lister -{ - fn new( - client: crate::client::Client, - w_sessions: tokio::sync::oneshot::Sender< - Result>, - >, - ) -> Self { - Self { - client, - w_sessions: Some(w_sessions), - } - } - - fn server_message( - &mut self, - msg: crate::protocol::Message, - ) -> Option>> { - match msg { - crate::protocol::Message::Sessions { sessions } => { - Some(Ok(sessions)) - } - crate::protocol::Message::Disconnected => { - Some(Err(Error::ServerDisconnected)) - } - crate::protocol::Message::Error { msg } => { - Some(Err(Error::Server { message: msg })) - } - msg => Some(Err(crate::error::Error::UnexpectedMessage { - message: msg, - })), - } - } -} - -impl - Lister -{ - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - (), - Error, - >] = &[&Self::poll_client]; - - fn poll_client(&mut self) -> component_future::Poll<(), Error> { - match component_future::try_ready!(self.client.poll()).unwrap() { - crate::client::Event::Disconnect => { - let res = Err(Error::ServerDisconnected); - self.w_sessions.take().unwrap().send(res).unwrap(); - return Ok(component_future::Async::Ready(())); - } - crate::client::Event::Connect => { - self.client - .send_message(crate::protocol::Message::list_sessions()); - } - crate::client::Event::ServerMessage(msg) => { - if let Some(res) = self.server_message(msg) { - self.w_sessions.take().unwrap().send(res).unwrap(); - return Ok(component_future::Async::Ready(())); - } - } - } - Ok(component_future::Async::DidWork) - } -} - -impl - futures::Future for Lister -{ - type Item = (); - type Error = Error; - - fn poll(&mut self) -> futures::Poll { - component_future::poll_future(self, Self::POLL_FNS) - } -} - -fn handle_watch( - mut state: gotham::state::State, -) -> (gotham::state::State, hyper::Response) { - let body = hyper::Body::take_from(&mut state); - let headers = hyper::HeaderMap::take_from(&mut state); - let config = Config::borrow_from(&state); - if ws::requested(&headers) { - let (response, stream) = match ws::accept(&headers, body) { - Ok(res) => res, - Err(_) => { - log::error!("failed to accept websocket request"); - return ( - state, - hyper::Response::builder() - .status(hyper::StatusCode::BAD_REQUEST) - .body(hyper::Body::empty()) - .unwrap(), - ); - } - }; - - let query_params = WatchQueryParams::borrow_from(&state); - let (_, address) = config.server_address; - let connector: crate::client::Connector<_> = Box::new(move || { - Box::new( - tokio::net::tcp::TcpStream::connect(&address) - .context(crate::error::Connect { address }), - ) - }); - let client = crate::client::Client::watch( - "teleterm-web", - connector, - &crate::protocol::Auth::plain("test"), - &query_params.id, - ); - let conn = Connection::new( - gotham::state::request_id(&state), - client, - ConnectionState::Connecting(Box::new( - stream.context(crate::error::WebSocketAccept), - )), - ); - - tokio::spawn(conn.map_err(|e| log::error!("{}", e))); - - (state, response) - } else { - ( - state, - hyper::Response::new(hyper::Body::from( - "non-websocket request to websocket endpoint", - )), - ) - } -} - -type WebSocketConnectionFuture = Box< - dyn futures::Future< - Item = tokio_tungstenite::WebSocketStream< - hyper::upgrade::Upgraded, - >, - Error = Error, - > + Send, ->; -type MessageSink = Box< - dyn futures::Sink - + Send, ->; -type MessageStream = Box< - dyn futures::Stream + Send, ->; - -enum SenderState { - Temporary, - Connected(MessageSink), - Sending( - Box + Send>, - ), - Flushing( - Box + Send>, - ), -} - -enum ConnectionState { - Connecting(WebSocketConnectionFuture), - Connected(SenderState, MessageStream), -} - -impl ConnectionState { - fn sink(&mut self) -> Option<&mut MessageSink> { - match self { - Self::Connected(sender, _) => match sender { - SenderState::Connected(sink) => Some(sink), - _ => None, - }, - _ => None, - } - } - - fn send(&mut self, msg: tungstenite::Message) { - match self { - Self::Connected(sender, _) => { - let fut = - match std::mem::replace(sender, SenderState::Temporary) { - SenderState::Connected(sink) => sink.send(msg), - _ => unreachable!(), - }; - *sender = SenderState::Sending(Box::new(fut)); - } - _ => unreachable!(), - } - } -} - -struct Connection< - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, -> { - id: String, - client: crate::client::Client, - conn: ConnectionState, -} - -impl - Connection -{ - fn new( - id: &str, - client: crate::client::Client, - conn: ConnectionState, - ) -> Self { - Self { - client, - id: id.to_string(), - conn, - } - } - - fn handle_client_message( - &mut self, - msg: &crate::protocol::Message, - ) -> Result> { - match msg { - crate::protocol::Message::TerminalOutput { .. } - | crate::protocol::Message::Disconnected - | crate::protocol::Message::Resize { .. } => { - let json = serde_json::to_string(msg) - .context(crate::error::SerializeMessage)?; - Ok(Some(tungstenite::Message::Text(json))) - } - _ => Ok(None), - } - } - - fn handle_websocket_message( - &mut self, - msg: &tungstenite::Message, - ) -> Result<()> { - // TODO - log::info!("websocket stream message for {}: {:?}", self.id, msg); - Ok(()) - } -} - -impl - Connection -{ - const POLL_FNS: - &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) - -> component_future::Poll< - (), - Error, - >] = &[&Self::poll_client, &Self::poll_websocket_stream]; - - fn poll_client(&mut self) -> component_future::Poll<(), Error> { - // don't start up the client until the websocket connection is fully - // established and isn't busy - if self.conn.sink().is_none() { - return Ok(component_future::Async::NothingToDo); - }; - - match component_future::try_ready!(self.client.poll()).unwrap() { - crate::client::Event::Disconnect => { - // TODO: better reconnect handling? - return Ok(component_future::Async::Ready(())); - } - crate::client::Event::Connect => {} - crate::client::Event::ServerMessage(msg) => { - if let Some(msg) = self.handle_client_message(&msg)? { - self.conn.send(msg); - } - } - } - Ok(component_future::Async::DidWork) - } - - fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> { - match &mut self.conn { - ConnectionState::Connecting(fut) => { - let stream = component_future::try_ready!(fut.poll()); - let (sink, stream) = stream.split(); - self.conn = ConnectionState::Connected( - SenderState::Connected(Box::new( - sink.sink_map_err(|e| Error::WebSocket { source: e }), - )), - Box::new(stream.context(crate::error::WebSocket)), - ); - Ok(component_future::Async::DidWork) - } - ConnectionState::Connected(sender, stream) => match sender { - SenderState::Temporary => unreachable!(), - SenderState::Connected(_) => { - if let Some(msg) = - component_future::try_ready!(stream.poll()) - { - self.handle_websocket_message(&msg)?; - Ok(component_future::Async::DidWork) - } else { - log::info!("disconnect for {}", self.id); - Ok(component_future::Async::Ready(())) - } - } - SenderState::Sending(fut) => { - let sink = component_future::try_ready!(fut.poll()); - *sender = SenderState::Flushing(Box::new(sink.flush())); - Ok(component_future::Async::DidWork) - } - SenderState::Flushing(fut) => { - let sink = component_future::try_ready!(fut.poll()); - *sender = SenderState::Connected(Box::new(sink)); - Ok(component_future::Async::DidWork) - } - }, - } - } -} - -impl - futures::Future for Connection -{ - type Item = (); - type Error = Error; - - fn poll(&mut self) -> futures::Poll { - component_future::poll_future(self, Self::POLL_FNS) - } -} -- cgit v1.2.3-54-g00ecf