aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-17 13:32:44 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-17 13:32:44 -0400
commita561217eca9f3076b097e6af3ee6cf229cf35668 (patch)
treea1e09d484c4b3de23ef5bc960effe69f47fd1696
parentefcd072f40b52be085e863df7f5421a5beada240 (diff)
downloadteleterm-a561217eca9f3076b097e6af3ee6cf229cf35668.tar.gz
teleterm-a561217eca9f3076b097e6af3ee6cf229cf35668.zip
give component_future its own try_ready! macro
this simplifies quite a lot of things
-rw-r--r--src/client.rs34
-rw-r--r--src/cmd/play.rs64
-rw-r--r--src/cmd/record.rs60
-rw-r--r--src/cmd/stream.rs45
-rw-r--r--src/cmd/watch.rs97
-rw-r--r--src/component_future.rs12
-rw-r--r--src/main.rs4
-rw-r--r--src/process.rs79
-rw-r--r--src/resize.rs33
-rw-r--r--src/server.rs41
-rw-r--r--src/server/tls.rs26
11 files changed, 166 insertions, 329 deletions
diff --git a/src/client.rs b/src/client.rs
index 584dd8e..ce7ce77 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -206,19 +206,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
true
}
- fn should_wait_to_reconnect(&mut self) -> Result<bool> {
- if let Some(timer) = &mut self.reconnect_timer {
- match timer.poll().context(crate::error::TimerReconnect)? {
- futures::Async::NotReady => {
- return Ok(true);
- }
- _ => {}
- }
- }
-
- Ok(false)
- }
-
fn handle_successful_connection(&mut self, s: S) -> Result<()> {
self.last_server_time = std::time::Instant::now();
@@ -415,8 +402,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
) -> crate::component_future::Poll<Option<Event>, Error> {
match &mut self.wsock {
WriteSocket::NotConnected => {
- if self.should_wait_to_reconnect()? {
- return Ok(crate::component_future::Async::NotReady);
+ if let Some(timer) = &mut self.reconnect_timer {
+ try_ready!(timer
+ .poll()
+ .context(crate::error::TimerReconnect));
}
self.set_reconnect_timer();
@@ -586,19 +575,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_heartbeat(
&mut self,
) -> crate::component_future::Poll<Option<Event>, Error> {
- match self
+ let _ = try_ready!(self
.heartbeat_timer
.poll()
- .context(crate::error::TimerHeartbeat)?
- {
- futures::Async::Ready(..) => {
- self.send_message(crate::protocol::Message::heartbeat());
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ .context(crate::error::TimerHeartbeat));
+ self.send_message(crate::protocol::Message::heartbeat());
+ Ok(crate::component_future::Async::DidWork)
}
}
diff --git a/src/cmd/play.rs b/src/cmd/play.rs
index e6afe2f..a5fd534 100644
--- a/src/cmd/play.rs
+++ b/src/cmd/play.rs
@@ -102,16 +102,11 @@ impl PlaySession {
Ok(crate::component_future::Async::DidWork)
}
FileState::Opening { fut } => {
- match fut.poll().context(crate::error::OpenFile)? {
- futures::Async::Ready(file) => {
- let file = crate::ttyrec::File::new(file);
- self.file = FileState::Open { file };
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ let file =
+ try_ready!(fut.poll().context(crate::error::OpenFile));
+ let file = crate::ttyrec::File::new(file);
+ self.file = FileState::Open { file };
+ Ok(crate::component_future::Async::DidWork)
}
_ => Ok(crate::component_future::Async::NothingToDo),
}
@@ -119,19 +114,12 @@ impl PlaySession {
fn poll_read_file(&mut self) -> crate::component_future::Poll<(), Error> {
if let FileState::Open { file } = &mut self.file {
- match file.poll_read()? {
- futures::Async::Ready(Some(frame)) => {
- self.to_write.insert_at(frame.data, frame.time);
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::Ready(None) => {
- self.file = FileState::Eof;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ if let Some(frame) = try_ready!(file.poll_read()) {
+ self.to_write.insert_at(frame.data, frame.time);
+ } else {
+ self.file = FileState::Eof;
}
+ Ok(crate::component_future::Async::DidWork)
} else {
Ok(crate::component_future::Async::NothingToDo)
}
@@ -140,25 +128,19 @@ impl PlaySession {
fn poll_write_terminal(
&mut self,
) -> crate::component_future::Poll<(), Error> {
- match self.to_write.poll().context(crate::error::Sleep)? {
- futures::Async::Ready(Some(data)) => {
- // TODO async
- let stdout = std::io::stdout();
- let mut stdout = stdout.lock();
- stdout.write(&data).context(crate::error::WriteTerminal)?;
- stdout.flush().context(crate::error::FlushTerminal)?;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::Ready(None) => {
- if let FileState::Eof = self.file {
- Ok(crate::component_future::Async::Ready(()))
- } else {
- Ok(crate::component_future::Async::NothingToDo)
- }
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ if let Some(data) =
+ try_ready!(self.to_write.poll().context(crate::error::Sleep))
+ {
+ // TODO async
+ let stdout = std::io::stdout();
+ let mut stdout = stdout.lock();
+ stdout.write(&data).context(crate::error::WriteTerminal)?;
+ stdout.flush().context(crate::error::FlushTerminal)?;
+ Ok(crate::component_future::Async::DidWork)
+ } else if let FileState::Eof = self.file {
+ Ok(crate::component_future::Async::Ready(()))
+ } else {
+ Ok(crate::component_future::Async::NothingToDo)
}
}
}
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index 9092506..afdfcd6 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -168,17 +168,12 @@ impl RecordSession {
Ok(crate::component_future::Async::DidWork)
}
FileState::Opening { fut } => {
- match fut.poll().context(crate::error::OpenFile)? {
- futures::Async::Ready(file) => {
- let mut file = crate::ttyrec::File::new(file);
- file.write_frame(self.buffer.contents())?;
- self.file = FileState::Open { file };
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ let file =
+ try_ready!(fut.poll().context(crate::error::OpenFile));
+ let mut file = crate::ttyrec::File::new(file);
+ file.write_frame(self.buffer.contents())?;
+ self.file = FileState::Open { file };
+ Ok(crate::component_future::Async::DidWork)
}
FileState::Open { .. } => {
Ok(crate::component_future::Async::NothingToDo)
@@ -189,8 +184,8 @@ impl RecordSession {
fn poll_read_process(
&mut self,
) -> crate::component_future::Poll<(), Error> {
- match self.process.poll()? {
- futures::Async::Ready(Some(crate::resize::Event::Process(e))) => {
+ match try_ready!(self.process.poll()) {
+ Some(crate::resize::Event::Process(e)) => {
match e {
crate::process::Event::CommandStart(..) => {
if self.raw_screen.is_none() {
@@ -212,10 +207,10 @@ impl RecordSession {
}
Ok(crate::component_future::Async::DidWork)
}
- futures::Async::Ready(Some(crate::resize::Event::Resize(_))) => {
+ Some(crate::resize::Event::Resize(_)) => {
Ok(crate::component_future::Async::DidWork)
}
- futures::Async::Ready(None) => {
+ None => {
if !self.done {
unreachable!()
}
@@ -223,9 +218,6 @@ impl RecordSession {
// writing all data to the file (see poll_write_file)
Ok(crate::component_future::Async::DidWork)
}
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
}
}
@@ -236,20 +228,13 @@ impl RecordSession {
return Ok(crate::component_future::Async::NothingToDo);
}
- match self
+ let n = try_ready!(self
.stdout
.poll_write(&self.buffer.contents()[self.sent_local..])
- .context(crate::error::WriteTerminal)?
- {
- futures::Async::Ready(n) => {
- self.sent_local += n;
- self.needs_flush = true;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ .context(crate::error::WriteTerminal));
+ self.sent_local += n;
+ self.needs_flush = true;
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_flush_terminal(
@@ -259,19 +244,12 @@ impl RecordSession {
return Ok(crate::component_future::Async::NothingToDo);
}
- match self
+ try_ready!(self
.stdout
.poll_flush()
- .context(crate::error::FlushTerminal)?
- {
- futures::Async::Ready(()) => {
- self.needs_flush = false;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ .context(crate::error::FlushTerminal));
+ self.needs_flush = false;
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_write_file(
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index a9eea1b..2e1ea03 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -300,8 +300,8 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_read_process(
&mut self,
) -> crate::component_future::Poll<(), Error> {
- match self.process.poll()? {
- futures::Async::Ready(Some(crate::resize::Event::Process(e))) => {
+ match try_ready!(self.process.poll()) {
+ Some(crate::resize::Event::Process(e)) => {
match e {
crate::process::Event::CommandStart(..) => {
if self.raw_screen.is_none() {
@@ -320,14 +320,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
Ok(crate::component_future::Async::DidWork)
}
- futures::Async::Ready(Some(crate::resize::Event::Resize(
- size,
- ))) => {
+ Some(crate::resize::Event::Resize(size)) => {
self.client
.send_message(crate::protocol::Message::resize(&size));
Ok(crate::component_future::Async::DidWork)
}
- futures::Async::Ready(None) => {
+ None => {
if !self.done {
unreachable!()
}
@@ -335,9 +333,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
// sending all data to the server (see poll_write_server)
Ok(crate::component_future::Async::DidWork)
}
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
}
}
@@ -348,20 +343,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
return Ok(crate::component_future::Async::NothingToDo);
}
- match self
+ let n = try_ready!(self
.stdout
.poll_write(&self.buffer.contents()[self.sent_local..])
- .context(crate::error::WriteTerminal)?
- {
- futures::Async::Ready(n) => {
- self.sent_local += n;
- self.needs_flush = true;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ .context(crate::error::WriteTerminal));
+ self.sent_local += n;
+ self.needs_flush = true;
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_flush_terminal(
@@ -371,19 +359,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
return Ok(crate::component_future::Async::NothingToDo);
}
- match self
+ try_ready!(self
.stdout
.poll_flush()
- .context(crate::error::FlushTerminal)?
- {
- futures::Async::Ready(()) => {
- self.needs_flush = false;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ .context(crate::error::FlushTerminal));
+ self.needs_flush = false;
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_write_server(
diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs
index 99819d8..cde2fae 100644
--- a/src/cmd/watch.rs
+++ b/src/cmd/watch.rs
@@ -597,16 +597,9 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
];
fn poll_resizer(&mut self) -> crate::component_future::Poll<(), Error> {
- match self.resizer.poll()? {
- futures::Async::Ready(Some(size)) => {
- self.resize(size)?;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::Ready(None) => unreachable!(),
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ let size = try_ready!(self.resizer.poll()).unwrap();
+ self.resize(size)?;
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> {
@@ -622,55 +615,36 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
}
- match self.key_reader.poll()? {
- futures::Async::Ready(Some(e)) => {
- let quit = match &mut self.state {
- State::Temporary => unreachable!(),
- State::LoggingIn { .. } => self.loading_keypress(&e)?,
- State::Choosing { .. } => self.list_keypress(&e)?,
- State::Watching { .. } => self.watch_keypress(&e)?,
- };
- if quit {
- Ok(crate::component_future::Async::Ready(()))
- } else {
- Ok(crate::component_future::Async::DidWork)
- }
- }
- futures::Async::Ready(None) => unreachable!(),
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ let e = try_ready!(self.key_reader.poll()).unwrap();
+ let quit = match &mut self.state {
+ State::Temporary => unreachable!(),
+ State::LoggingIn { .. } => self.loading_keypress(&e)?,
+ State::Choosing { .. } => self.list_keypress(&e)?,
+ State::Watching { .. } => self.watch_keypress(&e)?,
+ };
+ if quit {
+ Ok(crate::component_future::Async::Ready(()))
+ } else {
+ Ok(crate::component_future::Async::DidWork)
}
}
fn poll_list_client(
&mut self,
) -> crate::component_future::Poll<(), Error> {
- match self.list_client.poll()? {
- futures::Async::Ready(Some(e)) => {
- match e {
- crate::client::Event::Disconnect => {
- self.reconnect(true)?;
- }
- crate::client::Event::Connect => {
- self.list_client.send_message(
- crate::protocol::Message::list_sessions(),
- );
- }
- crate::client::Event::ServerMessage(msg) => {
- self.list_server_message(msg)?;
- }
- }
- Ok(crate::component_future::Async::DidWork)
+ match try_ready!(self.list_client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ self.reconnect(true)?;
}
- futures::Async::Ready(None) => {
- // the client should never exit on its own
- unreachable!()
+ crate::client::Event::Connect => {
+ self.list_client
+ .send_message(crate::protocol::Message::list_sessions());
}
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
+ crate::client::Event::ServerMessage(msg) => {
+ self.list_server_message(msg)?;
}
}
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_watch_client(
@@ -682,27 +656,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
return Ok(crate::component_future::Async::NothingToDo);
};
- match client.poll()? {
- futures::Async::Ready(Some(e)) => {
- match e {
- crate::client::Event::Disconnect => {
- self.reconnect(true)?;
- }
- crate::client::Event::Connect => {}
- crate::client::Event::ServerMessage(msg) => {
- self.watch_server_message(msg)?;
- }
- }
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::Ready(None) => {
- // the client should never exit on its own
- unreachable!()
+ match try_ready!(client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ self.reconnect(true)?;
}
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
+ crate::client::Event::Connect => {}
+ crate::client::Event::ServerMessage(msg) => {
+ self.watch_server_message(msg)?;
}
}
+ Ok(crate::component_future::Async::DidWork)
}
}
diff --git a/src/component_future.rs b/src/component_future.rs
index 4613b71..6d8449d 100644
--- a/src/component_future.rs
+++ b/src/component_future.rs
@@ -24,6 +24,18 @@ pub enum Async<Item> {
NothingToDo,
}
+macro_rules! try_ready {
+ ($e:expr) => {
+ match $e {
+ Ok(futures::Async::Ready(t)) => t,
+ Ok(futures::Async::NotReady) => {
+ return Ok($crate::component_future::Async::NotReady)
+ }
+ Err(e) => return Err(From::from(e)),
+ }
+ };
+}
+
pub fn poll_future<T, Item, Error>(
future: &mut T,
poll_fns: &'static [&'static dyn for<'a> Fn(
diff --git a/src/main.rs b/src/main.rs
index 0f7b4d3..c088d6d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,10 +9,12 @@
mod prelude;
+#[macro_use]
+mod component_future;
+
mod async_stdin;
mod client;
mod cmd;
-mod component_future;
mod config;
mod dirs;
mod error;
diff --git a/src/process.rs b/src/process.rs
index a099042..ebd55be 100644
--- a/src/process.rs
+++ b/src/process.rs
@@ -99,16 +99,10 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
&mut self,
) -> crate::component_future::Poll<Option<Event>, Error> {
if let Some(size) = &self.needs_resize {
- match size.resize_pty(self.state.pty())? {
- futures::Async::Ready(()) => {
- log::debug!("resize({:?})", size);
- self.needs_resize = None;
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ try_ready!(size.resize_pty(self.state.pty()));
+ log::debug!("resize({:?})", size);
+ self.needs_resize = None;
+ Ok(crate::component_future::Async::DidWork)
} else {
Ok(crate::component_future::Async::NothingToDo)
}
@@ -160,25 +154,18 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
return Ok(crate::component_future::Async::NothingToDo);
}
- match self
+ let n = try_ready!(self
.input
.poll_read(&mut self.buf)
- .context(crate::error::ReadTerminal)?
- {
- futures::Async::Ready(n) => {
- log::debug!("read_stdin({})", n);
- if n > 0 {
- self.input_buf.extend(self.buf[..n].iter());
- } else {
- self.input_buf.push_back(b'\x04');
- self.stdin_closed = true;
- }
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ .context(crate::error::ReadTerminal));
+ log::debug!("read_stdin({})", n);
+ if n > 0 {
+ self.input_buf.extend(self.buf[..n].iter());
+ } else {
+ self.input_buf.push_back(b'\x04');
+ self.stdin_closed = true;
}
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_write_stdin(
@@ -190,23 +177,16 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
let (a, b) = self.input_buf.as_slices();
let buf = if a.is_empty() { b } else { a };
- match self
+ let n = try_ready!(self
.state
.pty_mut()
.poll_write(buf)
- .context(crate::error::WritePty)?
- {
- futures::Async::Ready(n) => {
- log::debug!("write_stdin({})", n);
- for _ in 0..n {
- self.input_buf.pop_front();
- }
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ .context(crate::error::WritePty));
+ log::debug!("write_stdin({})", n);
+ for _ in 0..n {
+ self.input_buf.pop_front();
}
+ Ok(crate::component_future::Async::DidWork)
}
fn poll_read_stdout(
@@ -253,23 +233,16 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
return Ok(crate::component_future::Async::NothingToDo);
}
- match self
+ let status = try_ready!(self
.state
.process()
.poll()
- .context(crate::error::ProcessExitPoll)?
- {
- futures::Async::Ready(status) => {
- log::debug!("exit({})", status);
- self.exited = true;
- Ok(crate::component_future::Async::Ready(Some(
- Event::CommandExit(status),
- )))
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ .context(crate::error::ProcessExitPoll));
+ log::debug!("exit({})", status);
+ self.exited = true;
+ Ok(crate::component_future::Async::Ready(Some(
+ Event::CommandExit(status),
+ )))
}
}
diff --git a/src/resize.rs b/src/resize.rs
index 86dd386..9e357af 100644
--- a/src/resize.rs
+++ b/src/resize.rs
@@ -70,36 +70,19 @@ impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
fn poll_resize(
&mut self,
) -> crate::component_future::Poll<Option<Event<R>>, Error> {
- match self.resizer.poll()? {
- futures::Async::Ready(Some(size)) => {
- self.process.resize(size.clone());
- Ok(crate::component_future::Async::Ready(Some(
- Event::Resize(size),
- )))
- }
- futures::Async::Ready(None) => unreachable!(),
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ let size = try_ready!(self.resizer.poll()).unwrap();
+ self.process.resize(size.clone());
+ Ok(crate::component_future::Async::Ready(Some(Event::Resize(
+ size,
+ ))))
}
fn poll_process(
&mut self,
) -> crate::component_future::Poll<Option<Event<R>>, Error> {
- match self.process.poll()? {
- futures::Async::Ready(Some(e)) => {
- Ok(crate::component_future::Async::Ready(Some(
- Event::Process(e),
- )))
- }
- futures::Async::Ready(None) => {
- Ok(crate::component_future::Async::Ready(None))
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ Ok(crate::component_future::Async::Ready(
+ try_ready!(self.process.poll()).map(Event::Process),
+ ))
}
}
diff --git a/src/server.rs b/src/server.rs
index 45f5a4b..9b48381 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -853,23 +853,18 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
Err(e) => classify_connection_error(e),
},
- Some(ReadSocket::Processing(_, fut)) => match fut.poll()? {
- futures::Async::Ready((state, msg)) => {
- if let Some(ReadSocket::Processing(s, _)) =
- conn.rsock.take()
- {
- conn.state = state;
- conn.send_message(msg);
- conn.rsock = Some(ReadSocket::Connected(s));
- } else {
- unreachable!()
- }
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
+ Some(ReadSocket::Processing(_, fut)) => {
+ let (state, msg) = try_ready!(fut.poll());
+ if let Some(ReadSocket::Processing(s, _)) = conn.rsock.take()
+ {
+ conn.state = state;
+ conn.send_message(msg);
+ conn.rsock = Some(ReadSocket::Connected(s));
+ } else {
+ unreachable!()
}
- },
+ Ok(crate::component_future::Async::DidWork)
+ }
_ => Ok(crate::component_future::Async::NothingToDo),
}
}
@@ -961,15 +956,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_new_connections(
&mut self,
) -> crate::component_future::Poll<(), Error> {
- match self.sock_stream.poll()? {
- futures::Async::Ready(Some(conn)) => {
- self.connections.insert(conn.id.to_string(), conn);
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::Ready(None) => Err(Error::SocketChannelClosed),
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ if let Some(conn) = try_ready!(self.sock_stream.poll()) {
+ self.connections.insert(conn.id.to_string(), conn);
+ Ok(crate::component_future::Async::DidWork)
+ } else {
+ Err(Error::SocketChannelClosed)
}
}
diff --git a/src/server/tls.rs b/src/server/tls.rs
index f26d632..a05608a 100644
--- a/src/server/tls.rs
+++ b/src/server/tls.rs
@@ -53,19 +53,15 @@ impl Server {
fn poll_new_connections(
&mut self,
) -> crate::component_future::Poll<(), Error> {
- match self
+ if let Some(sock) = try_ready!(self
.sock_r
.poll()
- .context(crate::error::SocketChannelReceive)?
+ .context(crate::error::SocketChannelReceive))
{
- futures::Async::Ready(Some(sock)) => {
- self.accepting_sockets.push(sock);
- Ok(crate::component_future::Async::DidWork)
- }
- futures::Async::Ready(None) => Err(Error::SocketChannelClosed),
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
+ self.accepting_sockets.push(sock);
+ Ok(crate::component_future::Async::DidWork)
+ } else {
+ Err(Error::SocketChannelClosed)
}
}
@@ -112,14 +108,8 @@ impl Server {
}
fn poll_server(&mut self) -> crate::component_future::Poll<(), Error> {
- match self.server.poll()? {
- futures::Async::Ready(()) => {
- Ok(crate::component_future::Async::Ready(()))
- }
- futures::Async::NotReady => {
- Ok(crate::component_future::Async::NotReady)
- }
- }
+ try_ready!(self.server.poll());
+ Ok(crate::component_future::Async::Ready(()))
}
}