aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-23 07:55:58 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-23 07:55:58 -0500
commitfd9625037549f7047e35cef37a2f93630e7121e5 (patch)
treebf1862c1f52ce90f8138c4d4ada5674a67431242
parent547185d4eee66e09776b928bafc873794e9f3bc7 (diff)
downloadteleterm-fd9625037549f7047e35cef37a2f93630e7121e5.tar.gz
teleterm-fd9625037549f7047e35cef37a2f93630e7121e5.zip
refactor a bit
-rw-r--r--teleterm/src/web.rs415
-rw-r--r--teleterm/src/web/list.rs136
-rw-r--r--teleterm/src/web/watch.rs267
3 files changed, 409 insertions, 409 deletions
diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs
index 0e9d914..991f543 100644
--- a/teleterm/src/web.rs
+++ b/teleterm/src/web.rs
@@ -1,20 +1,12 @@
+mod list;
mod view;
+mod watch;
mod ws;
use crate::prelude::*;
use gotham::router::builder::{DefineSingleRoute as _, DrawRoutes as _};
use gotham::state::FromState as _;
-use tokio_tungstenite::tungstenite;
-
-#[derive(
- serde::Deserialize,
- gotham_derive::StateData,
- gotham_derive::StaticResponseExtender,
-)]
-struct WatchQueryParams {
- id: String,
-}
#[derive(Clone, serde::Serialize, gotham_derive::StateData)]
struct Config {
@@ -44,31 +36,12 @@ impl Server {
}
}
-impl Server {
- const POLL_FNS:
- &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- )
- -> component_future::Poll<
- (),
- Error,
- >] = &[&Self::poll_web_server];
-
- fn poll_web_server(&mut self) -> component_future::Poll<(), Error> {
- component_future::try_ready!(self
- .server
- .poll()
- .map_err(|_| unreachable!()));
- Ok(component_future::Async::Ready(()))
- }
-}
-
impl futures::Future for Server {
type Item = ();
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- component_future::poll_future(self, Self::POLL_FNS)
+ self.server.poll().map_err(|_| unreachable!())
}
}
@@ -94,11 +67,11 @@ fn router(data: &Config) -> impl gotham::handler::NewHandler {
route
.get("/teleterm.css")
.to(serve_static("text/css", &view::TELETERM_CSS));
- route.get("/list").to(handle_list);
+ route.get("/list").to(list::run);
route
.get("/watch")
- .with_query_string_extractor::<WatchQueryParams>()
- .to(handle_watch);
+ .with_query_string_extractor::<watch::QueryParams>()
+ .to(watch::run);
})
}
@@ -129,379 +102,3 @@ fn serve_template(
(state, response)
}
}
-
-fn handle_list(
- state: gotham::state::State,
-) -> (gotham::state::State, hyper::Response<hyper::Body>) {
- let config = Config::borrow_from(&state);
- let (_, address) = config.server_address;
- let connector: crate::client::Connector<_> = Box::new(move || {
- Box::new(
- tokio::net::tcp::TcpStream::connect(&address)
- .context(crate::error::Connect { address }),
- )
- });
- let client = crate::client::Client::list(
- "teleterm-web",
- connector,
- &crate::protocol::Auth::plain("test"),
- );
- let (w_sessions, r_sessions) = tokio::sync::oneshot::channel();
- let lister = Lister::new(client, w_sessions);
- tokio::spawn(lister.map_err(|e| log::warn!("error listing: {}", e)));
- match r_sessions.wait().unwrap() {
- Ok(sessions) => {
- let body = serde_json::to_string(&sessions).unwrap();
- (state, hyper::Response::new(hyper::Body::from(body)))
- }
- Err(e) => {
- log::warn!("error retrieving sessions: {}", e);
- (
- state,
- hyper::Response::new(hyper::Body::from(format!(
- "error retrieving sessions: {}",
- e
- ))),
- )
- }
- }
-}
-
-struct Lister<
- S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
-> {
- client: crate::client::Client<S>,
- w_sessions: Option<
- tokio::sync::oneshot::Sender<Result<Vec<crate::protocol::Session>>>,
- >,
-}
-
-impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
- Lister<S>
-{
- fn new(
- client: crate::client::Client<S>,
- w_sessions: tokio::sync::oneshot::Sender<
- Result<Vec<crate::protocol::Session>>,
- >,
- ) -> Self {
- Self {
- client,
- w_sessions: Some(w_sessions),
- }
- }
-
- fn server_message(
- &mut self,
- msg: crate::protocol::Message,
- ) -> Option<Result<Vec<crate::protocol::Session>>> {
- match msg {
- crate::protocol::Message::Sessions { sessions } => {
- Some(Ok(sessions))
- }
- crate::protocol::Message::Disconnected => {
- Some(Err(Error::ServerDisconnected))
- }
- crate::protocol::Message::Error { msg } => {
- Some(Err(Error::Server { message: msg }))
- }
- msg => Some(Err(crate::error::Error::UnexpectedMessage {
- message: msg,
- })),
- }
- }
-}
-
-impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
- Lister<S>
-{
- const POLL_FNS:
- &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- )
- -> component_future::Poll<
- (),
- Error,
- >] = &[&Self::poll_client];
-
- fn poll_client(&mut self) -> component_future::Poll<(), Error> {
- match component_future::try_ready!(self.client.poll()).unwrap() {
- crate::client::Event::Disconnect => {
- let res = Err(Error::ServerDisconnected);
- self.w_sessions.take().unwrap().send(res).unwrap();
- return Ok(component_future::Async::Ready(()));
- }
- crate::client::Event::Connect => {
- self.client
- .send_message(crate::protocol::Message::list_sessions());
- }
- crate::client::Event::ServerMessage(msg) => {
- if let Some(res) = self.server_message(msg) {
- self.w_sessions.take().unwrap().send(res).unwrap();
- return Ok(component_future::Async::Ready(()));
- }
- }
- }
- Ok(component_future::Async::DidWork)
- }
-}
-
-impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
- futures::Future for Lister<S>
-{
- type Item = ();
- type Error = Error;
-
- fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- component_future::poll_future(self, Self::POLL_FNS)
- }
-}
-
-fn handle_watch(
- mut state: gotham::state::State,
-) -> (gotham::state::State, hyper::Response<hyper::Body>) {
- let body = hyper::Body::take_from(&mut state);
- let headers = hyper::HeaderMap::take_from(&mut state);
- let config = Config::borrow_from(&state);
- if ws::requested(&headers) {
- let (response, stream) = match ws::accept(&headers, body) {
- Ok(res) => res,
- Err(_) => {
- log::error!("failed to accept websocket request");
- return (
- state,
- hyper::Response::builder()
- .status(hyper::StatusCode::BAD_REQUEST)
- .body(hyper::Body::empty())
- .unwrap(),
- );
- }
- };
-
- let query_params = WatchQueryParams::borrow_from(&state);
- let (_, address) = config.server_address;
- let connector: crate::client::Connector<_> = Box::new(move || {
- Box::new(
- tokio::net::tcp::TcpStream::connect(&address)
- .context(crate::error::Connect { address }),
- )
- });
- let client = crate::client::Client::watch(
- "teleterm-web",
- connector,
- &crate::protocol::Auth::plain("test"),
- &query_params.id,
- );
- let conn = Connection::new(
- gotham::state::request_id(&state),
- client,
- ConnectionState::Connecting(Box::new(
- stream.context(crate::error::WebSocketAccept),
- )),
- );
-
- tokio::spawn(conn.map_err(|e| log::error!("{}", e)));
-
- (state, response)
- } else {
- (
- state,
- hyper::Response::new(hyper::Body::from(
- "non-websocket request to websocket endpoint",
- )),
- )
- }
-}
-
-type WebSocketConnectionFuture = Box<
- dyn futures::Future<
- Item = tokio_tungstenite::WebSocketStream<
- hyper::upgrade::Upgraded,
- >,
- Error = Error,
- > + Send,
->;
-type MessageSink = Box<
- dyn futures::Sink<SinkItem = tungstenite::Message, SinkError = Error>
- + Send,
->;
-type MessageStream = Box<
- dyn futures::Stream<Item = tungstenite::Message, Error = Error> + Send,
->;
-
-enum SenderState {
- Temporary,
- Connected(MessageSink),
- Sending(
- Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
- ),
- Flushing(
- Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
- ),
-}
-
-enum ConnectionState {
- Connecting(WebSocketConnectionFuture),
- Connected(SenderState, MessageStream),
-}
-
-impl ConnectionState {
- fn sink(&mut self) -> Option<&mut MessageSink> {
- match self {
- Self::Connected(sender, _) => match sender {
- SenderState::Connected(sink) => Some(sink),
- _ => None,
- },
- _ => None,
- }
- }
-
- fn send(&mut self, msg: tungstenite::Message) {
- match self {
- Self::Connected(sender, _) => {
- let fut =
- match std::mem::replace(sender, SenderState::Temporary) {
- SenderState::Connected(sink) => sink.send(msg),
- _ => unreachable!(),
- };
- *sender = SenderState::Sending(Box::new(fut));
- }
- _ => unreachable!(),
- }
- }
-}
-
-struct Connection<
- S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
-> {
- id: String,
- client: crate::client::Client<S>,
- conn: ConnectionState,
-}
-
-impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
- Connection<S>
-{
- fn new(
- id: &str,
- client: crate::client::Client<S>,
- conn: ConnectionState,
- ) -> Self {
- Self {
- client,
- id: id.to_string(),
- conn,
- }
- }
-
- fn handle_client_message(
- &mut self,
- msg: &crate::protocol::Message,
- ) -> Result<Option<tungstenite::Message>> {
- match msg {
- crate::protocol::Message::TerminalOutput { .. }
- | crate::protocol::Message::Disconnected
- | crate::protocol::Message::Resize { .. } => {
- let json = serde_json::to_string(msg)
- .context(crate::error::SerializeMessage)?;
- Ok(Some(tungstenite::Message::Text(json)))
- }
- _ => Ok(None),
- }
- }
-
- fn handle_websocket_message(
- &mut self,
- msg: &tungstenite::Message,
- ) -> Result<()> {
- // TODO
- log::info!("websocket stream message for {}: {:?}", self.id, msg);
- Ok(())
- }
-}
-
-impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
- Connection<S>
-{
- const POLL_FNS:
- &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- )
- -> component_future::Poll<
- (),
- Error,
- >] = &[&Self::poll_client, &Self::poll_websocket_stream];
-
- fn poll_client(&mut self) -> component_future::Poll<(), Error> {
- // don't start up the client until the websocket connection is fully
- // established and isn't busy
- if self.conn.sink().is_none() {
- return Ok(component_future::Async::NothingToDo);
- };
-
- match component_future::try_ready!(self.client.poll()).unwrap() {
- crate::client::Event::Disconnect => {
- // TODO: better reconnect handling?
- return Ok(component_future::Async::Ready(()));
- }
- crate::client::Event::Connect => {}
- crate::client::Event::ServerMessage(msg) => {
- if let Some(msg) = self.handle_client_message(&msg)? {
- self.conn.send(msg);
- }
- }
- }
- Ok(component_future::Async::DidWork)
- }
-
- fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> {
- match &mut self.conn {
- ConnectionState::Connecting(fut) => {
- let stream = component_future::try_ready!(fut.poll());
- let (sink, stream) = stream.split();
- self.conn = ConnectionState::Connected(
- SenderState::Connected(Box::new(
- sink.sink_map_err(|e| Error::WebSocket { source: e }),
- )),
- Box::new(stream.context(crate::error::WebSocket)),
- );
- Ok(component_future::Async::DidWork)
- }
- ConnectionState::Connected(sender, stream) => match sender {
- SenderState::Temporary => unreachable!(),
- SenderState::Connected(_) => {
- if let Some(msg) =
- component_future::try_ready!(stream.poll())
- {
- self.handle_websocket_message(&msg)?;
- Ok(component_future::Async::DidWork)
- } else {
- log::info!("disconnect for {}", self.id);
- Ok(component_future::Async::Ready(()))
- }
- }
- SenderState::Sending(fut) => {
- let sink = component_future::try_ready!(fut.poll());
- *sender = SenderState::Flushing(Box::new(sink.flush()));
- Ok(component_future::Async::DidWork)
- }
- SenderState::Flushing(fut) => {
- let sink = component_future::try_ready!(fut.poll());
- *sender = SenderState::Connected(Box::new(sink));
- Ok(component_future::Async::DidWork)
- }
- },
- }
- }
-}
-
-impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
- futures::Future for Connection<S>
-{
- type Item = ();
- type Error = Error;
-
- fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- component_future::poll_future(self, Self::POLL_FNS)
- }
-}
diff --git a/teleterm/src/web/list.rs b/teleterm/src/web/list.rs
new file mode 100644
index 0000000..9c1bcc6
--- /dev/null
+++ b/teleterm/src/web/list.rs
@@ -0,0 +1,136 @@
+use crate::prelude::*;
+
+use gotham::state::FromState as _;
+
+pub fn run(
+ state: gotham::state::State,
+) -> (gotham::state::State, hyper::Response<hyper::Body>) {
+ let config = crate::web::Config::borrow_from(&state);
+
+ let (_, address) = config.server_address;
+ let connector: crate::client::Connector<_> = Box::new(move || {
+ Box::new(
+ tokio::net::tcp::TcpStream::connect(&address)
+ .context(crate::error::Connect { address }),
+ )
+ });
+ let client = crate::client::Client::list(
+ "teleterm-web",
+ connector,
+ &crate::protocol::Auth::plain("test"),
+ );
+
+ let (w_sessions, r_sessions) = tokio::sync::oneshot::channel();
+
+ tokio::spawn(
+ Lister::new(client, w_sessions)
+ .map_err(|e| log::warn!("error listing: {}", e)),
+ );
+
+ match r_sessions.wait().unwrap() {
+ Ok(sessions) => {
+ let body = serde_json::to_string(&sessions).unwrap();
+ (state, hyper::Response::new(hyper::Body::from(body)))
+ }
+ Err(e) => {
+ log::warn!("error retrieving sessions: {}", e);
+ (
+ state,
+ hyper::Response::new(hyper::Body::from(format!(
+ "error retrieving sessions: {}",
+ e
+ ))),
+ )
+ }
+ }
+}
+
+struct Lister<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ client: crate::client::Client<S>,
+ w_sessions: Option<
+ tokio::sync::oneshot::Sender<Result<Vec<crate::protocol::Session>>>,
+ >,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Lister<S>
+{
+ fn new(
+ client: crate::client::Client<S>,
+ w_sessions: tokio::sync::oneshot::Sender<
+ Result<Vec<crate::protocol::Session>>,
+ >,
+ ) -> Self {
+ Self {
+ client,
+ w_sessions: Some(w_sessions),
+ }
+ }
+
+ fn server_message(
+ &mut self,
+ msg: crate::protocol::Message,
+ ) -> Option<Result<Vec<crate::protocol::Session>>> {
+ match msg {
+ crate::protocol::Message::Sessions { sessions } => {
+ Some(Ok(sessions))
+ }
+ crate::protocol::Message::Disconnected => {
+ Some(Err(Error::ServerDisconnected))
+ }
+ crate::protocol::Message::Error { msg } => {
+ Some(Err(Error::Server { message: msg }))
+ }
+ msg => Some(Err(crate::error::Error::UnexpectedMessage {
+ message: msg,
+ })),
+ }
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Lister<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[&Self::poll_client];
+
+ fn poll_client(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self.client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ let res = Err(Error::ServerDisconnected);
+ self.w_sessions.take().unwrap().send(res).unwrap();
+ return Ok(component_future::Async::Ready(()));
+ }
+ crate::client::Event::Connect => {
+ self.client
+ .send_message(crate::protocol::Message::list_sessions());
+ }
+ crate::client::Event::ServerMessage(msg) => {
+ if let Some(res) = self.server_message(msg) {
+ self.w_sessions.take().unwrap().send(res).unwrap();
+ return Ok(component_future::Async::Ready(()));
+ }
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::Future for Lister<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/web/watch.rs b/teleterm/src/web/watch.rs
new file mode 100644
index 0000000..720e9f8
--- /dev/null
+++ b/teleterm/src/web/watch.rs
@@ -0,0 +1,267 @@
+use crate::prelude::*;
+
+use gotham::state::FromState as _;
+use tokio_tungstenite::tungstenite;
+
+#[derive(
+ serde::Deserialize,
+ gotham_derive::StateData,
+ gotham_derive::StaticResponseExtender,
+)]
+pub struct QueryParams {
+ id: String,
+}
+
+pub fn run(
+ mut state: gotham::state::State,
+) -> (gotham::state::State, hyper::Response<hyper::Body>) {
+ let body = hyper::Body::take_from(&mut state);
+ let headers = hyper::HeaderMap::take_from(&mut state);
+ let config = crate::web::Config::borrow_from(&state);
+
+ if crate::web::ws::requested(&headers) {
+ let (response, stream) = match crate::web::ws::accept(&headers, body)
+ {
+ Ok(res) => res,
+ Err(_) => {
+ log::error!("failed to accept websocket request");
+ return (
+ state,
+ hyper::Response::builder()
+ .status(hyper::StatusCode::BAD_REQUEST)
+ .body(hyper::Body::empty())
+ .unwrap(),
+ );
+ }
+ };
+
+ let query_params = QueryParams::borrow_from(&state);
+
+ let (_, address) = config.server_address;
+ let connector: crate::client::Connector<_> = Box::new(move || {
+ Box::new(
+ tokio::net::tcp::TcpStream::connect(&address)
+ .context(crate::error::Connect { address }),
+ )
+ });
+ let client = crate::client::Client::watch(
+ "teleterm-web",
+ connector,
+ &crate::protocol::Auth::plain("test"),
+ &query_params.id,
+ );
+
+ tokio::spawn(
+ Connection::new(
+ gotham::state::request_id(&state),
+ client,
+ ConnectionState::Connecting(Box::new(
+ stream.context(crate::error::WebSocketAccept),
+ )),
+ )
+ .map_err(|e| log::error!("{}", e)),
+ );
+
+ (state, response)
+ } else {
+ (
+ state,
+ hyper::Response::new(hyper::Body::from(
+ "non-websocket request to websocket endpoint",
+ )),
+ )
+ }
+}
+
+type WebSocketConnectionFuture = Box<
+ dyn futures::Future<
+ Item = tokio_tungstenite::WebSocketStream<
+ hyper::upgrade::Upgraded,
+ >,
+ Error = Error,
+ > + Send,
+>;
+type MessageSink = Box<
+ dyn futures::Sink<SinkItem = tungstenite::Message, SinkError = Error>
+ + Send,
+>;
+type MessageStream = Box<
+ dyn futures::Stream<Item = tungstenite::Message, Error = Error> + Send,
+>;
+
+enum SenderState {
+ Temporary,
+ Connected(MessageSink),
+ Sending(
+ Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
+ ),
+ Flushing(
+ Box<dyn futures::Future<Item = MessageSink, Error = Error> + Send>,
+ ),
+}
+
+enum ConnectionState {
+ Connecting(WebSocketConnectionFuture),
+ Connected(SenderState, MessageStream),
+}
+
+impl ConnectionState {
+ fn sink(&mut self) -> Option<&mut MessageSink> {
+ match self {
+ Self::Connected(sender, _) => match sender {
+ SenderState::Connected(sink) => Some(sink),
+ _ => None,
+ },
+ _ => None,
+ }
+ }
+
+ fn send(&mut self, msg: tungstenite::Message) {
+ match self {
+ Self::Connected(sender, _) => {
+ let fut =
+ match std::mem::replace(sender, SenderState::Temporary) {
+ SenderState::Connected(sink) => sink.send(msg),
+ _ => unreachable!(),
+ };
+ *sender = SenderState::Sending(Box::new(fut));
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+struct Connection<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ id: String,
+ client: crate::client::Client<S>,
+ conn: ConnectionState,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Connection<S>
+{
+ fn new(
+ id: &str,
+ client: crate::client::Client<S>,
+ conn: ConnectionState,
+ ) -> Self {
+ Self {
+ client,
+ id: id.to_string(),
+ conn,
+ }
+ }
+
+ fn handle_client_message(
+ &mut self,
+ msg: &crate::protocol::Message,
+ ) -> Result<Option<tungstenite::Message>> {
+ match msg {
+ crate::protocol::Message::TerminalOutput { .. }
+ | crate::protocol::Message::Disconnected
+ | crate::protocol::Message::Resize { .. } => {
+ let json = serde_json::to_string(msg)
+ .context(crate::error::SerializeMessage)?;
+ Ok(Some(tungstenite::Message::Text(json)))
+ }
+ _ => Ok(None),
+ }
+ }
+
+ fn handle_websocket_message(
+ &mut self,
+ msg: &tungstenite::Message,
+ ) -> Result<()> {
+ // TODO
+ log::info!("websocket stream message for {}: {:?}", self.id, msg);
+ Ok(())
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Connection<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[&Self::poll_client, &Self::poll_websocket_stream];
+
+ fn poll_client(&mut self) -> component_future::Poll<(), Error> {
+ // don't start up the client until the websocket connection is fully
+ // established and isn't busy
+ if self.conn.sink().is_none() {
+ return Ok(component_future::Async::NothingToDo);
+ };
+
+ match component_future::try_ready!(self.client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ // TODO: better reconnect handling?
+ return Ok(component_future::Async::Ready(()));
+ }
+ crate::client::Event::Connect => {}
+ crate::client::Event::ServerMessage(msg) => {
+ if let Some(msg) = self.handle_client_message(&msg)? {
+ self.conn.send(msg);
+ }
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_websocket_stream(&mut self) -> component_future::Poll<(), Error> {
+ match &mut self.conn {
+ ConnectionState::Connecting(fut) => {
+ let stream = component_future::try_ready!(fut.poll());
+ let (sink, stream) = stream.split();
+ self.conn = ConnectionState::Connected(
+ SenderState::Connected(Box::new(
+ sink.sink_map_err(|e| Error::WebSocket { source: e }),
+ )),
+ Box::new(stream.context(crate::error::WebSocket)),
+ );
+ Ok(component_future::Async::DidWork)
+ }
+ ConnectionState::Connected(sender, stream) => match sender {
+ SenderState::Temporary => unreachable!(),
+ SenderState::Connected(_) => {
+ if let Some(msg) =
+ component_future::try_ready!(stream.poll())
+ {
+ self.handle_websocket_message(&msg)?;
+ Ok(component_future::Async::DidWork)
+ } else {
+ log::info!("disconnect for {}", self.id);
+ Ok(component_future::Async::Ready(()))
+ }
+ }
+ SenderState::Sending(fut) => {
+ let sink = component_future::try_ready!(fut.poll());
+ *sender = SenderState::Flushing(Box::new(sink.flush()));
+ Ok(component_future::Async::DidWork)
+ }
+ SenderState::Flushing(fut) => {
+ let sink = component_future::try_ready!(fut.poll());
+ *sender = SenderState::Connected(Box::new(sink));
+ Ok(component_future::Async::DidWork)
+ }
+ },
+ }
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::Future for Connection<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}