diff options
Diffstat (limited to 'src/cmd/stream.rs')
-rw-r--r-- | src/cmd/stream.rs | 60 |
1 files changed, 25 insertions, 35 deletions
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs index 9a0f149..823a4b1 100644 --- a/src/cmd/stream.rs +++ b/src/cmd/stream.rs @@ -187,7 +187,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> &'static [&'static dyn for<'a> Fn( &'a mut Self, ) - -> crate::component_future::Poll< + -> component_future::Poll< (), Error, >] = &[ @@ -200,26 +200,24 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> // this should never return Err, because we don't want server // communication issues to ever interrupt a running process - fn poll_read_client( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_read_client(&mut self) -> component_future::Poll<(), Error> { match self.client.poll() { Ok(futures::Async::Ready(Some(e))) => match e { crate::client::Event::Disconnect => { self.connected = false; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } crate::client::Event::Connect => { self.connected = true; self.sent_remote = 0; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } crate::client::Event::ServerMessage(..) => { // we don't expect to ever see a server message once we // start streaming, so if one comes through, assume // something is messed up and try again self.client.reconnect(); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } }, Ok(futures::Async::Ready(None)) => { @@ -227,19 +225,17 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> unreachable!() } Ok(futures::Async::NotReady) => { - Ok(crate::component_future::Async::NotReady) + Ok(component_future::Async::NotReady) } Err(..) => { self.client.reconnect(); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } } - fn poll_read_process( - &mut self, - ) -> crate::component_future::Poll<(), Error> { - match try_ready!(self.process.poll()) { + fn poll_read_process(&mut self) -> component_future::Poll<(), Error> { + match component_future::try_ready!(self.process.poll()) { Some(crate::resize::Event::Process(e)) => { match e { crate::process::Event::CommandStart(..) => { @@ -257,12 +253,12 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> self.record_bytes(&output); } } - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } Some(crate::resize::Event::Resize(size)) => { self.client .send_message(crate::protocol::Message::resize(&size)); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } None => { if !self.done { @@ -270,51 +266,45 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> } // don't return final event here - wait until we are done // sending all data to the server (see poll_write_server) - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } } - fn poll_write_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> { if self.sent_local == self.buffer.len() { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - let n = try_ready!(self + let n = component_future::try_ready!(self .stdout .poll_write(&self.buffer.contents()[self.sent_local..]) .context(crate::error::WriteTerminal)); self.sent_local += n; self.needs_flush = true; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_flush_terminal( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> { if !self.needs_flush { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } - try_ready!(self + component_future::try_ready!(self .stdout .poll_flush() .context(crate::error::FlushTerminal)); self.needs_flush = false; - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } - fn poll_write_server( - &mut self, - ) -> crate::component_future::Poll<(), Error> { + fn poll_write_server(&mut self) -> component_future::Poll<(), Error> { if self.sent_remote == self.buffer.len() || !self.connected { // ship all data to the server before actually ending if self.done { - return Ok(crate::component_future::Async::Ready(())); + return Ok(component_future::Async::Ready(())); } else { - return Ok(crate::component_future::Async::NothingToDo); + return Ok(component_future::Async::NothingToDo); } } @@ -323,7 +313,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> .send_message(crate::protocol::Message::terminal_output(buf)); self.sent_remote = self.buffer.len(); - Ok(crate::component_future::Async::DidWork) + Ok(component_future::Async::DidWork) } } @@ -335,6 +325,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> type Error = Error; fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { - crate::component_future::poll_future(self, Self::POLL_FNS) + component_future::poll_future(self, Self::POLL_FNS) } } |