From fb8fbbaa4587bb89d65849518d38125406c0c2af Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Tue, 22 Oct 2019 07:40:41 -0400 Subject: remove unnecessary tokio::spawn use --- src/cmd/play.rs | 3 +- src/cmd/record.rs | 3 +- src/cmd/server.rs | 148 +++++++++++++++++++++++++----------------------------- src/cmd/stream.rs | 3 +- src/cmd/watch.rs | 3 +- src/server.rs | 30 ++++------- src/server/tls.rs | 35 +++++++------ 7 files changed, 107 insertions(+), 118 deletions(-) (limited to 'src') diff --git a/src/cmd/play.rs b/src/cmd/play.rs index 2f2f4ba..3dda60c 100644 --- a/src/cmd/play.rs +++ b/src/cmd/play.rs @@ -17,7 +17,8 @@ impl crate::config::Config for Config { fn run( &self, - ) -> Box + Send> { + ) -> Box + Send> + { Box::new(PlaySession::new(&self.ttyrec.filename)) } } diff --git a/src/cmd/record.rs b/src/cmd/record.rs index 44d01f3..f2e0aab 100644 --- a/src/cmd/record.rs +++ b/src/cmd/record.rs @@ -22,7 +22,8 @@ impl crate::config::Config for Config { fn run( &self, - ) -> Box + Send> { + ) -> Box + Send> + { Box::new(RecordSession::new( &self.ttyrec.filename, self.command.buffer_size, diff --git a/src/cmd/server.rs b/src/cmd/server.rs index bd1b4f4..8acd011 100644 --- a/src/cmd/server.rs +++ b/src/cmd/server.rs @@ -27,43 +27,30 @@ impl crate::config::Config for Config { fn run( &self, - ) -> Box + Send> { - let (acceptor, server) = - if let Some(tls_identity_file) = &self.server.tls_identity_file { - match create_server_tls( - self.server.listen_address, - self.server.buffer_size, - self.server.read_timeout, - tls_identity_file, - self.server.allowed_login_methods.clone(), - self.oauth_configs.clone(), - self.server.uid, - self.server.gid, - ) { - Ok(futs) => futs, - Err(e) => return Box::new(futures::future::err(e)), - } - } else { - match create_server( - self.server.listen_address, - self.server.buffer_size, - self.server.read_timeout, - self.server.allowed_login_methods.clone(), - self.oauth_configs.clone(), - self.server.uid, - self.server.gid, - ) { - Ok(futs) => futs, - Err(e) => return Box::new(futures::future::err(e)), - } - }; - Box::new(futures::future::lazy(move || { - tokio::spawn(server.map_err(|e| { - log::error!("{}", e); - })); - - acceptor - })) + ) -> Box + Send> + { + if let Some(tls_identity_file) = &self.server.tls_identity_file { + create_server_tls( + self.server.listen_address, + self.server.buffer_size, + self.server.read_timeout, + tls_identity_file, + self.server.allowed_login_methods.clone(), + self.oauth_configs.clone(), + self.server.uid, + self.server.gid, + ) + } else { + create_server( + self.server.listen_address, + self.server.buffer_size, + self.server.read_timeout, + self.server.allowed_login_methods.clone(), + self.oauth_configs.clone(), + self.server.uid, + self.server.gid, + ) + } } } @@ -97,31 +84,22 @@ fn create_server( >, uid: Option, gid: Option, -) -> Result<( - Box + Send>, - Box + Send>, -)> { - let (mut sock_w, sock_r) = tokio::sync::mpsc::channel(100); - let listener = tokio::net::TcpListener::bind(&address) - .context(crate::error::Bind { address })?; - drop_privs(uid, gid)?; - log::info!("Listening on {}", address); - let acceptor = listener - .incoming() - .context(crate::error::Acceptor) - .for_each(move |sock| { - sock_w - .try_send(sock) - .context(crate::error::SendSocketChannel) - }); +) -> Box + Send> { + let listener = match listen(address, uid, gid) { + Ok(listener) => listener, + Err(e) => return Box::new(futures::future::err(e)), + }; + + let acceptor = listener.incoming().context(crate::error::Acceptor); let server = crate::server::Server::new( + Box::new(acceptor), buffer_size, read_timeout, - sock_r, allowed_login_methods, oauth_configs, ); - Ok((Box::new(acceptor), Box::new(server))) + + Box::new(server) } fn create_server_tls( @@ -138,16 +116,45 @@ fn create_server_tls( >, uid: Option, gid: Option, -) -> Result<( - Box + Send>, - Box + Send>, -)> { - let (mut sock_w, sock_r) = tokio::sync::mpsc::channel(100); +) -> Box + Send> { + let listener = match listen(address, uid, gid) { + Ok(listener) => listener, + Err(e) => return Box::new(futures::future::err(e)), + }; + + let tls_acceptor = match accept_tls(tls_identity_file) { + Ok(acceptor) => acceptor, + Err(e) => return Box::new(futures::future::err(e)), + }; + + let acceptor = listener + .incoming() + .context(crate::error::Acceptor) + .map(move |sock| tls_acceptor.accept(sock)); + let server = crate::server::tls::Server::new( + Box::new(acceptor), + buffer_size, + read_timeout, + allowed_login_methods, + oauth_configs, + ); + + Box::new(server) +} + +fn listen( + address: std::net::SocketAddr, + uid: Option, + gid: Option, +) -> Result { let listener = tokio::net::TcpListener::bind(&address) .context(crate::error::Bind { address })?; drop_privs(uid, gid)?; log::info!("Listening on {}", address); + Ok(listener) +} +fn accept_tls(tls_identity_file: &str) -> Result { let mut file = std::fs::File::open(tls_identity_file).context( crate::error::OpenFileSync { filename: tls_identity_file, @@ -160,25 +167,8 @@ fn create_server_tls( .context(crate::error::ParseIdentity)?; let acceptor = native_tls::TlsAcceptor::new(identity) .context(crate::error::CreateAcceptor)?; - let acceptor = tokio_tls::TlsAcceptor::from(acceptor); - let acceptor = listener - .incoming() - .context(crate::error::Acceptor) - .for_each(move |sock| { - let sock = acceptor.accept(sock); - sock_w - .try_send(sock) - .map_err(|_| Error::SendSocketChannelTls {}) - }); - let server = crate::server::tls::Server::new( - buffer_size, - read_timeout, - sock_r, - allowed_login_methods, - oauth_configs, - ); - Ok((Box::new(acceptor), Box::new(server))) + Ok(tokio_tls::TlsAcceptor::from(acceptor)) } fn drop_privs( diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index 0b60925..9a0f149 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -22,7 +22,8 @@ impl crate::config::Config for Config { fn run( &self, - ) -> Box + Send> { + ) -> Box + Send> + { let auth = match self.client.auth { crate::protocol::AuthType::Plain => { let username = self diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index f73457e..011629d 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -17,7 +17,8 @@ impl crate::config::Config for Config { fn run( &self, - ) -> Box + Send> { + ) -> Box + Send> + { let auth = match self.client.auth { crate::protocol::AuthType::Plain => { let username = self diff --git a/src/server.rs b/src/server.rs index cbc410f..17c14e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -323,10 +323,8 @@ pub struct Server< > { buffer_size: usize, read_timeout: std::time::Duration, - sock_stream: Box< - dyn futures::stream::Stream, Error = Error> - + Send, - >, + acceptor: + Box + Send>, connections: std::collections::HashMap>, rate_limiter: ratelimit_meter::KeyedRateLimiter>, allowed_auth_types: std::collections::HashSet, @@ -340,9 +338,11 @@ impl Server { pub fn new( + acceptor: Box< + dyn futures::stream::Stream + Send, + >, buffer_size: usize, read_timeout: std::time::Duration, - sock_r: tokio::sync::mpsc::Receiver, allowed_auth_types: std::collections::HashSet< crate::protocol::AuthType, >, @@ -351,13 +351,10 @@ impl crate::oauth::Config, >, ) -> Self { - let sock_stream = sock_r - .map(move |s| Connection::new(s, buffer_size)) - .context(crate::error::SocketChannelReceive); Self { buffer_size, read_timeout, - sock_stream: Box::new(sock_stream), + acceptor, connections: std::collections::HashMap::new(), rate_limiter: ratelimit_meter::KeyedRateLimiter::new( std::num::NonZeroU32::new(300).unwrap(), @@ -944,20 +941,15 @@ impl -> crate::component_future::Poll< (), Error, - >] = &[ - &Self::poll_new_connections, - &Self::poll_read, - &Self::poll_write, - ]; + >] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write]; - fn poll_new_connections( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - if let Some(conn) = try_ready!(self.sock_stream.poll()) { + fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> { + if let Some(sock) = try_ready!(self.acceptor.poll()) { + let conn = Connection::new(sock, self.buffer_size); self.connections.insert(conn.id.to_string(), conn); Ok(crate::component_future::Async::DidWork) } else { - Err(Error::SocketChannelClosed) + unreachable!() } } diff --git a/src/server/tls.rs b/src/server/tls.rs index d8eb088..8bb105c 100644 --- a/src/server/tls.rs +++ b/src/server/tls.rs @@ -2,8 +2,12 @@ use crate::prelude::*; pub struct Server { server: super::Server>, - sock_r: - tokio::sync::mpsc::Receiver>, + acceptor: Box< + dyn futures::stream::Stream< + Item = tokio_tls::Accept, + Error = Error, + > + Send, + >, sock_w: tokio::sync::mpsc::Sender< tokio_tls::TlsStream, >, @@ -12,11 +16,14 @@ pub struct Server { impl Server { pub fn new( + acceptor: Box< + dyn futures::stream::Stream< + Item = tokio_tls::Accept, + Error = Error, + > + Send, + >, buffer_size: usize, read_timeout: std::time::Duration, - sock_r: tokio::sync::mpsc::Receiver< - tokio_tls::Accept, - >, allowed_login_methods: std::collections::HashSet< crate::protocol::AuthType, >, @@ -28,13 +35,15 @@ impl Server { let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100); Self { server: super::Server::new( + Box::new( + tls_sock_r.context(crate::error::SocketChannelReceive), + ), buffer_size, read_timeout, - tls_sock_r, allowed_login_methods, oauth_configs, ), - sock_r, + acceptor, sock_w: tls_sock_w, accepting_sockets: vec![], } @@ -50,19 +59,13 @@ impl Server { (), Error, >] = &[ - &Self::poll_new_connections, + &Self::poll_accept, &Self::poll_handshake_connections, &Self::poll_server, ]; - fn poll_new_connections( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - if let Some(sock) = try_ready!(self - .sock_r - .poll() - .context(crate::error::SocketChannelReceive)) - { + fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> { + if let Some(sock) = try_ready!(self.acceptor.poll()) { self.accepting_sockets.push(sock); Ok(crate::component_future::Async::DidWork) } else { -- cgit v1.2.3-54-g00ecf