aboutsummaryrefslogtreecommitdiffstats
path: root/src/cmd/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/cmd/stream.rs')
-rw-r--r--src/cmd/stream.rs61
1 files changed, 32 insertions, 29 deletions
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index 805ed8e..eba26bb 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -242,11 +242,14 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
StreamSession<S>
{
- const POLL_FNS: &'static [&'static dyn for<'a> Fn(
- &'a mut Self,
- ) -> Result<
- crate::component_future::Poll<()>,
- >] = &[
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> crate::component_future::Poll<
+ (),
+ Error,
+ >] = &[
&Self::poll_read_client,
&Self::poll_read_process,
&Self::poll_write_terminal,
@@ -258,32 +261,32 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
// communication issues to ever interrupt a running process
fn poll_read_client(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.client.poll() {
Ok(futures::Async::Ready(Some(e))) => match e {
crate::client::Event::Disconnect => {
self.connected = false;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::Start(size) => {
self.process.resize(size);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::Connect() => {
self.connected = true;
self.sent_remote = 0;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::ServerMessage(..) => {
// we don't expect to ever see a server message once we
// start streaming, so if one comes through, assume
// something is messed up and try again
self.client.reconnect();
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
crate::client::Event::Resize(size) => {
self.process.resize(size);
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
},
Ok(futures::Async::Ready(None)) => {
@@ -291,18 +294,18 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
unreachable!()
}
Ok(futures::Async::NotReady) => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
Err(..) => {
self.client.reconnect();
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
}
}
fn poll_read_process(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
match self.process.poll()? {
futures::Async::Ready(Some(e)) => {
match e {
@@ -322,7 +325,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
self.record_bytes(&output);
}
}
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::Ready(None) => {
if !self.done {
@@ -330,19 +333,19 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
// don't return final event here - wait until we are done
// sending all data to the server (see poll_write_server)
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if self.sent_local == self.buffer.len() {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -353,19 +356,19 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
futures::Async::Ready(n) => {
self.sent_local += n;
self.needs_flush = true;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_flush_terminal(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if !self.needs_flush {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
match self
@@ -375,23 +378,23 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
{
futures::Async::Ready(()) => {
self.needs_flush = false;
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
futures::Async::NotReady => {
- Ok(crate::component_future::Poll::NotReady)
+ Ok(crate::component_future::Async::NotReady)
}
}
}
fn poll_write_server(
&mut self,
- ) -> Result<crate::component_future::Poll<()>> {
+ ) -> crate::component_future::Poll<(), Error> {
if self.sent_remote == self.buffer.len() || !self.connected {
// ship all data to the server before actually ending
if self.done {
- return Ok(crate::component_future::Poll::Event(()));
+ return Ok(crate::component_future::Async::Ready(()));
} else {
- return Ok(crate::component_future::Poll::NothingToDo);
+ return Ok(crate::component_future::Async::NothingToDo);
}
}
@@ -400,7 +403,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
.send_message(crate::protocol::Message::terminal_output(buf));
self.sent_remote = self.buffer.len();
- Ok(crate::component_future::Poll::DidWork)
+ Ok(crate::component_future::Async::DidWork)
}
}