From ea36c04d25636384f29f6b25bf9bf0b55bdd9c0e Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 21 Nov 2019 14:08:08 -0500 Subject: add an endpoint that is able to list available streams --- teleterm/src/config.rs | 3 +- teleterm/src/error.rs | 3 ++ teleterm/src/web.rs | 139 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+), 1 deletion(-) (limited to 'teleterm') diff --git a/teleterm/src/config.rs b/teleterm/src/config.rs index 42b4ec4..8e555d8 100644 --- a/teleterm/src/config.rs +++ b/teleterm/src/config.rs @@ -203,7 +203,8 @@ fn default_connect_address() -> (String, std::net::SocketAddr) { } // XXX this does a blocking dns lookup - should try to find an async version -fn to_connect_address( +// XXX shouldn't need to be pub +pub fn to_connect_address( address: &str, ) -> Result<(String, std::net::SocketAddr)> { let mut address_parts = address.split(':'); diff --git a/teleterm/src/error.rs b/teleterm/src/error.rs index cb5feca..2e4b57a 100644 --- a/teleterm/src/error.rs +++ b/teleterm/src/error.rs @@ -303,6 +303,9 @@ pub enum Error { #[snafu(display("received error from server: {}", message))] Server { message: String }, + #[snafu(display("couldn't connect to server"))] + ServerDisconnected, + #[snafu(display("SIGWINCH handler failed: {}", source))] SigWinchHandler { source: std::io::Error }, diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs index bcf055f..5754bcd 100644 --- a/teleterm/src/web.rs +++ b/teleterm/src/web.rs @@ -69,6 +69,7 @@ pub fn router() -> impl gotham::handler::NewHandler { route .get("/teleterm_web_bg.wasm") .to(serve_static("application/wasm", &TELETERM_WEB_WASM)); + route.get("/list").to(handle_list); route.get("/watch").to(handle_watch); }) } @@ -86,6 +87,144 @@ fn serve_static( } } +fn handle_list( + state: gotham::state::State, +) -> (gotham::state::State, hyper::Response) { + let address = "127.0.0.1:4144"; + let (_, address) = crate::config::to_connect_address(address).unwrap(); + let connector: crate::client::Connector<_> = Box::new(move || { + Box::new( + tokio::net::tcp::TcpStream::connect(&address) + .context(crate::error::Connect { address }), + ) + }); + let client = crate::client::Client::list( + "teleterm-web", + connector, + &crate::protocol::Auth::plain("test"), + ); + let (w_sessions, r_sessions) = tokio::sync::oneshot::channel(); + let lister = Lister::new(client, w_sessions); + tokio::spawn(lister.map_err(|e| log::warn!("error listing: {}", e))); + match r_sessions.wait().unwrap() { + Ok(sessions) => { + let mut body = String::new(); + for session in sessions { + log::debug!( + "found session with id {} and name {}", + session.id, + session.username + ); + body.push_str(&format!( + "{}: {}\n", + session.id, session.username + )); + } + (state, hyper::Response::new(hyper::Body::from(body))) + } + Err(e) => { + log::warn!("error retrieving sessions: {}", e); + ( + state, + hyper::Response::new(hyper::Body::from(format!( + "error retrieving sessions: {}", + e + ))), + ) + } + } +} + +struct Lister< + S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static, +> { + client: crate::client::Client, + w_sessions: Option< + tokio::sync::oneshot::Sender>>, + >, +} + +impl + Lister +{ + fn new( + client: crate::client::Client, + w_sessions: tokio::sync::oneshot::Sender< + Result>, + >, + ) -> Self { + Self { + client, + w_sessions: Some(w_sessions), + } + } + + fn server_message( + &mut self, + msg: crate::protocol::Message, + ) -> Option>> { + match msg { + crate::protocol::Message::Sessions { sessions } => { + Some(Ok(sessions)) + } + crate::protocol::Message::Disconnected => { + Some(Err(Error::ServerDisconnected)) + } + crate::protocol::Message::Error { msg } => { + Some(Err(Error::Server { message: msg })) + } + msg => Some(Err(crate::error::Error::UnexpectedMessage { + message: msg, + })), + } + } +} + +impl + Lister +{ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + (), + Error, + >] = &[&Self::poll_client]; + + fn poll_client(&mut self) -> component_future::Poll<(), Error> { + match component_future::try_ready!(self.client.poll()).unwrap() { + crate::client::Event::Disconnect => { + let res = Err(Error::ServerDisconnected); + self.w_sessions.take().unwrap().send(res).unwrap(); + return Ok(component_future::Async::Ready(())); + } + crate::client::Event::Connect => { + self.client + .send_message(crate::protocol::Message::list_sessions()); + } + crate::client::Event::ServerMessage(msg) => { + if let Some(res) = self.server_message(msg) { + self.w_sessions.take().unwrap().send(res).unwrap(); + return Ok(component_future::Async::Ready(())); + } + } + } + Ok(component_future::Async::DidWork) + } +} + +impl + futures::Future for Lister +{ + type Item = (); + type Error = Error; + + fn poll(&mut self) -> futures::Poll { + component_future::poll_future(self, Self::POLL_FNS) + } +} + fn handle_watch( mut state: gotham::state::State, ) -> (gotham::state::State, hyper::Response) { -- cgit v1.2.3-54-g00ecf