aboutsummaryrefslogtreecommitdiffstats
path: root/teleterm/src/cmd/stream.rs
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-15 13:11:07 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-15 13:11:07 -0500
commitbbf15cfef8134da720a27bd71a93efcb8467025b (patch)
treeaa58a5d7c1862fcdd6c8629651f664aa12c70f66 /teleterm/src/cmd/stream.rs
parentfe4fa53dbbb6030beae2094e33d1db008532ae3c (diff)
downloadteleterm-bbf15cfef8134da720a27bd71a93efcb8467025b.tar.gz
teleterm-bbf15cfef8134da720a27bd71a93efcb8467025b.zip
use workspaces
Diffstat (limited to 'teleterm/src/cmd/stream.rs')
-rw-r--r--teleterm/src/cmd/stream.rs345
1 files changed, 345 insertions, 0 deletions
diff --git a/teleterm/src/cmd/stream.rs b/teleterm/src/cmd/stream.rs
new file mode 100644
index 0000000..11d3873
--- /dev/null
+++ b/teleterm/src/cmd/stream.rs
@@ -0,0 +1,345 @@
+use crate::prelude::*;
+use tokio::io::AsyncWrite as _;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ client: crate::config::Client,
+
+ #[serde(default)]
+ command: crate::config::Command,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.client.merge_args(matches)?;
+ self.command.merge_args(matches)?;
+ Ok(())
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ let auth = match self.client.auth {
+ crate::protocol::AuthType::Plain => {
+ let username = self
+ .client
+ .username
+ .clone()
+ .context(crate::error::CouldntFindUsername);
+ match username {
+ Ok(username) => crate::protocol::Auth::plain(&username),
+ Err(e) => return Box::new(futures::future::err(e)),
+ }
+ }
+ crate::protocol::AuthType::RecurseCenter => {
+ let id = crate::oauth::load_client_auth_id(self.client.auth);
+ crate::protocol::Auth::recurse_center(
+ id.as_ref().map(std::string::String::as_str),
+ )
+ }
+ };
+
+ let host = self.client.host().to_string();
+ let address = *self.client.addr();
+ if self.client.tls {
+ let connector = match native_tls::TlsConnector::new()
+ .context(crate::error::CreateConnector)
+ {
+ Ok(connector) => connector,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+ let connect: crate::client::Connector<_> = Box::new(move || {
+ let host = host.clone();
+ let connector = connector.clone();
+ let connector = tokio_tls::TlsConnector::from(connector);
+ let stream = tokio::net::tcp::TcpStream::connect(&address);
+ Box::new(
+ stream
+ .context(crate::error::Connect { address })
+ .and_then(move |stream| {
+ connector
+ .connect(&host, stream)
+ .context(crate::error::ConnectTls { host })
+ }),
+ )
+ });
+ Box::new(StreamSession::new(
+ &self.command.command,
+ &self.command.args,
+ connect,
+ &auth,
+ ))
+ } else {
+ let connect: crate::client::Connector<_> = Box::new(move || {
+ Box::new(
+ tokio::net::tcp::TcpStream::connect(&address)
+ .context(crate::error::Connect { address }),
+ )
+ });
+ Box::new(StreamSession::new(
+ &self.command.command,
+ &self.command.args,
+ connect,
+ &auth,
+ ))
+ }
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Client::cmd(crate::config::Command::cmd(
+ app.about("Stream your terminal"),
+ ))
+}
+
+pub fn config(
+ mut config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ if config.is_none() {
+ config = crate::config::wizard::run()?;
+ }
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
+
+struct StreamSession<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ client: crate::client::Client<S>,
+ connected: bool,
+
+ process:
+ tokio_pty_process_stream::ResizingProcess<crate::async_stdin::Stdin>,
+ raw_screen: Option<crossterm::screen::RawScreen>,
+ done: bool,
+
+ term: vt100::Parser,
+ last_screen: vt100::Screen,
+ needs_screen_update: bool,
+
+ stdout: tokio::io::Stdout,
+ to_print: std::collections::VecDeque<u8>,
+ needs_flush: bool,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ StreamSession<S>
+{
+ fn new(
+ cmd: &str,
+ args: &[String],
+ connect: crate::client::Connector<S>,
+ auth: &crate::protocol::Auth,
+ ) -> Self {
+ let client = crate::client::Client::stream(connect, auth);
+
+ // TODO: tokio::io::stdin is broken (it's blocking)
+ // see https://github.com/tokio-rs/tokio/issues/589
+ // let input = tokio::io::stdin();
+ let input = crate::async_stdin::Stdin::new();
+
+ let process = tokio_pty_process_stream::ResizingProcess::new(
+ tokio_pty_process_stream::Process::new(cmd, args, input),
+ );
+
+ let term = vt100::Parser::default();
+ let screen = term.screen().clone();
+
+ Self {
+ client,
+ connected: false,
+
+ process,
+ raw_screen: None,
+ done: false,
+
+ term,
+ last_screen: screen,
+ needs_screen_update: false,
+
+ stdout: tokio::io::stdout(),
+ to_print: std::collections::VecDeque::new(),
+ needs_flush: false,
+ }
+ }
+
+ fn record_bytes(&mut self, buf: &[u8]) {
+ self.to_print.extend(buf);
+ self.term.process(buf);
+ self.needs_screen_update = true;
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ StreamSession<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[
+ &Self::poll_read_client,
+ &Self::poll_read_process,
+ &Self::poll_write_terminal,
+ &Self::poll_flush_terminal,
+ &Self::poll_write_server,
+ ];
+
+ // this should never return Err, because we don't want server
+ // communication issues to ever interrupt a running process
+ fn poll_read_client(&mut self) -> component_future::Poll<(), Error> {
+ match self.client.poll() {
+ Ok(futures::Async::Ready(Some(e))) => match e {
+ crate::client::Event::Disconnect => {
+ self.connected = false;
+ Ok(component_future::Async::DidWork)
+ }
+ crate::client::Event::Connect => {
+ self.connected = true;
+ self.client.send_message(
+ crate::protocol::Message::terminal_output(
+ &self.last_screen.contents_formatted(),
+ ),
+ );
+ Ok(component_future::Async::DidWork)
+ }
+ crate::client::Event::ServerMessage(..) => {
+ // we don't expect to ever see a server message once we
+ // start streaming, so if one comes through, assume
+ // something is messed up and try again
+ self.client.reconnect();
+ Ok(component_future::Async::DidWork)
+ }
+ },
+ Ok(futures::Async::Ready(None)) => {
+ // the client should never exit on its own
+ unreachable!()
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(..) => {
+ self.client.reconnect();
+ Ok(component_future::Async::DidWork)
+ }
+ }
+ }
+
+ fn poll_read_process(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self
+ .process
+ .poll()
+ .context(crate::error::Subprocess))
+ {
+ Some(tokio_pty_process_stream::Event::CommandStart {
+ ..
+ }) => {
+ if self.raw_screen.is_none() {
+ self.raw_screen = Some(
+ crossterm::screen::RawScreen::into_raw_mode()
+ .context(crate::error::ToRawMode)?,
+ );
+ }
+ }
+ Some(tokio_pty_process_stream::Event::CommandExit { .. }) => {
+ self.done = true;
+ }
+ Some(tokio_pty_process_stream::Event::Output { data }) => {
+ self.record_bytes(&data);
+ }
+ Some(tokio_pty_process_stream::Event::Resize {
+ size: (rows, cols),
+ }) => {
+ self.client.send_message(crate::protocol::Message::resize(
+ crate::term::Size { rows, cols },
+ ));
+ }
+ None => {
+ if !self.done {
+ unreachable!()
+ }
+ // don't return final event here - wait until we are done
+ // sending all data to the server (see poll_write_server)
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if self.to_print.is_empty() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ let (a, b) = self.to_print.as_slices();
+ let buf = if a.is_empty() { b } else { a };
+ let n = component_future::try_ready!(self
+ .stdout
+ .poll_write(buf)
+ .context(crate::error::WriteTerminal));
+ for _ in 0..n {
+ self.to_print.pop_front();
+ }
+ self.needs_flush = true;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if !self.needs_flush {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ component_future::try_ready!(self
+ .stdout
+ .poll_flush()
+ .context(crate::error::FlushTerminal));
+ self.needs_flush = false;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_write_server(&mut self) -> component_future::Poll<(), Error> {
+ if !self.connected || !self.needs_screen_update {
+ // ship all data to the server before actually ending
+ if self.done {
+ return Ok(component_future::Async::Ready(()));
+ } else {
+ return Ok(component_future::Async::NothingToDo);
+ }
+ }
+
+ let screen = self.term.screen().clone();
+ self.client
+ .send_message(crate::protocol::Message::terminal_output(
+ &screen.contents_diff(&self.last_screen),
+ ));
+ self.last_screen = screen;
+ self.needs_screen_update = false;
+
+ Ok(component_future::Async::DidWork)
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::future::Future for StreamSession<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}