aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-22 07:40:41 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-22 08:02:20 -0400
commitfb8fbbaa4587bb89d65849518d38125406c0c2af (patch)
treea46d2fde392d63112ab5adfd7a88e70865922fcc
parent036f51ad6deddaf149685e6fd4945b1ed30543e3 (diff)
downloadteleterm-fb8fbbaa4587bb89d65849518d38125406c0c2af.tar.gz
teleterm-fb8fbbaa4587bb89d65849518d38125406c0c2af.zip
remove unnecessary tokio::spawn use
-rw-r--r--src/cmd/play.rs3
-rw-r--r--src/cmd/record.rs3
-rw-r--r--src/cmd/server.rs148
-rw-r--r--src/cmd/stream.rs3
-rw-r--r--src/cmd/watch.rs3
-rw-r--r--src/server.rs30
-rw-r--r--src/server/tls.rs35
7 files changed, 107 insertions, 118 deletions
diff --git a/src/cmd/play.rs b/src/cmd/play.rs
index 2f2f4ba..3dda60c 100644
--- a/src/cmd/play.rs
+++ b/src/cmd/play.rs
@@ -17,7 +17,8 @@ impl crate::config::Config for Config {
fn run(
&self,
- ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
Box::new(PlaySession::new(&self.ttyrec.filename))
}
}
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index 44d01f3..f2e0aab 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -22,7 +22,8 @@ impl crate::config::Config for Config {
fn run(
&self,
- ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
Box::new(RecordSession::new(
&self.ttyrec.filename,
self.command.buffer_size,
diff --git a/src/cmd/server.rs b/src/cmd/server.rs
index bd1b4f4..8acd011 100644
--- a/src/cmd/server.rs
+++ b/src/cmd/server.rs
@@ -27,43 +27,30 @@ impl crate::config::Config for Config {
fn run(
&self,
- ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
- let (acceptor, server) =
- if let Some(tls_identity_file) = &self.server.tls_identity_file {
- match create_server_tls(
- self.server.listen_address,
- self.server.buffer_size,
- self.server.read_timeout,
- tls_identity_file,
- self.server.allowed_login_methods.clone(),
- self.oauth_configs.clone(),
- self.server.uid,
- self.server.gid,
- ) {
- Ok(futs) => futs,
- Err(e) => return Box::new(futures::future::err(e)),
- }
- } else {
- match create_server(
- self.server.listen_address,
- self.server.buffer_size,
- self.server.read_timeout,
- self.server.allowed_login_methods.clone(),
- self.oauth_configs.clone(),
- self.server.uid,
- self.server.gid,
- ) {
- Ok(futs) => futs,
- Err(e) => return Box::new(futures::future::err(e)),
- }
- };
- Box::new(futures::future::lazy(move || {
- tokio::spawn(server.map_err(|e| {
- log::error!("{}", e);
- }));
-
- acceptor
- }))
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ if let Some(tls_identity_file) = &self.server.tls_identity_file {
+ create_server_tls(
+ self.server.listen_address,
+ self.server.buffer_size,
+ self.server.read_timeout,
+ tls_identity_file,
+ self.server.allowed_login_methods.clone(),
+ self.oauth_configs.clone(),
+ self.server.uid,
+ self.server.gid,
+ )
+ } else {
+ create_server(
+ self.server.listen_address,
+ self.server.buffer_size,
+ self.server.read_timeout,
+ self.server.allowed_login_methods.clone(),
+ self.oauth_configs.clone(),
+ self.server.uid,
+ self.server.gid,
+ )
+ }
}
}
@@ -97,31 +84,22 @@ fn create_server(
>,
uid: Option<users::uid_t>,
gid: Option<users::gid_t>,
-) -> Result<(
- Box<dyn futures::future::Future<Item = (), Error = Error> + Send>,
- Box<dyn futures::future::Future<Item = (), Error = Error> + Send>,
-)> {
- let (mut sock_w, sock_r) = tokio::sync::mpsc::channel(100);
- let listener = tokio::net::TcpListener::bind(&address)
- .context(crate::error::Bind { address })?;
- drop_privs(uid, gid)?;
- log::info!("Listening on {}", address);
- let acceptor = listener
- .incoming()
- .context(crate::error::Acceptor)
- .for_each(move |sock| {
- sock_w
- .try_send(sock)
- .context(crate::error::SendSocketChannel)
- });
+) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ let listener = match listen(address, uid, gid) {
+ Ok(listener) => listener,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+
+ let acceptor = listener.incoming().context(crate::error::Acceptor);
let server = crate::server::Server::new(
+ Box::new(acceptor),
buffer_size,
read_timeout,
- sock_r,
allowed_login_methods,
oauth_configs,
);
- Ok((Box::new(acceptor), Box::new(server)))
+
+ Box::new(server)
}
fn create_server_tls(
@@ -138,16 +116,45 @@ fn create_server_tls(
>,
uid: Option<users::uid_t>,
gid: Option<users::gid_t>,
-) -> Result<(
- Box<dyn futures::future::Future<Item = (), Error = Error> + Send>,
- Box<dyn futures::future::Future<Item = (), Error = Error> + Send>,
-)> {
- let (mut sock_w, sock_r) = tokio::sync::mpsc::channel(100);
+) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ let listener = match listen(address, uid, gid) {
+ Ok(listener) => listener,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+
+ let tls_acceptor = match accept_tls(tls_identity_file) {
+ Ok(acceptor) => acceptor,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+
+ let acceptor = listener
+ .incoming()
+ .context(crate::error::Acceptor)
+ .map(move |sock| tls_acceptor.accept(sock));
+ let server = crate::server::tls::Server::new(
+ Box::new(acceptor),
+ buffer_size,
+ read_timeout,
+ allowed_login_methods,
+ oauth_configs,
+ );
+
+ Box::new(server)
+}
+
+fn listen(
+ address: std::net::SocketAddr,
+ uid: Option<users::uid_t>,
+ gid: Option<users::gid_t>,
+) -> Result<tokio::net::TcpListener> {
let listener = tokio::net::TcpListener::bind(&address)
.context(crate::error::Bind { address })?;
drop_privs(uid, gid)?;
log::info!("Listening on {}", address);
+ Ok(listener)
+}
+fn accept_tls(tls_identity_file: &str) -> Result<tokio_tls::TlsAcceptor> {
let mut file = std::fs::File::open(tls_identity_file).context(
crate::error::OpenFileSync {
filename: tls_identity_file,
@@ -160,25 +167,8 @@ fn create_server_tls(
.context(crate::error::ParseIdentity)?;
let acceptor = native_tls::TlsAcceptor::new(identity)
.context(crate::error::CreateAcceptor)?;
- let acceptor = tokio_tls::TlsAcceptor::from(acceptor);
- let acceptor = listener
- .incoming()
- .context(crate::error::Acceptor)
- .for_each(move |sock| {
- let sock = acceptor.accept(sock);
- sock_w
- .try_send(sock)
- .map_err(|_| Error::SendSocketChannelTls {})
- });
- let server = crate::server::tls::Server::new(
- buffer_size,
- read_timeout,
- sock_r,
- allowed_login_methods,
- oauth_configs,
- );
- Ok((Box::new(acceptor), Box::new(server)))
+ Ok(tokio_tls::TlsAcceptor::from(acceptor))
}
fn drop_privs(
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index 0b60925..9a0f149 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -22,7 +22,8 @@ impl crate::config::Config for Config {
fn run(
&self,
- ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
let auth = match self.client.auth {
crate::protocol::AuthType::Plain => {
let username = self
diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs
index f73457e..011629d 100644
--- a/src/cmd/watch.rs
+++ b/src/cmd/watch.rs
@@ -17,7 +17,8 @@ impl crate::config::Config for Config {
fn run(
&self,
- ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
let auth = match self.client.auth {
crate::protocol::AuthType::Plain => {
let username = self
diff --git a/src/server.rs b/src/server.rs
index cbc410f..17c14e5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -323,10 +323,8 @@ pub struct Server<
> {
buffer_size: usize,
read_timeout: std::time::Duration,
- sock_stream: Box<
- dyn futures::stream::Stream<Item = Connection<S>, Error = Error>
- + Send,
- >,
+ acceptor:
+ Box<dyn futures::stream::Stream<Item = S, Error = Error> + Send>,
connections: std::collections::HashMap<String, Connection<S>>,
rate_limiter: ratelimit_meter::KeyedRateLimiter<Option<String>>,
allowed_auth_types: std::collections::HashSet<crate::protocol::AuthType>,
@@ -340,9 +338,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Server<S>
{
pub fn new(
+ acceptor: Box<
+ dyn futures::stream::Stream<Item = S, Error = Error> + Send,
+ >,
buffer_size: usize,
read_timeout: std::time::Duration,
- sock_r: tokio::sync::mpsc::Receiver<S>,
allowed_auth_types: std::collections::HashSet<
crate::protocol::AuthType,
>,
@@ -351,13 +351,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
crate::oauth::Config,
>,
) -> Self {
- let sock_stream = sock_r
- .map(move |s| Connection::new(s, buffer_size))
- .context(crate::error::SocketChannelReceive);
Self {
buffer_size,
read_timeout,
- sock_stream: Box::new(sock_stream),
+ acceptor,
connections: std::collections::HashMap::new(),
rate_limiter: ratelimit_meter::KeyedRateLimiter::new(
std::num::NonZeroU32::new(300).unwrap(),
@@ -944,20 +941,15 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
-> crate::component_future::Poll<
(),
Error,
- >] = &[
- &Self::poll_new_connections,
- &Self::poll_read,
- &Self::poll_write,
- ];
+ >] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write];
- fn poll_new_connections(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- if let Some(conn) = try_ready!(self.sock_stream.poll()) {
+ fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> {
+ if let Some(sock) = try_ready!(self.acceptor.poll()) {
+ let conn = Connection::new(sock, self.buffer_size);
self.connections.insert(conn.id.to_string(), conn);
Ok(crate::component_future::Async::DidWork)
} else {
- Err(Error::SocketChannelClosed)
+ unreachable!()
}
}
diff --git a/src/server/tls.rs b/src/server/tls.rs
index d8eb088..8bb105c 100644
--- a/src/server/tls.rs
+++ b/src/server/tls.rs
@@ -2,8 +2,12 @@ use crate::prelude::*;
pub struct Server {
server: super::Server<tokio_tls::TlsStream<tokio::net::TcpStream>>,
- sock_r:
- tokio::sync::mpsc::Receiver<tokio_tls::Accept<tokio::net::TcpStream>>,
+ acceptor: Box<
+ dyn futures::stream::Stream<
+ Item = tokio_tls::Accept<tokio::net::TcpStream>,
+ Error = Error,
+ > + Send,
+ >,
sock_w: tokio::sync::mpsc::Sender<
tokio_tls::TlsStream<tokio::net::TcpStream>,
>,
@@ -12,11 +16,14 @@ pub struct Server {
impl Server {
pub fn new(
+ acceptor: Box<
+ dyn futures::stream::Stream<
+ Item = tokio_tls::Accept<tokio::net::TcpStream>,
+ Error = Error,
+ > + Send,
+ >,
buffer_size: usize,
read_timeout: std::time::Duration,
- sock_r: tokio::sync::mpsc::Receiver<
- tokio_tls::Accept<tokio::net::TcpStream>,
- >,
allowed_login_methods: std::collections::HashSet<
crate::protocol::AuthType,
>,
@@ -28,13 +35,15 @@ impl Server {
let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100);
Self {
server: super::Server::new(
+ Box::new(
+ tls_sock_r.context(crate::error::SocketChannelReceive),
+ ),
buffer_size,
read_timeout,
- tls_sock_r,
allowed_login_methods,
oauth_configs,
),
- sock_r,
+ acceptor,
sock_w: tls_sock_w,
accepting_sockets: vec![],
}
@@ -50,19 +59,13 @@ impl Server {
(),
Error,
>] = &[
- &Self::poll_new_connections,
+ &Self::poll_accept,
&Self::poll_handshake_connections,
&Self::poll_server,
];
- fn poll_new_connections(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- if let Some(sock) = try_ready!(self
- .sock_r
- .poll()
- .context(crate::error::SocketChannelReceive))
- {
+ fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> {
+ if let Some(sock) = try_ready!(self.acceptor.poll()) {
self.accepting_sockets.push(sock);
Ok(crate::component_future::Async::DidWork)
} else {