diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 30 |
1 files changed, 11 insertions, 19 deletions
diff --git a/src/server.rs b/src/server.rs index cbc410f..17c14e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -323,10 +323,8 @@ pub struct Server< > { buffer_size: usize, read_timeout: std::time::Duration, - sock_stream: Box< - dyn futures::stream::Stream<Item = Connection<S>, Error = Error> - + Send, - >, + acceptor: + Box<dyn futures::stream::Stream<Item = S, Error = Error> + Send>, connections: std::collections::HashMap<String, Connection<S>>, rate_limiter: ratelimit_meter::KeyedRateLimiter<Option<String>>, allowed_auth_types: std::collections::HashSet<crate::protocol::AuthType>, @@ -340,9 +338,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> Server<S> { pub fn new( + acceptor: Box< + dyn futures::stream::Stream<Item = S, Error = Error> + Send, + >, buffer_size: usize, read_timeout: std::time::Duration, - sock_r: tokio::sync::mpsc::Receiver<S>, allowed_auth_types: std::collections::HashSet< crate::protocol::AuthType, >, @@ -351,13 +351,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> crate::oauth::Config, >, ) -> Self { - let sock_stream = sock_r - .map(move |s| Connection::new(s, buffer_size)) - .context(crate::error::SocketChannelReceive); Self { buffer_size, read_timeout, - sock_stream: Box::new(sock_stream), + acceptor, connections: std::collections::HashMap::new(), rate_limiter: ratelimit_meter::KeyedRateLimiter::new( std::num::NonZeroU32::new(300).unwrap(), @@ -944,20 +941,15 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> -> crate::component_future::Poll< (), Error, - >] = &[ - &Self::poll_new_connections, - &Self::poll_read, - &Self::poll_write, - ]; + >] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write]; - fn poll_new_connections( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - if let Some(conn) = try_ready!(self.sock_stream.poll()) { + fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> { + if let Some(sock) = try_ready!(self.acceptor.poll()) { + let conn = Connection::new(sock, self.buffer_size); self.connections.insert(conn.id.to_string(), conn); Ok(crate::component_future::Async::DidWork) } else { - Err(Error::SocketChannelClosed) + unreachable!() } } |