aboutsummaryrefslogtreecommitdiffstats
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs30
1 files changed, 11 insertions, 19 deletions
diff --git a/src/server.rs b/src/server.rs
index cbc410f..17c14e5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -323,10 +323,8 @@ pub struct Server<
> {
buffer_size: usize,
read_timeout: std::time::Duration,
- sock_stream: Box<
- dyn futures::stream::Stream<Item = Connection<S>, Error = Error>
- + Send,
- >,
+ acceptor:
+ Box<dyn futures::stream::Stream<Item = S, Error = Error> + Send>,
connections: std::collections::HashMap<String, Connection<S>>,
rate_limiter: ratelimit_meter::KeyedRateLimiter<Option<String>>,
allowed_auth_types: std::collections::HashSet<crate::protocol::AuthType>,
@@ -340,9 +338,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Server<S>
{
pub fn new(
+ acceptor: Box<
+ dyn futures::stream::Stream<Item = S, Error = Error> + Send,
+ >,
buffer_size: usize,
read_timeout: std::time::Duration,
- sock_r: tokio::sync::mpsc::Receiver<S>,
allowed_auth_types: std::collections::HashSet<
crate::protocol::AuthType,
>,
@@ -351,13 +351,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
crate::oauth::Config,
>,
) -> Self {
- let sock_stream = sock_r
- .map(move |s| Connection::new(s, buffer_size))
- .context(crate::error::SocketChannelReceive);
Self {
buffer_size,
read_timeout,
- sock_stream: Box::new(sock_stream),
+ acceptor,
connections: std::collections::HashMap::new(),
rate_limiter: ratelimit_meter::KeyedRateLimiter::new(
std::num::NonZeroU32::new(300).unwrap(),
@@ -944,20 +941,15 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
-> crate::component_future::Poll<
(),
Error,
- >] = &[
- &Self::poll_new_connections,
- &Self::poll_read,
- &Self::poll_write,
- ];
+ >] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write];
- fn poll_new_connections(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- if let Some(conn) = try_ready!(self.sock_stream.poll()) {
+ fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> {
+ if let Some(sock) = try_ready!(self.acceptor.poll()) {
+ let conn = Connection::new(sock, self.buffer_size);
self.connections.insert(conn.id.to_string(), conn);
Ok(crate::component_future::Async::DidWork)
} else {
- Err(Error::SocketChannelClosed)
+ unreachable!()
}
}