aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-14 13:39:40 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-14 13:49:06 -0400
commit79e6424e43ea1549bb984941166ead4a96b7d8b9 (patch)
treec558820b0803687671ee131caba08346a84fba53
parent4d502b5e296f8585b5143e8b854891063928af5b (diff)
downloadteleterm-79e6424e43ea1549bb984941166ead4a96b7d8b9.tar.gz
teleterm-79e6424e43ea1549bb984941166ead4a96b7d8b9.zip
move some code around
-rw-r--r--src/cmd/server.rs7
-rw-r--r--src/protocol.rs18
-rw-r--r--src/server.rs138
-rw-r--r--src/server/tls.rs145
4 files changed, 171 insertions, 137 deletions
diff --git a/src/cmd/server.rs b/src/cmd/server.rs
index 3c40e4f..aca32d9 100644
--- a/src/cmd/server.rs
+++ b/src/cmd/server.rs
@@ -43,6 +43,9 @@ pub enum Error {
#[snafu(display("failed to run server: {}", source))]
Server { source: crate::server::Error },
+ #[snafu(display("failed to run server: {}", source))]
+ TlsServer { source: crate::server::tls::Error },
+
#[snafu(display("failed to open identity file: {}", source))]
OpenIdentityFile { source: std::io::Error },
@@ -194,7 +197,7 @@ fn create_server_tls(
.map_err(|_| Error::TlsSocketChannel {})
});
let server =
- crate::server::TlsServer::new(buffer_size, read_timeout, sock_r)
- .context(Server);
+ crate::server::tls::Server::new(buffer_size, read_timeout, sock_r)
+ .context(TlsServer);
Ok((Box::new(acceptor), Box::new(server)))
}
diff --git a/src/protocol.rs b/src/protocol.rs
index d673159..1221fc5 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -237,6 +237,24 @@ impl Message {
{
Packet::from(self).write_async(w)
}
+
+ // it'd be nice if i could just override the Debug implementation for
+ // specific enum variants, but writing the whole impl Debug by hand just
+ // to make this one change would be super obnoxious
+ pub fn log(&self, id: &str) {
+ match self {
+ Self::TerminalOutput { data } => {
+ log::debug!(
+ "{}: message(TerminalOutput {{ data: ({} bytes) }})",
+ id,
+ data.len()
+ );
+ }
+ message => {
+ log::debug!("{}: message({:?})", id, message);
+ }
+ }
+ }
}
struct Packet {
diff --git a/src/server.rs b/src/server.rs
index 4141037..eeb4069 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -5,6 +5,8 @@ use snafu::futures01::FutureExt as _;
use snafu::ResultExt as _;
use tokio::util::FutureExt as _;
+pub mod tls;
+
#[derive(Debug, snafu::Snafu)]
pub enum Error {
#[snafu(display("{}", source))]
@@ -514,7 +516,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
}
- log_message(&conn.id, &message);
+ message.log(&conn.id);
match conn.state {
ConnectionState::Accepted { .. } => {
@@ -804,137 +806,3 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
crate::component_future::poll_future(self, Self::POLL_FNS)
}
}
-
-#[allow(clippy::module_name_repetitions)]
-pub struct TlsServer {
- server: Server<tokio_tls::TlsStream<tokio::net::TcpStream>>,
- sock_r:
- tokio::sync::mpsc::Receiver<tokio_tls::Accept<tokio::net::TcpStream>>,
- sock_w: tokio::sync::mpsc::Sender<
- tokio_tls::TlsStream<tokio::net::TcpStream>,
- >,
- accepting_sockets: Vec<tokio_tls::Accept<tokio::net::TcpStream>>,
-}
-
-impl TlsServer {
- pub fn new(
- buffer_size: usize,
- read_timeout: std::time::Duration,
- sock_r: tokio::sync::mpsc::Receiver<
- tokio_tls::Accept<tokio::net::TcpStream>,
- >,
- ) -> Self {
- let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100);
- Self {
- server: Server::new(buffer_size, read_timeout, tls_sock_r),
- sock_r,
- sock_w: tls_sock_w,
- accepting_sockets: vec![],
- }
- }
-}
-
-impl TlsServer {
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
- &Self::poll_new_connections,
- &Self::poll_handshake_connections,
- &Self::poll_server,
- ];
-
- fn poll_new_connections(
- &mut self,
- ) -> Result<crate::component_future::Poll<()>> {
- match self.sock_r.poll().context(SocketChannelReceive)? {
- futures::Async::Ready(Some(sock)) => {
- self.accepting_sockets.push(sock);
- Ok(crate::component_future::Poll::DidWork)
- }
- futures::Async::Ready(None) => Err(Error::SocketChannelClosed),
- futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
- }
- }
- }
-
- fn poll_handshake_connections(
- &mut self,
- ) -> Result<crate::component_future::Poll<()>> {
- let mut did_work = false;
- let mut not_ready = false;
-
- let mut i = 0;
- while i < self.accepting_sockets.len() {
- let sock = self.accepting_sockets.get_mut(i).unwrap();
- match sock.poll() {
- Ok(futures::Async::Ready(sock)) => {
- self.accepting_sockets.swap_remove(i);
- self.sock_w.try_send(sock).unwrap_or_else(|e| {
- log::warn!(
- "failed to send connected tls socket: {}",
- e
- );
- });
- did_work = true;
- continue;
- }
- Ok(futures::Async::NotReady) => {
- not_ready = true;
- }
- Err(e) => {
- log::warn!("failed to accept tls connection: {}", e);
- self.accepting_sockets.swap_remove(i);
- continue;
- }
- }
- i += 1;
- }
-
- if did_work {
- Ok(crate::component_future::Poll::DidWork)
- } else if not_ready {
- Ok(crate::component_future::Poll::NotReady)
- } else {
- Ok(crate::component_future::Poll::NothingToDo)
- }
- }
-
- fn poll_server(&mut self) -> Result<crate::component_future::Poll<()>> {
- match self.server.poll()? {
- futures::Async::Ready(()) => {
- Ok(crate::component_future::Poll::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
- }
- }
- }
-}
-
-#[must_use = "futures do nothing unless polled"]
-impl futures::future::Future for TlsServer {
- type Item = ();
- type Error = Error;
-
- fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- crate::component_future::poll_future(self, Self::POLL_FNS)
- }
-}
-
-fn log_message(id: &str, message: &crate::protocol::Message) {
- match message {
- crate::protocol::Message::TerminalOutput { data } => {
- log::debug!(
- "{}: message(TerminalOutput {{ data: ({} bytes) }})",
- id,
- data.len()
- );
- }
- message => {
- log::debug!("{}: message({:?})", id, message);
- }
- }
-}
diff --git a/src/server/tls.rs b/src/server/tls.rs
new file mode 100644
index 0000000..8a4d132
--- /dev/null
+++ b/src/server/tls.rs
@@ -0,0 +1,145 @@
+use futures::future::Future as _;
+use futures::stream::Stream as _;
+use snafu::ResultExt as _;
+
+#[derive(Debug, snafu::Snafu)]
+pub enum Error {
+ #[snafu(display("{}", source))]
+ Common { source: crate::error::Error },
+
+ #[snafu(display("{}", source))]
+ InnerServer { source: super::Error },
+
+ #[snafu(display(
+ "failed to receive new socket over channel: {}",
+ source
+ ))]
+ SocketChannelReceive {
+ source: tokio::sync::mpsc::error::RecvError,
+ },
+
+ #[snafu(display(
+ "failed to receive new socket over channel: channel closed"
+ ))]
+ SocketChannelClosed,
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+pub struct Server {
+ server: super::Server<tokio_tls::TlsStream<tokio::net::TcpStream>>,
+ sock_r:
+ tokio::sync::mpsc::Receiver<tokio_tls::Accept<tokio::net::TcpStream>>,
+ sock_w: tokio::sync::mpsc::Sender<
+ tokio_tls::TlsStream<tokio::net::TcpStream>,
+ >,
+ accepting_sockets: Vec<tokio_tls::Accept<tokio::net::TcpStream>>,
+}
+
+impl Server {
+ pub fn new(
+ buffer_size: usize,
+ read_timeout: std::time::Duration,
+ sock_r: tokio::sync::mpsc::Receiver<
+ tokio_tls::Accept<tokio::net::TcpStream>,
+ >,
+ ) -> Self {
+ let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100);
+ Self {
+ server: super::Server::new(buffer_size, read_timeout, tls_sock_r),
+ sock_r,
+ sock_w: tls_sock_w,
+ accepting_sockets: vec![],
+ }
+ }
+}
+
+impl Server {
+ const POLL_FNS: &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ ) -> Result<
+ crate::component_future::Poll<()>,
+ >] = &[
+ &Self::poll_new_connections,
+ &Self::poll_handshake_connections,
+ &Self::poll_server,
+ ];
+
+ fn poll_new_connections(
+ &mut self,
+ ) -> Result<crate::component_future::Poll<()>> {
+ match self.sock_r.poll().context(SocketChannelReceive)? {
+ futures::Async::Ready(Some(sock)) => {
+ self.accepting_sockets.push(sock);
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ futures::Async::Ready(None) => Err(Error::SocketChannelClosed),
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Poll::NotReady)
+ }
+ }
+ }
+
+ fn poll_handshake_connections(
+ &mut self,
+ ) -> Result<crate::component_future::Poll<()>> {
+ let mut did_work = false;
+ let mut not_ready = false;
+
+ let mut i = 0;
+ while i < self.accepting_sockets.len() {
+ let sock = self.accepting_sockets.get_mut(i).unwrap();
+ match sock.poll() {
+ Ok(futures::Async::Ready(sock)) => {
+ self.accepting_sockets.swap_remove(i);
+ self.sock_w.try_send(sock).unwrap_or_else(|e| {
+ log::warn!(
+ "failed to send connected tls socket: {}",
+ e
+ );
+ });
+ did_work = true;
+ continue;
+ }
+ Ok(futures::Async::NotReady) => {
+ not_ready = true;
+ }
+ Err(e) => {
+ log::warn!("failed to accept tls connection: {}", e);
+ self.accepting_sockets.swap_remove(i);
+ continue;
+ }
+ }
+ i += 1;
+ }
+
+ if did_work {
+ Ok(crate::component_future::Poll::DidWork)
+ } else if not_ready {
+ Ok(crate::component_future::Poll::NotReady)
+ } else {
+ Ok(crate::component_future::Poll::NothingToDo)
+ }
+ }
+
+ fn poll_server(&mut self) -> Result<crate::component_future::Poll<()>> {
+ match self.server.poll().context(InnerServer)? {
+ futures::Async::Ready(()) => {
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Poll::NotReady)
+ }
+ }
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl futures::future::Future for Server {
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ crate::component_future::poll_future(self, Self::POLL_FNS)
+ }
+}