From 6552f2d69eb0f19851dbebd21742b7bc9f37cc42 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 24 Oct 2019 06:33:51 -0400 Subject: move component_future to a separate crate --- Cargo.lock | 10 +++++ Cargo.toml | 1 + src/client.rs | 72 ++++++++++++++++----------------- src/cmd/play.rs | 50 ++++++++++++----------- src/cmd/record.rs | 70 +++++++++++++++----------------- src/cmd/stream.rs | 60 ++++++++++++---------------- src/cmd/watch.rs | 36 ++++++++--------- src/component_future.rs | 104 ------------------------------------------------ src/main.rs | 3 -- src/process.rs | 63 +++++++++++++++-------------- src/resize.rs | 19 +++++---- src/server.rs | 75 +++++++++++++++++----------------- src/server/tls.rs | 25 ++++++------ 13 files changed, 235 insertions(+), 353 deletions(-) delete mode 100644 src/component_future.rs diff --git a/Cargo.lock b/Cargo.lock index 2455306..3bf4cea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,6 +176,14 @@ dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "component-future" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "config" version = "0.9.3" @@ -1452,6 +1460,7 @@ version = "0.1.3" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "component-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", "crossterm 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "directories 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1996,6 +2005,7 @@ dependencies = [ "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum component-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e815962dcecc6faf0f0661924cfa240c5b4bccb73679542f00478555dd1cc2c" "checksum config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f9107d78ed62b3fa5a86e7d18e647abed48cfd8f8fab6c72f4cdb982d196f7e6" "checksum constant_time_eq 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "995a44c877f9212528ccc74b21a232f66ad69001e40ede5bcee2ac9ef2657120" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" diff --git a/Cargo.toml b/Cargo.toml index b12d3cb..4c0c10d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ license = "MIT" [dependencies] bytes = "0.4" clap = { version = "2", features = ["wrap_help"] } +component-future = "0.1" config = { version = "0.9", features = ["toml"], default_features = false } crossterm = { version = "0.11", features = ["terminal", "input", "screen"], default_features = false } directories = "2" diff --git a/src/client.rs b/src/client.rs index fc9c07a..f932904 100644 --- a/src/client.rs +++ b/src/client.rs @@ -241,7 +241,7 @@ impl &mut self, msg: crate::protocol::Message, ) -> Result<( - crate::component_future::Async>, + component_future::Async>, Option< Box< dyn futures::future::Future< @@ -264,7 +264,7 @@ impl } open::that(url).context(crate::error::OpenLink)?; Ok(( - crate::component_future::Async::DidWork, + component_future::Async::DidWork, Some(self.wait_for_oauth_response( state.map(|s| s.to_string()), &id, @@ -279,19 +279,17 @@ impl } self.last_error = None; Ok(( - crate::component_future::Async::Ready(Some( - Event::Connect, - )), + component_future::Async::Ready(Some(Event::Connect)), None, )) } crate::protocol::Message::Heartbeat => { - Ok((crate::component_future::Async::DidWork, None)) + Ok((component_future::Async::DidWork, None)) } _ => Ok(( - crate::component_future::Async::Ready(Some( - Event::ServerMessage(msg), - )), + component_future::Async::Ready(Some(Event::ServerMessage( + msg, + ))), None, )), } @@ -396,7 +394,7 @@ impl &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< Option, Error, >] = &[ @@ -408,11 +406,11 @@ impl fn poll_reconnect_server( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { match &mut self.wsock { WriteSocket::NotConnected => { if let Some(timer) = &mut self.reconnect_timer { - try_ready!(timer + component_future::try_ready!(timer .poll() .context(crate::error::TimerReconnect)); } @@ -425,20 +423,20 @@ impl self.handle_successful_connection(s)?; } Ok(futures::Async::NotReady) => { - return Ok(crate::component_future::Async::NotReady); + return Ok(component_future::Async::NotReady); } Err(e) => { log::warn!("error while connecting, reconnecting: {}", e); self.reconnect(); self.last_error = Some(format!("{}", e)); - return Ok(crate::component_future::Async::Ready(Some( + return Ok(component_future::Async::Ready(Some( Event::Disconnect, ))); } }, WriteSocket::Connected(..) | WriteSocket::Writing(..) => { if self.has_seen_server_recently() { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } else { log::warn!( "haven't seen server in a while, reconnecting", @@ -446,22 +444,22 @@ impl self.reconnect(); self.last_error = Some("haven't seen server in a while".to_string()); - return Ok(crate::component_future::Async::Ready(Some( + return Ok(component_future::Async::Ready(Some( Event::Disconnect, ))); } } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } fn poll_read_server( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { match &mut self.rsock { ReadSocket::NotConnected => { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } ReadSocket::Connected(..) => { if let ReadSocket::Connected(s) = std::mem::replace( @@ -473,7 +471,7 @@ impl } else { unreachable!() } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } ReadSocket::Reading(ref mut fut) => match fut.poll() { Ok(futures::Async::Ready((msg, s))) => { @@ -494,20 +492,20 @@ impl ); self.reconnect(); self.last_error = Some(format!("{}", e)); - Ok(crate::component_future::Async::Ready(Some( + Ok(component_future::Async::Ready(Some( Event::Disconnect, ))) } } } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(e) => { log::warn!("error reading message, reconnecting: {}", e); self.reconnect(); self.last_error = Some(format!("{}", e)); - Ok(crate::component_future::Async::Ready(Some( + Ok(component_future::Async::Ready(Some( Event::Disconnect, ))) } @@ -523,10 +521,10 @@ impl } else { unreachable!() } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(e) => { log::warn!( @@ -535,7 +533,7 @@ impl ); self.reconnect(); self.last_error = Some(format!("{}", e)); - Ok(crate::component_future::Async::Ready(Some( + Ok(component_future::Async::Ready(Some( Event::Disconnect, ))) } @@ -545,14 +543,14 @@ impl fn poll_write_server( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { match &mut self.wsock { WriteSocket::NotConnected | WriteSocket::Connecting(..) => { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } WriteSocket::Connected(..) => { if self.to_send.is_empty() { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } if let WriteSocket::Connected(s) = std::mem::replace( @@ -567,21 +565,21 @@ impl unreachable!() } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } WriteSocket::Writing(ref mut fut) => match fut.poll() { Ok(futures::Async::Ready(s)) => { self.wsock = WriteSocket::Connected(s); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(e) => { log::warn!("error writing message, reconnecting: {}", e); self.reconnect(); self.last_error = Some(format!("{}", e)); - Ok(crate::component_future::Async::Ready(Some( + Ok(component_future::Async::Ready(Some( Event::Disconnect, ))) } @@ -591,13 +589,13 @@ impl fn poll_heartbeat( &mut self, - ) -> crate::component_future::Poll, Error> { - let _ = try_ready!(self + ) -> component_future::Poll, Error> { + let _ = component_future::try_ready!(self .heartbeat_timer .poll() .context(crate::error::TimerHeartbeat)); self.send_message(crate::protocol::Message::heartbeat()); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } @@ -609,6 +607,6 @@ impl type Error = Error; fn poll(&mut self) -> futures::Poll, Self::Error> { - crate::component_future::poll_stream(self, Self::POLL_FNS) + component_future::poll_stream(self, Self::POLL_FNS) } } diff --git a/src/cmd/play.rs b/src/cmd/play.rs index 3dda60c..4e0048e 100644 --- a/src/cmd/play.rs +++ b/src/cmd/play.rs @@ -78,7 +78,7 @@ impl PlaySession { &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[ @@ -87,58 +87,62 @@ impl PlaySession { &Self::poll_write_terminal, ]; - fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> { + fn poll_open_file(&mut self) -> component_future::Poll<(), Error> { match &mut self.file { FileState::Closed { filename } => { self.file = FileState::Opening { filename: filename.to_string(), fut: tokio::fs::File::open(filename.to_string()), }; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } FileState::Opening { filename, fut } => { - let file = try_ready!(fut.poll().with_context(|| { - crate::error::OpenFile { - filename: filename.to_string(), - } - })); + let file = component_future::try_ready!(fut + .poll() + .with_context(|| { + crate::error::OpenFile { + filename: filename.to_string(), + } + })); let file = crate::ttyrec::File::new(file); self.file = FileState::Open { file }; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - _ => Ok(crate::component_future::Async::NothingToDo), + _ => Ok(component_future::Async::NothingToDo), } } - fn poll_read_file(&mut self) -> crate::component_future::Poll<(), Error> { + fn poll_read_file(&mut self) -> component_future::Poll<(), Error> { if let FileState::Open { file } = &mut self.file { - if let Some(frame) = try_ready!(file.poll_read()) { + if let Some(frame) = + component_future::try_ready!(file.poll_read()) + { self.to_write.insert_at(frame.data, frame.time); } else { self.file = FileState::Eof; } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } - fn poll_write_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - if let Some(data) = - try_ready!(self.to_write.poll().context(crate::error::Sleep)) + fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> { + if let Some(data) = component_future::try_ready!(self + .to_write + .poll() + .context(crate::error::Sleep)) { // TODO async let stdout = std::io::stdout(); let mut stdout = stdout.lock(); stdout.write(&data).context(crate::error::WriteTerminal)?; stdout.flush().context(crate::error::FlushTerminal)?; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else if let FileState::Eof = self.file { - Ok(crate::component_future::Async::Ready(())) + Ok(component_future::Async::Ready(())) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } } @@ -149,7 +153,7 @@ impl futures::future::Future for PlaySession { type Error = Error; fn poll(&mut self) -> futures::Poll { - crate::component_future::poll_future(self, Self::POLL_FNS) + component_future::poll_future(self, Self::POLL_FNS) } } diff --git a/src/cmd/record.rs b/src/cmd/record.rs index f2e0aab..6a31c5d 100644 --- a/src/cmd/record.rs +++ b/src/cmd/record.rs @@ -113,7 +113,7 @@ impl RecordSession { &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[ @@ -124,36 +124,36 @@ impl RecordSession { &Self::poll_write_file, ]; - fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> { + fn poll_open_file(&mut self) -> component_future::Poll<(), Error> { match &mut self.file { FileState::Closed { filename } => { self.file = FileState::Opening { filename: filename.to_string(), fut: tokio::fs::File::create(filename.to_string()), }; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } FileState::Opening { filename, fut } => { - let file = try_ready!(fut.poll().with_context(|| { - crate::error::OpenFile { - filename: filename.clone(), - } - })); + let file = component_future::try_ready!(fut + .poll() + .with_context(|| { + crate::error::OpenFile { + filename: filename.clone(), + } + })); let mut file = crate::ttyrec::File::new(file); file.write_frame(self.buffer.contents())?; self.file = FileState::Open { file }; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } FileState::Open { .. } => { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } } - fn poll_read_process( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - match try_ready!(self.process.poll()) { + fn poll_read_process(&mut self) -> component_future::Poll<(), Error> { + match component_future::try_ready!(self.process.poll()) { Some(crate::resize::Event::Process(e)) => { match e { crate::process::Event::CommandStart(..) => { @@ -174,10 +174,10 @@ impl RecordSession { } } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Some(crate::resize::Event::Resize(_)) => { - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } None => { if !self.done { @@ -185,62 +185,54 @@ impl RecordSession { } // don't return final event here - wait until we are done // writing all data to the file (see poll_write_file) - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } } - fn poll_write_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> { if self.sent_local == self.buffer.len() { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - let n = try_ready!(self + let n = component_future::try_ready!(self .stdout .poll_write(&self.buffer.contents()[self.sent_local..]) .context(crate::error::WriteTerminal)); self.sent_local += n; self.needs_flush = true; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_flush_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> { if !self.needs_flush { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - try_ready!(self + component_future::try_ready!(self .stdout .poll_flush() .context(crate::error::FlushTerminal)); self.needs_flush = false; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_write_file( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_write_file(&mut self) -> component_future::Poll<(), Error> { let file = match &mut self.file { FileState::Open { file } => file, _ => { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } }; match file.poll_write()? { - futures::Async::Ready(()) => { - Ok(crate::component_future::Async::DidWork) - } + futures::Async::Ready(()) => Ok(component_future::Async::DidWork), futures::Async::NotReady => { // ship all data to the server before actually ending if self.done && file.is_empty() { - Ok(crate::component_future::Async::Ready(())) + Ok(component_future::Async::Ready(())) } else { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } } } @@ -253,6 +245,6 @@ impl futures::future::Future for RecordSession { type Error = Error; fn poll(&mut self) -> futures::Poll { - crate::component_future::poll_future(self, Self::POLL_FNS) + component_future::poll_future(self, Self::POLL_FNS) } } diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index 9a0f149..823a4b1 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -187,7 +187,7 @@ impl &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[ @@ -200,26 +200,24 @@ impl // this should never return Err, because we don't want server // communication issues to ever interrupt a running process - fn poll_read_client( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_read_client(&mut self) -> component_future::Poll<(), Error> { match self.client.poll() { Ok(futures::Async::Ready(Some(e))) => match e { crate::client::Event::Disconnect => { self.connected = false; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } crate::client::Event::Connect => { self.connected = true; self.sent_remote = 0; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } crate::client::Event::ServerMessage(..) => { // we don't expect to ever see a server message once we // start streaming, so if one comes through, assume // something is messed up and try again self.client.reconnect(); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } }, Ok(futures::Async::Ready(None)) => { @@ -227,19 +225,17 @@ impl unreachable!() } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(..) => { self.client.reconnect(); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } } - fn poll_read_process( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - match try_ready!(self.process.poll()) { + fn poll_read_process(&mut self) -> component_future::Poll<(), Error> { + match component_future::try_ready!(self.process.poll()) { Some(crate::resize::Event::Process(e)) => { match e { crate::process::Event::CommandStart(..) => { @@ -257,12 +253,12 @@ impl self.record_bytes(&output); } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Some(crate::resize::Event::Resize(size)) => { self.client .send_message(crate::protocol::Message::resize(&size)); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } None => { if !self.done { @@ -270,51 +266,45 @@ impl } // don't return final event here - wait until we are done // sending all data to the server (see poll_write_server) - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } } - fn poll_write_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> { if self.sent_local == self.buffer.len() { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - let n = try_ready!(self + let n = component_future::try_ready!(self .stdout .poll_write(&self.buffer.contents()[self.sent_local..]) .context(crate::error::WriteTerminal)); self.sent_local += n; self.needs_flush = true; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_flush_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> { if !self.needs_flush { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - try_ready!(self + component_future::try_ready!(self .stdout .poll_flush() .context(crate::error::FlushTerminal)); self.needs_flush = false; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_write_server( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_write_server(&mut self) -> component_future::Poll<(), Error> { if self.sent_remote == self.buffer.len() || !self.connected { // ship all data to the server before actually ending if self.done { - return Ok(crate::component_future::Async::Ready(())); + return Ok(component_future::Async::Ready(())); } else { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } } @@ -323,7 +313,7 @@ impl .send_message(crate::protocol::Message::terminal_output(buf)); self.sent_remote = self.buffer.len(); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } @@ -335,6 +325,6 @@ impl type Error = Error; fn poll(&mut self) -> futures::Poll { - crate::component_future::poll_future(self, Self::POLL_FNS) + component_future::poll_future(self, Self::POLL_FNS) } } diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index 011629d..67e3507 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -566,7 +566,7 @@ impl &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[ @@ -576,13 +576,13 @@ impl &Self::poll_watch_client, ]; - fn poll_resizer(&mut self) -> crate::component_future::Poll<(), Error> { - let size = try_ready!(self.resizer.poll()).unwrap(); + fn poll_resizer(&mut self) -> component_future::Poll<(), Error> { + let size = component_future::try_ready!(self.resizer.poll()).unwrap(); self.resize(size)?; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> { + fn poll_input(&mut self) -> component_future::Poll<(), Error> { if self.raw_screen.is_none() { self.raw_screen = Some( crossterm::RawScreen::into_raw_mode() @@ -595,7 +595,7 @@ impl } } - let e = try_ready!(self.key_reader.poll()).unwrap(); + let e = component_future::try_ready!(self.key_reader.poll()).unwrap(); let quit = match &mut self.state { State::Temporary => unreachable!(), State::LoggingIn { .. } => self.loading_keypress(&e)?, @@ -603,16 +603,14 @@ impl State::Watching { .. } => self.watch_keypress(&e)?, }; if quit { - Ok(crate::component_future::Async::Ready(())) + Ok(component_future::Async::Ready(())) } else { - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } - fn poll_list_client( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - match try_ready!(self.list_client.poll()).unwrap() { + fn poll_list_client(&mut self) -> component_future::Poll<(), Error> { + match component_future::try_ready!(self.list_client.poll()).unwrap() { crate::client::Event::Disconnect => { self.reconnect(true)?; } @@ -624,19 +622,17 @@ impl self.list_server_message(msg)?; } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_watch_client( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_watch_client(&mut self) -> component_future::Poll<(), Error> { let client = if let State::Watching { client } = &mut self.state { client } else { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); }; - match try_ready!(client.poll()).unwrap() { + match component_future::try_ready!(client.poll()).unwrap() { crate::client::Event::Disconnect => { self.reconnect(true)?; } @@ -645,7 +641,7 @@ impl self.watch_server_message(msg)?; } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } @@ -657,7 +653,7 @@ impl type Error = Error; fn poll(&mut self) -> futures::Poll { - let res = crate::component_future::poll_future(self, Self::POLL_FNS); + let res = component_future::poll_future(self, Self::POLL_FNS); if res.is_err() { self.state = State::Temporary; // drop alternate screen self.raw_screen = None; diff --git a/src/component_future.rs b/src/component_future.rs deleted file mode 100644 index 6d8449d..0000000 --- a/src/component_future.rs +++ /dev/null @@ -1,104 +0,0 @@ -pub type Poll = Result, Error>; - -pub enum Async { - // we have a value for the main loop to return immediately. - Ready(Item), - - // one of our inner futures returned futures::Async::NotReady. if all - // of our other components return either NothingToDo or NotReady, then our - // overall future should return NotReady and wait to be polled again. - NotReady, - - // we did some work (moved our internal state closer to being ready to - // return a value), but we aren't ready to return a value yet. we should - // re-run all of the poll functions to see if the state modification made - // any of them also able to make progress. - DidWork, - - // we didn't poll any inner futures or otherwise change our internal state - // at all, so rerunning is unlikely to make progress. if all components - // return either NothingToDo or NotReady (and at least one returned - // NotReady), then we should just return NotReady and wait to be polled - // again. it is an error (panic) for all component poll methods to return - // NothingToDo. - NothingToDo, -} - -macro_rules! try_ready { - ($e:expr) => { - match $e { - Ok(futures::Async::Ready(t)) => t, - Ok(futures::Async::NotReady) => { - return Ok($crate::component_future::Async::NotReady) - } - Err(e) => return Err(From::from(e)), - } - }; -} - -pub fn poll_future( - future: &mut T, - poll_fns: &'static [&'static dyn for<'a> Fn( - &'a mut T, - ) -> Poll], -) -> futures::Poll -where - T: futures::future::Future, -{ - loop { - let mut not_ready = false; - let mut did_work = false; - - for f in poll_fns { - match f(future)? { - Async::Ready(e) => return Ok(futures::Async::Ready(e)), - Async::NotReady => not_ready = true, - Async::NothingToDo => {} - Async::DidWork => did_work = true, - } - } - - if !did_work { - if not_ready { - return Ok(futures::Async::NotReady); - } else { - unreachable!() - } - } - } -} - -pub fn poll_stream( - stream: &mut T, - poll_fns: &'static [&'static dyn for<'a> Fn( - &'a mut T, - ) -> Poll< - Option, - Error, - >], -) -> futures::Poll, Error> -where - T: futures::stream::Stream, -{ - loop { - let mut not_ready = false; - let mut did_work = false; - - for f in poll_fns { - match f(stream)? { - Async::Ready(e) => return Ok(futures::Async::Ready(e)), - Async::NotReady => not_ready = true, - Async::NothingToDo => {} - Async::DidWork => did_work = true, - } - } - - if !did_work { - if not_ready { - return Ok(futures::Async::NotReady); - } else { - unreachable!() - } - } - } -} diff --git a/src/main.rs b/src/main.rs index b9a7223..03cfc35 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,9 +15,6 @@ const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml"); mod prelude; -#[macro_use] -mod component_future; - mod async_stdin; mod client; mod cmd; diff --git a/src/process.rs b/src/process.rs index ebd55be..bbd1820 100644 --- a/src/process.rs +++ b/src/process.rs @@ -79,7 +79,7 @@ impl Process { &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< Option, Error, >] = &[ @@ -97,22 +97,22 @@ impl Process { fn poll_resize( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { if let Some(size) = &self.needs_resize { - try_ready!(size.resize_pty(self.state.pty())); + component_future::try_ready!(size.resize_pty(self.state.pty())); log::debug!("resize({:?})", size); self.needs_resize = None; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } fn poll_command_start( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { if self.started { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } if self.state.pty.is_none() { @@ -142,19 +142,20 @@ impl Process { } self.started = true; - Ok(crate::component_future::Async::Ready(Some( - Event::CommandStart(self.cmd.clone(), self.args.clone()), - ))) + Ok(component_future::Async::Ready(Some(Event::CommandStart( + self.cmd.clone(), + self.args.clone(), + )))) } fn poll_read_stdin( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { if self.exited || self.stdin_closed { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - let n = try_ready!(self + let n = component_future::try_ready!(self .input .poll_read(&mut self.buf) .context(crate::error::ReadTerminal)); @@ -165,19 +166,19 @@ impl Process { self.input_buf.push_back(b'\x04'); self.stdin_closed = true; } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } fn poll_write_stdin( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { if self.exited || self.input_buf.is_empty() { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } let (a, b) = self.input_buf.as_slices(); let buf = if a.is_empty() { b } else { a }; - let n = try_ready!(self + let n = component_future::try_ready!(self .state .pty_mut() .poll_write(buf) @@ -186,12 +187,12 @@ impl Process { for _ in 0..n { self.input_buf.pop_front(); } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } fn poll_read_stdout( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { match self .state .pty_mut() @@ -201,12 +202,10 @@ impl Process { Ok(futures::Async::Ready(n)) => { log::debug!("read_stdout({})", n); let bytes = self.buf[..n].to_vec(); - Ok(crate::component_future::Async::Ready(Some( - Event::Output(bytes), - ))) + Ok(component_future::Async::Ready(Some(Event::Output(bytes)))) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(e) => { // XXX this seems to be how eof is returned, but this seems... @@ -215,7 +214,7 @@ impl Process { if source.kind() == std::io::ErrorKind::Other { log::debug!("read_stdout(eof)"); self.stdout_closed = true; - return Ok(crate::component_future::Async::DidWork); + return Ok(component_future::Async::DidWork); } } Err(e) @@ -225,24 +224,24 @@ impl Process { fn poll_command_exit( &mut self, - ) -> crate::component_future::Poll, Error> { + ) -> component_future::Poll, Error> { if self.exited { - return Ok(crate::component_future::Async::Ready(None)); + return Ok(component_future::Async::Ready(None)); } if !self.stdout_closed { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - let status = try_ready!(self + let status = component_future::try_ready!(self .state .process() .poll() .context(crate::error::ProcessExitPoll)); log::debug!("exit({})", status); self.exited = true; - Ok(crate::component_future::Async::Ready(Some( - Event::CommandExit(status), - ))) + Ok(component_future::Async::Ready(Some(Event::CommandExit( + status, + )))) } } @@ -254,7 +253,7 @@ impl futures::stream::Stream type Error = Error; fn poll(&mut self) -> futures::Poll, Self::Error> { - crate::component_future::poll_stream(self, Self::POLL_FNS) + component_future::poll_stream(self, Self::POLL_FNS) } } diff --git a/src/resize.rs b/src/resize.rs index 9e357af..2dcf928 100644 --- a/src/resize.rs +++ b/src/resize.rs @@ -62,26 +62,25 @@ impl ResizingProcess { &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< Option>, Error, >] = &[&Self::poll_resize, &Self::poll_process]; fn poll_resize( &mut self, - ) -> crate::component_future::Poll>, Error> { - let size = try_ready!(self.resizer.poll()).unwrap(); + ) -> component_future::Poll>, Error> { + let size = component_future::try_ready!(self.resizer.poll()).unwrap(); self.process.resize(size.clone()); - Ok(crate::component_future::Async::Ready(Some(Event::Resize( - size, - )))) + Ok(component_future::Async::Ready(Some(Event::Resize(size)))) } fn poll_process( &mut self, - ) -> crate::component_future::Poll>, Error> { - Ok(crate::component_future::Async::Ready( - try_ready!(self.process.poll()).map(Event::Process), + ) -> component_future::Poll>, Error> { + Ok(component_future::Async::Ready( + component_future::try_ready!(self.process.poll()) + .map(Event::Process), )) } } @@ -95,6 +94,6 @@ impl futures::stream::Stream as futures::stream::Stream>::Error; fn poll(&mut self) -> futures::Poll, Self::Error> { - crate::component_future::poll_stream(self, Self::POLL_FNS) + component_future::poll_stream(self, Self::POLL_FNS) } } diff --git a/src/server.rs b/src/server.rs index 17c14e5..36195c5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -810,7 +810,7 @@ impl fn poll_read_connection( &mut self, conn: &mut Connection, - ) -> crate::component_future::Poll<(), Error> { + ) -> component_future::Poll<(), Error> { match &mut conn.rsock { Some(ReadSocket::Connected(..)) => { if let Some(ReadSocket::Connected(s)) = conn.rsock.take() { @@ -823,7 +823,7 @@ impl } else { unreachable!() } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Some(ReadSocket::Reading(fut)) => match fut.poll() { Ok(futures::Async::Ready((msg, s))) => { @@ -840,15 +840,15 @@ impl conn.rsock = Some(ReadSocket::Connected(s)); } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(e) => classify_connection_error(e), }, Some(ReadSocket::Processing(_, fut)) => { - let (state, msg) = try_ready!(fut.poll()); + let (state, msg) = component_future::try_ready!(fut.poll()); if let Some(ReadSocket::Processing(s, _)) = conn.rsock.take() { conn.state = state; @@ -857,16 +857,16 @@ impl } else { unreachable!() } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - _ => Ok(crate::component_future::Async::NothingToDo), + _ => Ok(component_future::Async::NothingToDo), } } fn poll_write_connection( &mut self, conn: &mut Connection, - ) -> crate::component_future::Poll<(), Error> { + ) -> component_future::Poll<(), Error> { match &mut conn.wsock { Some(WriteSocket::Connected(..)) => { if let Some(msg) = conn.to_send.pop_front() { @@ -886,24 +886,24 @@ impl } else { unreachable!() } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else if conn.closed { - Ok(crate::component_future::Async::Ready(())) + Ok(component_future::Async::Ready(())) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } Some(WriteSocket::Writing(fut)) => match fut.poll() { Ok(futures::Async::Ready(s)) => { conn.wsock = Some(WriteSocket::Connected(s)); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(e) => classify_connection_error(e), }, - _ => Ok(crate::component_future::Async::NothingToDo), + _ => Ok(component_future::Async::NothingToDo), } } @@ -938,22 +938,23 @@ impl &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write]; - fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> { - if let Some(sock) = try_ready!(self.acceptor.poll()) { + fn poll_accept(&mut self) -> component_future::Poll<(), Error> { + if let Some(sock) = component_future::try_ready!(self.acceptor.poll()) + { let conn = Connection::new(sock, self.buffer_size); self.connections.insert(conn.id.to_string(), conn); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else { unreachable!() } } - fn poll_read(&mut self) -> crate::component_future::Poll<(), Error> { + fn poll_read(&mut self) -> component_future::Poll<(), Error> { let mut did_work = false; let mut not_ready = false; @@ -961,14 +962,14 @@ impl for key in keys { let mut conn = self.connections.remove(&key).unwrap(); match self.poll_read_connection(&mut conn) { - Ok(crate::component_future::Async::Ready(())) => { + Ok(component_future::Async::Ready(())) => { self.handle_disconnect(&mut conn); continue; } - Ok(crate::component_future::Async::DidWork) => { + Ok(component_future::Async::DidWork) => { did_work = true; } - Ok(crate::component_future::Async::NotReady) => { + Ok(component_future::Async::NotReady) => { not_ready = true; } Err(e) => { @@ -984,15 +985,15 @@ impl } if did_work { - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else if not_ready { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } - fn poll_write(&mut self) -> crate::component_future::Poll<(), Error> { + fn poll_write(&mut self) -> component_future::Poll<(), Error> { let mut did_work = false; let mut not_ready = false; @@ -1000,14 +1001,14 @@ impl for key in keys { let mut conn = self.connections.remove(&key).unwrap(); match self.poll_write_connection(&mut conn) { - Ok(crate::component_future::Async::Ready(())) => { + Ok(component_future::Async::Ready(())) => { self.handle_disconnect(&mut conn); continue; } - Ok(crate::component_future::Async::DidWork) => { + Ok(component_future::Async::DidWork) => { did_work = true; } - Ok(crate::component_future::Async::NotReady) => { + Ok(component_future::Async::NotReady) => { not_ready = true; } Err(e) => { @@ -1023,18 +1024,16 @@ impl } if did_work { - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else if not_ready { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } } -fn classify_connection_error( - e: Error, -) -> crate::component_future::Poll<(), Error> { +fn classify_connection_error(e: Error) -> component_future::Poll<(), Error> { let source = match e { Error::ReadMessageWithTimeout { source } => source, Error::WriteMessageWithTimeout { source } => source, @@ -1051,7 +1050,7 @@ fn classify_connection_error( source: ref tokio_err, } => tokio_err, Error::EOF => { - return Ok(crate::component_future::Async::Ready(())); + return Ok(component_future::Async::Ready(())); } _ => { return Err(source); @@ -1059,7 +1058,7 @@ fn classify_connection_error( }; if tokio_err.kind() == tokio::io::ErrorKind::UnexpectedEof { - Ok(crate::component_future::Async::Ready(())) + Ok(component_future::Async::Ready(())) } else { Err(source) } @@ -1079,6 +1078,6 @@ impl type Error = Error; fn poll(&mut self) -> futures::Poll { - crate::component_future::poll_future(self, Self::POLL_FNS) + component_future::poll_future(self, Self::POLL_FNS) } } diff --git a/src/server/tls.rs b/src/server/tls.rs index 8bb105c..9db4603 100644 --- a/src/server/tls.rs +++ b/src/server/tls.rs @@ -55,7 +55,7 @@ impl Server { &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[ @@ -64,10 +64,11 @@ impl Server { &Self::poll_server, ]; - fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> { - if let Some(sock) = try_ready!(self.acceptor.poll()) { + fn poll_accept(&mut self) -> component_future::Poll<(), Error> { + if let Some(sock) = component_future::try_ready!(self.acceptor.poll()) + { self.accepting_sockets.push(sock); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else { Err(Error::SocketChannelClosed) } @@ -75,7 +76,7 @@ impl Server { fn poll_handshake_connections( &mut self, - ) -> crate::component_future::Poll<(), Error> { + ) -> component_future::Poll<(), Error> { let mut did_work = false; let mut not_ready = false; @@ -107,17 +108,17 @@ impl Server { } if did_work { - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } else if not_ready { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } else { - Ok(crate::component_future::Async::NothingToDo) + Ok(component_future::Async::NothingToDo) } } - fn poll_server(&mut self) -> crate::component_future::Poll<(), Error> { - try_ready!(self.server.poll()); - Ok(crate::component_future::Async::Ready(())) + fn poll_server(&mut self) -> component_future::Poll<(), Error> { + component_future::try_ready!(self.server.poll()); + Ok(component_future::Async::Ready(())) } } @@ -127,6 +128,6 @@ impl futures::future::Future for Server { type Error = Error; fn poll(&mut self) -> futures::Poll { - crate::component_future::poll_future(self, Self::POLL_FNS) + component_future::poll_future(self, Self::POLL_FNS) } } -- cgit v1.2.3-54-g00ecf