aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-24 06:33:51 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-24 06:33:51 -0400
commit6552f2d69eb0f19851dbebd21742b7bc9f37cc42 (patch)
tree164af15b47d0caaf274b51323ba1daab42ff8be4
parente9f475a611140d781bfb80fe18958374d7a54296 (diff)
downloadteleterm-6552f2d69eb0f19851dbebd21742b7bc9f37cc42.tar.gz
teleterm-6552f2d69eb0f19851dbebd21742b7bc9f37cc42.zip
move component_future to a separate crate
-rw-r--r--Cargo.lock10
-rw-r--r--Cargo.toml1
-rw-r--r--src/client.rs72
-rw-r--r--src/cmd/play.rs50
-rw-r--r--src/cmd/record.rs70
-rw-r--r--src/cmd/stream.rs60
-rw-r--r--src/cmd/watch.rs36
-rw-r--r--src/component_future.rs104
-rw-r--r--src/main.rs3
-rw-r--r--src/process.rs63
-rw-r--r--src/resize.rs19
-rw-r--r--src/server.rs75
-rw-r--r--src/server/tls.rs25
13 files changed, 235 insertions, 353 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2455306..3bf4cea 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -177,6 +177,14 @@ dependencies = [
]
[[package]]
+name = "component-future"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "config"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1452,6 +1460,7 @@ version = "0.1.3"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "component-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossterm 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)",
"directories 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1996,6 +2005,7 @@ dependencies = [
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
+"checksum component-future 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e815962dcecc6faf0f0661924cfa240c5b4bccb73679542f00478555dd1cc2c"
"checksum config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f9107d78ed62b3fa5a86e7d18e647abed48cfd8f8fab6c72f4cdb982d196f7e6"
"checksum constant_time_eq 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "995a44c877f9212528ccc74b21a232f66ad69001e40ede5bcee2ac9ef2657120"
"checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5"
diff --git a/Cargo.toml b/Cargo.toml
index b12d3cb..4c0c10d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,6 +14,7 @@ license = "MIT"
[dependencies]
bytes = "0.4"
clap = { version = "2", features = ["wrap_help"] }
+component-future = "0.1"
config = { version = "0.9", features = ["toml"], default_features = false }
crossterm = { version = "0.11", features = ["terminal", "input", "screen"], default_features = false }
directories = "2"
diff --git a/src/client.rs b/src/client.rs
index fc9c07a..f932904 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -241,7 +241,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&mut self,
msg: crate::protocol::Message,
) -> Result<(
- crate::component_future::Async<Option<Event>>,
+ component_future::Async<Option<Event>>,
Option<
Box<
dyn futures::future::Future<
@@ -264,7 +264,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
open::that(url).context(crate::error::OpenLink)?;
Ok((
- crate::component_future::Async::DidWork,
+ component_future::Async::DidWork,
Some(self.wait_for_oauth_response(
state.map(|s| s.to_string()),
&id,
@@ -279,19 +279,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
self.last_error = None;
Ok((
- crate::component_future::Async::Ready(Some(
- Event::Connect,
- )),
+ component_future::Async::Ready(Some(Event::Connect)),
None,
))
}
crate::protocol::Message::Heartbeat => {
- Ok((crate::component_future::Async::DidWork, None))
+ Ok((component_future::Async::DidWork, None))
}
_ => Ok((
- crate::component_future::Async::Ready(Some(
- Event::ServerMessage(msg),
- )),
+ component_future::Async::Ready(Some(Event::ServerMessage(
+ msg,
+ ))),
None,
)),
}
@@ -396,7 +394,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
Option<Event>,
Error,
>] = &[
@@ -408,11 +406,11 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_reconnect_server(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
match &mut self.wsock {
WriteSocket::NotConnected => {
if let Some(timer) = &mut self.reconnect_timer {
- try_ready!(timer
+ component_future::try_ready!(timer
.poll()
.context(crate::error::TimerReconnect));
}
@@ -425,20 +423,20 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.handle_successful_connection(s)?;
}
Ok(futures::Async::NotReady) => {
- return Ok(crate::component_future::Async::NotReady);
+ return Ok(component_future::Async::NotReady);
}
Err(e) => {
log::warn!("error while connecting, reconnecting: {}", e);
self.reconnect();
self.last_error = Some(format!("{}", e));
- return Ok(crate::component_future::Async::Ready(Some(
+ return Ok(component_future::Async::Ready(Some(
Event::Disconnect,
)));
}
},
WriteSocket::Connected(..) | WriteSocket::Writing(..) => {
if self.has_seen_server_recently() {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
} else {
log::warn!(
"haven't seen server in a while, reconnecting",
@@ -446,22 +444,22 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.reconnect();
self.last_error =
Some("haven't seen server in a while".to_string());
- return Ok(crate::component_future::Async::Ready(Some(
+ return Ok(component_future::Async::Ready(Some(
Event::Disconnect,
)));
}
}
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
fn poll_read_server(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
match &mut self.rsock {
ReadSocket::NotConnected => {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
ReadSocket::Connected(..) => {
if let ReadSocket::Connected(s) = std::mem::replace(
@@ -473,7 +471,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
ReadSocket::Reading(ref mut fut) => match fut.poll() {
Ok(futures::Async::Ready((msg, s))) => {
@@ -494,20 +492,20 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
);
self.reconnect();
self.last_error = Some(format!("{}", e));
- Ok(crate::component_future::Async::Ready(Some(
+ Ok(component_future::Async::Ready(Some(
Event::Disconnect,
)))
}
}
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(e) => {
log::warn!("error reading message, reconnecting: {}", e);
self.reconnect();
self.last_error = Some(format!("{}", e));
- Ok(crate::component_future::Async::Ready(Some(
+ Ok(component_future::Async::Ready(Some(
Event::Disconnect,
)))
}
@@ -523,10 +521,10 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(e) => {
log::warn!(
@@ -535,7 +533,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
);
self.reconnect();
self.last_error = Some(format!("{}", e));
- Ok(crate::component_future::Async::Ready(Some(
+ Ok(component_future::Async::Ready(Some(
Event::Disconnect,
)))
}
@@ -545,14 +543,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_write_server(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
match &mut self.wsock {
WriteSocket::NotConnected | WriteSocket::Connecting(..) => {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
WriteSocket::Connected(..) => {
if self.to_send.is_empty() {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
if let WriteSocket::Connected(s) = std::mem::replace(
@@ -567,21 +565,21 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
unreachable!()
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
WriteSocket::Writing(ref mut fut) => match fut.poll() {
Ok(futures::Async::Ready(s)) => {
self.wsock = WriteSocket::Connected(s);
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(e) => {
log::warn!("error writing message, reconnecting: {}", e);
self.reconnect();
self.last_error = Some(format!("{}", e));
- Ok(crate::component_future::Async::Ready(Some(
+ Ok(component_future::Async::Ready(Some(
Event::Disconnect,
)))
}
@@ -591,13 +589,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_heartbeat(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
- let _ = try_ready!(self
+ ) -> component_future::Poll<Option<Event>, Error> {
+ let _ = component_future::try_ready!(self
.heartbeat_timer
.poll()
.context(crate::error::TimerHeartbeat));
self.send_message(crate::protocol::Message::heartbeat());
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
@@ -609,6 +607,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- crate::component_future::poll_stream(self, Self::POLL_FNS)
+ component_future::poll_stream(self, Self::POLL_FNS)
}
}
diff --git a/src/cmd/play.rs b/src/cmd/play.rs
index 3dda60c..4e0048e 100644
--- a/src/cmd/play.rs
+++ b/src/cmd/play.rs
@@ -78,7 +78,7 @@ impl PlaySession {
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
(),
Error,
>] = &[
@@ -87,58 +87,62 @@ impl PlaySession {
&Self::poll_write_terminal,
];
- fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> {
+ fn poll_open_file(&mut self) -> component_future::Poll<(), Error> {
match &mut self.file {
FileState::Closed { filename } => {
self.file = FileState::Opening {
filename: filename.to_string(),
fut: tokio::fs::File::open(filename.to_string()),
};
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
FileState::Opening { filename, fut } => {
- let file = try_ready!(fut.poll().with_context(|| {
- crate::error::OpenFile {
- filename: filename.to_string(),
- }
- }));
+ let file = component_future::try_ready!(fut
+ .poll()
+ .with_context(|| {
+ crate::error::OpenFile {
+ filename: filename.to_string(),
+ }
+ }));
let file = crate::ttyrec::File::new(file);
self.file = FileState::Open { file };
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- _ => Ok(crate::component_future::Async::NothingToDo),
+ _ => Ok(component_future::Async::NothingToDo),
}
}
- fn poll_read_file(&mut self) -> crate::component_future::Poll<(), Error> {
+ fn poll_read_file(&mut self) -> component_future::Poll<(), Error> {
if let FileState::Open { file } = &mut self.file {
- if let Some(frame) = try_ready!(file.poll_read()) {
+ if let Some(frame) =
+ component_future::try_ready!(file.poll_read())
+ {
self.to_write.insert_at(frame.data, frame.time);
} else {
self.file = FileState::Eof;
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
- fn poll_write_terminal(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- if let Some(data) =
- try_ready!(self.to_write.poll().context(crate::error::Sleep))
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if let Some(data) = component_future::try_ready!(self
+ .to_write
+ .poll()
+ .context(crate::error::Sleep))
{
// TODO async
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
stdout.write(&data).context(crate::error::WriteTerminal)?;
stdout.flush().context(crate::error::FlushTerminal)?;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else if let FileState::Eof = self.file {
- Ok(crate::component_future::Async::Ready(()))
+ Ok(component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
}
@@ -149,7 +153,7 @@ impl futures::future::Future for PlaySession {
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- crate::component_future::poll_future(self, Self::POLL_FNS)
+ component_future::poll_future(self, Self::POLL_FNS)
}
}
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index f2e0aab..6a31c5d 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -113,7 +113,7 @@ impl RecordSession {
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
(),
Error,
>] = &[
@@ -124,36 +124,36 @@ impl RecordSession {
&Self::poll_write_file,
];
- fn poll_open_file(&mut self) -> crate::component_future::Poll<(), Error> {
+ fn poll_open_file(&mut self) -> component_future::Poll<(), Error> {
match &mut self.file {
FileState::Closed { filename } => {
self.file = FileState::Opening {
filename: filename.to_string(),
fut: tokio::fs::File::create(filename.to_string()),
};
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
FileState::Opening { filename, fut } => {
- let file = try_ready!(fut.poll().with_context(|| {
- crate::error::OpenFile {
- filename: filename.clone(),
- }
- }));
+ let file = component_future::try_ready!(fut
+ .poll()
+ .with_context(|| {
+ crate::error::OpenFile {
+ filename: filename.clone(),
+ }
+ }));
let mut file = crate::ttyrec::File::new(file);
file.write_frame(self.buffer.contents())?;
self.file = FileState::Open { file };
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
FileState::Open { .. } => {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
}
- fn poll_read_process(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- match try_ready!(self.process.poll()) {
+ fn poll_read_process(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self.process.poll()) {
Some(crate::resize::Event::Process(e)) => {
match e {
crate::process::Event::CommandStart(..) => {
@@ -174,10 +174,10 @@ impl RecordSession {
}
}
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Some(crate::resize::Event::Resize(_)) => {
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
None => {
if !self.done {
@@ -185,62 +185,54 @@ impl RecordSession {
}
// don't return final event here - wait until we are done
// writing all data to the file (see poll_write_file)
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
}
- fn poll_write_terminal(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
if self.sent_local == self.buffer.len() {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
- let n = try_ready!(self
+ let n = component_future::try_ready!(self
.stdout
.poll_write(&self.buffer.contents()[self.sent_local..])
.context(crate::error::WriteTerminal));
self.sent_local += n;
self.needs_flush = true;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- fn poll_flush_terminal(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> {
if !self.needs_flush {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
- try_ready!(self
+ component_future::try_ready!(self
.stdout
.poll_flush()
.context(crate::error::FlushTerminal));
self.needs_flush = false;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- fn poll_write_file(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_write_file(&mut self) -> component_future::Poll<(), Error> {
let file = match &mut self.file {
FileState::Open { file } => file,
_ => {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
};
match file.poll_write()? {
- futures::Async::Ready(()) => {
- Ok(crate::component_future::Async::DidWork)
- }
+ futures::Async::Ready(()) => Ok(component_future::Async::DidWork),
futures::Async::NotReady => {
// ship all data to the server before actually ending
if self.done && file.is_empty() {
- Ok(crate::component_future::Async::Ready(()))
+ Ok(component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
}
}
@@ -253,6 +245,6 @@ impl futures::future::Future for RecordSession {
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- crate::component_future::poll_future(self, Self::POLL_FNS)
+ component_future::poll_future(self, Self::POLL_FNS)
}
}
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index 9a0f149..823a4b1 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -187,7 +187,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
(),
Error,
>] = &[
@@ -200,26 +200,24 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
// this should never return Err, because we don't want server
// communication issues to ever interrupt a running process
- fn poll_read_client(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_read_client(&mut self) -> component_future::Poll<(), Error> {
match self.client.poll() {
Ok(futures::Async::Ready(Some(e))) => match e {
crate::client::Event::Disconnect => {
self.connected = false;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
crate::client::Event::Connect => {
self.connected = true;
self.sent_remote = 0;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
crate::client::Event::ServerMessage(..) => {
// we don't expect to ever see a server message once we
// start streaming, so if one comes through, assume
// something is messed up and try again
self.client.reconnect();
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
},
Ok(futures::Async::Ready(None)) => {
@@ -227,19 +225,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
unreachable!()
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(..) => {
self.client.reconnect();
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
}
- fn poll_read_process(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- match try_ready!(self.process.poll()) {
+ fn poll_read_process(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self.process.poll()) {
Some(crate::resize::Event::Process(e)) => {
match e {
crate::process::Event::CommandStart(..) => {
@@ -257,12 +253,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.record_bytes(&output);
}
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Some(crate::resize::Event::Resize(size)) => {
self.client
.send_message(crate::protocol::Message::resize(&size));
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
None => {
if !self.done {
@@ -270,51 +266,45 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
// don't return final event here - wait until we are done
// sending all data to the server (see poll_write_server)
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
}
- fn poll_write_terminal(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
if self.sent_local == self.buffer.len() {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
- let n = try_ready!(self
+ let n = component_future::try_ready!(self
.stdout
.poll_write(&self.buffer.contents()[self.sent_local..])
.context(crate::error::WriteTerminal));
self.sent_local += n;
self.needs_flush = true;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- fn poll_flush_terminal(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> {
if !self.needs_flush {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
- try_ready!(self
+ component_future::try_ready!(self
.stdout
.poll_flush()
.context(crate::error::FlushTerminal));
self.needs_flush = false;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- fn poll_write_server(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_write_server(&mut self) -> component_future::Poll<(), Error> {
if self.sent_remote == self.buffer.len() || !self.connected {
// ship all data to the server before actually ending
if self.done {
- return Ok(crate::component_future::Async::Ready(()));
+ return Ok(component_future::Async::Ready(()));
} else {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
}
@@ -323,7 +313,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
.send_message(crate::protocol::Message::terminal_output(buf));
self.sent_remote = self.buffer.len();
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
@@ -335,6 +325,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- crate::component_future::poll_future(self, Self::POLL_FNS)
+ component_future::poll_future(self, Self::POLL_FNS)
}
}
diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs
index 011629d..67e3507 100644
--- a/src/cmd/watch.rs
+++ b/src/cmd/watch.rs
@@ -566,7 +566,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
(),
Error,
>] = &[
@@ -576,13 +576,13 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&Self::poll_watch_client,
];
- fn poll_resizer(&mut self) -> crate::component_future::Poll<(), Error> {
- let size = try_ready!(self.resizer.poll()).unwrap();
+ fn poll_resizer(&mut self) -> component_future::Poll<(), Error> {
+ let size = component_future::try_ready!(self.resizer.poll()).unwrap();
self.resize(size)?;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- fn poll_input(&mut self) -> crate::component_future::Poll<(), Error> {
+ fn poll_input(&mut self) -> component_future::Poll<(), Error> {
if self.raw_screen.is_none() {
self.raw_screen = Some(
crossterm::RawScreen::into_raw_mode()
@@ -595,7 +595,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
}
- let e = try_ready!(self.key_reader.poll()).unwrap();
+ let e = component_future::try_ready!(self.key_reader.poll()).unwrap();
let quit = match &mut self.state {
State::Temporary => unreachable!(),
State::LoggingIn { .. } => self.loading_keypress(&e)?,
@@ -603,16 +603,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
State::Watching { .. } => self.watch_keypress(&e)?,
};
if quit {
- Ok(crate::component_future::Async::Ready(()))
+ Ok(component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
- fn poll_list_client(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
- match try_ready!(self.list_client.poll()).unwrap() {
+ fn poll_list_client(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self.list_client.poll()).unwrap() {
crate::client::Event::Disconnect => {
self.reconnect(true)?;
}
@@ -624,19 +622,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.list_server_message(msg)?;
}
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- fn poll_watch_client(
- &mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ fn poll_watch_client(&mut self) -> component_future::Poll<(), Error> {
let client = if let State::Watching { client } = &mut self.state {
client
} else {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
};
- match try_ready!(client.poll()).unwrap() {
+ match component_future::try_ready!(client.poll()).unwrap() {
crate::client::Event::Disconnect => {
self.reconnect(true)?;
}
@@ -645,7 +641,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.watch_server_message(msg)?;
}
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
}
@@ -657,7 +653,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- let res = crate::component_future::poll_future(self, Self::POLL_FNS);
+ let res = component_future::poll_future(self, Self::POLL_FNS);
if res.is_err() {
self.state = State::Temporary; // drop alternate screen
self.raw_screen = None;
diff --git a/src/component_future.rs b/src/component_future.rs
deleted file mode 100644
index 6d8449d..0000000
--- a/src/component_future.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-pub type Poll<Item, Error> = Result<Async<Item>, Error>;
-
-pub enum Async<Item> {
- // we have a value for the main loop to return immediately.
- Ready(Item),
-
- // one of our inner futures returned futures::Async::NotReady. if all
- // of our other components return either NothingToDo or NotReady, then our
- // overall future should return NotReady and wait to be polled again.
- NotReady,
-
- // we did some work (moved our internal state closer to being ready to
- // return a value), but we aren't ready to return a value yet. we should
- // re-run all of the poll functions to see if the state modification made
- // any of them also able to make progress.
- DidWork,
-
- // we didn't poll any inner futures or otherwise change our internal state
- // at all, so rerunning is unlikely to make progress. if all components
- // return either NothingToDo or NotReady (and at least one returned
- // NotReady), then we should just return NotReady and wait to be polled
- // again. it is an error (panic) for all component poll methods to return
- // NothingToDo.
- NothingToDo,
-}
-
-macro_rules! try_ready {
- ($e:expr) => {
- match $e {
- Ok(futures::Async::Ready(t)) => t,
- Ok(futures::Async::NotReady) => {
- return Ok($crate::component_future::Async::NotReady)
- }
- Err(e) => return Err(From::from(e)),
- }
- };
-}
-
-pub fn poll_future<T, Item, Error>(
- future: &mut T,
- poll_fns: &'static [&'static dyn for<'a> Fn(
- &'a mut T,
- ) -> Poll<Item, Error>],
-) -> futures::Poll<Item, Error>
-where
- T: futures::future::Future<Item = Item, Error = Error>,
-{
- loop {
- let mut not_ready = false;
- let mut did_work = false;
-
- for f in poll_fns {
- match f(future)? {
- Async::Ready(e) => return Ok(futures::Async::Ready(e)),
- Async::NotReady => not_ready = true,
- Async::NothingToDo => {}
- Async::DidWork => did_work = true,
- }
- }
-
- if !did_work {
- if not_ready {
- return Ok(futures::Async::NotReady);
- } else {
- unreachable!()
- }
- }
- }
-}
-
-pub fn poll_stream<T, Item, Error>(
- stream: &mut T,
- poll_fns: &'static [&'static dyn for<'a> Fn(
- &'a mut T,
- ) -> Poll<
- Option<Item>,
- Error,
- >],
-) -> futures::Poll<Option<Item>, Error>
-where
- T: futures::stream::Stream<Item = Item, Error = Error>,
-{
- loop {
- let mut not_ready = false;
- let mut did_work = false;
-
- for f in poll_fns {
- match f(stream)? {
- Async::Ready(e) => return Ok(futures::Async::Ready(e)),
- Async::NotReady => not_ready = true,
- Async::NothingToDo => {}
- Async::DidWork => did_work = true,
- }
- }
-
- if !did_work {
- if not_ready {
- return Ok(futures::Async::NotReady);
- } else {
- unreachable!()
- }
- }
- }
-}
diff --git a/src/main.rs b/src/main.rs
index b9a7223..03cfc35 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -15,9 +15,6 @@ const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml");
mod prelude;
-#[macro_use]
-mod component_future;
-
mod async_stdin;
mod client;
mod cmd;
diff --git a/src/process.rs b/src/process.rs
index ebd55be..bbd1820 100644
--- a/src/process.rs
+++ b/src/process.rs
@@ -79,7 +79,7 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
Option<Event>,
Error,
>] = &[
@@ -97,22 +97,22 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
fn poll_resize(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
if let Some(size) = &self.needs_resize {
- try_ready!(size.resize_pty(self.state.pty()));
+ component_future::try_ready!(size.resize_pty(self.state.pty()));
log::debug!("resize({:?})", size);
self.needs_resize = None;
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
fn poll_command_start(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
if self.started {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
if self.state.pty.is_none() {
@@ -142,19 +142,20 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
}
self.started = true;
- Ok(crate::component_future::Async::Ready(Some(
- Event::CommandStart(self.cmd.clone(), self.args.clone()),
- )))
+ Ok(component_future::Async::Ready(Some(Event::CommandStart(
+ self.cmd.clone(),
+ self.args.clone(),
+ ))))
}
fn poll_read_stdin(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
if self.exited || self.stdin_closed {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
- let n = try_ready!(self
+ let n = component_future::try_ready!(self
.input
.poll_read(&mut self.buf)
.context(crate::error::ReadTerminal));
@@ -165,19 +166,19 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
self.input_buf.push_back(b'\x04');
self.stdin_closed = true;
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
fn poll_write_stdin(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
if self.exited || self.input_buf.is_empty() {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
let (a, b) = self.input_buf.as_slices();
let buf = if a.is_empty() { b } else { a };
- let n = try_ready!(self
+ let n = component_future::try_ready!(self
.state
.pty_mut()
.poll_write(buf)
@@ -186,12 +187,12 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
for _ in 0..n {
self.input_buf.pop_front();
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
fn poll_read_stdout(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
match self
.state
.pty_mut()
@@ -201,12 +202,10 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
Ok(futures::Async::Ready(n)) => {
log::debug!("read_stdout({})", n);
let bytes = self.buf[..n].to_vec();
- Ok(crate::component_future::Async::Ready(Some(
- Event::Output(bytes),
- )))
+ Ok(component_future::Async::Ready(Some(Event::Output(bytes))))
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(e) => {
// XXX this seems to be how eof is returned, but this seems...
@@ -215,7 +214,7 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
if source.kind() == std::io::ErrorKind::Other {
log::debug!("read_stdout(eof)");
self.stdout_closed = true;
- return Ok(crate::component_future::Async::DidWork);
+ return Ok(component_future::Async::DidWork);
}
}
Err(e)
@@ -225,24 +224,24 @@ impl<R: tokio::io::AsyncRead + 'static> Process<R> {
fn poll_command_exit(
&mut self,
- ) -> crate::component_future::Poll<Option<Event>, Error> {
+ ) -> component_future::Poll<Option<Event>, Error> {
if self.exited {
- return Ok(crate::component_future::Async::Ready(None));
+ return Ok(component_future::Async::Ready(None));
}
if !self.stdout_closed {
- return Ok(crate::component_future::Async::NothingToDo);
+ return Ok(component_future::Async::NothingToDo);
}
- let status = try_ready!(self
+ let status = component_future::try_ready!(self
.state
.process()
.poll()
.context(crate::error::ProcessExitPoll));
log::debug!("exit({})", status);
self.exited = true;
- Ok(crate::component_future::Async::Ready(Some(
- Event::CommandExit(status),
- )))
+ Ok(component_future::Async::Ready(Some(Event::CommandExit(
+ status,
+ ))))
}
}
@@ -254,7 +253,7 @@ impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- crate::component_future::poll_stream(self, Self::POLL_FNS)
+ component_future::poll_stream(self, Self::POLL_FNS)
}
}
diff --git a/src/resize.rs b/src/resize.rs
index 9e357af..2dcf928 100644
--- a/src/resize.rs
+++ b/src/resize.rs
@@ -62,26 +62,25 @@ impl<R: tokio::io::AsyncRead + 'static> ResizingProcess<R> {
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
Option<Event<R>>,
Error,
>] = &[&Self::poll_resize, &Self::poll_process];
fn poll_resize(
&mut self,
- ) -> crate::component_future::Poll<Option<Event<R>>, Error> {
- let size = try_ready!(self.resizer.poll()).unwrap();
+ ) -> component_future::Poll<Option<Event<R>>, Error> {
+ let size = component_future::try_ready!(self.resizer.poll()).unwrap();
self.process.resize(size.clone());
- Ok(crate::component_future::Async::Ready(Some(Event::Resize(
- size,
- ))))
+ Ok(component_future::Async::Ready(Some(Event::Resize(size))))
}
fn poll_process(
&mut self,
- ) -> crate::component_future::Poll<Option<Event<R>>, Error> {
- Ok(crate::component_future::Async::Ready(
- try_ready!(self.process.poll()).map(Event::Process),
+ ) -> component_future::Poll<Option<Event<R>>, Error> {
+ Ok(component_future::Async::Ready(
+ component_future::try_ready!(self.process.poll())
+ .map(Event::Process),
))
}
}
@@ -95,6 +94,6 @@ impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
<crate::process::Process<R> as futures::stream::Stream>::Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- crate::component_future::poll_stream(self, Self::POLL_FNS)
+ component_future::poll_stream(self, Self::POLL_FNS)
}
}
diff --git a/src/server.rs b/src/server.rs
index 17c14e5..36195c5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -810,7 +810,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
fn poll_read_connection(
&mut self,
conn: &mut Connection<S>,
- ) -> crate::component_future::Poll<(), Error> {
+ ) -> component_future::Poll<(), Error> {
match &mut conn.rsock {
Some(ReadSocket::Connected(..)) => {
if let Some(ReadSocket::Connected(s)) = conn.rsock.take() {
@@ -823,7 +823,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Some(ReadSocket::Reading(fut)) => match fut.poll() {
Ok(futures::Async::Ready((msg, s))) => {
@@ -840,15 +840,15 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
conn.rsock = Some(ReadSocket::Connected(s));
}
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(e) => classify_connection_error(e),
},
Some(ReadSocket::Processing(_, fut)) => {
- let (state, msg) = try_ready!(fut.poll());
+ let (state, msg) = component_future::try_ready!(fut.poll());
if let Some(ReadSocket::Processing(s, _)) = conn.rsock.take()
{
conn.state = state;
@@ -857,16 +857,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
- _ => Ok(crate::component_future::Async::NothingToDo),
+ _ => Ok(component_future::Async::NothingToDo),
}
}
fn poll_write_connection(
&mut self,
conn: &mut Connection<S>,
- ) -> crate::component_future::Poll<(), Error> {
+ ) -> component_future::Poll<(), Error> {
match &mut conn.wsock {
Some(WriteSocket::Connected(..)) => {
if let Some(msg) = conn.to_send.pop_front() {
@@ -886,24 +886,24 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
} else {
unreachable!()
}
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else if conn.closed {
- Ok(crate::component_future::Async::Ready(()))
+ Ok(component_future::Async::Ready(()))
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
Some(WriteSocket::Writing(fut)) => match fut.poll() {
Ok(futures::Async::Ready(s)) => {
conn.wsock = Some(WriteSocket::Connected(s));
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
}
Err(e) => classify_connection_error(e),
},
- _ => Ok(crate::component_future::Async::NothingToDo),
+ _ => Ok(component_future::Async::NothingToDo),
}
}
@@ -938,22 +938,23 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
(),
Error,
>] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write];
- fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> {
- if let Some(sock) = try_ready!(self.acceptor.poll()) {
+ fn poll_accept(&mut self) -> component_future::Poll<(), Error> {
+ if let Some(sock) = component_future::try_ready!(self.acceptor.poll())
+ {
let conn = Connection::new(sock, self.buffer_size);
self.connections.insert(conn.id.to_string(), conn);
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else {
unreachable!()
}
}
- fn poll_read(&mut self) -> crate::component_future::Poll<(), Error> {
+ fn poll_read(&mut self) -> component_future::Poll<(), Error> {
let mut did_work = false;
let mut not_ready = false;
@@ -961,14 +962,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
for key in keys {
let mut conn = self.connections.remove(&key).unwrap();
match self.poll_read_connection(&mut conn) {
- Ok(crate::component_future::Async::Ready(())) => {
+ Ok(component_future::Async::Ready(())) => {
self.handle_disconnect(&mut conn);
continue;
}
- Ok(crate::component_future::Async::DidWork) => {
+ Ok(component_future::Async::DidWork) => {
did_work = true;
}
- Ok(crate::component_future::Async::NotReady) => {
+ Ok(component_future::Async::NotReady) => {
not_ready = true;
}
Err(e) => {
@@ -984,15 +985,15 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
if did_work {
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else if not_ready {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
- fn poll_write(&mut self) -> crate::component_future::Poll<(), Error> {
+ fn poll_write(&mut self) -> component_future::Poll<(), Error> {
let mut did_work = false;
let mut not_ready = false;
@@ -1000,14 +1001,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
for key in keys {
let mut conn = self.connections.remove(&key).unwrap();
match self.poll_write_connection(&mut conn) {
- Ok(crate::component_future::Async::Ready(())) => {
+ Ok(component_future::Async::Ready(())) => {
self.handle_disconnect(&mut conn);
continue;
}
- Ok(crate::component_future::Async::DidWork) => {
+ Ok(component_future::Async::DidWork) => {
did_work = true;
}
- Ok(crate::component_future::Async::NotReady) => {
+ Ok(component_future::Async::NotReady) => {
not_ready = true;
}
Err(e) => {
@@ -1023,18 +1024,16 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
if did_work {
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else if not_ready {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
}
-fn classify_connection_error(
- e: Error,
-) -> crate::component_future::Poll<(), Error> {
+fn classify_connection_error(e: Error) -> component_future::Poll<(), Error> {
let source = match e {
Error::ReadMessageWithTimeout { source } => source,
Error::WriteMessageWithTimeout { source } => source,
@@ -1051,7 +1050,7 @@ fn classify_connection_error(
source: ref tokio_err,
} => tokio_err,
Error::EOF => {
- return Ok(crate::component_future::Async::Ready(()));
+ return Ok(component_future::Async::Ready(()));
}
_ => {
return Err(source);
@@ -1059,7 +1058,7 @@ fn classify_connection_error(
};
if tokio_err.kind() == tokio::io::ErrorKind::UnexpectedEof {
- Ok(crate::component_future::Async::Ready(()))
+ Ok(component_future::Async::Ready(()))
} else {
Err(source)
}
@@ -1079,6 +1078,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- crate::component_future::poll_future(self, Self::POLL_FNS)
+ component_future::poll_future(self, Self::POLL_FNS)
}
}
diff --git a/src/server/tls.rs b/src/server/tls.rs
index 8bb105c..9db4603 100644
--- a/src/server/tls.rs
+++ b/src/server/tls.rs
@@ -55,7 +55,7 @@ impl Server {
&'static [&'static dyn for<'a> Fn(
&'a mut Self,
)
- -> crate::component_future::Poll<
+ -> component_future::Poll<
(),
Error,
>] = &[
@@ -64,10 +64,11 @@ impl Server {
&Self::poll_server,
];
- fn poll_accept(&mut self) -> crate::component_future::Poll<(), Error> {
- if let Some(sock) = try_ready!(self.acceptor.poll()) {
+ fn poll_accept(&mut self) -> component_future::Poll<(), Error> {
+ if let Some(sock) = component_future::try_ready!(self.acceptor.poll())
+ {
self.accepting_sockets.push(sock);
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else {
Err(Error::SocketChannelClosed)
}
@@ -75,7 +76,7 @@ impl Server {
fn poll_handshake_connections(
&mut self,
- ) -> crate::component_future::Poll<(), Error> {
+ ) -> component_future::Poll<(), Error> {
let mut did_work = false;
let mut not_ready = false;
@@ -107,17 +108,17 @@ impl Server {
}
if did_work {
- Ok(crate::component_future::Async::DidWork)
+ Ok(component_future::Async::DidWork)
} else if not_ready {
- Ok(crate::component_future::Async::NotReady)
+ Ok(component_future::Async::NotReady)
} else {
- Ok(crate::component_future::Async::NothingToDo)
+ Ok(component_future::Async::NothingToDo)
}
}
- fn poll_server(&mut self) -> crate::component_future::Poll<(), Error> {
- try_ready!(self.server.poll());
- Ok(crate::component_future::Async::Ready(()))
+ fn poll_server(&mut self) -> component_future::Poll<(), Error> {
+ component_future::try_ready!(self.server.poll());
+ Ok(component_future::Async::Ready(()))
}
}
@@ -127,6 +128,6 @@ impl futures::future::Future for Server {
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- crate::component_future::poll_future(self, Self::POLL_FNS)
+ component_future::poll_future(self, Self::POLL_FNS)
}
}