From e71944c103a2c212620c9878cd76eeae8c90db8c Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 21 Nov 2019 02:47:16 -0500 Subject: simplify --- teleterm/src/error.rs | 8 ++++++++ teleterm/src/web.rs | 45 +++++++++++++-------------------------------- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/teleterm/src/error.rs b/teleterm/src/error.rs index e3c5206..cb5feca 100644 --- a/teleterm/src/error.rs +++ b/teleterm/src/error.rs @@ -384,6 +384,14 @@ pub enum Error { #[snafu(display("failed to find user with username {}", name))] UnknownUser { name: String }, + #[snafu(display("failure during websocket stream: {}", source))] + WebSocket { + source: tokio_tungstenite::tungstenite::Error, + }, + + #[snafu(display("failed to accept websocket connection: {}", source))] + WebSocketAccept { source: hyper::Error }, + #[snafu(display("failed to write to file: {}", source))] WriteFile { source: tokio::io::Error }, diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs index d8324c2..0aa54cf 100644 --- a/teleterm/src/web.rs +++ b/teleterm/src/web.rs @@ -105,16 +105,18 @@ fn handle_websocket_connection( ); } }; - let req_id = gotham::state::request_id(&state).to_owned(); let stream = stream - .map_err(|e| { - log::error!( - "error upgrading connection for websockets: {}", - e - ) - }) - .and_then(move |stream| handle_websocket_stream(req_id, stream)); - tokio::spawn(stream); + .context(crate::error::WebSocketAccept) + .map(|stream| stream.context(crate::error::WebSocket)) + .flatten_stream(); + + let req_id = gotham::state::request_id(&state).to_owned(); + tokio::spawn( + stream + .for_each(move |msg| handle_websocket_message(&req_id, &msg)) + .map_err(|e| log::error!("{}", e)), + ); + (state, response) } else { ( @@ -126,32 +128,11 @@ fn handle_websocket_connection( } } -fn handle_websocket_stream( - req_id: String, - stream: S, -) -> impl futures::Future -where - S: futures::Stream< - Item = tokio_tungstenite::tungstenite::protocol::Message, - Error = tokio_tungstenite::tungstenite::Error, - > + futures::Sink< - SinkItem = tokio_tungstenite::tungstenite::protocol::Message, - SinkError = tokio_tungstenite::tungstenite::Error, - >, -{ - let (sink, stream) = stream.split(); - sink.send_all(stream.map(move |msg| { - handle_websocket_message(&req_id, &msg); - msg - })) - .map_err(|e| log::error!("error during websocket stream: {}", e)) - .map(|_| log::info!("disconnect")) -} - fn handle_websocket_message( req_id: &str, msg: &tokio_tungstenite::tungstenite::protocol::Message, -) { +) -> Result<()> { // TODO log::info!("websocket stream message for {}: {:?}", req_id, msg); + Ok(()) } -- cgit v1.2.3-54-g00ecf