aboutsummaryrefslogtreecommitdiffstats
path: root/src/cmd/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/cmd/stream.rs')
-rw-r--r--src/cmd/stream.rs111
1 files changed, 78 insertions, 33 deletions
diff --git a/src/cmd/stream.rs b/src/cmd/stream.rs
index 503a5ad..b55113f 100644
--- a/src/cmd/stream.rs
+++ b/src/cmd/stream.rs
@@ -115,6 +115,39 @@ pub fn config(
Ok(Box::new(config))
}
+struct ScreenState {
+ screen: Option<vt100::Screen>,
+ has_screen_data: bool,
+ write_buf: Vec<u8>,
+ write_buf_idx: usize,
+}
+
+impl ScreenState {
+ fn new() -> Self {
+ Self {
+ screen: None,
+ has_screen_data: true,
+ write_buf: vec![],
+ write_buf_idx: 0,
+ }
+ }
+
+ fn has_data(&self) -> bool {
+ self.has_screen_data || self.write_buf_idx != self.write_buf.len()
+ }
+
+ fn update_screen(&mut self, screen: &vt100::Screen) {
+ if let Some(prev_screen) = &self.screen {
+ self.write_buf = screen.contents_diff(prev_screen);
+ } else {
+ self.write_buf = screen.contents_formatted();
+ }
+ self.write_buf_idx = 0;
+ self.screen = Some(screen.clone());
+ self.has_screen_data = false;
+ }
+}
+
struct StreamSession<
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
> {
@@ -122,11 +155,10 @@ struct StreamSession<
process:
tokio_pty_process_stream::ResizingProcess<crate::async_stdin::Stdin>,
stdout: tokio::io::Stdout,
- buffer: crate::term::Buffer,
- sent_local: usize,
- sent_remote: usize,
+ term: vt100::Parser,
+ local: ScreenState,
+ remote: Option<ScreenState>,
needs_flush: bool,
- connected: bool,
done: bool,
raw_screen: Option<crossterm::screen::RawScreen>,
}
@@ -157,26 +189,20 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
client,
process,
stdout: tokio::io::stdout(),
- buffer: crate::term::Buffer::new(buffer_size),
- sent_local: 0,
- sent_remote: 0,
+ term: vt100::Parser::new(24, 80, 0),
+ local: ScreenState::new(),
+ remote: None,
needs_flush: false,
- connected: false,
done: false,
raw_screen: None,
}
}
fn record_bytes(&mut self, buf: &[u8]) {
- let written = if self.connected {
- self.sent_local.min(self.sent_remote)
- } else {
- self.sent_local
- };
- let truncated = self.buffer.append_client(buf, written);
- self.sent_local -= truncated;
- if self.connected {
- self.sent_remote -= truncated;
+ self.term.process(buf);
+ self.local.has_screen_data = true;
+ if let Some(remote) = &mut self.remote {
+ remote.has_screen_data = true;
}
}
}
@@ -205,18 +231,18 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
match self.client.poll() {
Ok(futures::Async::Ready(Some(e))) => match e {
crate::client::Event::Disconnect => {
- self.connected = false;
+ self.remote = None;
Ok(component_future::Async::DidWork)
}
crate::client::Event::Connect => {
- self.connected = true;
- self.sent_remote = 0;
+ self.remote = Some(ScreenState::new());
Ok(component_future::Async::DidWork)
}
crate::client::Event::ServerMessage(..) => {
// we don't expect to ever see a server message once we
// start streaming, so if one comes through, assume
// something is messed up and try again
+ self.remote = None;
self.client.reconnect();
Ok(component_future::Async::DidWork)
}
@@ -229,6 +255,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Ok(component_future::Async::NotReady)
}
Err(..) => {
+ self.remote = None;
self.client.reconnect();
Ok(component_future::Async::DidWork)
}
@@ -260,6 +287,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
Some(tokio_pty_process_stream::Event::Resize {
size: (rows, cols),
}) => {
+ self.term.set_size(rows, cols);
self.client.send_message(crate::protocol::Message::resize(
&crate::term::Size { rows, cols },
));
@@ -276,15 +304,19 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
- if self.sent_local == self.buffer.len() {
+ if !self.local.has_data() {
return Ok(component_future::Async::NothingToDo);
}
+ if self.local.write_buf_idx == self.local.write_buf.len() {
+ self.local.update_screen(self.term.screen());
+ }
+
let n = component_future::try_ready!(self
.stdout
- .poll_write(&self.buffer.contents()[self.sent_local..])
+ .poll_write(&self.local.write_buf[self.local.write_buf_idx..])
.context(crate::error::WriteTerminal));
- self.sent_local += n;
+ self.local.write_buf_idx += n;
self.needs_flush = true;
Ok(component_future::Async::DidWork)
}
@@ -303,21 +335,34 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
}
fn poll_write_server(&mut self) -> component_future::Poll<(), Error> {
- if self.sent_remote == self.buffer.len() || !self.connected {
+ if let Some(remote) = &mut self.remote {
+ if !remote.has_data() {
+ // ship all data to the server before actually ending
+ if self.done {
+ return Ok(component_future::Async::Ready(()));
+ } else {
+ return Ok(component_future::Async::NothingToDo);
+ }
+ }
+
+ if remote.write_buf_idx == remote.write_buf.len() {
+ remote.update_screen(self.term.screen());
+ }
+
+ let buf = &remote.write_buf[remote.write_buf_idx..];
+ self.client
+ .send_message(crate::protocol::Message::terminal_output(buf));
+ remote.write_buf_idx = remote.write_buf.len();
+
+ Ok(component_future::Async::DidWork)
+ } else {
// ship all data to the server before actually ending
if self.done {
- return Ok(component_future::Async::Ready(()));
+ Ok(component_future::Async::Ready(()))
} else {
- return Ok(component_future::Async::NothingToDo);
+ Ok(component_future::Async::NothingToDo)
}
}
-
- let buf = &self.buffer.contents()[self.sent_remote..];
- self.client
- .send_message(crate::protocol::Message::terminal_output(buf));
- self.sent_remote = self.buffer.len();
-
- Ok(component_future::Async::DidWork)
}
}