From 089653fbf49b7dfbade9da9264c69908a6224b08 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 13 Nov 2019 15:53:33 -0500 Subject: remove client side buffer size configuration --- src/client.rs | 37 ++++++++++--------------------------- src/cmd/stream.rs | 6 +----- src/cmd/watch.rs | 4 +--- src/config.rs | 18 ------------------ src/protocol.rs | 12 +++++------- src/server.rs | 8 ++++---- 6 files changed, 21 insertions(+), 64 deletions(-) diff --git a/src/client.rs b/src/client.rs index f932904..a6e897f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -45,10 +45,8 @@ enum WriteSocket< NotConnected, Connecting( Box< - dyn futures::future::Future< - Item = S, - Error = crate::error::Error, - > + Send, + dyn futures::future::Future + + Send, >, ), Connected(crate::protocol::FramedWriteHalf), @@ -70,10 +68,8 @@ pub enum Event { pub type Connector = Box< dyn Fn() -> Box< - dyn futures::future::Future< - Item = S, - Error = crate::error::Error, - > + Send, + dyn futures::future::Future + + Send, > + Send, >; @@ -82,7 +78,6 @@ pub struct Client< > { connect: Connector, auth: crate::protocol::Auth, - buffer_size: usize, term_type: String, @@ -106,12 +101,10 @@ impl pub fn stream( connect: Connector, auth: &crate::protocol::Auth, - buffer_size: usize, ) -> Self { Self::new( connect, auth, - buffer_size, &[crate::protocol::Message::start_streaming()], ) } @@ -119,29 +112,22 @@ impl pub fn watch( connect: Connector, auth: &crate::protocol::Auth, - buffer_size: usize, id: &str, ) -> Self { Self::new( connect, auth, - buffer_size, &[crate::protocol::Message::start_watching(id)], ) } - pub fn list( - connect: Connector, - auth: &crate::protocol::Auth, - buffer_size: usize, - ) -> Self { - Self::new(connect, auth, buffer_size, &[]) + pub fn list(connect: Connector, auth: &crate::protocol::Auth) -> Self { + Self::new(connect, auth, &[]) } fn new( connect: Connector, auth: &crate::protocol::Auth, - buffer_size: usize, on_login: &[crate::protocol::Message], ) -> Self { let term_type = @@ -152,7 +138,6 @@ impl Self { connect, auth: auth.clone(), - buffer_size, term_type, @@ -220,12 +205,10 @@ impl log::info!("connected to server"); let (rs, ws) = s.split(); - self.rsock = ReadSocket::Connected( - crate::protocol::FramedReader::new(rs, self.buffer_size), - ); - self.wsock = WriteSocket::Connected( - crate::protocol::FramedWriter::new(ws, self.buffer_size), - ); + self.rsock = + ReadSocket::Connected(crate::protocol::FramedReader::new(rs)); + self.wsock = + WriteSocket::Connected(crate::protocol::FramedWriter::new(ws)); self.to_send.clear(); self.send_message(crate::protocol::Message::login( diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index 9d4605c..84af32d 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -72,7 +72,6 @@ impl crate::config::Config for Config { &self.command.command, &self.command.args, connect, - self.command.buffer_size, &auth, )) } else { @@ -86,7 +85,6 @@ impl crate::config::Config for Config { &self.command.command, &self.command.args, connect, - self.command.buffer_size, &auth, )) } @@ -142,11 +140,9 @@ impl cmd: &str, args: &[String], connect: crate::client::Connector, - buffer_size: usize, auth: &crate::protocol::Auth, ) -> Self { - let client = - crate::client::Client::stream(connect, auth, buffer_size); + let client = crate::client::Client::stream(connect, auth); // TODO: tokio::io::stdin is broken (it's blocking) // see https://github.com/tokio-rs/tokio/issues/589 diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index d36a614..f0d1929 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -208,8 +208,7 @@ impl make_connector: Box crate::client::Connector + Send>, auth: &crate::protocol::Auth, ) -> Self { - let list_client = - crate::client::Client::list(make_connector(), auth, 4_194_304); + let list_client = crate::client::Client::list(make_connector(), auth); Self { make_connector, @@ -326,7 +325,6 @@ impl let client = crate::client::Client::watch( (self.make_connector)(), &self.auth, - 4_194_304, id, ); self.state.watching(client); diff --git a/src/config.rs b/src/config.rs index a33022b..5a2b67b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -577,9 +577,6 @@ where #[derive(serde::Deserialize, Debug)] pub struct Command { - #[serde(default = "default_buffer_size")] - pub buffer_size: usize, - #[serde(default = "default_command")] pub command: String, @@ -589,18 +586,10 @@ pub struct Command { impl Command { pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> { - let buffer_size_help = "Max number of bytes to buffer in order to be able to resend them when reconnecting to the server (defaults to 4194304)"; let command_help = "Command to run"; let args_help = "Arguments for the command"; app.arg( - clap::Arg::with_name(BUFFER_SIZE_OPTION) - .long(BUFFER_SIZE_OPTION) - .takes_value(true) - .value_name("BYTES") - .help(buffer_size_help), - ) - .arg( clap::Arg::with_name(COMMAND_OPTION) .index(1) .help(command_help), @@ -616,12 +605,6 @@ impl Command { &mut self, matches: &clap::ArgMatches<'a>, ) -> Result<()> { - if matches.is_present(BUFFER_SIZE_OPTION) { - let buffer_size = matches.value_of(BUFFER_SIZE_OPTION).unwrap(); - self.buffer_size = buffer_size.parse().context( - crate::error::ParseBufferSize { input: buffer_size }, - )?; - } if matches.is_present(COMMAND_OPTION) { self.command = matches.value_of(COMMAND_OPTION).unwrap().to_string(); @@ -640,7 +623,6 @@ impl Command { impl Default for Command { fn default() -> Self { Self { - buffer_size: default_buffer_size(), command: default_command(), args: default_args(), } diff --git a/src/protocol.rs b/src/protocol.rs index f0b02fb..299812a 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -23,11 +23,10 @@ pub struct FramedReader( ); impl FramedReader { - pub fn new(rs: T, buffer_size: usize) -> Self { + pub fn new(rs: T) -> Self { Self( tokio::codec::length_delimited::Builder::new() .length_field_length(4) - .max_frame_length(buffer_size + 1024 * 1024) .new_read(rs), ) } @@ -41,11 +40,10 @@ pub struct FramedWriter( ); impl FramedWriter { - pub fn new(ws: T, buffer_size: usize) -> Self { + pub fn new(ws: T) -> Self { Self( tokio::codec::length_delimited::Builder::new() .length_field_length(4) - .max_frame_length(buffer_size + 1024 * 1024) .new_write(ws), ) } @@ -775,11 +773,11 @@ mod test { let wres2 = wres.clone(); let buf = std::io::Cursor::new(vec![]); let fut = msg - .write_async(FramedWriter::new(buf, 4_194_304)) + .write_async(FramedWriter::new(buf)) .and_then(|w| { let mut buf = w.0.into_inner(); buf.set_position(0); - Message::read_async(FramedReader::new(buf, 4_194_304)) + Message::read_async(FramedReader::new(buf)) }) .and_then(move |(msg2, _)| { wres.wait().send(Ok(msg2)).unwrap(); @@ -811,7 +809,7 @@ mod test { let (wres, rres) = tokio::sync::mpsc::channel(1); let wres2 = wres.clone(); let buf = std::io::Cursor::new(buf); - let fut = Message::read_async(FramedReader::new(buf, 4_194_304)) + let fut = Message::read_async(FramedReader::new(buf)) .and_then(move |(msg2, _)| { wres.wait().send(Ok(msg2)).unwrap(); futures::future::ok(()) diff --git a/src/server.rs b/src/server.rs index 36195c5..51ca6e4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -243,7 +243,7 @@ struct Connection< impl Connection { - fn new(s: S, buffer_size: usize) -> Self { + fn new(s: S) -> Self { let (rs, ws) = s.split(); let id = format!("{}", uuid::Uuid::new_v4()); log::info!("{}: new connection", id); @@ -251,10 +251,10 @@ impl Self { id, rsock: Some(ReadSocket::Connected( - crate::protocol::FramedReader::new(rs, buffer_size), + crate::protocol::FramedReader::new(rs), )), wsock: Some(WriteSocket::Connected( - crate::protocol::FramedWriter::new(ws, buffer_size), + crate::protocol::FramedWriter::new(ws), )), to_send: std::collections::VecDeque::new(), closed: false, @@ -946,7 +946,7 @@ impl fn poll_accept(&mut self) -> component_future::Poll<(), Error> { if let Some(sock) = component_future::try_ready!(self.acceptor.poll()) { - let conn = Connection::new(sock, self.buffer_size); + let conn = Connection::new(sock); self.connections.insert(conn.id.to_string(), conn); Ok(component_future::Async::DidWork) } else { -- cgit v1.2.3-54-g00ecf