From 61c8fb0a8d6460aff45ed454cfb2b0a5e37a1a89 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 17 Oct 2019 11:22:12 -0400 Subject: improve the interface of component_future a bit make it feel more like the standard futures::Poll/futures::Async --- src/client.rs | 101 +++++++++++++++++++++++++----------------------- src/cmd/play.rs | 47 +++++++++++----------- src/cmd/record.rs | 59 ++++++++++++++-------------- src/cmd/stream.rs | 61 +++++++++++++++-------------- src/cmd/watch.rs | 35 +++++++++-------- src/component_future.rs | 61 +++++++++++++++-------------- src/process.rs | 68 +++++++++++++++++--------------- src/server.rs | 81 +++++++++++++++++++------------------- src/server/tls.rs | 33 +++++++++------- 9 files changed, 286 insertions(+), 260 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0fa0254..be8061c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -270,7 +270,7 @@ impl &mut self, msg: crate::protocol::Message, ) -> Result<( - crate::component_future::Poll, + crate::component_future::Async>, Option< Box< dyn futures::future::Future< @@ -293,7 +293,7 @@ impl } open::that(url).context(crate::error::OpenLink)?; Ok(( - crate::component_future::Poll::DidWork, + crate::component_future::Async::DidWork, Some(self.wait_for_oauth_response( state.map(|s| s.to_string()), &id, @@ -307,16 +307,18 @@ impl self.to_send.push_back(msg.clone()); } Ok(( - crate::component_future::Poll::Event(Event::Connect()), + crate::component_future::Async::Ready(Some( + Event::Connect(), + )), None, )) } crate::protocol::Message::Heartbeat => { - Ok((crate::component_future::Poll::DidWork, None)) + Ok((crate::component_future::Async::DidWork, None)) } _ => Ok(( - crate::component_future::Poll::Event(Event::ServerMessage( - msg, + crate::component_future::Async::Ready(Some( + Event::ServerMessage(msg), )), None, )), @@ -418,11 +420,14 @@ impl Client { // XXX rustfmt does a terrible job here - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + Option, + Error, + >] = &[ &Self::poll_reconnect_server, &Self::poll_read_server, &Self::poll_write_server, @@ -432,11 +437,11 @@ impl fn poll_reconnect_server( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { match &mut self.wsock { WriteSocket::NotConnected => { if self.should_wait_to_reconnect()? { - return Ok(crate::component_future::Poll::NotReady); + return Ok(crate::component_future::Async::NotReady); } self.set_reconnect_timer(); @@ -447,7 +452,7 @@ impl self.handle_successful_connection(s)?; } Ok(futures::Async::NotReady) => { - return Ok(crate::component_future::Poll::NotReady); + return Ok(crate::component_future::Async::NotReady); } Err(e) => { log::warn!("error while connecting, reconnecting: {}", e); @@ -458,28 +463,28 @@ impl }, WriteSocket::Connected(..) | WriteSocket::Writing(..) => { if self.has_seen_server_recently() { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } else { log::warn!( "haven't seen server in a while, reconnecting", ); self.reconnect(); - return Ok(crate::component_future::Poll::Event( + return Ok(crate::component_future::Async::Ready(Some( Event::Disconnect, - )); + ))); } } } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } fn poll_read_server( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { match &mut self.rsock { ReadSocket::NotConnected => { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } ReadSocket::Connected(..) => { if let ReadSocket::Connected(s) = std::mem::replace( @@ -491,7 +496,7 @@ impl } else { unreachable!() } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } ReadSocket::Reading(ref mut fut) => match fut.poll() { Ok(futures::Async::Ready((msg, s))) => { @@ -511,21 +516,21 @@ impl e ); self.reconnect(); - Ok(crate::component_future::Poll::Event( + Ok(crate::component_future::Async::Ready(Some( Event::Disconnect, - )) + ))) } } } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(e) => { log::warn!("error reading message, reconnecting: {}", e); self.reconnect(); - Ok(crate::component_future::Poll::Event( + Ok(crate::component_future::Async::Ready(Some( Event::Disconnect, - )) + ))) } }, ReadSocket::Processing(_, fut) => match fut.poll() { @@ -539,10 +544,10 @@ impl } else { unreachable!() } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(e) => { log::warn!( @@ -550,9 +555,9 @@ impl e ); self.reconnect(); - Ok(crate::component_future::Poll::Event( + Ok(crate::component_future::Async::Ready(Some( Event::Disconnect, - )) + ))) } }, } @@ -560,14 +565,14 @@ impl fn poll_write_server( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { match &mut self.wsock { WriteSocket::NotConnected | WriteSocket::Connecting(..) => { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } WriteSocket::Connected(..) => { if self.to_send.is_empty() { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } if let WriteSocket::Connected(s) = std::mem::replace( @@ -582,22 +587,22 @@ impl unreachable!() } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } WriteSocket::Writing(ref mut fut) => match fut.poll() { Ok(futures::Async::Ready(s)) => { self.wsock = WriteSocket::Connected(s); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(e) => { log::warn!("error writing message, reconnecting: {}", e); self.reconnect(); - Ok(crate::component_future::Poll::Event( + Ok(crate::component_future::Async::Ready(Some( Event::Disconnect, - )) + ))) } }, } @@ -605,7 +610,7 @@ impl fn poll_heartbeat( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { match self .heartbeat_timer .poll() @@ -613,23 +618,23 @@ impl { futures::Async::Ready(..) => { self.send_message(crate::protocol::Message::heartbeat()); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_sigwinch( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { if let Some(winches) = &mut self.winches { if !self.started { self.started = true; - return Ok(crate::component_future::Poll::Event( + return Ok(crate::component_future::Async::Ready(Some( Event::Start(crate::term::Size::get()?), - )); + ))); } match winches.poll()? { @@ -638,17 +643,17 @@ impl self.send_message(crate::protocol::Message::resize( &size, )); - Ok(crate::component_future::Poll::Event(Event::Resize( - size, + Ok(crate::component_future::Async::Ready(Some( + Event::Resize(size), ))) } futures::Async::Ready(None) => unreachable!(), futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } } diff --git a/src/cmd/play.rs b/src/cmd/play.rs index 08d911f..e6afe2f 100644 --- a/src/cmd/play.rs +++ b/src/cmd/play.rs @@ -80,67 +80,66 @@ impl PlaySession { } impl PlaySession { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + (), + Error, + >] = &[ &Self::poll_open_file, &Self::poll_read_file, &Self::poll_write_terminal, ]; - fn poll_open_file( - &mut self, - ) -> Result> { + fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> { match &mut self.file { FileState::Closed { filename } => { self.file = FileState::Opening { fut: tokio::fs::File::open(filename.to_string()), }; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } FileState::Opening { fut } => { match fut.poll().context(crate::error::OpenFile)? { futures::Async::Ready(file) => { let file = crate::ttyrec::File::new(file); self.file = FileState::Open { file }; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } - _ => Ok(crate::component_future::Poll::NothingToDo), + _ => Ok(crate::component_future::Async::NothingToDo), } } - fn poll_read_file( - &mut self, - ) -> Result> { + fn poll_read_file(&mut self) -> crate::component_future::Poll<(), Error> { if let FileState::Open { file } = &mut self.file { match file.poll_read()? { futures::Async::Ready(Some(frame)) => { self.to_write.insert_at(frame.data, frame.time); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => { self.file = FileState::Eof; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } fn poll_write_terminal( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self.to_write.poll().context(crate::error::Sleep)? { futures::Async::Ready(Some(data)) => { // TODO async @@ -148,17 +147,17 @@ impl PlaySession { let mut stdout = stdout.lock(); stdout.write(&data).context(crate::error::WriteTerminal)?; stdout.flush().context(crate::error::FlushTerminal)?; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => { if let FileState::Eof = self.file { - Ok(crate::component_future::Poll::Event(())) + Ok(crate::component_future::Async::Ready(())) } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } diff --git a/src/cmd/record.rs b/src/cmd/record.rs index 83651f5..7fbfb4b 100644 --- a/src/cmd/record.rs +++ b/src/cmd/record.rs @@ -142,11 +142,14 @@ impl RecordSession { } impl RecordSession { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + (), + Error, + >] = &[ &Self::poll_open_file, &Self::poll_read_process, &Self::poll_write_terminal, @@ -154,15 +157,13 @@ impl RecordSession { &Self::poll_write_file, ]; - fn poll_open_file( - &mut self, - ) -> Result> { + fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> { match &mut self.file { FileState::Closed { filename } => { self.file = FileState::Opening { fut: tokio::fs::File::create(filename.to_string()), }; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } FileState::Opening { fut } => { match fut.poll().context(crate::error::OpenFile)? { @@ -170,22 +171,22 @@ impl RecordSession { let mut file = crate::ttyrec::File::new(file); file.write_frame(self.buffer.contents())?; self.file = FileState::Open { file }; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } FileState::Open { .. } => { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } } fn poll_read_process( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self.process.poll()? { futures::Async::Ready(Some(e)) => { match e { @@ -207,7 +208,7 @@ impl RecordSession { } } } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => { if !self.done { @@ -215,19 +216,19 @@ impl RecordSession { } // don't return final event here - wait until we are done // writing all data to the file (see poll_write_file) - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_write_terminal( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { if self.sent_local == self.buffer.len() { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } match self @@ -238,19 +239,19 @@ impl RecordSession { futures::Async::Ready(n) => { self.sent_local += n; self.needs_flush = true; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_flush_terminal( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { if !self.needs_flush { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } match self @@ -260,34 +261,34 @@ impl RecordSession { { futures::Async::Ready(()) => { self.needs_flush = false; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_write_file( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { let file = match &mut self.file { FileState::Open { file } => file, _ => { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } }; match file.poll_write()? { futures::Async::Ready(()) => { - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { // ship all data to the server before actually ending if self.done && file.is_empty() { - Ok(crate::component_future::Poll::Event(())) + Ok(crate::component_future::Async::Ready(())) } else { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index 805ed8e..eba26bb 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -242,11 +242,14 @@ impl impl StreamSession { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + (), + Error, + >] = &[ &Self::poll_read_client, &Self::poll_read_process, &Self::poll_write_terminal, @@ -258,32 +261,32 @@ impl // communication issues to ever interrupt a running process fn poll_read_client( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self.client.poll() { Ok(futures::Async::Ready(Some(e))) => match e { crate::client::Event::Disconnect => { self.connected = false; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } crate::client::Event::Start(size) => { self.process.resize(size); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } crate::client::Event::Connect() => { self.connected = true; self.sent_remote = 0; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } crate::client::Event::ServerMessage(..) => { // we don't expect to ever see a server message once we // start streaming, so if one comes through, assume // something is messed up and try again self.client.reconnect(); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } crate::client::Event::Resize(size) => { self.process.resize(size); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } }, Ok(futures::Async::Ready(None)) => { @@ -291,18 +294,18 @@ impl unreachable!() } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(..) => { self.client.reconnect(); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } } } fn poll_read_process( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self.process.poll()? { futures::Async::Ready(Some(e)) => { match e { @@ -322,7 +325,7 @@ impl self.record_bytes(&output); } } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => { if !self.done { @@ -330,19 +333,19 @@ impl } // don't return final event here - wait until we are done // sending all data to the server (see poll_write_server) - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_write_terminal( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { if self.sent_local == self.buffer.len() { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } match self @@ -353,19 +356,19 @@ impl futures::Async::Ready(n) => { self.sent_local += n; self.needs_flush = true; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_flush_terminal( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { if !self.needs_flush { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } match self @@ -375,23 +378,23 @@ impl { futures::Async::Ready(()) => { self.needs_flush = false; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_write_server( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { if self.sent_remote == self.buffer.len() || !self.connected { // ship all data to the server before actually ending if self.done { - return Ok(crate::component_future::Poll::Event(())); + return Ok(crate::component_future::Async::Ready(())); } else { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } } @@ -400,7 +403,7 @@ impl .send_message(crate::protocol::Message::terminal_output(buf)); self.sent_remote = self.buffer.len(); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } } diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index 75eba03..f4d4948 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -580,17 +580,20 @@ impl impl WatchSession { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + (), + Error, + >] = &[ &Self::poll_input, &Self::poll_list_client, &Self::poll_watch_client, ]; - fn poll_input(&mut self) -> Result> { + fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> { if self.raw_screen.is_none() { self.raw_screen = Some( crossterm::RawScreen::into_raw_mode() @@ -612,21 +615,21 @@ impl State::Watching { .. } => self.watch_keypress(&e)?, }; if quit { - Ok(crate::component_future::Poll::Event(())) + Ok(crate::component_future::Async::Ready(())) } else { - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } } futures::Async::Ready(None) => unreachable!(), futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_list_client( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self.list_client.poll()? { futures::Async::Ready(Some(e)) => { match e { @@ -648,25 +651,25 @@ impl self.resize(size)?; } } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => { // the client should never exit on its own unreachable!() } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_watch_client( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { let client = if let State::Watching { client } = &mut self.state { client } else { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); }; match client.poll()? { @@ -688,14 +691,14 @@ impl unreachable!(); } } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => { // the client should never exit on its own unreachable!() } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } diff --git a/src/component_future.rs b/src/component_future.rs index 2bc9ddc..4613b71 100644 --- a/src/component_future.rs +++ b/src/component_future.rs @@ -1,27 +1,34 @@ -pub enum Poll { - // something happened that we want to report - Event(T), - // underlying future/stream returned NotReady, so it's safe for us to also - // return NotReady +pub type Poll = Result, Error>; + +pub enum Async { + // we have a value for the main loop to return immediately. + Ready(Item), + + // one of our inner futures returned futures::Async::NotReady. if all + // of our other components return either NothingToDo or NotReady, then our + // overall future should return NotReady and wait to be polled again. NotReady, - // didn't do any work, so we want to return NotReady assuming at least one - // other poll method returned NotReady (if every poll method returns - // NothingToDo, something is broken) - NothingToDo, - // did some work, so we want to loop + + // we did some work (moved our internal state closer to being ready to + // return a value), but we aren't ready to return a value yet. we should + // re-run all of the poll functions to see if the state modification made + // any of them also able to make progress. DidWork, - // the stream has ended - Done, + + // we didn't poll any inner futures or otherwise change our internal state + // at all, so rerunning is unlikely to make progress. if all components + // return either NothingToDo or NotReady (and at least one returned + // NotReady), then we should just return NotReady and wait to be polled + // again. it is an error (panic) for all component poll methods to return + // NothingToDo. + NothingToDo, } pub fn poll_future( future: &mut T, poll_fns: &'static [&'static dyn for<'a> Fn( &'a mut T, - ) -> Result< - Poll, - Error, - >], + ) -> Poll], ) -> futures::Poll where T: futures::future::Future, @@ -32,11 +39,10 @@ where for f in poll_fns { match f(future)? { - Poll::Event(e) => return Ok(futures::Async::Ready(e)), - Poll::NotReady => not_ready = true, - Poll::NothingToDo => {} - Poll::DidWork => did_work = true, - Poll::Done => unreachable!(), + Async::Ready(e) => return Ok(futures::Async::Ready(e)), + Async::NotReady => not_ready = true, + Async::NothingToDo => {} + Async::DidWork => did_work = true, } } @@ -54,8 +60,8 @@ pub fn poll_stream( stream: &mut T, poll_fns: &'static [&'static dyn for<'a> Fn( &'a mut T, - ) -> Result< - Poll, + ) -> Poll< + Option, Error, >], ) -> futures::Poll, Error> @@ -68,11 +74,10 @@ where for f in poll_fns { match f(stream)? { - Poll::Event(e) => return Ok(futures::Async::Ready(Some(e))), - Poll::NotReady => not_ready = true, - Poll::NothingToDo => {} - Poll::DidWork => did_work = true, - Poll::Done => return Ok(futures::Async::Ready(None)), + Async::Ready(e) => return Ok(futures::Async::Ready(e)), + Async::NotReady => not_ready = true, + Async::NothingToDo => {} + Async::DidWork => did_work = true, } } diff --git a/src/process.rs b/src/process.rs index d70b207..a099042 100644 --- a/src/process.rs +++ b/src/process.rs @@ -75,11 +75,14 @@ impl Process { } impl Process { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + Option, + Error, + >] = &[ // order is important here - checking command_exit first so that we // don't try to read from a process that has already exited, which // causes an error. also, poll_resize needs to happen after @@ -94,28 +97,28 @@ impl Process { fn poll_resize( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { if let Some(size) = &self.needs_resize { match size.resize_pty(self.state.pty())? { futures::Async::Ready(()) => { log::debug!("resize({:?})", size); self.needs_resize = None; - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } fn poll_command_start( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { if self.started { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } if self.state.pty.is_none() { @@ -145,17 +148,16 @@ impl Process { } self.started = true; - Ok(crate::component_future::Poll::Event(Event::CommandStart( - self.cmd.clone(), - self.args.clone(), + Ok(crate::component_future::Async::Ready(Some( + Event::CommandStart(self.cmd.clone(), self.args.clone()), ))) } fn poll_read_stdin( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { if self.exited || self.stdin_closed { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } match self @@ -171,19 +173,19 @@ impl Process { self.input_buf.push_back(b'\x04'); self.stdin_closed = true; } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_write_stdin( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { if self.exited || self.input_buf.is_empty() { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } let (a, b) = self.input_buf.as_slices(); @@ -199,17 +201,17 @@ impl Process { for _ in 0..n { self.input_buf.pop_front(); } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_read_stdout( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { match self .state .pty_mut() @@ -219,10 +221,12 @@ impl Process { Ok(futures::Async::Ready(n)) => { log::debug!("read_stdout({})", n); let bytes = self.buf[..n].to_vec(); - Ok(crate::component_future::Poll::Event(Event::Output(bytes))) + Ok(crate::component_future::Async::Ready(Some( + Event::Output(bytes), + ))) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(e) => { // XXX this seems to be how eof is returned, but this seems... @@ -231,7 +235,7 @@ impl Process { if source.kind() == std::io::ErrorKind::Other { log::debug!("read_stdout(eof)"); self.stdout_closed = true; - return Ok(crate::component_future::Poll::DidWork); + return Ok(crate::component_future::Async::DidWork); } } Err(e) @@ -241,12 +245,12 @@ impl Process { fn poll_command_exit( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll, Error> { if self.exited { - return Ok(crate::component_future::Poll::Done); + return Ok(crate::component_future::Async::Ready(None)); } if !self.stdout_closed { - return Ok(crate::component_future::Poll::NothingToDo); + return Ok(crate::component_future::Async::NothingToDo); } match self @@ -258,12 +262,12 @@ impl Process { futures::Async::Ready(status) => { log::debug!("exit({})", status); self.exited = true; - Ok(crate::component_future::Poll::Event(Event::CommandExit( - status, + Ok(crate::component_future::Async::Ready(Some( + Event::CommandExit(status), ))) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } diff --git a/src/server.rs b/src/server.rs index b760812..45f5a4b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -816,7 +816,7 @@ impl fn poll_read_connection( &mut self, conn: &mut Connection, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match &mut conn.rsock { Some(ReadSocket::Connected(..)) => { if let Some(ReadSocket::Connected(s)) = conn.rsock.take() { @@ -829,7 +829,7 @@ impl } else { unreachable!() } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } Some(ReadSocket::Reading(fut)) => match fut.poll() { Ok(futures::Async::Ready((msg, s))) => { @@ -846,10 +846,10 @@ impl conn.rsock = Some(ReadSocket::Connected(s)); } } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(e) => classify_connection_error(e), }, @@ -864,20 +864,20 @@ impl } else { unreachable!() } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } }, - _ => Ok(crate::component_future::Poll::NothingToDo), + _ => Ok(crate::component_future::Async::NothingToDo), } } fn poll_write_connection( &mut self, conn: &mut Connection, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match &mut conn.wsock { Some(WriteSocket::Connected(..)) => { if let Some(msg) = conn.to_send.pop_front() { @@ -897,24 +897,24 @@ impl } else { unreachable!() } - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } else if conn.closed { - Ok(crate::component_future::Poll::Event(())) + Ok(crate::component_future::Async::Ready(())) } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } Some(WriteSocket::Writing(fut)) => match fut.poll() { Ok(futures::Async::Ready(s)) => { conn.wsock = Some(WriteSocket::Connected(s)); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } Err(e) => classify_connection_error(e), }, - _ => Ok(crate::component_future::Poll::NothingToDo), + _ => Ok(crate::component_future::Async::NothingToDo), } } @@ -945,11 +945,14 @@ impl impl Server { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + (), + Error, + >] = &[ &Self::poll_new_connections, &Self::poll_read, &Self::poll_write, @@ -957,20 +960,20 @@ impl fn poll_new_connections( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self.sock_stream.poll()? { futures::Async::Ready(Some(conn)) => { self.connections.insert(conn.id.to_string(), conn); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => Err(Error::SocketChannelClosed), futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } - fn poll_read(&mut self) -> Result> { + fn poll_read(&mut self) -> crate::component_future::Poll<(), Error> { let mut did_work = false; let mut not_ready = false; @@ -978,14 +981,14 @@ impl for key in keys { let mut conn = self.connections.remove(&key).unwrap(); match self.poll_read_connection(&mut conn) { - Ok(crate::component_future::Poll::Event(())) => { + Ok(crate::component_future::Async::Ready(())) => { self.handle_disconnect(&mut conn); continue; } - Ok(crate::component_future::Poll::DidWork) => { + Ok(crate::component_future::Async::DidWork) => { did_work = true; } - Ok(crate::component_future::Poll::NotReady) => { + Ok(crate::component_future::Async::NotReady) => { not_ready = true; } Err(e) => { @@ -1001,15 +1004,15 @@ impl } if did_work { - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } else if not_ready { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } - fn poll_write(&mut self) -> Result> { + fn poll_write(&mut self) -> crate::component_future::Poll<(), Error> { let mut did_work = false; let mut not_ready = false; @@ -1017,14 +1020,14 @@ impl for key in keys { let mut conn = self.connections.remove(&key).unwrap(); match self.poll_write_connection(&mut conn) { - Ok(crate::component_future::Poll::Event(())) => { + Ok(crate::component_future::Async::Ready(())) => { self.handle_disconnect(&mut conn); continue; } - Ok(crate::component_future::Poll::DidWork) => { + Ok(crate::component_future::Async::DidWork) => { did_work = true; } - Ok(crate::component_future::Poll::NotReady) => { + Ok(crate::component_future::Async::NotReady) => { not_ready = true; } Err(e) => { @@ -1040,18 +1043,18 @@ impl } if did_work { - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } else if not_ready { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } } fn classify_connection_error( e: Error, -) -> Result> { +) -> crate::component_future::Poll<(), Error> { let source = match e { Error::ReadMessageWithTimeout { source } => source, Error::WriteMessageWithTimeout { source } => source, @@ -1068,7 +1071,7 @@ fn classify_connection_error( source: ref tokio_err, } => tokio_err, Error::EOF => { - return Ok(crate::component_future::Poll::Event(())); + return Ok(crate::component_future::Async::Ready(())); } _ => { return Err(source); @@ -1076,7 +1079,7 @@ fn classify_connection_error( }; if tokio_err.kind() == tokio::io::ErrorKind::UnexpectedEof { - Ok(crate::component_future::Poll::Event(())) + Ok(crate::component_future::Async::Ready(())) } else { Err(source) } diff --git a/src/server/tls.rs b/src/server/tls.rs index 14effbc..f26d632 100644 --- a/src/server/tls.rs +++ b/src/server/tls.rs @@ -37,11 +37,14 @@ impl Server { } impl Server { - const POLL_FNS: &'static [&'static dyn for<'a> Fn( - &'a mut Self, - ) -> Result< - crate::component_future::Poll<()>, - >] = &[ + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> crate::component_future::Poll< + (), + Error, + >] = &[ &Self::poll_new_connections, &Self::poll_handshake_connections, &Self::poll_server, @@ -49,7 +52,7 @@ impl Server { fn poll_new_connections( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { match self .sock_r .poll() @@ -57,18 +60,18 @@ impl Server { { futures::Async::Ready(Some(sock)) => { self.accepting_sockets.push(sock); - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } futures::Async::Ready(None) => Err(Error::SocketChannelClosed), futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } fn poll_handshake_connections( &mut self, - ) -> Result> { + ) -> crate::component_future::Poll<(), Error> { let mut did_work = false; let mut not_ready = false; @@ -100,21 +103,21 @@ impl Server { } if did_work { - Ok(crate::component_future::Poll::DidWork) + Ok(crate::component_future::Async::DidWork) } else if not_ready { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } else { - Ok(crate::component_future::Poll::NothingToDo) + Ok(crate::component_future::Async::NothingToDo) } } - fn poll_server(&mut self) -> Result> { + fn poll_server(&mut self) -> crate::component_future::Poll<(), Error> { match self.server.poll()? { futures::Async::Ready(()) => { - Ok(crate::component_future::Poll::Event(())) + Ok(crate::component_future::Async::Ready(())) } futures::Async::NotReady => { - Ok(crate::component_future::Poll::NotReady) + Ok(crate::component_future::Async::NotReady) } } } -- cgit v1.2.3-54-g00ecf