1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
mod ws;
use futures::{Future as _, Sink as _, Stream as _};
use gotham::router::builder::{DefineSingleRoute as _, DrawRoutes as _};
use gotham::state::FromState as _;
pub fn router() -> impl gotham::handler::NewHandler {
gotham::router::builder::build_simple_router(|route| {
route.get("/").to(root);
route.get("/ws").to(handle_websocket_connection);
})
}
fn root(state: gotham::state::State) -> (gotham::state::State, String) {
log::info!("request for /");
(state, "hello world".to_string())
}
fn handle_websocket_connection(
mut state: gotham::state::State,
) -> (gotham::state::State, hyper::Response<hyper::Body>) {
let body = hyper::Body::take_from(&mut state);
let headers = hyper::HeaderMap::take_from(&mut state);
if ws::requested(&headers) {
let (response, stream) = match ws::accept(&headers, body) {
Ok(res) => res,
Err(_) => {
log::error!("failed to accept websocket request");
return (
state,
hyper::Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(hyper::Body::empty())
.unwrap(),
);
}
};
let req_id = gotham::state::request_id(&state).to_owned();
let stream = stream
.map_err(|e| {
log::error!(
"error upgrading connection for websockets: {}",
e
)
})
.and_then(move |stream| handle_websocket_stream(req_id, stream));
tokio::spawn(stream);
(state, response)
} else {
(
state,
hyper::Response::new(hyper::Body::from(
"non-websocket request to websocket endpoint",
)),
)
}
}
fn handle_websocket_stream<S>(
req_id: String,
stream: S,
) -> impl futures::Future<Item = (), Error = ()>
where
S: futures::Stream<
Item = tokio_tungstenite::tungstenite::protocol::Message,
Error = tokio_tungstenite::tungstenite::Error,
> + futures::Sink<
SinkItem = tokio_tungstenite::tungstenite::protocol::Message,
SinkError = tokio_tungstenite::tungstenite::Error,
>,
{
let (sink, stream) = stream.split();
sink.send_all(stream.map(move |msg| {
handle_websocket_message(&req_id, &msg);
msg
}))
.map_err(|e| log::error!("error during websocket stream: {}", e))
.map(|_| log::info!("disconnect"))
}
fn handle_websocket_message(
req_id: &str,
msg: &tokio_tungstenite::tungstenite::protocol::Message,
) {
// TODO
log::info!("websocket stream message for {}: {:?}", req_id, msg);
}
|