diff options
author | Jesse Luehrs <doy@tozt.net> | 2019-10-14 13:39:40 -0400 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2019-10-14 13:49:06 -0400 |
commit | 79e6424e43ea1549bb984941166ead4a96b7d8b9 (patch) | |
tree | c558820b0803687671ee131caba08346a84fba53 | |
parent | 4d502b5e296f8585b5143e8b854891063928af5b (diff) | |
download | teleterm-79e6424e43ea1549bb984941166ead4a96b7d8b9.tar.gz teleterm-79e6424e43ea1549bb984941166ead4a96b7d8b9.zip |
move some code around
-rw-r--r-- | src/cmd/server.rs | 7 | ||||
-rw-r--r-- | src/protocol.rs | 18 | ||||
-rw-r--r-- | src/server.rs | 138 | ||||
-rw-r--r-- | src/server/tls.rs | 145 |
4 files changed, 171 insertions, 137 deletions
diff --git a/src/cmd/server.rs b/src/cmd/server.rs index 3c40e4f..aca32d9 100644 --- a/src/cmd/server.rs +++ b/src/cmd/server.rs @@ -43,6 +43,9 @@ pub enum Error { #[snafu(display("failed to run server: {}", source))] Server { source: crate::server::Error }, + #[snafu(display("failed to run server: {}", source))] + TlsServer { source: crate::server::tls::Error }, + #[snafu(display("failed to open identity file: {}", source))] OpenIdentityFile { source: std::io::Error }, @@ -194,7 +197,7 @@ fn create_server_tls( .map_err(|_| Error::TlsSocketChannel {}) }); let server = - crate::server::TlsServer::new(buffer_size, read_timeout, sock_r) - .context(Server); + crate::server::tls::Server::new(buffer_size, read_timeout, sock_r) + .context(TlsServer); Ok((Box::new(acceptor), Box::new(server))) } diff --git a/src/protocol.rs b/src/protocol.rs index d673159..1221fc5 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -237,6 +237,24 @@ impl Message { { Packet::from(self).write_async(w) } + + // it'd be nice if i could just override the Debug implementation for + // specific enum variants, but writing the whole impl Debug by hand just + // to make this one change would be super obnoxious + pub fn log(&self, id: &str) { + match self { + Self::TerminalOutput { data } => { + log::debug!( + "{}: message(TerminalOutput {{ data: ({} bytes) }})", + id, + data.len() + ); + } + message => { + log::debug!("{}: message({:?})", id, message); + } + } + } } struct Packet { diff --git a/src/server.rs b/src/server.rs index 4141037..eeb4069 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,8 @@ use snafu::futures01::FutureExt as _; use snafu::ResultExt as _; use tokio::util::FutureExt as _; +pub mod tls; + #[derive(Debug, snafu::Snafu)] pub enum Error { #[snafu(display("{}", source))] @@ -514,7 +516,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> } } - log_message(&conn.id, &message); + message.log(&conn.id); match conn.state { ConnectionState::Accepted { .. } => { @@ -804,137 +806,3 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> crate::component_future::poll_future(self, Self::POLL_FNS) } } - -#[allow(clippy::module_name_repetitions)] -pub struct TlsServer { - server: Server<tokio_tls::TlsStream<tokio::net::TcpStream>>, - sock_r: - tokio::sync::mpsc::Receiver<tokio_tls::Accept<tokio::net::TcpStream>>, - sock_w: tokio::sync::mpsc::Sender< - tokio_tls::TlsStream<tokio::net::TcpStream>, - >, - accepting_sockets: Vec<tokio_tls::Accept<tokio::net::TcpStream>>, -} - -impl TlsServer { - pub fn new( - buffer_size: usize, - read_timeout: std::time::Duration, - sock_r: tokio::sync::mpsc::Receiver< - tokio_tls::Accept<tokio::net::TcpStream>, - >, - ) -> Self { - let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100); - Self { - server: Server::new(buffer_size, read_timeout, tls_sock_r), - sock_r, - sock_w: tls_sock_w, - accepting_sockets: vec![], - } - } -} - -impl TlsServer { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ - &Self::poll_new_connections, - &Self::poll_handshake_connections, - &Self::poll_server, - ]; - - fn poll_new_connections( - &mut self, - ) -> Result<crate::component_future::Poll<()>> { - match self.sock_r.poll().context(SocketChannelReceive)? { - futures::Async::Ready(Some(sock)) => { - self.accepting_sockets.push(sock); - Ok(crate::component_future::Poll::DidWork) - } - futures::Async::Ready(None) => Err(Error::SocketChannelClosed), - futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) - } - } - } - - fn poll_handshake_connections( - &mut self, - ) -> Result<crate::component_future::Poll<()>> { - let mut did_work = false; - let mut not_ready = false; - - let mut i = 0; - while i < self.accepting_sockets.len() { - let sock = self.accepting_sockets.get_mut(i).unwrap(); - match sock.poll() { - Ok(futures::Async::Ready(sock)) => { - self.accepting_sockets.swap_remove(i); - self.sock_w.try_send(sock).unwrap_or_else(|e| { - log::warn!( - "failed to send connected tls socket: {}", - e - ); - }); - did_work = true; - continue; - } - Ok(futures::Async::NotReady) => { - not_ready = true; - } - Err(e) => { - log::warn!("failed to accept tls connection: {}", e); - self.accepting_sockets.swap_remove(i); - continue; - } - } - i += 1; - } - - if did_work { - Ok(crate::component_future::Poll::DidWork) - } else if not_ready { - Ok(crate::component_future::Poll::NotReady) - } else { - Ok(crate::component_future::Poll::NothingToDo) - } - } - - fn poll_server(&mut self) -> Result<crate::component_future::Poll<()>> { - match self.server.poll()? { - futures::Async::Ready(()) => { - Ok(crate::component_future::Poll::DidWork) - } - futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) - } - } - } -} - -#[must_use = "futures do nothing unless polled"] -impl futures::future::Future for TlsServer { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { - crate::component_future::poll_future(self, Self::POLL_FNS) - } -} - -fn log_message(id: &str, message: &crate::protocol::Message) { - match message { - crate::protocol::Message::TerminalOutput { data } => { - log::debug!( - "{}: message(TerminalOutput {{ data: ({} bytes) }})", - id, - data.len() - ); - } - message => { - log::debug!("{}: message({:?})", id, message); - } - } -} diff --git a/src/server/tls.rs b/src/server/tls.rs new file mode 100644 index 0000000..8a4d132 --- /dev/null +++ b/src/server/tls.rs @@ -0,0 +1,145 @@ +use futures::future::Future as _; +use futures::stream::Stream as _; +use snafu::ResultExt as _; + +#[derive(Debug, snafu::Snafu)] +pub enum Error { + #[snafu(display("{}", source))] + Common { source: crate::error::Error }, + + #[snafu(display("{}", source))] + InnerServer { source: super::Error }, + + #[snafu(display( + "failed to receive new socket over channel: {}", + source + ))] + SocketChannelReceive { + source: tokio::sync::mpsc::error::RecvError, + }, + + #[snafu(display( + "failed to receive new socket over channel: channel closed" + ))] + SocketChannelClosed, +} + +pub type Result<T> = std::result::Result<T, Error>; + +pub struct Server { + server: super::Server<tokio_tls::TlsStream<tokio::net::TcpStream>>, + sock_r: + tokio::sync::mpsc::Receiver<tokio_tls::Accept<tokio::net::TcpStream>>, + sock_w: tokio::sync::mpsc::Sender< + tokio_tls::TlsStream<tokio::net::TcpStream>, + >, + accepting_sockets: Vec<tokio_tls::Accept<tokio::net::TcpStream>>, +} + +impl Server { + pub fn new( + buffer_size: usize, + read_timeout: std::time::Duration, + sock_r: tokio::sync::mpsc::Receiver< + tokio_tls::Accept<tokio::net::TcpStream>, + >, + ) -> Self { + let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100); + Self { + server: super::Server::new(buffer_size, read_timeout, tls_sock_r), + sock_r, + sock_w: tls_sock_w, + accepting_sockets: vec![], + } + } +} + +impl Server { + const POLL_FNS: &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) -> Result< + crate::component_future::Poll<()>, + >] = &[ + &Self::poll_new_connections, + &Self::poll_handshake_connections, + &Self::poll_server, + ]; + + fn poll_new_connections( + &mut self, + ) -> Result<crate::component_future::Poll<()>> { + match self.sock_r.poll().context(SocketChannelReceive)? { + futures::Async::Ready(Some(sock)) => { + self.accepting_sockets.push(sock); + Ok(crate::component_future::Poll::DidWork) + } + futures::Async::Ready(None) => Err(Error::SocketChannelClosed), + futures::Async::NotReady => { + Ok(crate::component_future::Poll::NotReady) + } + } + } + + fn poll_handshake_connections( + &mut self, + ) -> Result<crate::component_future::Poll<()>> { + let mut did_work = false; + let mut not_ready = false; + + let mut i = 0; + while i < self.accepting_sockets.len() { + let sock = self.accepting_sockets.get_mut(i).unwrap(); + match sock.poll() { + Ok(futures::Async::Ready(sock)) => { + self.accepting_sockets.swap_remove(i); + self.sock_w.try_send(sock).unwrap_or_else(|e| { + log::warn!( + "failed to send connected tls socket: {}", + e + ); + }); + did_work = true; + continue; + } + Ok(futures::Async::NotReady) => { + not_ready = true; + } + Err(e) => { + log::warn!("failed to accept tls connection: {}", e); + self.accepting_sockets.swap_remove(i); + continue; + } + } + i += 1; + } + + if did_work { + Ok(crate::component_future::Poll::DidWork) + } else if not_ready { + Ok(crate::component_future::Poll::NotReady) + } else { + Ok(crate::component_future::Poll::NothingToDo) + } + } + + fn poll_server(&mut self) -> Result<crate::component_future::Poll<()>> { + match self.server.poll().context(InnerServer)? { + futures::Async::Ready(()) => { + Ok(crate::component_future::Poll::DidWork) + } + futures::Async::NotReady => { + Ok(crate::component_future::Poll::NotReady) + } + } + } +} + +#[must_use = "futures do nothing unless polled"] +impl futures::future::Future for Server { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { + crate::component_future::poll_future(self, Self::POLL_FNS) + } +} |