aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-17 11:22:12 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-17 11:22:12 -0400
commit61c8fb0a8d6460aff45ed454cfb2b0a5e37a1a89 (patch)
treebfa9bd2887e647713ea939798c06ff51f81c09d3
parent0ff2ba5d413834d04a24ce3f29a0172c82a31c35 (diff)
downloadteleterm-61c8fb0a8d6460aff45ed454cfb2b0a5e37a1a89.tar.gz
teleterm-61c8fb0a8d6460aff45ed454cfb2b0a5e37a1a89.zip
improve the interface of component_future a bit
make it feel more like the standard futures::Poll/futures::Async
-rw-r--r--src/client.rs101
-rw-r--r--src/cmd/play.rs47
-rw-r--r--src/cmd/record.rs59
-rw-r--r--src/cmd/stream.rs61
-rw-r--r--src/cmd/watch.rs35
-rw-r--r--src/component_future.rs61
-rw-r--r--src/process.rs68
-rw-r--r--src/server.rs81
-rw-r--r--src/server/tls.rs33
9 files changed, 286 insertions, 260 deletions
diff --git a/src/client.rs b/src/client.rs
index 0fa0254..be8061c 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -270,7 +270,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&mut self,
msg: crate::protocol::Message,
) -> Result<(
- crate::component_future::Poll<Event>,
+ crate::component_future::Async<Option<Event>>,
Option<
Box<
dyn futures::future::Future<
@@ -293,7 +293,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
open::that(url).context(crate::error::OpenLink)?;
Ok((
- crate::component_future::Poll::DidWork,
+ crate::component_future::Async::DidWork,
Some(self.wait_for_oauth_response(
state.map(|s| s.to_string()),
&id,
@@ -307,16 +307,18 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.to_send.push_back(msg.clone());
}
Ok((
- crate::component_future::Poll::Event(Event::Connect()),
+ crate::component_future::Async::Ready(Some(
+ Event::Connect(),
+ )),
None,
))
}
crate::protocol::Message::Heartbeat => {
- Ok((crate::component_future::Poll::DidWork, None))
+ Ok((crate::component_future::Async::DidWork, None))
}
_ => Ok((
- crate::component_future::Poll::Event(Event::ServerMessage(
- msg,
+ crate::component_future::Async::Ready(Some(
+ Event::ServerMessage(msg),
)),
None,
)),
@@ -418,11 +420,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Client<S>
{
// XXX rustfmt does a terrible job here
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<Event>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ Option<Event>,
+ Error,
+ >] = &[
&Self::poll_reconnect_server,
&Self::poll_read_server,
&Self::poll_write_server,
@@ -432,11 +437,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_reconnect_server(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
match &mut self.wsock {
WriteSocket::NotConnected => {
if self.should_wait_to_reconnect()? {
- return Ok(crate::component_future::Poll::NotReady);
+ return Ok(crate::component_future::Async::NotReady);
}
self.set_reconnect_timer();
@@ -447,7 +452,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.handle_successful_connection(s)?;
}
Ok(futures::Async::NotReady) => {
- return Ok(crate::component_future::Poll::NotReady);
+ return Ok(crate::component_future::Async::NotReady);
}
Err(e) => {
log::warn!("error while connecting, reconnecting: {}", e);
@@ -458,28 +463,28 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
},
WriteSocket::Connected(..) | WriteSocket::Writing(..) => {
if self.has_seen_server_recently() {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
} else {
log::warn!(
"haven't seen server in a while, reconnecting",
);
self.reconnect();
- return Ok(crate::component_future::Poll::Event(
+ return Ok(crate::component_future::Async::Ready(Some(
Event::Disconnect,
- ));
+ )));
}
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_read_server(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
match &mut self.rsock {
ReadSocket::NotConnected => {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
ReadSocket::Connected(..) => {
if let ReadSocket::Connected(s) = std::mem::replace(
@@ -491,7 +496,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
ReadSocket::Reading(ref mut fut) => match fut.poll() {
Ok(futures::Async::Ready((msg, s))) => {
@@ -511,21 +516,21 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
e
);
self.reconnect();
- Ok(crate::component_future::Poll::Event(
+ Ok(crate::component_future::Async::Ready(Some(
Event::Disconnect,
- ))
+ )))
}
}
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(e) => {
log::warn!("error reading message, reconnecting: {}", e);
self.reconnect();
- Ok(crate::component_future::Poll::Event(
+ Ok(crate::component_future::Async::Ready(Some(
Event::Disconnect,
- ))
+ )))
}
},
ReadSocket::Processing(_, fut) => match fut.poll() {
@@ -539,10 +544,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(e) => {
log::warn!(
@@ -550,9 +555,9 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
e
);
self.reconnect();
- Ok(crate::component_future::Poll::Event(
+ Ok(crate::component_future::Async::Ready(Some(
Event::Disconnect,
- ))
+ )))
}
},
}
@@ -560,14 +565,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_write_server(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
match &mut self.wsock {
WriteSocket::NotConnected | WriteSocket::Connecting(..) => {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
WriteSocket::Connected(..) => {
if self.to_send.is_empty() {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
if let WriteSocket::Connected(s) = std::mem::replace(
@@ -582,22 +587,22 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
unreachable!()
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
WriteSocket::Writing(ref mut fut) => match fut.poll() {
Ok(futures::Async::Ready(s)) => {
self.wsock = WriteSocket::Connected(s);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(e) => {
log::warn!("error writing message, reconnecting: {}", e);
self.reconnect();
- Ok(crate::component_future::Poll::Event(
+ Ok(crate::component_future::Async::Ready(Some(
Event::Disconnect,
- ))
+ )))
}
},
}
@@ -605,7 +610,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_heartbeat(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
match self
.heartbeat_timer
.poll()
@@ -613,23 +618,23 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
{
futures::Async::Ready(..) => {
self.send_message(crate::protocol::Message::heartbeat());
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_sigwinch(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
if let Some(winches) = &mut self.winches {
if !self.started {
self.started = true;
- return Ok(crate::component_future::Poll::Event(
+ return Ok(crate::component_future::Async::Ready(Some(
Event::Start(crate::term::Size::get()?),
- ));
+ )));
}
match winches.poll()? {
@@ -638,17 +643,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.send_message(crate::protocol::Message::resize(
&size,
));
- Ok(crate::component_future::Poll::Event(Event::Resize(
- size,
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::Resize(size),
)))
}
futures::Async::Ready(None) => unreachable!(),
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
}
diff --git a/src/cmd/play.rs b/src/cmd/play.rs
index 08d911f..e6afe2f 100644
--- a/src/cmd/play.rs
+++ b/src/cmd/play.rs
@@ -80,67 +80,66 @@ impl PlaySession {
}
impl PlaySession {
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_open_file,
&Self::poll_read_file,
&Self::poll_write_terminal,
];
- fn poll_open_file(
- &mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> {
match &mut self.file {
FileState::Closed { filename } => {
self.file = FileState::Opening {
fut: tokio::fs::File::open(filename.to_string()),
};
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
FileState::Opening { fut } => {
match fut.poll().context(crate::error::OpenFile)? {
futures::Async::Ready(file) => {
let file = crate::ttyrec::File::new(file);
self.file = FileState::Open { file };
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
- _ => Ok(crate::component_future::Poll::NothingToDo),
+ _ => Ok(crate::component_future::Async::NothingToDo),
}
}
- fn poll_read_file(
- &mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ fn poll_read_file(&mut self) -> crate::component_future::Poll<(), Error> {
if let FileState::Open { file } = &mut self.file {
match file.poll_read()? {
futures::Async::Ready(Some(frame)) => {
self.to_write.insert_at(frame.data, frame.time);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
self.file = FileState::Eof;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
fn poll_write_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.to_write.poll().context(crate::error::Sleep)? {
futures::Async::Ready(Some(data)) => {
// TODO async
@@ -148,17 +147,17 @@ impl PlaySession {
let mut stdout = stdout.lock();
stdout.write(&data).context(crate::error::WriteTerminal)?;
stdout.flush().context(crate::error::FlushTerminal)?;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
if let FileState::Eof = self.file {
- Ok(crate::component_future::Poll::Event(()))
+ Ok(crate::component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index 83651f5..7fbfb4b 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -142,11 +142,14 @@ impl RecordSession {
}
impl RecordSession {
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_open_file,
&Self::poll_read_process,
&Self::poll_write_terminal,
@@ -154,15 +157,13 @@ impl RecordSession {
&Self::poll_write_file,
];
- fn poll_open_file(
- &mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> {
match &mut self.file {
FileState::Closed { filename } => {
self.file = FileState::Opening {
fut: tokio::fs::File::create(filename.to_string()),
};
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
FileState::Opening { fut } => {
match fut.poll().context(crate::error::OpenFile)? {
@@ -170,22 +171,22 @@ impl RecordSession {
let mut file = crate::ttyrec::File::new(file);
file.write_frame(self.buffer.contents())?;
self.file = FileState::Open { file };
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
FileState::Open { .. } => {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
}
fn poll_read_process(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.process.poll()? {
futures::Async::Ready(Some(e)) => {
match e {
@@ -207,7 +208,7 @@ impl RecordSession {
}
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
if !self.done {
@@ -215,19 +216,19 @@ impl RecordSession {
}
// don't return final event here - wait until we are done
// writing all data to the file (see poll_write_file)
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if self.sent_local == self.buffer.len() {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -238,19 +239,19 @@ impl RecordSession {
futures::Async::Ready(n) => {
self.sent_local += n;
self.needs_flush = true;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_flush_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if !self.needs_flush {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -260,34 +261,34 @@ impl RecordSession {
{
futures::Async::Ready(()) => {
self.needs_flush = false;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_file(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
let file = match &mut self.file {
FileState::Open { file } => file,
_ => {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
};
match file.poll_write()? {
futures::Async::Ready(()) => {
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
// ship all data to the server before actually ending
if self.done && file.is_empty() {
- Ok(crate::component_future::Poll::Event(()))
+ Ok(crate::component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index 805ed8e..eba26bb 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -242,11 +242,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
StreamSession<S>
{
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_read_client,
&Self::poll_read_process,
&Self::poll_write_terminal,
@@ -258,32 +261,32 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
// communication issues to ever interrupt a running process
fn poll_read_client(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.client.poll() {
Ok(futures::Async::Ready(Some(e))) => match e {
crate::client::Event::Disconnect => {
self.connected = false;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::Start(size) => {
self.process.resize(size);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::Connect() => {
self.connected = true;
self.sent_remote = 0;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::ServerMessage(..) => {
// we don't expect to ever see a server message once we
// start streaming, so if one comes through, assume
// something is messed up and try again
self.client.reconnect();
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::Resize(size) => {
self.process.resize(size);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
},
Ok(futures::Async::Ready(None)) => {
@@ -291,18 +294,18 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
unreachable!()
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(..) => {
self.client.reconnect();
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
}
}
fn poll_read_process(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.process.poll()? {
futures::Async::Ready(Some(e)) => {
match e {
@@ -322,7 +325,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.record_bytes(&output);
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
if !self.done {
@@ -330,19 +333,19 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
// don't return final event here - wait until we are done
// sending all data to the server (see poll_write_server)
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if self.sent_local == self.buffer.len() {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -353,19 +356,19 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
futures::Async::Ready(n) => {
self.sent_local += n;
self.needs_flush = true;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_flush_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if !self.needs_flush {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -375,23 +378,23 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
{
futures::Async::Ready(()) => {
self.needs_flush = false;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_server(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if self.sent_remote == self.buffer.len() || !self.connected {
// ship all data to the server before actually ending
if self.done {
- return Ok(crate::component_future::Poll::Event(()));
+ return Ok(crate::component_future::Async::Ready(()));
} else {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
}
@@ -400,7 +403,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
.send_message(crate::protocol::Message::terminal_output(buf));
self.sent_remote = self.buffer.len();
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
}
diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs
index 75eba03..f4d4948 100644
--- a/src/cmd/watch.rs
+++ b/src/cmd/watch.rs
@@ -580,17 +580,20 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
WatchSession<S>
{
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_input,
&Self::poll_list_client,
&Self::poll_watch_client,
];
- fn poll_input(&mut self) -> Result<crate::component_future::Poll<()>> {
+ fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> {
if self.raw_screen.is_none() {
self.raw_screen = Some(
crossterm::RawScreen::into_raw_mode()
@@ -612,21 +615,21 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
State::Watching { .. } => self.watch_keypress(&e)?,
};
if quit {
- Ok(crate::component_future::Poll::Event(()))
+ Ok(crate::component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
}
futures::Async::Ready(None) => unreachable!(),
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_list_client(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.list_client.poll()? {
futures::Async::Ready(Some(e)) => {
match e {
@@ -648,25 +651,25 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.resize(size)?;
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
// the client should never exit on its own
unreachable!()
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_watch_client(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
let client = if let State::Watching { client } = &mut self.state {
client
} else {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
};
match client.poll()? {
@@ -688,14 +691,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
unreachable!();
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
// the client should never exit on its own
unreachable!()
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
diff --git a/src/component_future.rs b/src/component_future.rs
index 2bc9ddc..4613b71 100644
--- a/src/component_future.rs
+++ b/src/component_future.rs
@@ -1,27 +1,34 @@
-pub enum Poll<T> {
- // something happened that we want to report
- Event(T),
- // underlying future/stream returned NotReady, so it's safe for us to also
- // return NotReady
+pub type Poll<Item, Error> = Result<Async<Item>, Error>;
+
+pub enum Async<Item> {
+ // we have a value for the main loop to return immediately.
+ Ready(Item),
+
+ // one of our inner futures returned futures::Async::NotReady. if all
+ // of our other components return either NothingToDo or NotReady, then our
+ // overall future should return NotReady and wait to be polled again.
NotReady,
- // didn't do any work, so we want to return NotReady assuming at least one
- // other poll method returned NotReady (if every poll method returns
- // NothingToDo, something is broken)
- NothingToDo,
- // did some work, so we want to loop
+
+ // we did some work (moved our internal state closer to being ready to
+ // return a value), but we aren't ready to return a value yet. we should
+ // re-run all of the poll functions to see if the state modification made
+ // any of them also able to make progress.
DidWork,
- // the stream has ended
- Done,
+
+ // we didn't poll any inner futures or otherwise change our internal state
+ // at all, so rerunning is unlikely to make progress. if all components
+ // return either NothingToDo or NotReady (and at least one returned
+ // NotReady), then we should just return NotReady and wait to be polled
+ // again. it is an error (panic) for all component poll methods to return
+ // NothingToDo.
+ NothingToDo,
}
pub fn poll_future<T, Item, Error>(
future: &mut T,
poll_fns: &'static [&'static dyn for<'a> Fn(
&'a mut T,
- ) -> Result<
- Poll<Item>,
- Error,
- >],
+ ) -> Poll<Item, Error>],
) -> futures::Poll<Item, Error>
where
T: futures::future::Future<Item = Item, Error = Error>,
@@ -32,11 +39,10 @@ where
for f in poll_fns {
match f(future)? {
- Poll::Event(e) => return Ok(futures::Async::Ready(e)),
- Poll::NotReady => not_ready = true,
- Poll::NothingToDo => {}
- Poll::DidWork => did_work = true,
- Poll::Done => unreachable!(),
+ Async::Ready(e) => return Ok(futures::Async::Ready(e)),
+ Async::NotReady => not_ready = true,
+ Async::NothingToDo => {}
+ Async::DidWork => did_work = true,
}
}
@@ -54,8 +60,8 @@ pub fn poll_stream<T, Item, Error>(
stream: &mut T,
poll_fns: &'static [&'static dyn for<'a> Fn(
&'a mut T,
- ) -> Result<
- Poll<Item>,
+ ) -> Poll<
+ Option<Item>,
Error,
>],
) -> futures::Poll<Option<Item>, Error>
@@ -68,11 +74,10 @@ where
for f in poll_fns {
match f(stream)? {
- Poll::Event(e) => return Ok(futures::Async::Ready(Some(e))),
- Poll::NotReady => not_ready = true,
- Poll::NothingToDo => {}
- Poll::DidWork => did_work = true,
- Poll::Done => return Ok(futures::Async::Ready(None)),
+ Async::Ready(e) => return Ok(futures::Async::Ready(e)),
+ Async::NotReady => not_ready = true,
+ Async::NothingToDo => {}
+ Async::DidWork => did_work = true,
}
}
diff --git a/src/process.rs b/src/process.rs
index d70b207..a099042 100644
--- a/src/process.rs
+++ b/src/process.rs
@@ -75,11 +75,14 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
}
impl<R: tokio::io::AsyncRead + 'static> Process<R> {
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<Event>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ Option<Event>,
+ Error,
+ >] = &[
// order is important here - checking command_exit first so that we
// don't try to read from a process that has already exited, which
// causes an error. also, poll_resize needs to happen after
@@ -94,28 +97,28 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
fn poll_resize(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
if let Some(size) = &self.needs_resize {
match size.resize_pty(self.state.pty())? {
futures::Async::Ready(()) => {
log::debug!("resize({:?})", size);
self.needs_resize = None;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
fn poll_command_start(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
if self.started {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
if self.state.pty.is_none() {
@@ -145,17 +148,16 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
}
self.started = true;
- Ok(crate::component_future::Poll::Event(Event::CommandStart(
- self.cmd.clone(),
- self.args.clone(),
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::CommandStart(self.cmd.clone(), self.args.clone()),
)))
}
fn poll_read_stdin(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
if self.exited || self.stdin_closed {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -171,19 +173,19 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
self.input_buf.push_back(b'\x04');
self.stdin_closed = true;
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_stdin(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
if self.exited || self.input_buf.is_empty() {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
let (a, b) = self.input_buf.as_slices();
@@ -199,17 +201,17 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
for _ in 0..n {
self.input_buf.pop_front();
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_read_stdout(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
match self
.state
.pty_mut()
@@ -219,10 +221,12 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
Ok(futures::Async::Ready(n)) => {
log::debug!("read_stdout({})", n);
let bytes = self.buf[..n].to_vec();
- Ok(crate::component_future::Poll::Event(Event::Output(bytes)))
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::Output(bytes),
+ )))
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(e) => {
// XXX this seems to be how eof is returned, but this seems...
@@ -231,7 +235,7 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
if source.kind() == std::io::ErrorKind::Other {
log::debug!("read_stdout(eof)");
self.stdout_closed = true;
- return Ok(crate::component_future::Poll::DidWork);
+ return Ok(crate::component_future::Async::DidWork);
}
}
Err(e)
@@ -241,12 +245,12 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
fn poll_command_exit(
&mut self,
- ) -> Result<crate::component_future::Poll<Event>> {
+ ) -> crate::component_future::Poll<Option<Event>, Error> {
if self.exited {
- return Ok(crate::component_future::Poll::Done);
+ return Ok(crate::component_future::Async::Ready(None));
}
if !self.stdout_closed {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -258,12 +262,12 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
futures::Async::Ready(status) => {
log::debug!("exit({})", status);
self.exited = true;
- Ok(crate::component_future::Poll::Event(Event::CommandExit(
- status,
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::CommandExit(status),
)))
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
diff --git a/src/server.rs b/src/server.rs
index b760812..45f5a4b 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -816,7 +816,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_read_connection(
&mut self,
conn: &mut Connection<S>,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match &mut conn.rsock {
Some(ReadSocket::Connected(..)) => {
if let Some(ReadSocket::Connected(s)) = conn.rsock.take() {
@@ -829,7 +829,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
Some(ReadSocket::Reading(fut)) => match fut.poll() {
Ok(futures::Async::Ready((msg, s))) => {
@@ -846,10 +846,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
conn.rsock = Some(ReadSocket::Connected(s));
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(e) => classify_connection_error(e),
},
@@ -864,20 +864,20 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
},
- _ => Ok(crate::component_future::Poll::NothingToDo),
+ _ => Ok(crate::component_future::Async::NothingToDo),
}
}
fn poll_write_connection(
&mut self,
conn: &mut Connection<S>,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match &mut conn.wsock {
Some(WriteSocket::Connected(..)) => {
if let Some(msg) = conn.to_send.pop_front() {
@@ -897,24 +897,24 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
} else if conn.closed {
- Ok(crate::component_future::Poll::Event(()))
+ Ok(crate::component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
Some(WriteSocket::Writing(fut)) => match fut.poll() {
Ok(futures::Async::Ready(s)) => {
conn.wsock = Some(WriteSocket::Connected(s));
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(e) => classify_connection_error(e),
},
- _ => Ok(crate::component_future::Poll::NothingToDo),
+ _ => Ok(crate::component_future::Async::NothingToDo),
}
}
@@ -945,11 +945,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Server<S>
{
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_new_connections,
&Self::poll_read,
&Self::poll_write,
@@ -957,20 +960,20 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_new_connections(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.sock_stream.poll()? {
futures::Async::Ready(Some(conn)) => {
self.connections.insert(conn.id.to_string(), conn);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => Err(Error::SocketChannelClosed),
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
- fn poll_read(&mut self) -> Result<crate::component_future::Poll<()>> {
+ fn poll_read(&mut self) -> crate::component_future::Poll<(), Error> {
let mut did_work = false;
let mut not_ready = false;
@@ -978,14 +981,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
for key in keys {
let mut conn = self.connections.remove(&key).unwrap();
match self.poll_read_connection(&mut conn) {
- Ok(crate::component_future::Poll::Event(())) => {
+ Ok(crate::component_future::Async::Ready(())) => {
self.handle_disconnect(&mut conn);
continue;
}
- Ok(crate::component_future::Poll::DidWork) => {
+ Ok(crate::component_future::Async::DidWork) => {
did_work = true;
}
- Ok(crate::component_future::Poll::NotReady) => {
+ Ok(crate::component_future::Async::NotReady) => {
not_ready = true;
}
Err(e) => {
@@ -1001,15 +1004,15 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
if did_work {
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
} else if not_ready {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
- fn poll_write(&mut self) -> Result<crate::component_future::Poll<()>> {
+ fn poll_write(&mut self) -> crate::component_future::Poll<(), Error> {
let mut did_work = false;
let mut not_ready = false;
@@ -1017,14 +1020,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
for key in keys {
let mut conn = self.connections.remove(&key).unwrap();
match self.poll_write_connection(&mut conn) {
- Ok(crate::component_future::Poll::Event(())) => {
+ Ok(crate::component_future::Async::Ready(())) => {
self.handle_disconnect(&mut conn);
continue;
}
- Ok(crate::component_future::Poll::DidWork) => {
+ Ok(crate::component_future::Async::DidWork) => {
did_work = true;
}
- Ok(crate::component_future::Poll::NotReady) => {
+ Ok(crate::component_future::Async::NotReady) => {
not_ready = true;
}
Err(e) => {
@@ -1040,18 +1043,18 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
if did_work {
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
} else if not_ready {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
}
fn classify_connection_error(
e: Error,
-) -> Result<crate::component_future::Poll<()>> {
+) -> crate::component_future::Poll<(), Error> {
let source = match e {
Error::ReadMessageWithTimeout { source } => source,
Error::WriteMessageWithTimeout { source } => source,
@@ -1068,7 +1071,7 @@ fn classify_connection_error(
source: ref tokio_err,
} => tokio_err,
Error::EOF => {
- return Ok(crate::component_future::Poll::Event(()));
+ return Ok(crate::component_future::Async::Ready(()));
}
_ => {
return Err(source);
@@ -1076,7 +1079,7 @@ fn classify_connection_error(
};
if tokio_err.kind() == tokio::io::ErrorKind::UnexpectedEof {
- Ok(crate::component_future::Poll::Event(()))
+ Ok(crate::component_future::Async::Ready(()))
} else {
Err(source)
}
diff --git a/src/server/tls.rs b/src/server/tls.rs
index 14effbc..f26d632 100644
--- a/src/server/tls.rs
+++ b/src/server/tls.rs
@@ -37,11 +37,14 @@ impl Server {
}
impl Server {
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_new_connections,
&Self::poll_handshake_connections,
&Self::poll_server,
@@ -49,7 +52,7 @@ impl Server {
fn poll_new_connections(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self
.sock_r
.poll()
@@ -57,18 +60,18 @@ impl Server {
{
futures::Async::Ready(Some(sock)) => {
self.accepting_sockets.push(sock);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => Err(Error::SocketChannelClosed),
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_handshake_connections(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
let mut did_work = false;
let mut not_ready = false;
@@ -100,21 +103,21 @@ impl Server {
}
if did_work {
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
} else if not_ready {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
} else {
- Ok(crate::component_future::Poll::NothingToDo)
+ Ok(crate::component_future::Async::NothingToDo)
}
}
- fn poll_server(&mut self) -> Result<crate::component_future::Poll<()>> {
+ fn poll_server(&mut self) -> crate::component_future::Poll<(), Error> {
match self.server.poll()? {
futures::Async::Ready(()) => {
- Ok(crate::component_future::Poll::Event(()))
+ Ok(crate::component_future::Async::Ready(()))
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}