aboutsummaryrefslogtreecommitdiffstats
path: root/teleterm
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-15 13:11:07 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-15 13:11:07 -0500
commitbbf15cfef8134da720a27bd71a93efcb8467025b (patch)
treeaa58a5d7c1862fcdd6c8629651f664aa12c70f66 /teleterm
parentfe4fa53dbbb6030beae2094e33d1db008532ae3c (diff)
downloadteleterm-bbf15cfef8134da720a27bd71a93efcb8467025b.tar.gz
teleterm-bbf15cfef8134da720a27bd71a93efcb8467025b.zip
use workspaces
Diffstat (limited to 'teleterm')
-rw-r--r--teleterm/Cargo.toml58
-rw-r--r--teleterm/src/async_stdin.rs88
-rw-r--r--teleterm/src/client.rs595
-rw-r--r--teleterm/src/cmd.rs138
-rw-r--r--teleterm/src/cmd/play.rs946
-rw-r--r--teleterm/src/cmd/record.rs259
-rw-r--r--teleterm/src/cmd/server.rs181
-rw-r--r--teleterm/src/cmd/stream.rs345
-rw-r--r--teleterm/src/cmd/watch.rs776
-rw-r--r--teleterm/src/cmd/web.rs46
-rw-r--r--teleterm/src/config.rs877
-rw-r--r--teleterm/src/config/wizard.rs159
-rw-r--r--teleterm/src/dirs.rs98
-rw-r--r--teleterm/src/error.rs415
-rw-r--r--teleterm/src/key_reader.rs68
-rw-r--r--teleterm/src/main.rs44
-rw-r--r--teleterm/src/oauth.rs181
-rw-r--r--teleterm/src/oauth/recurse_center.rs87
-rw-r--r--teleterm/src/prelude.rs9
-rw-r--r--teleterm/src/protocol.rs947
-rw-r--r--teleterm/src/server.rs1073
-rw-r--r--teleterm/src/server/tls.rs131
-rw-r--r--teleterm/src/session_list.rs342
-rw-r--r--teleterm/src/term.rs25
-rw-r--r--teleterm/src/web.rs87
-rw-r--r--teleterm/src/web/ws.rs61
26 files changed, 8036 insertions, 0 deletions
diff --git a/teleterm/Cargo.toml b/teleterm/Cargo.toml
new file mode 100644
index 0000000..f1e0ed0
--- /dev/null
+++ b/teleterm/Cargo.toml
@@ -0,0 +1,58 @@
+[package]
+name = "teleterm"
+version = "0.2.0"
+authors = ["Jesse Luehrs <doy@tozt.net>"]
+edition = "2018"
+
+description = "share your terminals!"
+repository = "https://git.tozt.net/teleterm"
+readme = "README.md"
+keywords = ["terminal", "streaming"]
+categories = ["command-line-utilities"]
+license = "MIT"
+
+[dependencies]
+base64 = "0.11"
+bytes = "0.4"
+clap = { version = "2", features = ["wrap_help"] }
+component-future = "0.1"
+config = { version = "0.9", features = ["toml"], default_features = false }
+crossterm = { version = "0.13", features = ["terminal", "input", "screen"], default_features = false }
+directories = "2"
+env_logger = "0.7"
+futures = "0.1.29"
+# for websocket support - should be able to go back to released version in 0.5
+gotham = { git = "https://github.com/gotham-rs/gotham", rev = "d2395926b93710832f8d72b49c9bd3e77516e386" }
+hyper = "0.12"
+lazy_static = "1"
+log = { version = "0.4", features = ["release_max_level_info"] }
+mio = "0.6.19"
+native-tls = "0.2"
+oauth2 = "3.0.0-alpha.4" # need the alpha for async support
+open = "1.1"
+rand = "0.7"
+ratelimit_meter = "5"
+regex = { version = "1", features = ["std", "perf"], default_features = false }
+reqwest = "0.9.22"
+serde = "1"
+sha1 = "0.6"
+snafu = { version = "0.5", features = ["rust_1_30", "futures-01"], default_features = false }
+tokio = { version = "0.1.22", features = ["codec", "fs", "io", "reactor", "rt-full", "sync", "tcp", "timer"], default_features = false }
+tokio-pty-process-stream = "0.2"
+tokio-signal = "0.2"
+tokio-terminal-resize = "0.1"
+tokio-tls = "0.2"
+tokio-tungstenite = "0.9"
+ttyrec = "0.2"
+twoway = "0.2"
+url = "2"
+users = "0.9"
+uuid = { version = "0.8", features = ["v4"] }
+vt100 = "0.6"
+
+[[bin]]
+name = "tt"
+path = "src/main.rs"
+
+[package.metadata.deb]
+depends = "openssl"
diff --git a/teleterm/src/async_stdin.rs b/teleterm/src/async_stdin.rs
new file mode 100644
index 0000000..e3b0ead
--- /dev/null
+++ b/teleterm/src/async_stdin.rs
@@ -0,0 +1,88 @@
+struct EventedStdin;
+
+const STDIN: i32 = 0;
+
+impl std::io::Read for EventedStdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let stdin = std::io::stdin();
+ let mut stdin = stdin.lock();
+ stdin.read(buf)
+ }
+}
+
+impl mio::Evented for EventedStdin {
+ fn register(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::Ready,
+ opts: mio::PollOpt,
+ ) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
+ let fd = STDIN as std::os::unix::io::RawFd;
+ let eventedfd = mio::unix::EventedFd(&fd);
+ eventedfd.deregister(poll)
+ }
+}
+
+pub struct Stdin {
+ input: tokio::reactor::PollEvented2<EventedStdin>,
+}
+
+impl Stdin {
+ pub fn new() -> Self {
+ Self {
+ input: tokio::reactor::PollEvented2::new(EventedStdin),
+ }
+ }
+}
+
+impl std::io::Read for Stdin {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ self.input.read(buf)
+ }
+}
+
+impl tokio::io::AsyncRead for Stdin {
+ fn poll_read(
+ &mut self,
+ buf: &mut [u8],
+ ) -> std::result::Result<futures::Async<usize>, tokio::io::Error> {
+ // XXX this is why i had to do the EventedFd thing - poll_read on its
+ // own will block reading from stdin, so i need a way to explicitly
+ // check readiness before doing the read
+ let ready = mio::Ready::readable();
+ match self.input.poll_read_ready(ready)? {
+ futures::Async::Ready(_) => {
+ let res = self.input.poll_read(buf);
+
+ // XXX i'm pretty sure this is wrong (if the single poll_read
+ // call didn't return all waiting data, clearing read ready
+ // state means that we won't get the rest until some more data
+ // beyond that appears), but i don't know that there's a way
+ // to do it correctly given that poll_read blocks
+ self.input.clear_read_ready(ready)?;
+
+ res
+ }
+ futures::Async::NotReady => Ok(futures::Async::NotReady),
+ }
+ }
+}
diff --git a/teleterm/src/client.rs b/teleterm/src/client.rs
new file mode 100644
index 0000000..254940c
--- /dev/null
+++ b/teleterm/src/client.rs
@@ -0,0 +1,595 @@
+use crate::prelude::*;
+use rand::Rng as _;
+
+const HEARTBEAT_DURATION: std::time::Duration =
+ std::time::Duration::from_secs(30);
+const RECONNECT_BACKOFF_BASE: std::time::Duration =
+ std::time::Duration::from_secs(1);
+const RECONNECT_BACKOFF_FACTOR: f32 = 2.0;
+const RECONNECT_BACKOFF_MAX: std::time::Duration =
+ std::time::Duration::from_secs(60);
+
+const OAUTH_LISTEN_ADDRESS: &str = "127.0.0.1:44141";
+const OAUTH_BROWSER_SUCCESS_MESSAGE: &str = "authenticated successfully! now close this page and return to your terminal.";
+
+enum ReadSocket<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ NotConnected,
+ Connected(crate::protocol::FramedReadHalf<S>),
+ Reading(
+ Box<
+ dyn futures::future::Future<
+ Item = (
+ crate::protocol::Message,
+ crate::protocol::FramedReadHalf<S>,
+ ),
+ Error = Error,
+ > + Send,
+ >,
+ ),
+ Processing(
+ crate::protocol::FramedReadHalf<S>,
+ Box<
+ dyn futures::future::Future<
+ Item = crate::protocol::Message,
+ Error = Error,
+ > + Send,
+ >,
+ ),
+}
+
+enum WriteSocket<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ NotConnected,
+ Connecting(
+ Box<
+ dyn futures::future::Future<Item = S, Error = crate::error::Error>
+ + Send,
+ >,
+ ),
+ Connected(crate::protocol::FramedWriteHalf<S>),
+ Writing(
+ Box<
+ dyn futures::future::Future<
+ Item = crate::protocol::FramedWriteHalf<S>,
+ Error = Error,
+ > + Send,
+ >,
+ ),
+}
+
+pub enum Event {
+ ServerMessage(crate::protocol::Message),
+ Disconnect,
+ Connect,
+}
+
+pub type Connector<S> = Box<
+ dyn Fn() -> Box<
+ dyn futures::future::Future<Item = S, Error = crate::error::Error>
+ + Send,
+ > + Send,
+>;
+
+pub struct Client<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ connect: Connector<S>,
+ auth: crate::protocol::Auth,
+
+ term_type: String,
+
+ heartbeat_timer: tokio::timer::Interval,
+ reconnect_timer: Option<tokio::timer::Delay>,
+ reconnect_backoff_amount: std::time::Duration,
+ last_server_time: std::time::Instant,
+
+ rsock: ReadSocket<S>,
+ wsock: WriteSocket<S>,
+
+ on_login: Vec<crate::protocol::Message>,
+ to_send: std::collections::VecDeque<crate::protocol::Message>,
+
+ last_error: Option<String>,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Client<S>
+{
+ pub fn stream(
+ connect: Connector<S>,
+ auth: &crate::protocol::Auth,
+ ) -> Self {
+ Self::new(
+ connect,
+ auth,
+ &[crate::protocol::Message::start_streaming()],
+ )
+ }
+
+ pub fn watch(
+ connect: Connector<S>,
+ auth: &crate::protocol::Auth,
+ id: &str,
+ ) -> Self {
+ Self::new(
+ connect,
+ auth,
+ &[crate::protocol::Message::start_watching(id)],
+ )
+ }
+
+ pub fn list(connect: Connector<S>, auth: &crate::protocol::Auth) -> Self {
+ Self::new(connect, auth, &[])
+ }
+
+ fn new(
+ connect: Connector<S>,
+ auth: &crate::protocol::Auth,
+ on_login: &[crate::protocol::Message],
+ ) -> Self {
+ let term_type =
+ std::env::var("TERM").unwrap_or_else(|_| "".to_string());
+ let heartbeat_timer =
+ tokio::timer::Interval::new_interval(HEARTBEAT_DURATION);
+
+ Self {
+ connect,
+ auth: auth.clone(),
+
+ term_type,
+
+ heartbeat_timer,
+ reconnect_timer: None,
+ reconnect_backoff_amount: RECONNECT_BACKOFF_BASE,
+ last_server_time: std::time::Instant::now(),
+
+ rsock: ReadSocket::NotConnected,
+ wsock: WriteSocket::NotConnected,
+
+ on_login: on_login.to_vec(),
+ to_send: std::collections::VecDeque::new(),
+
+ last_error: None,
+ }
+ }
+
+ pub fn send_message(&mut self, msg: crate::protocol::Message) {
+ self.to_send.push_back(msg);
+ }
+
+ pub fn reconnect(&mut self) {
+ self.rsock = ReadSocket::NotConnected;
+ self.wsock = WriteSocket::NotConnected;
+ }
+
+ pub fn last_error(&self) -> Option<&str> {
+ self.last_error.as_ref().map(std::string::String::as_str)
+ }
+
+ fn set_reconnect_timer(&mut self) {
+ let delay = rand::thread_rng().gen_range(
+ self.reconnect_backoff_amount / 2,
+ self.reconnect_backoff_amount,
+ );
+ let delay = delay.max(RECONNECT_BACKOFF_BASE);
+ self.reconnect_timer =
+ Some(tokio::timer::Delay::new(std::time::Instant::now() + delay));
+ self.reconnect_backoff_amount = self
+ .reconnect_backoff_amount
+ .mul_f32(RECONNECT_BACKOFF_FACTOR);
+ self.reconnect_backoff_amount =
+ self.reconnect_backoff_amount.min(RECONNECT_BACKOFF_MAX);
+ }
+
+ fn reset_reconnect_timer(&mut self) {
+ self.reconnect_timer = None;
+ self.reconnect_backoff_amount = RECONNECT_BACKOFF_BASE;
+ }
+
+ fn has_seen_server_recently(&self) -> bool {
+ let since_last_server =
+ std::time::Instant::now().duration_since(self.last_server_time);
+ if since_last_server > HEARTBEAT_DURATION * 2 {
+ return false;
+ }
+
+ true
+ }
+
+ fn handle_successful_connection(&mut self, s: S) -> Result<()> {
+ self.last_server_time = std::time::Instant::now();
+
+ log::info!("connected to server");
+
+ let (rs, ws) = s.split();
+ self.rsock =
+ ReadSocket::Connected(crate::protocol::FramedReader::new(rs));
+ self.wsock =
+ WriteSocket::Connected(crate::protocol::FramedWriter::new(ws));
+
+ self.to_send.clear();
+ self.send_message(crate::protocol::Message::login(
+ &self.auth,
+ &self.term_type,
+ crate::term::Size::get()?,
+ ));
+
+ Ok(())
+ }
+
+ fn handle_message(
+ &mut self,
+ msg: crate::protocol::Message,
+ ) -> Result<(
+ component_future::Async<Option<Event>>,
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = crate::protocol::Message,
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ )> {
+ log::debug!("recv_message({})", msg.format_log());
+
+ match msg {
+ crate::protocol::Message::OauthRequest { url, id } => {
+ let mut state = None;
+ let parsed_url = url::Url::parse(&url).unwrap();
+ for (k, v) in parsed_url.query_pairs() {
+ if k == "state" {
+ state = Some(v);
+ }
+ }
+ open::that(url).context(crate::error::OpenLink)?;
+ Ok((
+ component_future::Async::DidWork,
+ Some(self.wait_for_oauth_response(
+ state.map(|s| s.to_string()),
+ &id,
+ )?),
+ ))
+ }
+ crate::protocol::Message::LoggedIn { username } => {
+ log::info!("successfully logged into server as {}", username);
+ self.reset_reconnect_timer();
+ for msg in &self.on_login {
+ self.to_send.push_back(msg.clone());
+ }
+ self.last_error = None;
+ Ok((
+ component_future::Async::Ready(Some(Event::Connect)),
+ None,
+ ))
+ }
+ crate::protocol::Message::Heartbeat => {
+ Ok((component_future::Async::DidWork, None))
+ }
+ _ => Ok((
+ component_future::Async::Ready(Some(Event::ServerMessage(
+ msg,
+ ))),
+ None,
+ )),
+ }
+ }
+
+ fn wait_for_oauth_response(
+ &self,
+ state: Option<String>,
+ id: &str,
+ ) -> Result<
+ Box<
+ dyn futures::future::Future<
+ Item = crate::protocol::Message,
+ Error = Error,
+ > + Send,
+ >,
+ > {
+ lazy_static::lazy_static! {
+ static ref RE: regex::Regex = regex::Regex::new(
+ r"^GET (/[^ ]*) HTTP/[0-9.]+$"
+ ).unwrap();
+ }
+
+ let auth_type = self.auth.auth_type();
+ let id = id.to_string();
+ let address = OAUTH_LISTEN_ADDRESS
+ .parse()
+ .context(crate::error::ParseAddr)?;
+ let listener = tokio::net::TcpListener::bind(&address)
+ .context(crate::error::Bind { address })?;
+ Ok(Box::new(
+ listener
+ .incoming()
+ .into_future()
+ .map_err(|(e, _)| e)
+ .context(crate::error::Acceptor)
+ .and_then(|(sock, _)| {
+ let sock = sock.unwrap();
+ tokio::io::lines(std::io::BufReader::new(sock))
+ .into_future()
+ .map_err(|(e, _)| e)
+ .context(crate::error::ReadSocket)
+ })
+ .and_then(move |(buf, lines)| {
+ let buf = buf.unwrap();
+ let path = &RE
+ .captures(&buf)
+ .context(crate::error::ParseHttpRequest)?[1];
+ let base = url::Url::parse(&format!(
+ "http://{}",
+ OAUTH_LISTEN_ADDRESS
+ ))
+ .unwrap();
+ let url = base
+ .join(path)
+ .context(crate::error::ParseHttpRequestPath)?;
+ let mut req_code = None;
+ let mut req_state = None;
+ for (k, v) in url.query_pairs() {
+ if k == "code" {
+ req_code = Some(v.to_string());
+ }
+ if k == "state" {
+ req_state = Some(v.to_string());
+ }
+ }
+ if state != req_state {
+ return Err(Error::ParseHttpRequestCsrf);
+ }
+ let code = if let Some(code) = req_code {
+ code
+ } else {
+ return Err(Error::ParseHttpRequestMissingCode);
+ };
+ Ok((
+ crate::protocol::Message::oauth_response(&code),
+ lines.into_inner().into_inner(),
+ ))
+ })
+ .and_then(move |(msg, sock)| {
+ crate::oauth::save_client_auth_id(auth_type, &id)
+ .map(|_| (msg, sock))
+ })
+ .and_then(|(msg, sock)| {
+ let response = format!(
+ "HTTP/1.1 200 OK\n\n{}",
+ OAUTH_BROWSER_SUCCESS_MESSAGE
+ );
+ tokio::io::write_all(sock, response)
+ .context(crate::error::WriteSocket)
+ .map(|_| msg)
+ }),
+ ))
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Client<S>
+{
+ // XXX rustfmt does a terrible job here
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ Option<Event>,
+ Error,
+ >] = &[
+ &Self::poll_reconnect_server,
+ &Self::poll_read_server,
+ &Self::poll_write_server,
+ &Self::poll_heartbeat,
+ ];
+
+ fn poll_reconnect_server(
+ &mut self,
+ ) -> component_future::Poll<Option<Event>, Error> {
+ match &mut self.wsock {
+ WriteSocket::NotConnected => {
+ if let Some(timer) = &mut self.reconnect_timer {
+ component_future::try_ready!(timer
+ .poll()
+ .context(crate::error::TimerReconnect));
+ }
+
+ self.set_reconnect_timer();
+ self.wsock = WriteSocket::Connecting((self.connect)());
+ }
+ WriteSocket::Connecting(ref mut fut) => match fut.poll() {
+ Ok(futures::Async::Ready(s)) => {
+ self.handle_successful_connection(s)?;
+ }
+ Ok(futures::Async::NotReady) => {
+ return Ok(component_future::Async::NotReady);
+ }
+ Err(e) => {
+ log::warn!("error while connecting, reconnecting: {}", e);
+ self.reconnect();
+ self.last_error = Some(format!("{}", e));
+ return Ok(component_future::Async::Ready(Some(
+ Event::Disconnect,
+ )));
+ }
+ },
+ WriteSocket::Connected(..) | WriteSocket::Writing(..) => {
+ if self.has_seen_server_recently() {
+ return Ok(component_future::Async::NothingToDo);
+ } else {
+ log::warn!(
+ "haven't seen server in a while, reconnecting",
+ );
+ self.reconnect();
+ self.last_error =
+ Some("haven't seen server in a while".to_string());
+ return Ok(component_future::Async::Ready(Some(
+ Event::Disconnect,
+ )));
+ }
+ }
+ }
+
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_read_server(
+ &mut self,
+ ) -> component_future::Poll<Option<Event>, Error> {
+ match &mut self.rsock {
+ ReadSocket::NotConnected => {
+ Ok(component_future::Async::NothingToDo)
+ }
+ ReadSocket::Connected(..) => {
+ if let ReadSocket::Connected(s) = std::mem::replace(
+ &mut self.rsock,
+ ReadSocket::NotConnected,
+ ) {
+ let fut = crate::protocol::Message::read_async(s);
+ self.rsock = ReadSocket::Reading(Box::new(fut));
+ } else {
+ unreachable!()
+ }
+ Ok(component_future::Async::DidWork)
+ }
+ ReadSocket::Reading(ref mut fut) => match fut.poll() {
+ Ok(futures::Async::Ready((msg, s))) => {
+ self.last_server_time = std::time::Instant::now();
+ match self.handle_message(msg) {
+ Ok((poll, fut)) => {
+ if let Some(fut) = fut {
+ self.rsock = ReadSocket::Processing(s, fut);
+ } else {
+ self.rsock = ReadSocket::Connected(s);
+ }
+ Ok(poll)
+ }
+ Err(e) => {
+ log::warn!(
+ "error handling message, reconnecting: {}",
+ e
+ );
+ self.reconnect();
+ self.last_error = Some(format!("{}", e));
+ Ok(component_future::Async::Ready(Some(
+ Event::Disconnect,
+ )))
+ }
+ }
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(e) => {
+ log::warn!("error reading message, reconnecting: {}", e);
+ self.reconnect();
+ self.last_error = Some(format!("{}", e));
+ Ok(component_future::Async::Ready(Some(
+ Event::Disconnect,
+ )))
+ }
+ },
+ ReadSocket::Processing(_, fut) => match fut.poll() {
+ Ok(futures::Async::Ready(msg)) => {
+ if let ReadSocket::Processing(s, _) = std::mem::replace(
+ &mut self.rsock,
+ ReadSocket::NotConnected,
+ ) {
+ self.rsock = ReadSocket::Connected(s);
+ self.send_message(msg);
+ } else {
+ unreachable!()
+ }
+ Ok(component_future::Async::DidWork)
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(e) => {
+ log::warn!(
+ "error processing message, reconnecting: {}",
+ e
+ );
+ self.reconnect();
+ self.last_error = Some(format!("{}", e));
+ Ok(component_future::Async::Ready(Some(
+ Event::Disconnect,
+ )))
+ }
+ },
+ }
+ }
+
+ fn poll_write_server(
+ &mut self,
+ ) -> component_future::Poll<Option<Event>, Error> {
+ match &mut self.wsock {
+ WriteSocket::NotConnected | WriteSocket::Connecting(..) => {
+ Ok(component_future::Async::NothingToDo)
+ }
+ WriteSocket::Connected(..) => {
+ if self.to_send.is_empty() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ if let WriteSocket::Connected(s) = std::mem::replace(
+ &mut self.wsock,
+ WriteSocket::NotConnected,
+ ) {
+ let msg = self.to_send.pop_front().unwrap();
+ log::debug!("send_message({})", msg.format_log());
+ let fut = msg.write_async(s);
+ self.wsock = WriteSocket::Writing(Box::new(fut));
+ } else {
+ unreachable!()
+ }
+
+ Ok(component_future::Async::DidWork)
+ }
+ WriteSocket::Writing(ref mut fut) => match fut.poll() {
+ Ok(futures::Async::Ready(s)) => {
+ self.wsock = WriteSocket::Connected(s);
+ Ok(component_future::Async::DidWork)
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(e) => {
+ log::warn!("error writing message, reconnecting: {}", e);
+ self.reconnect();
+ self.last_error = Some(format!("{}", e));
+ Ok(component_future::Async::Ready(Some(
+ Event::Disconnect,
+ )))
+ }
+ },
+ }
+ }
+
+ fn poll_heartbeat(
+ &mut self,
+ ) -> component_future::Poll<Option<Event>, Error> {
+ let _ = component_future::try_ready!(self
+ .heartbeat_timer
+ .poll()
+ .context(crate::error::TimerHeartbeat));
+ self.send_message(crate::protocol::Message::heartbeat());
+ Ok(component_future::Async::DidWork)
+ }
+}
+
+#[must_use = "streams do nothing unless polled"]
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::stream::Stream for Client<S>
+{
+ type Item = Event;
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ component_future::poll_stream(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/cmd.rs b/teleterm/src/cmd.rs
new file mode 100644
index 0000000..8d7faa7
--- /dev/null
+++ b/teleterm/src/cmd.rs
@@ -0,0 +1,138 @@
+use crate::prelude::*;
+
+mod play;
+mod record;
+mod server;
+mod stream;
+mod watch;
+mod web;
+
+struct Command {
+ name: &'static str,
+ cmd: &'static dyn for<'a, 'b> Fn(clap::App<'a, 'b>) -> clap::App<'a, 'b>,
+ config: &'static dyn Fn(
+ Option<config::Config>,
+ ) -> Result<Box<dyn crate::config::Config>>,
+ log_level: &'static str,
+}
+
+const COMMANDS: &[Command] = &[
+ Command {
+ name: "stream",
+ cmd: &stream::cmd,
+ config: &stream::config,
+ log_level: "error",
+ },
+ Command {
+ name: "server",
+ cmd: &server::cmd,
+ config: &server::config,
+ log_level: "info",
+ },
+ Command {
+ name: "web",
+ cmd: &web::cmd,
+ config: &web::config,
+ log_level: "info",
+ },
+ Command {
+ name: "watch",
+ cmd: &watch::cmd,
+ config: &watch::config,
+ log_level: "error",
+ },
+ Command {
+ name: "record",
+ cmd: &record::cmd,
+ config: &record::config,
+ log_level: "error",
+ },
+ Command {
+ name: "play",
+ cmd: &play::cmd,
+ config: &play::config,
+ log_level: "error",
+ },
+];
+
+pub fn parse<'a>() -> Result<clap::ArgMatches<'a>> {
+ let mut app = clap::App::new(program_name()?)
+ .about("Stream your terminal for other people to watch")
+ .author(clap::crate_authors!())
+ .version(clap::crate_version!())
+ .arg(
+ clap::Arg::with_name("config-file")
+ .long("config-file")
+ .takes_value(true)
+ .value_name("FILE")
+ .help("Read configuration from FILE"),
+ )
+ .global_setting(clap::AppSettings::DontCollapseArgsInUsage)
+ .global_setting(clap::AppSettings::GlobalVersion)
+ .global_setting(clap::AppSettings::UnifiedHelpMessage)
+ .global_setting(clap::AppSettings::VersionlessSubcommands);
+
+ for cmd in COMMANDS {
+ let subcommand = clap::SubCommand::with_name(cmd.name);
+ app = app.subcommand(
+ (cmd.cmd)(subcommand).setting(clap::AppSettings::NextLineHelp),
+ );
+ }
+
+ app.get_matches_safe().context(crate::error::ParseArgs)
+}
+
+pub fn run(matches: &clap::ArgMatches<'_>) -> Result<()> {
+ let mut chosen_cmd = &COMMANDS[0];
+ let mut chosen_submatches = &clap::ArgMatches::<'_>::default();
+ for cmd in COMMANDS {
+ if let Some(submatches) = matches.subcommand_matches(cmd.name) {
+ chosen_cmd = cmd;
+ chosen_submatches = submatches;
+ }
+ }
+
+ env_logger::from_env(
+ env_logger::Env::default().default_filter_or(chosen_cmd.log_level),
+ )
+ .init();
+
+ let config = crate::config::config(
+ matches.value_of("config-file").map(std::path::Path::new),
+ )?;
+ let mut cmd_config = (chosen_cmd.config)(config)?;
+ cmd_config.merge_args(chosen_submatches)?;
+ log::debug!("{:?}", cmd_config);
+
+ // XXX ideally we'd be able to run everything on the current_thread
+ // runtime, but this is blocked on
+ // https://github.com/tokio-rs/tokio/issues/1356 (fixed in the 0.2 branch
+ // which is not yet stable)
+
+ // let mut runtime = tokio::runtime::current_thread::Runtime::new()
+ // .unwrap();
+ // runtime
+ // .block_on(cmd_config.run().map_err(|e| {
+ // log::error!("{}", e);
+ // }))
+ // .unwrap();
+
+ tokio::run(cmd_config.run().map_err(|e| {
+ log::error!("{}", e);
+ }));
+
+ Ok(())
+}
+
+fn program_name() -> Result<String> {
+ let program =
+ std::env::args().next().context(crate::error::MissingArgv)?;
+ let path = std::path::Path::new(&program);
+ let filename = path.file_name();
+ Ok(filename
+ .ok_or_else(|| Error::NotAFileName {
+ path: path.to_string_lossy().to_string(),
+ })?
+ .to_string_lossy()
+ .to_string())
+}
diff --git a/teleterm/src/cmd/play.rs b/teleterm/src/cmd/play.rs
new file mode 100644
index 0000000..3d36d18
--- /dev/null
+++ b/teleterm/src/cmd/play.rs
@@ -0,0 +1,946 @@
+use crate::prelude::*;
+use std::io::Write as _;
+
+const PLAYBACK_RATIO_INCR: f32 = 1.5;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ ttyrec: crate::config::Ttyrec,
+
+ #[serde(default)]
+ play: crate::config::Play,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.ttyrec.merge_args(matches)?;
+ self.play.merge_args(matches)?;
+ Ok(())
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ Box::new(PlaySession::new(
+ &self.ttyrec.filename,
+ self.play.play_at_start,
+ self.play.playback_ratio,
+ self.play.max_frame_length,
+ ))
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Ttyrec::cmd(crate::config::Play::cmd(
+ app.about("Play recorded terminal sessions"),
+ ))
+}
+
+pub fn config(
+ config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
+
+struct Frame {
+ dur: std::time::Duration,
+ full: Vec<u8>,
+ diff: Vec<u8>,
+}
+
+impl Frame {
+ fn adjusted_dur(
+ &self,
+ scale: f32,
+ clamp: Option<std::time::Duration>,
+ ) -> std::time::Duration {
+ let scaled = self.dur.div_f32(scale);
+ clamp.map_or(scaled, |clamp| scaled.min(clamp))
+ }
+}
+
+#[derive(Default)]
+struct Ttyrec {
+ frames: Vec<Frame>,
+}
+
+impl Ttyrec {
+ fn new() -> Self {
+ Self::default()
+ }
+
+ fn add_frame(&mut self, frame: Frame) {
+ self.frames.push(frame);
+ }
+
+ fn frame(&self, idx: usize) -> Option<&Frame> {
+ self.frames.get(idx)
+ }
+
+ fn frames(
+ &self,
+ ) -> impl DoubleEndedIterator<Item = &Frame> + ExactSizeIterator<Item = &Frame>
+ {
+ self.frames.iter()
+ }
+
+ fn matches_from<'a>(
+ &'a self,
+ idx: usize,
+ re: &'a regex::bytes::Regex,
+ ) -> impl Iterator<Item = (usize, &Frame)> + 'a {
+ self.frames()
+ .enumerate()
+ .skip(idx)
+ .filter(move |(_, frame)| re.is_match(&frame.full))
+ }
+
+ fn rmatches_from<'a>(
+ &'a self,
+ idx: usize,
+ re: &'a regex::bytes::Regex,
+ ) -> impl Iterator<Item = (usize, &Frame)> + 'a {
+ self.frames()
+ .enumerate()
+ .rev()
+ .skip(self.frames.len() - idx)
+ .filter(move |(_, frame)| re.is_match(&frame.full))
+ }
+
+ fn count_matches_from(
+ &self,
+ idx: usize,
+ re: &regex::bytes::Regex,
+ ) -> usize {
+ self.matches_from(idx, re).count()
+ }
+
+ fn len(&self) -> usize {
+ self.frames.len()
+ }
+}
+
+struct SearchState {
+ query: regex::bytes::Regex,
+ count: usize,
+ total_frame_count: usize,
+ idx: Option<usize>,
+}
+
+struct Player {
+ playback_ratio: f32,
+ max_frame_length: Option<std::time::Duration>,
+ ttyrec: Ttyrec,
+ idx: usize,
+ timer: Option<tokio::timer::Delay>,
+ base_time: std::time::Instant,
+ played_amount: std::time::Duration,
+ paused: Option<std::time::Instant>,
+ search_state: Option<SearchState>,
+}
+
+impl Player {
+ fn new(
+ play_at_start: bool,
+ playback_ratio: f32,
+ max_frame_length: Option<std::time::Duration>,
+ ) -> Self {
+ let now = std::time::Instant::now();
+ Self {
+ playback_ratio,
+ max_frame_length,
+ ttyrec: Ttyrec::new(),
+ idx: 0,
+ timer: None,
+ base_time: now,
+ played_amount: std::time::Duration::default(),
+ paused: if play_at_start { None } else { Some(now) },
+ search_state: None,
+ }
+ }
+
+ fn current_frame_idx(&self) -> usize {
+ self.idx
+ }
+
+ fn current_frame(&self) -> Option<&Frame> {
+ self.ttyrec.frame(self.idx)
+ }
+
+ fn num_frames(&self) -> usize {
+ self.ttyrec.len()
+ }
+
+ fn base_time_incr(&mut self, incr: std::time::Duration) {
+ self.base_time += incr;
+ self.set_timer();
+ }
+
+ fn add_frame(&mut self, frame: Frame) {
+ self.ttyrec.add_frame(frame);
+ if self.timer.is_none() {
+ self.set_timer();
+ }
+ }
+
+ fn playback_ratio_incr(&mut self) {
+ self.playback_ratio *= PLAYBACK_RATIO_INCR;
+ self.set_timer();
+ }
+
+ fn playback_ratio_decr(&mut self) {
+ self.playback_ratio /= PLAYBACK_RATIO_INCR;
+ self.set_timer();
+ }
+
+ fn playback_ratio_reset(&mut self) {
+ self.playback_ratio = 1.0;
+ self.set_timer();
+ }
+
+ fn back(&mut self) {
+ self.idx = self.idx.saturating_sub(1);
+ self.recalculate_times();
+ self.set_timer();
+ self.clear_match_idx();
+ }
+
+ fn forward(&mut self) {
+ self.idx = self.idx.saturating_add(1);
+ if self.idx > self.ttyrec.len() - 1 {
+ self.idx = self.ttyrec.len() - 1;
+ }
+ self.recalculate_times();
+ self.set_timer();
+ self.clear_match_idx();
+ }
+
+ fn first(&mut self) {
+ self.idx = 0;
+ self.recalculate_times();
+ self.set_timer();
+ self.clear_match_idx();
+ }
+
+ fn last(&mut self) {
+ self.idx = self.ttyrec.len() - 1;
+ self.recalculate_times();
+ self.set_timer();
+ self.clear_match_idx();
+ }
+
+ fn next_match(&mut self) {
+ let idx = if let Some(state) = &self.search_state {
+ self.ttyrec
+ .matches_from(self.idx + 1, &state.query)
+ .next()
+ .map(|(idx, _)| idx)
+ } else {
+ return;
+ };
+ let idx = if let Some(idx) = idx {
+ idx
+ } else {
+ return;
+ };
+
+ self.idx = idx;
+ self.recalculate_times();
+ self.set_timer();
+
+ if let Some(state) = &mut self.search_state {
+ if let Some(idx) = &mut state.idx {
+ state.idx = Some(*idx + 1);
+ } else {
+ if state.total_frame_count != self.ttyrec.len() {
+ state.count =
+ self.ttyrec.count_matches_from(0, &state.query);
+ }
+ state.idx = Some(
+ state.count
+ - self
+ .ttyrec
+ .count_matches_from(self.idx, &state.query),
+ );
+ }
+ }
+ }
+
+ fn prev_match(&mut self) {
+ let idx = if let Some(state) = &self.search_state {
+ self.ttyrec
+ .rmatches_from(self.idx, &state.query)
+ .next()
+ .map(|(idx, _)| idx)
+ } else {
+ return;
+ };
+ let idx = if let Some(idx) = idx {
+ idx
+ } else {
+ return;
+ };
+
+ self.idx = idx;
+ self.recalculate_times();
+ self.set_timer();
+
+ if let Some(state) = &mut self.search_state {
+ if let Some(idx) = &mut state.idx {
+ state.idx = Some(*idx - 1);
+ } else {
+ if state.total_frame_count != self.ttyrec.len() {
+ state.count =
+ self.ttyrec.count_matches_from(0, &state.query);
+ }
+ state.idx = Some(
+ state.count
+ - self
+ .ttyrec
+ .count_matches_from(self.idx, &state.query),
+ );
+ }
+ }
+ }
+
+ fn toggle_pause(&mut self) {
+ let now = std::time::Instant::now();
+ if let Some(time) = self.paused.take() {
+ self.base_time_incr(now - time);
+ } else {
+ self.paused = Some(now);
+ }
+ }
+
+ fn paused(&self) -> bool {
+ self.paused.is_some()
+ }
+
+ fn recalculate_times(&mut self) {
+ let now = std::time::Instant::now();
+ self.played_amount = self
+ .ttyrec
+ .frames
+ .iter()
+ .map(|f| f.dur)
+ .take(self.idx)
+ .sum();
+ self.base_time = now - self.played_amount;
+ if let Some(paused) = &mut self.paused {
+ *paused = now;
+ }
+ }
+
+ fn set_timer(&mut self) {
+ if let Some(frame) = self.ttyrec.frame(self.idx) {
+ self.timer = Some(tokio::timer::Delay::new(
+ self.base_time
+ + self.played_amount
+ + frame.adjusted_dur(
+ self.playback_ratio,
+ self.max_frame_length,
+ ),
+ ));
+ } else {
+ self.timer = None;
+ }
+ }
+
+ fn set_search_query(&mut self, re: regex::bytes::Regex) {
+ let count = self.ttyrec.count_matches_from(0, &re);
+ self.search_state = Some(SearchState {
+ query: re,
+ count,
+ total_frame_count: self.ttyrec.len(),
+ idx: None,
+ });
+ self.next_match();
+ }
+
+ fn clear_match_idx(&mut self) {
+ if let Some(SearchState { idx, .. }) = &mut self.search_state {
+ *idx = None;
+ }
+ }
+
+ fn poll(&mut self) -> futures::Poll<Option<Vec<u8>>, Error> {
+ let frame = if let Some(frame) = self.ttyrec.frame(self.idx) {
+ frame
+ } else {
+ return Ok(futures::Async::Ready(None));
+ };
+ let timer = if let Some(timer) = &mut self.timer {
+ timer
+ } else {
+ return Ok(futures::Async::Ready(None));
+ };
+
+ futures::try_ready!(timer.poll().context(crate::error::Sleep));
+ let ret = frame.diff.clone();
+
+ self.idx += 1;
+ self.played_amount +=
+ frame.adjusted_dur(self.playback_ratio, self.max_frame_length);
+ self.set_timer();
+ self.clear_match_idx();
+
+ Ok(futures::Async::Ready(Some(ret)))
+ }
+}
+
+#[allow(clippy::large_enum_variant)]
+enum FileState {
+ Closed {
+ filename: String,
+ },
+ Opening {
+ filename: String,
+ fut: tokio::fs::file::OpenFuture<String>,
+ },
+ Open {
+ reader: ttyrec::Reader<tokio::fs::File>,
+ parser: vt100::Parser,
+ },
+ Eof,
+}
+
+enum InputState {
+ Normal,
+ Search { query: String },
+}
+
+struct PlaySession {
+ file: FileState,
+ player: Player,
+ raw_screen: Option<crossterm::screen::RawScreen>,
+ alternate_screen: Option<crossterm::screen::AlternateScreen>,
+ key_reader: crate::key_reader::KeyReader,
+ last_frame_time: std::time::Duration,
+ last_frame_screen: Option<vt100::Screen>,
+ input_state: InputState,
+ hide_ui: bool,
+}
+
+impl PlaySession {
+ fn new(
+ filename: &str,
+ play_at_start: bool,
+ playback_ratio: f32,
+ max_frame_length: Option<std::time::Duration>,
+ ) -> Self {
+ Self {
+ file: FileState::Closed {
+ filename: filename.to_string(),
+ },
+ player: Player::new(
+ play_at_start,
+ playback_ratio,
+ max_frame_length,
+ ),
+ raw_screen: None,
+ alternate_screen: None,
+ key_reader: crate::key_reader::KeyReader::new(),
+ last_frame_time: std::time::Duration::default(),
+ last_frame_screen: None,
+ input_state: InputState::Normal,
+ hide_ui: false,
+ }
+ }
+
+ fn normal_keypress(
+ &mut self,
+ e: &crossterm::input::InputEvent,
+ ) -> Result<bool> {
+ match e {
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('q'),
+ ) => return Ok(true),
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char(' '),
+ ) => {
+ self.player.toggle_pause();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Backspace,
+ ) => {
+ if self.player.paused() {
+ self.hide_ui = !self.hide_ui;
+ }
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('+'),
+ ) => {
+ self.player.playback_ratio_incr();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('-'),
+ ) => {
+ self.player.playback_ratio_decr();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('='),
+ ) => {
+ self.player.playback_ratio_reset();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('<'),
+ ) => {
+ self.player.back();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('>'),
+ ) => {
+ self.player.forward();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('0'),
+ ) => {
+ self.player.first();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('$'),
+ ) => {
+ self.player.last();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('/'),
+ ) => {
+ self.input_state = InputState::Search {
+ query: String::new(),
+ };
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('n'),
+ ) => {
+ self.player.next_match();
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('p'),
+ ) => {
+ self.player.prev_match();
+ }
+ _ => {}
+ }
+ Ok(false)
+ }
+
+ fn search_keypress(
+ &mut self,
+ e: &crossterm::input::InputEvent,
+ ) -> Result<bool> {
+ match e {
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Esc,
+ ) => {
+ self.input_state = InputState::Normal;
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char(c),
+ ) => match &mut self.input_state {
+ InputState::Search { query } => {
+ query.push(*c);
+ }
+ _ => unreachable!(),
+ },
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Backspace,
+ ) => match &mut self.input_state {
+ InputState::Search { query } => {
+ query.pop();
+ }
+ _ => unreachable!(),
+ },
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Enter,
+ ) => {
+ let query =
+ if let InputState::Search { query } = &self.input_state {
+ query.to_string()
+ } else {
+ unreachable!()
+ };
+ if let Ok(re) = regex::bytes::Regex::new(&query) {
+ self.input_state = InputState::Normal;
+ self.player.set_search_query(re);
+ }
+ }
+ _ => {}
+ }
+ Ok(false)
+ }
+
+ fn keypress(&mut self, e: &crossterm::input::InputEvent) -> Result<bool> {
+ match &mut self.input_state {
+ InputState::Normal => self.normal_keypress(e),
+ InputState::Search { .. } => self.search_keypress(e),
+ }
+ }
+
+ fn redraw(&self) -> Result<()> {
+ let frame = if let Some(frame) = self.player.current_frame() {
+ frame
+ } else {
+ return Ok(());
+ };
+ self.write(&frame.full)?;
+ self.draw_ui()?;
+ Ok(())
+ }
+
+ fn write(&self, data: &[u8]) -> Result<()> {
+ // TODO async
+ let stdout = std::io::stdout();
+ let mut stdout = stdout.lock();
+ stdout.write(data).context(crate::error::WriteTerminal)?;
+ stdout.flush().context(crate::error::FlushTerminal)?;
+ Ok(())
+ }
+
+ fn draw_ui(&self) -> Result<()> {
+ let size = crate::term::Size::get()?;
+
+ if self.player.paused() && !self.hide_ui {
+ self.write(b"\x1b7\x1b[37;44m\x1b[?25l")?;
+
+ self.draw_status()?;
+ self.draw_help(size)?;
+
+ self.write(b"\x1b8")?;
+ }
+
+ self.draw_search(size)?;
+
+ Ok(())
+ }
+
+ fn draw_status(&self) -> Result<()> {
+ let msg = format!(
+ "paused (frame {}/{})",
+ self.player.current_frame_idx() + 1,
+ self.player.num_frames()
+ );
+
+ self.write(b"\x1b[2;2H")?;
+ self.write("â•­".as_bytes())?;
+ self.write("─".repeat(2 + msg.len()).as_bytes())?;
+ self.write("â•®".as_bytes())?;
+
+ self.write(b"\x1b[3;2H")?;
+ self.write(format!("│ {} │", msg).as_bytes())?;
+
+ self.write(b"\x1b[4;2H")?;
+ self.write("â•°".as_bytes())?;
+ self.write("─".repeat(2 + msg.len()).as_bytes())?;
+ self.write("╯".as_bytes())?;
+
+ Ok(())
+ }
+
+ fn draw_help(&self, size: crate::term::Size) -> Result<()> {
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 12, size.cols - 32).as_bytes(),
+ )?;
+ self.write("â•­".as_bytes())?;
+ self.write("─".repeat(30).as_bytes())?;
+ self.write("â•®".as_bytes())?;
+
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 11, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ Keys │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 10, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ q: quit │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 9, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ Space: pause/unpause │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 8, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ Backspace: hide/show ui │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 7, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ </>: previous/next frame │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 6, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ 0/$: first/last frame │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 5, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ +/-: increase/decrease speed │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 4, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ =: normal speed │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 3, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ /: search │".as_bytes())?;
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 2, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ n/p: next/previous match │".as_bytes())?;
+
+ self.write(
+ format!("\x1b[{};{}H", size.rows - 1, size.cols - 32).as_bytes(),
+ )?;
+ self.write("â•°".as_bytes())?;
+ self.write("─".repeat(30).as_bytes())?;
+ self.write("╯".as_bytes())?;
+
+ Ok(())
+ }
+
+ fn draw_search(&self, size: crate::term::Size) -> Result<()> {
+ match &self.input_state {
+ InputState::Normal => {
+ if !self.player.paused() {
+ return Ok(());
+ }
+
+ if let Some(state) = &self.player.search_state {
+ self.write(b"\x1b7\x1b[37;44m")?;
+ self.write(
+ format!("\x1b[{};{}H", 2, size.cols - 32).as_bytes(),
+ )?;
+ self.write("â•­".as_bytes())?;
+ self.write("─".repeat(30).as_bytes())?;
+ self.write("â•®".as_bytes())?;
+
+ let msg = if let Some(idx) = state.idx {
+ format!("match ({}/{})", idx + 1, state.count)
+ } else {
+ format!("match (-/{})", state.count)
+ };
+ self.write(
+ format!("\x1b[{};{}H", 3, size.cols - 32).as_bytes(),
+ )?;
+ self.write(
+ format!("│ {}:{} │", msg, " ".repeat(27 - msg.len()))
+ .as_bytes(),
+ )?;
+
+ self.write(
+ format!("\x1b[{};{}H", 4, size.cols - 32).as_bytes(),
+ )?;
+ let query_str = state.query.as_str();
+ if query_str.len() > 26 {
+ self.write(
+ format!("│ /{:24}... │", &query_str[0..24])
+ .as_bytes(),
+ )?;
+ } else {
+ let regex_str = format!("/{}/", query_str);
+ self.write(
+ format!("│ {:28} │", regex_str).as_bytes(),
+ )?;
+ }
+
+ self.write(
+ format!("\x1b[{};{}H", 5, size.cols - 32).as_bytes(),
+ )?;
+ self.write("â•°".as_bytes())?;
+ self.write("─".repeat(30).as_bytes())?;
+ self.write("╯".as_bytes())?;
+
+ self.write(b"\x1b8")?;
+ }
+ }
+ InputState::Search { query } => {
+ self.write(b"\x1b7\x1b[37;44m")?;
+ self.write(
+ format!("\x1b[{};{}H", 2, size.cols - 32).as_bytes(),
+ )?;
+ self.write("â•­".as_bytes())?;
+ self.write("─".repeat(30).as_bytes())?;
+ self.write("â•®".as_bytes())?;
+
+ self.write(
+ format!("\x1b[{};{}H", 3, size.cols - 32).as_bytes(),
+ )?;
+ self.write("│ search: │".as_bytes())?;
+
+ self.write(
+ format!("\x1b[{};{}H", 4, size.cols - 32).as_bytes(),
+ )?;
+ self.write(
+ format!(
+ "│ {:28} │",
+ if query.len() > 28 {
+ &query[query.len() - 28..]
+ } else {
+ query
+ }
+ )
+ .as_bytes(),
+ )?;
+
+ self.write(
+ format!("\x1b[{};{}H", 5, size.cols - 32).as_bytes(),
+ )?;
+ self.write("â•°".as_bytes())?;
+ self.write("─".repeat(30).as_bytes())?;
+ self.write("╯".as_bytes())?;
+
+ self.write(
+ format!(
+ "\x1b8\x1b[{};{}H\x1b[?25h",
+ 4,
+ size.cols as usize - 32 + 2 + query.len().min(28)
+ )
+ .as_bytes(),
+ )?;
+ }
+ }
+ Ok(())
+ }
+}
+
+impl PlaySession {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[
+ &Self::poll_open_file,
+ &Self::poll_read_file,
+ &Self::poll_input,
+ &Self::poll_write_terminal,
+ ];
+
+ fn poll_open_file(&mut self) -> component_future::Poll<(), Error> {
+ match &mut self.file {
+ FileState::Closed { filename } => {
+ self.file = FileState::Opening {
+ filename: filename.to_string(),
+ fut: tokio::fs::File::open(filename.to_string()),
+ };
+ Ok(component_future::Async::DidWork)
+ }
+ FileState::Opening { filename, fut } => {
+ let file = component_future::try_ready!(fut
+ .poll()
+ .with_context(|| {
+ crate::error::OpenFile {
+ filename: filename.to_string(),
+ }
+ }));
+ let size = crate::term::Size::get()?;
+ let reader = ttyrec::Reader::new(file);
+ let parser = vt100::Parser::new(size.rows, size.cols, 0);
+ self.file = FileState::Open { reader, parser };
+ Ok(component_future::Async::DidWork)
+ }
+ _ => Ok(component_future::Async::NothingToDo),
+ }
+ }
+
+ fn poll_read_file(&mut self) -> component_future::Poll<(), Error> {
+ if let FileState::Open { reader, parser } = &mut self.file {
+ if let Some(frame) = component_future::try_ready!(reader
+ .poll_read()
+ .context(crate::error::ReadTtyrec))
+ {
+ parser.process(&frame.data);
+
+ let frame_time = frame.time - reader.offset().unwrap();
+ let frame_dur = frame_time - self.last_frame_time;
+ self.last_frame_time = frame_time;
+
+ let full = parser.screen().contents_formatted();
+ let diff = if let Some(last_frame_screen) =
+ &self.last_frame_screen
+ {
+ parser.screen().contents_diff(last_frame_screen)
+ } else {
+ full.clone()
+ };
+
+ self.last_frame_screen = Some(parser.screen().clone());
+ self.player.add_frame(Frame {
+ dur: frame_dur,
+ full,
+ diff,
+ });
+ if self.player.paused() {
+ self.draw_ui()?;
+ }
+ } else {
+ self.file = FileState::Eof;
+ }
+ Ok(component_future::Async::DidWork)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_input(&mut self) -> component_future::Poll<(), Error> {
+ if self.raw_screen.is_none() {
+ self.raw_screen = Some(
+ crossterm::screen::RawScreen::into_raw_mode()
+ .context(crate::error::ToRawMode)?,
+ );
+ }
+ if self.alternate_screen.is_none() {
+ self.alternate_screen = Some(
+ crossterm::screen::AlternateScreen::to_alternate(false)
+ .context(crate::error::ToAlternateScreen)?,
+ );
+ }
+
+ let e = component_future::try_ready!(self.key_reader.poll()).unwrap();
+ let quit = self.keypress(&e)?;
+ if quit {
+ self.write(b"\x1b[?25h")?;
+ Ok(component_future::Async::Ready(()))
+ } else {
+ self.redraw()?;
+ Ok(component_future::Async::DidWork)
+ }
+ }
+
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if self.player.paused() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ if let Some(data) = component_future::try_ready!(self.player.poll()) {
+ self.write(&data)?;
+ self.draw_ui()?;
+ Ok(component_future::Async::DidWork)
+ } else if let FileState::Eof = self.file {
+ Ok(component_future::Async::Ready(()))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl futures::future::Future for PlaySession {
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/cmd/record.rs b/teleterm/src/cmd/record.rs
new file mode 100644
index 0000000..7cfe52a
--- /dev/null
+++ b/teleterm/src/cmd/record.rs
@@ -0,0 +1,259 @@
+use crate::prelude::*;
+use tokio::io::AsyncWrite as _;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ command: crate::config::Command,
+
+ #[serde(default)]
+ ttyrec: crate::config::Ttyrec,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.command.merge_args(matches)?;
+ self.ttyrec.merge_args(matches)?;
+ Ok(())
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ Box::new(RecordSession::new(
+ &self.ttyrec.filename,
+ &self.command.command,
+ &self.command.args,
+ ))
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Command::cmd(crate::config::Ttyrec::cmd(
+ app.about("Record a terminal session to a file"),
+ ))
+}
+
+pub fn config(
+ config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
+
+#[allow(clippy::large_enum_variant)]
+enum FileState {
+ Closed {
+ filename: String,
+ },
+ Opening {
+ filename: String,
+ fut: tokio::fs::file::CreateFuture<String>,
+ },
+ Open {
+ writer: ttyrec::Writer<tokio::fs::File>,
+ },
+}
+
+struct RecordSession {
+ file: FileState,
+ frame_data: Vec<u8>,
+
+ process:
+ tokio_pty_process_stream::ResizingProcess<crate::async_stdin::Stdin>,
+ raw_screen: Option<crossterm::screen::RawScreen>,
+ done: bool,
+
+ stdout: tokio::io::Stdout,
+ to_write_stdout: std::collections::VecDeque<u8>,
+ needs_flush: bool,
+}
+
+impl RecordSession {
+ fn new(filename: &str, cmd: &str, args: &[String]) -> Self {
+ let input = crate::async_stdin::Stdin::new();
+ let process = tokio_pty_process_stream::ResizingProcess::new(
+ tokio_pty_process_stream::Process::new(cmd, args, input),
+ );
+
+ Self {
+ file: FileState::Closed {
+ filename: filename.to_string(),
+ },
+ frame_data: vec![],
+
+ process,
+ raw_screen: None,
+ done: false,
+
+ stdout: tokio::io::stdout(),
+ to_write_stdout: std::collections::VecDeque::new(),
+ needs_flush: false,
+ }
+ }
+
+ fn record_bytes(&mut self, buf: &[u8]) {
+ self.frame_data.extend(buf);
+ self.to_write_stdout.extend(buf);
+ }
+}
+
+impl RecordSession {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[
+ &Self::poll_open_file,
+ &Self::poll_read_process,
+ &Self::poll_write_terminal,
+ &Self::poll_flush_terminal,
+ &Self::poll_write_file,
+ ];
+
+ fn poll_open_file(&mut self) -> component_future::Poll<(), Error> {
+ match &mut self.file {
+ FileState::Closed { filename } => {
+ self.file = FileState::Opening {
+ filename: filename.to_string(),
+ fut: tokio::fs::File::create(filename.to_string()),
+ };
+ Ok(component_future::Async::DidWork)
+ }
+ FileState::Opening { filename, fut } => {
+ let file = component_future::try_ready!(fut
+ .poll()
+ .with_context(|| {
+ crate::error::OpenFile {
+ filename: filename.clone(),
+ }
+ }));
+ self.file = FileState::Open {
+ writer: ttyrec::Writer::new(file),
+ };
+ Ok(component_future::Async::DidWork)
+ }
+ FileState::Open { .. } => {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+ }
+
+ fn poll_read_process(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self
+ .process
+ .poll()
+ .context(crate::error::Subprocess))
+ {
+ Some(tokio_pty_process_stream::Event::CommandStart {
+ ..
+ }) => {
+ if self.raw_screen.is_none() {
+ self.raw_screen = Some(
+ crossterm::screen::RawScreen::into_raw_mode()
+ .context(crate::error::ToRawMode)?,
+ );
+ }
+ }
+ Some(tokio_pty_process_stream::Event::CommandExit { .. }) => {
+ self.done = true;
+ }
+ Some(tokio_pty_process_stream::Event::Output { data }) => {
+ self.record_bytes(&data);
+ }
+ Some(tokio_pty_process_stream::Event::Resize { .. }) => {}
+ None => {
+ if !self.done {
+ unreachable!()
+ }
+ // don't return final event here - wait until we are done
+ // writing all data to the file (see poll_write_file)
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if self.to_write_stdout.is_empty() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ let (a, b) = self.to_write_stdout.as_slices();
+ let buf = if a.is_empty() { b } else { a };
+ let n = component_future::try_ready!(self
+ .stdout
+ .poll_write(buf)
+ .context(crate::error::WriteTerminal));
+ for _ in 0..n {
+ self.to_write_stdout.pop_front();
+ }
+ self.needs_flush = true;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if !self.needs_flush {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ component_future::try_ready!(self
+ .stdout
+ .poll_flush()
+ .context(crate::error::FlushTerminal));
+ self.needs_flush = false;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_write_file(&mut self) -> component_future::Poll<(), Error> {
+ let writer = match &mut self.file {
+ FileState::Open { writer } => writer,
+ _ => {
+ return Ok(component_future::Async::NothingToDo);
+ }
+ };
+
+ if !self.frame_data.is_empty() {
+ writer
+ .frame(&self.frame_data)
+ .context(crate::error::WriteTtyrec)?;
+ self.frame_data.clear();
+ }
+
+ if writer.needs_write() {
+ component_future::try_ready!(writer
+ .poll_write()
+ .context(crate::error::WriteTtyrec));
+ Ok(component_future::Async::DidWork)
+ } else {
+ // finish writing to the file before actually ending
+ if self.done {
+ Ok(component_future::Async::Ready(()))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl futures::future::Future for RecordSession {
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/cmd/server.rs b/teleterm/src/cmd/server.rs
new file mode 100644
index 0000000..c4d2075
--- /dev/null
+++ b/teleterm/src/cmd/server.rs
@@ -0,0 +1,181 @@
+use crate::prelude::*;
+use std::io::Read as _;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ server: crate::config::Server,
+
+ #[serde(
+ rename = "oauth",
+ deserialize_with = "crate::config::oauth_configs",
+ default
+ )]
+ oauth_configs: std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.server.merge_args(matches)
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ if let Some(tls_identity_file) = &self.server.tls_identity_file {
+ create_server_tls(
+ self.server.listen_address,
+ self.server.read_timeout,
+ tls_identity_file,
+ self.server.allowed_login_methods.clone(),
+ self.oauth_configs.clone(),
+ self.server.uid,
+ self.server.gid,
+ )
+ } else {
+ create_server(
+ self.server.listen_address,
+ self.server.read_timeout,
+ self.server.allowed_login_methods.clone(),
+ self.oauth_configs.clone(),
+ self.server.uid,
+ self.server.gid,
+ )
+ }
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Server::cmd(app.about("Run a teleterm server"))
+}
+
+pub fn config(
+ config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
+
+fn create_server(
+ address: std::net::SocketAddr,
+ read_timeout: std::time::Duration,
+ allowed_login_methods: std::collections::HashSet<
+ crate::protocol::AuthType,
+ >,
+ oauth_configs: std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+ uid: Option<users::uid_t>,
+ gid: Option<users::gid_t>,
+) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ let listener = match listen(address, uid, gid) {
+ Ok(listener) => listener,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+
+ let acceptor = listener.incoming().context(crate::error::Acceptor);
+ let server = crate::server::Server::new(
+ Box::new(acceptor),
+ read_timeout,
+ allowed_login_methods,
+ oauth_configs,
+ );
+
+ Box::new(server)
+}
+
+fn create_server_tls(
+ address: std::net::SocketAddr,
+ read_timeout: std::time::Duration,
+ tls_identity_file: &str,
+ allowed_login_methods: std::collections::HashSet<
+ crate::protocol::AuthType,
+ >,
+ oauth_configs: std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+ uid: Option<users::uid_t>,
+ gid: Option<users::gid_t>,
+) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ let tls_acceptor = match accept_tls(tls_identity_file) {
+ Ok(acceptor) => acceptor,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+
+ let listener = match listen(address, uid, gid) {
+ Ok(listener) => listener,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+
+ let acceptor = listener
+ .incoming()
+ .context(crate::error::Acceptor)
+ .map(move |sock| tls_acceptor.accept(sock));
+ let server = crate::server::tls::Server::new(
+ Box::new(acceptor),
+ read_timeout,
+ allowed_login_methods,
+ oauth_configs,
+ );
+
+ Box::new(server)
+}
+
+fn listen(
+ address: std::net::SocketAddr,
+ uid: Option<users::uid_t>,
+ gid: Option<users::gid_t>,
+) -> Result<tokio::net::TcpListener> {
+ let listener = tokio::net::TcpListener::bind(&address)
+ .context(crate::error::Bind { address })?;
+ drop_privs(uid, gid)?;
+ log::info!("Listening on {}", address);
+ Ok(listener)
+}
+
+fn accept_tls(tls_identity_file: &str) -> Result<tokio_tls::TlsAcceptor> {
+ let mut file = std::fs::File::open(tls_identity_file).context(
+ crate::error::OpenFileSync {
+ filename: tls_identity_file,
+ },
+ )?;
+ let mut identity = vec![];
+ file.read_to_end(&mut identity)
+ .context(crate::error::ReadFileSync)?;
+ let identity = native_tls::Identity::from_pkcs12(&identity, "")
+ .context(crate::error::ParseIdentity)?;
+ let acceptor = native_tls::TlsAcceptor::new(identity)
+ .context(crate::error::CreateAcceptor)?;
+
+ Ok(tokio_tls::TlsAcceptor::from(acceptor))
+}
+
+fn drop_privs(
+ uid: Option<users::uid_t>,
+ gid: Option<users::gid_t>,
+) -> Result<()> {
+ if let Some(gid) = gid {
+ users::switch::set_both_gid(gid, gid)
+ .context(crate::error::SwitchGid)?;
+ }
+ if let Some(uid) = uid {
+ users::switch::set_both_uid(uid, uid)
+ .context(crate::error::SwitchUid)?;
+ }
+ Ok(())
+}
diff --git a/teleterm/src/cmd/stream.rs b/teleterm/src/cmd/stream.rs
new file mode 100644
index 0000000..11d3873
--- /dev/null
+++ b/teleterm/src/cmd/stream.rs
@@ -0,0 +1,345 @@
+use crate::prelude::*;
+use tokio::io::AsyncWrite as _;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ client: crate::config::Client,
+
+ #[serde(default)]
+ command: crate::config::Command,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.client.merge_args(matches)?;
+ self.command.merge_args(matches)?;
+ Ok(())
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ let auth = match self.client.auth {
+ crate::protocol::AuthType::Plain => {
+ let username = self
+ .client
+ .username
+ .clone()
+ .context(crate::error::CouldntFindUsername);
+ match username {
+ Ok(username) => crate::protocol::Auth::plain(&username),
+ Err(e) => return Box::new(futures::future::err(e)),
+ }
+ }
+ crate::protocol::AuthType::RecurseCenter => {
+ let id = crate::oauth::load_client_auth_id(self.client.auth);
+ crate::protocol::Auth::recurse_center(
+ id.as_ref().map(std::string::String::as_str),
+ )
+ }
+ };
+
+ let host = self.client.host().to_string();
+ let address = *self.client.addr();
+ if self.client.tls {
+ let connector = match native_tls::TlsConnector::new()
+ .context(crate::error::CreateConnector)
+ {
+ Ok(connector) => connector,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+ let connect: crate::client::Connector<_> = Box::new(move || {
+ let host = host.clone();
+ let connector = connector.clone();
+ let connector = tokio_tls::TlsConnector::from(connector);
+ let stream = tokio::net::tcp::TcpStream::connect(&address);
+ Box::new(
+ stream
+ .context(crate::error::Connect { address })
+ .and_then(move |stream| {
+ connector
+ .connect(&host, stream)
+ .context(crate::error::ConnectTls { host })
+ }),
+ )
+ });
+ Box::new(StreamSession::new(
+ &self.command.command,
+ &self.command.args,
+ connect,
+ &auth,
+ ))
+ } else {
+ let connect: crate::client::Connector<_> = Box::new(move || {
+ Box::new(
+ tokio::net::tcp::TcpStream::connect(&address)
+ .context(crate::error::Connect { address }),
+ )
+ });
+ Box::new(StreamSession::new(
+ &self.command.command,
+ &self.command.args,
+ connect,
+ &auth,
+ ))
+ }
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Client::cmd(crate::config::Command::cmd(
+ app.about("Stream your terminal"),
+ ))
+}
+
+pub fn config(
+ mut config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ if config.is_none() {
+ config = crate::config::wizard::run()?;
+ }
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
+
+struct StreamSession<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ client: crate::client::Client<S>,
+ connected: bool,
+
+ process:
+ tokio_pty_process_stream::ResizingProcess<crate::async_stdin::Stdin>,
+ raw_screen: Option<crossterm::screen::RawScreen>,
+ done: bool,
+
+ term: vt100::Parser,
+ last_screen: vt100::Screen,
+ needs_screen_update: bool,
+
+ stdout: tokio::io::Stdout,
+ to_print: std::collections::VecDeque<u8>,
+ needs_flush: bool,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ StreamSession<S>
+{
+ fn new(
+ cmd: &str,
+ args: &[String],
+ connect: crate::client::Connector<S>,
+ auth: &crate::protocol::Auth,
+ ) -> Self {
+ let client = crate::client::Client::stream(connect, auth);
+
+ // TODO: tokio::io::stdin is broken (it's blocking)
+ // see https://github.com/tokio-rs/tokio/issues/589
+ // let input = tokio::io::stdin();
+ let input = crate::async_stdin::Stdin::new();
+
+ let process = tokio_pty_process_stream::ResizingProcess::new(
+ tokio_pty_process_stream::Process::new(cmd, args, input),
+ );
+
+ let term = vt100::Parser::default();
+ let screen = term.screen().clone();
+
+ Self {
+ client,
+ connected: false,
+
+ process,
+ raw_screen: None,
+ done: false,
+
+ term,
+ last_screen: screen,
+ needs_screen_update: false,
+
+ stdout: tokio::io::stdout(),
+ to_print: std::collections::VecDeque::new(),
+ needs_flush: false,
+ }
+ }
+
+ fn record_bytes(&mut self, buf: &[u8]) {
+ self.to_print.extend(buf);
+ self.term.process(buf);
+ self.needs_screen_update = true;
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ StreamSession<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[
+ &Self::poll_read_client,
+ &Self::poll_read_process,
+ &Self::poll_write_terminal,
+ &Self::poll_flush_terminal,
+ &Self::poll_write_server,
+ ];
+
+ // this should never return Err, because we don't want server
+ // communication issues to ever interrupt a running process
+ fn poll_read_client(&mut self) -> component_future::Poll<(), Error> {
+ match self.client.poll() {
+ Ok(futures::Async::Ready(Some(e))) => match e {
+ crate::client::Event::Disconnect => {
+ self.connected = false;
+ Ok(component_future::Async::DidWork)
+ }
+ crate::client::Event::Connect => {
+ self.connected = true;
+ self.client.send_message(
+ crate::protocol::Message::terminal_output(
+ &self.last_screen.contents_formatted(),
+ ),
+ );
+ Ok(component_future::Async::DidWork)
+ }
+ crate::client::Event::ServerMessage(..) => {
+ // we don't expect to ever see a server message once we
+ // start streaming, so if one comes through, assume
+ // something is messed up and try again
+ self.client.reconnect();
+ Ok(component_future::Async::DidWork)
+ }
+ },
+ Ok(futures::Async::Ready(None)) => {
+ // the client should never exit on its own
+ unreachable!()
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(..) => {
+ self.client.reconnect();
+ Ok(component_future::Async::DidWork)
+ }
+ }
+ }
+
+ fn poll_read_process(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self
+ .process
+ .poll()
+ .context(crate::error::Subprocess))
+ {
+ Some(tokio_pty_process_stream::Event::CommandStart {
+ ..
+ }) => {
+ if self.raw_screen.is_none() {
+ self.raw_screen = Some(
+ crossterm::screen::RawScreen::into_raw_mode()
+ .context(crate::error::ToRawMode)?,
+ );
+ }
+ }
+ Some(tokio_pty_process_stream::Event::CommandExit { .. }) => {
+ self.done = true;
+ }
+ Some(tokio_pty_process_stream::Event::Output { data }) => {
+ self.record_bytes(&data);
+ }
+ Some(tokio_pty_process_stream::Event::Resize {
+ size: (rows, cols),
+ }) => {
+ self.client.send_message(crate::protocol::Message::resize(
+ crate::term::Size { rows, cols },
+ ));
+ }
+ None => {
+ if !self.done {
+ unreachable!()
+ }
+ // don't return final event here - wait until we are done
+ // sending all data to the server (see poll_write_server)
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_write_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if self.to_print.is_empty() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ let (a, b) = self.to_print.as_slices();
+ let buf = if a.is_empty() { b } else { a };
+ let n = component_future::try_ready!(self
+ .stdout
+ .poll_write(buf)
+ .context(crate::error::WriteTerminal));
+ for _ in 0..n {
+ self.to_print.pop_front();
+ }
+ self.needs_flush = true;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_flush_terminal(&mut self) -> component_future::Poll<(), Error> {
+ if !self.needs_flush {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ component_future::try_ready!(self
+ .stdout
+ .poll_flush()
+ .context(crate::error::FlushTerminal));
+ self.needs_flush = false;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_write_server(&mut self) -> component_future::Poll<(), Error> {
+ if !self.connected || !self.needs_screen_update {
+ // ship all data to the server before actually ending
+ if self.done {
+ return Ok(component_future::Async::Ready(()));
+ } else {
+ return Ok(component_future::Async::NothingToDo);
+ }
+ }
+
+ let screen = self.term.screen().clone();
+ self.client
+ .send_message(crate::protocol::Message::terminal_output(
+ &screen.contents_diff(&self.last_screen),
+ ));
+ self.last_screen = screen;
+ self.needs_screen_update = false;
+
+ Ok(component_future::Async::DidWork)
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::future::Future for StreamSession<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/cmd/watch.rs b/teleterm/src/cmd/watch.rs
new file mode 100644
index 0000000..b872706
--- /dev/null
+++ b/teleterm/src/cmd/watch.rs
@@ -0,0 +1,776 @@
+use crate::prelude::*;
+use std::io::Write as _;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ client: crate::config::Client,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.client.merge_args(matches)
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ let auth = match self.client.auth {
+ crate::protocol::AuthType::Plain => {
+ let username = self
+ .client
+ .username
+ .clone()
+ .context(crate::error::CouldntFindUsername);
+ match username {
+ Ok(username) => crate::protocol::Auth::plain(&username),
+ Err(e) => return Box::new(futures::future::err(e)),
+ }
+ }
+ crate::protocol::AuthType::RecurseCenter => {
+ let id = crate::oauth::load_client_auth_id(self.client.auth);
+ crate::protocol::Auth::recurse_center(
+ id.as_ref().map(std::string::String::as_str),
+ )
+ }
+ };
+
+ let host = self.client.host().to_string();
+ let address = *self.client.addr();
+ if self.client.tls {
+ let connector = match native_tls::TlsConnector::new()
+ .context(crate::error::CreateConnector)
+ {
+ Ok(connector) => connector,
+ Err(e) => return Box::new(futures::future::err(e)),
+ };
+ let make_connector: Box<
+ dyn Fn() -> crate::client::Connector<_> + Send,
+ > = Box::new(move || {
+ let host = host.clone();
+ let connector = connector.clone();
+ Box::new(move || {
+ let host = host.clone();
+ let connector = connector.clone();
+ let connector = tokio_tls::TlsConnector::from(connector);
+ let stream =
+ tokio::net::tcp::TcpStream::connect(&address);
+ Box::new(
+ stream
+ .context(crate::error::Connect { address })
+ .and_then(move |stream| {
+ connector.connect(&host, stream).context(
+ crate::error::ConnectTls { host },
+ )
+ }),
+ )
+ })
+ });
+ Box::new(WatchSession::new(make_connector, &auth))
+ } else {
+ let make_connector: Box<
+ dyn Fn() -> crate::client::Connector<_> + Send,
+ > = Box::new(move || {
+ Box::new(move || {
+ Box::new(
+ tokio::net::tcp::TcpStream::connect(&address)
+ .context(crate::error::Connect { address }),
+ )
+ })
+ });
+ Box::new(WatchSession::new(make_connector, &auth))
+ }
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Client::cmd(app.about("Watch teleterm streams"))
+}
+
+pub fn config(
+ mut config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ if config.is_none() {
+ config = crate::config::wizard::run()?;
+ }
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
+
+// XXX https://github.com/rust-lang/rust/issues/64362
+#[allow(dead_code)]
+enum State<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static> {
+ Temporary,
+ LoggingIn {
+ alternate_screen: crossterm::screen::AlternateScreen,
+ },
+ Choosing {
+ sessions: crate::session_list::SessionList,
+ alternate_screen: crossterm::screen::AlternateScreen,
+ },
+ Watching {
+ client: Box<crate::client::Client<S>>,
+ },
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ State<S>
+{
+ fn new() -> Self {
+ Self::Temporary
+ }
+
+ fn logging_in(&mut self) -> Result<()> {
+ let prev_state = std::mem::replace(self, Self::Temporary);
+ *self = match prev_state {
+ Self::Temporary => unreachable!(),
+ Self::LoggingIn { alternate_screen } => {
+ Self::LoggingIn { alternate_screen }
+ }
+ Self::Choosing {
+ alternate_screen, ..
+ } => Self::LoggingIn { alternate_screen },
+ _ => Self::LoggingIn {
+ alternate_screen: new_alternate_screen()?,
+ },
+ };
+ Ok(())
+ }
+
+ fn choosing(
+ &mut self,
+ sessions: crate::session_list::SessionList,
+ ) -> Result<()> {
+ let prev_state = std::mem::replace(self, Self::Temporary);
+ *self = match prev_state {
+ Self::Temporary => unreachable!(),
+ Self::LoggingIn { alternate_screen } => Self::Choosing {
+ alternate_screen,
+ sessions,
+ },
+ Self::Choosing {
+ alternate_screen, ..
+ } => Self::Choosing {
+ alternate_screen,
+ sessions,
+ },
+ _ => Self::Choosing {
+ alternate_screen: new_alternate_screen()?,
+ sessions,
+ },
+ };
+ Ok(())
+ }
+
+ fn watching(&mut self, client: crate::client::Client<S>) {
+ if let Self::Temporary = self {
+ unreachable!()
+ }
+ *self = Self::Watching {
+ client: Box::new(client),
+ }
+ }
+}
+
+struct WatchSession<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ make_connector: Box<dyn Fn() -> crate::client::Connector<S> + Send>,
+ auth: crate::protocol::Auth,
+
+ key_reader: crate::key_reader::KeyReader,
+ list_client: crate::client::Client<S>,
+ resizer: Box<
+ dyn futures::stream::Stream<
+ Item = (u16, u16),
+ Error = crate::error::Error,
+ > + Send,
+ >,
+ state: State<S>,
+ raw_screen: Option<crossterm::screen::RawScreen>,
+ needs_redraw: bool,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ WatchSession<S>
+{
+ fn new(
+ make_connector: Box<dyn Fn() -> crate::client::Connector<S> + Send>,
+ auth: &crate::protocol::Auth,
+ ) -> Self {
+ let list_client = crate::client::Client::list(make_connector(), auth);
+
+ Self {
+ make_connector,
+ auth: auth.clone(),
+
+ key_reader: crate::key_reader::KeyReader::new(),
+ list_client,
+ resizer: Box::new(
+ tokio_terminal_resize::resizes()
+ .flatten_stream()
+ .context(crate::error::Resize),
+ ),
+ state: State::new(),
+ raw_screen: None,
+ needs_redraw: true,
+ }
+ }
+
+ fn reconnect(&mut self, hard: bool) -> Result<()> {
+ self.state.logging_in()?;
+ self.needs_redraw = true;
+ if hard {
+ self.list_client.reconnect();
+ } else {
+ self.list_client
+ .send_message(crate::protocol::Message::list_sessions());
+ }
+ Ok(())
+ }
+
+ fn loading_keypress(
+ &mut self,
+ e: &crossterm::input::InputEvent,
+ ) -> Result<bool> {
+ match e {
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('q'),
+ ) => {
+ return Ok(true);
+ }
+ _ => {}
+ }
+ Ok(false)
+ }
+
+ fn list_server_message(
+ &mut self,
+ msg: crate::protocol::Message,
+ ) -> Result<()> {
+ match msg {
+ crate::protocol::Message::Sessions { sessions } => {
+ self.state.choosing(
+ crate::session_list::SessionList::new(
+ sessions,
+ crate::term::Size::get()?,
+ ),
+ )?;
+ self.needs_redraw = true;
+ }
+ crate::protocol::Message::Disconnected => {
+ self.reconnect(true)?;
+ }
+ crate::protocol::Message::Error { msg } => {
+ return Err(Error::Server { message: msg });
+ }
+ msg => {
+ return Err(crate::error::Error::UnexpectedMessage {
+ message: msg,
+ });
+ }
+ }
+ Ok(())
+ }
+
+ fn list_keypress(
+ &mut self,
+ e: &crossterm::input::InputEvent,
+ ) -> Result<bool> {
+ let sessions =
+ if let State::Choosing { sessions, .. } = &mut self.state {
+ sessions
+ } else {
+ unreachable!()
+ };
+
+ match e {
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char(' '),
+ ) => {
+ self.list_client
+ .send_message(crate::protocol::Message::list_sessions());
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('q'),
+ ) => {
+ return Ok(true);
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('<'),
+ ) => {
+ sessions.prev_page();
+ self.needs_redraw = true;
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('>'),
+ ) => {
+ sessions.next_page();
+ self.needs_redraw = true;
+ }
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char(c),
+ ) => {
+ if let Some(id) = sessions.id_for(*c) {
+ let client = crate::client::Client::watch(
+ (self.make_connector)(),
+ &self.auth,
+ id,
+ );
+ self.state.watching(client);
+ clear()?;
+ }
+ }
+ _ => {}
+ }
+ Ok(false)
+ }
+
+ fn watch_server_message(
+ &mut self,
+ msg: crate::protocol::Message,
+ ) -> Result<()> {
+ match msg {
+ crate::protocol::Message::TerminalOutput { data } => {
+ let data: Vec<_> = data
+ .iter()
+ // replace \n with \r\n since we're writing to a
+ // raw terminal
+ .fold(vec![], |mut acc, &c| {
+ if c == b'\n' {
+ acc.push(b'\r');
+ acc.push(b'\n');
+ } else {
+ acc.push(c);
+ }
+ acc
+ });
+ // TODO async
+ let stdout = std::io::stdout();
+ let mut stdout = stdout.lock();
+ stdout.write(&data).context(crate::error::WriteTerminal)?;
+ stdout.flush().context(crate::error::FlushTerminal)?;
+ }
+ crate::protocol::Message::Disconnected => {
+ self.reconnect(false)?;
+ }
+ crate::protocol::Message::Error { msg } => {
+ return Err(Error::Server { message: msg });
+ }
+ msg => {
+ return Err(crate::error::Error::UnexpectedMessage {
+ message: msg,
+ });
+ }
+ }
+ Ok(())
+ }
+
+ fn watch_keypress(
+ &mut self,
+ e: &crossterm::input::InputEvent,
+ ) -> Result<bool> {
+ match e {
+ crossterm::input::InputEvent::Keyboard(
+ crossterm::input::KeyEvent::Char('q'),
+ ) => {
+ self.reconnect(false)?;
+ }
+ _ => {}
+ }
+ Ok(false)
+ }
+
+ fn resize(&mut self, size: crate::term::Size) -> Result<()> {
+ if let State::Choosing { sessions, .. } = &mut self.state {
+ sessions.resize(size);
+ self.needs_redraw = true;
+ }
+ Ok(())
+ }
+
+ fn redraw(&self) -> Result<()> {
+ match &self.state {
+ State::Temporary => unreachable!(),
+ State::LoggingIn { .. } => {
+ self.display_loading_screen()?;
+ }
+ State::Choosing { .. } => {
+ self.display_choosing_screen()?;
+ }
+ State::Watching { .. } => {}
+ }
+ Ok(())
+ }
+
+ fn display_loading_screen(&self) -> Result<()> {
+ clear()?;
+
+ println!("loading...\r");
+ if let Some(err) = self.list_client.last_error() {
+ println!("error: {}\r", err);
+ }
+ print!("q: quit --> ");
+
+ std::io::stdout()
+ .flush()
+ .context(crate::error::FlushTerminal)?;
+
+ Ok(())
+ }
+
+ fn display_choosing_screen(&self) -> Result<()> {
+ let sessions = if let State::Choosing { sessions, .. } = &self.state {
+ sessions
+ } else {
+ unreachable!()
+ };
+
+ let char_width = 2;
+
+ let max_name_width = (sessions.size().cols / 3) as usize;
+ let name_width = sessions
+ .visible_sessions()
+ .iter()
+ .map(|s| s.username.len())
+ .max()
+ .unwrap_or(4);
+ // XXX unstable
+ // let name_width = name_width.clamp(4, max_name_width);
+ let name_width = if name_width < 4 {
+ 4
+ } else if name_width > max_name_width {
+ max_name_width
+ } else {
+ name_width
+ };
+
+ let size_width = 7;
+
+ let max_idle_time = sessions
+ .visible_sessions()
+ .iter()
+ .map(|s| s.idle_time)
+ .max()
+ .unwrap_or(4);
+ let idle_width = format_time(max_idle_time).len();
+ let idle_width = if idle_width < 4 { 4 } else { idle_width };
+
+ let watch_width = 5;
+
+ let max_title_width = (sessions.size().cols as usize)
+ - char_width
+ - 3
+ - name_width
+ - 3
+ - size_width
+ - 3
+ - idle_width
+ - 3
+ - watch_width
+ - 3;
+
+ clear()?;
+ println!("welcome to teleterm\r");
+ println!("available sessions:\r");
+ println!("\r");
+ println!(
+ "{:5$} | {:6$} | {:7$} | {:8$} | {:9$} | title\r",
+ "",
+ "name",
+ "size",
+ "idle",
+ "watch",
+ char_width,
+ name_width,
+ size_width,
+ idle_width,
+ watch_width,
+ );
+ println!(
+ "{}+{}+{}+{}+{}+{}\r",
+ "-".repeat(char_width + 1),
+ "-".repeat(name_width + 2),
+ "-".repeat(size_width + 2),
+ "-".repeat(idle_width + 2),
+ "-".repeat(watch_width + 2),
+ "-".repeat(max_title_width + 1)
+ );
+
+ let mut prev_name: Option<&str> = None;
+ for (c, session) in sessions.visible_sessions_with_chars() {
+ let first = if let Some(name) = prev_name {
+ name != session.username
+ } else {
+ true
+ };
+
+ let display_char = format!("{})", c);
+ let display_name = if first {
+ truncate(&session.username, max_name_width)
+ } else {
+ "".to_string()
+ };
+ let display_size_plain = format!("{}", &session.size);
+ let display_size_full = if session.size == sessions.size() {
+ // XXX i should be able to use crossterm::style here, but
+ // it has bugs
+ format!("\x1b[32m{}\x1b[m", display_size_plain)
+ } else if session.size.fits_in(sessions.size()) {
+ display_size_plain.clone()
+ } else {
+ // XXX i should be able to use crossterm::style here, but
+ // it has bugs
+ format!("\x1b[31m{}\x1b[m", display_size_plain)
+ };
+ let display_idle = format_time(session.idle_time);
+ let display_title = truncate(&session.title, max_title_width);
+ let display_watch = session.watchers;
+
+ println!(
+ "{:6$} | {:7$} | {:8$} | {:9$} | {:10$} | {}\r",
+ display_char,
+ display_name,
+ display_size_full,
+ display_idle,
+ display_watch,
+ display_title,
+ char_width,
+ name_width,
+ size_width
+ + (display_size_full.len() - display_size_plain.len()),
+ idle_width,
+ watch_width,
+ );
+
+ prev_name = Some(&session.username);
+ }
+ print!(
+ "({}/{}) space: refresh, q: quit, <: prev page, >: next page --> ",
+ sessions.current_page(),
+ sessions.total_pages(),
+ );
+ std::io::stdout()
+ .flush()
+ .context(crate::error::FlushTerminal)?;
+
+ Ok(())
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ WatchSession<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[
+ &Self::poll_resizer,
+ &Self::poll_input,
+ &Self::poll_list_client,
+ &Self::poll_watch_client,
+ ];
+
+ fn poll_resizer(&mut self) -> component_future::Poll<(), Error> {
+ let (rows, cols) =
+ component_future::try_ready!(self.resizer.poll()).unwrap();
+ self.resize(crate::term::Size { rows, cols })?;
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_input(&mut self) -> component_future::Poll<(), Error> {
+ if self.raw_screen.is_none() {
+ self.raw_screen = Some(
+ crossterm::screen::RawScreen::into_raw_mode()
+ .context(crate::error::ToRawMode)?,
+ );
+ }
+ if let State::Temporary = self.state {
+ self.state = State::LoggingIn {
+ alternate_screen: new_alternate_screen()?,
+ }
+ }
+
+ let e = component_future::try_ready!(self.key_reader.poll()).unwrap();
+ let quit = match &mut self.state {
+ State::Temporary => unreachable!(),
+ State::LoggingIn { .. } => self.loading_keypress(&e)?,
+ State::Choosing { .. } => self.list_keypress(&e)?,
+ State::Watching { .. } => self.watch_keypress(&e)?,
+ };
+ if quit {
+ Ok(component_future::Async::Ready(()))
+ } else {
+ Ok(component_future::Async::DidWork)
+ }
+ }
+
+ fn poll_list_client(&mut self) -> component_future::Poll<(), Error> {
+ match component_future::try_ready!(self.list_client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ self.reconnect(true)?;
+ }
+ crate::client::Event::Connect => {
+ self.list_client
+ .send_message(crate::protocol::Message::list_sessions());
+ }
+ crate::client::Event::ServerMessage(msg) => {
+ self.list_server_message(msg)?;
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+
+ fn poll_watch_client(&mut self) -> component_future::Poll<(), Error> {
+ let client = if let State::Watching { client } = &mut self.state {
+ client
+ } else {
+ return Ok(component_future::Async::NothingToDo);
+ };
+
+ match component_future::try_ready!(client.poll()).unwrap() {
+ crate::client::Event::Disconnect => {
+ self.reconnect(true)?;
+ }
+ crate::client::Event::Connect => {}
+ crate::client::Event::ServerMessage(msg) => {
+ self.watch_server_message(msg)?;
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::future::Future for WatchSession<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ let res = component_future::poll_future(self, Self::POLL_FNS);
+ if res.is_err() {
+ self.state = State::Temporary; // drop alternate screen
+ self.raw_screen = None;
+ } else if self.needs_redraw {
+ self.redraw()?;
+ self.needs_redraw = false;
+ }
+ res
+ }
+}
+
+fn new_alternate_screen() -> Result<crossterm::screen::AlternateScreen> {
+ crossterm::screen::AlternateScreen::to_alternate(false)
+ .context(crate::error::ToAlternateScreen)
+}
+
+fn format_time(dur: u32) -> String {
+ let secs = dur % 60;
+ let dur = dur / 60;
+ if dur == 0 {
+ return format!("{}s", secs);
+ }
+
+ let mins = dur % 60;
+ let dur = dur / 60;
+ if dur == 0 {
+ return format!("{}m{:02}s", mins, secs);
+ }
+
+ let hours = dur % 24;
+ let dur = dur / 24;
+ if dur == 0 {
+ return format!("{}h{:02}m{:02}s", hours, mins, secs);
+ }
+
+ let days = dur;
+ format!("{}d{:02}h{:02}m{:02}s", days, hours, mins, secs)
+}
+
+fn truncate(s: &str, len: usize) -> String {
+ if s.len() <= len {
+ s.to_string()
+ } else {
+ format!("{}...", &s[..(len - 3)])
+ }
+}
+
+fn clear() -> Result<()> {
+ crossterm::execute!(
+ std::io::stdout(),
+ crossterm::cursor::MoveTo(0, 0),
+ crossterm::terminal::Clear(crossterm::terminal::ClearType::All)
+ )
+ .context(crate::error::WriteTerminalCrossterm)
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_truncate() {
+ assert_eq!(truncate("abcdefghij", 12), "abcdefghij");
+ assert_eq!(truncate("abcdefghij", 11), "abcdefghij");
+ assert_eq!(truncate("abcdefghij", 10), "abcdefghij");
+ assert_eq!(truncate("abcdefghij", 9), "abcdef...");
+ assert_eq!(truncate("abcdefghij", 8), "abcde...");
+ assert_eq!(truncate("abcdefghij", 7), "abcd...");
+
+ assert_eq!(truncate("", 7), "");
+ assert_eq!(truncate("a", 7), "a");
+ assert_eq!(truncate("ab", 7), "ab");
+ assert_eq!(truncate("abc", 7), "abc");
+ assert_eq!(truncate("abcd", 7), "abcd");
+ assert_eq!(truncate("abcde", 7), "abcde");
+ assert_eq!(truncate("abcdef", 7), "abcdef");
+ assert_eq!(truncate("abcdefg", 7), "abcdefg");
+ assert_eq!(truncate("abcdefgh", 7), "abcd...");
+ assert_eq!(truncate("abcdefghi", 7), "abcd...");
+ assert_eq!(truncate("abcdefghij", 7), "abcd...");
+ }
+
+ #[test]
+ fn test_format_time() {
+ assert_eq!(format_time(0), "0s");
+ assert_eq!(format_time(5), "5s");
+ assert_eq!(format_time(10), "10s");
+ assert_eq!(format_time(60), "1m00s");
+ assert_eq!(format_time(61), "1m01s");
+ assert_eq!(format_time(601), "10m01s");
+ assert_eq!(format_time(610), "10m10s");
+ assert_eq!(format_time(3599), "59m59s");
+ assert_eq!(format_time(3600), "1h00m00s");
+ assert_eq!(format_time(3601), "1h00m01s");
+ assert_eq!(format_time(3610), "1h00m10s");
+ assert_eq!(format_time(3660), "1h01m00s");
+ assert_eq!(format_time(3661), "1h01m01s");
+ assert_eq!(format_time(3670), "1h01m10s");
+ assert_eq!(format_time(4200), "1h10m00s");
+ assert_eq!(format_time(4201), "1h10m01s");
+ assert_eq!(format_time(4210), "1h10m10s");
+ assert_eq!(format_time(36000), "10h00m00s");
+ assert_eq!(format_time(86399), "23h59m59s");
+ assert_eq!(format_time(86400), "1d00h00m00s");
+ assert_eq!(format_time(86401), "1d00h00m01s");
+ assert_eq!(format_time(864_000), "10d00h00m00s");
+ assert_eq!(format_time(8_640_000), "100d00h00m00s");
+ assert_eq!(format_time(86_400_000), "1000d00h00m00s");
+ }
+}
diff --git a/teleterm/src/cmd/web.rs b/teleterm/src/cmd/web.rs
new file mode 100644
index 0000000..a37468b
--- /dev/null
+++ b/teleterm/src/cmd/web.rs
@@ -0,0 +1,46 @@
+use crate::prelude::*;
+
+#[derive(serde::Deserialize, Debug, Default)]
+pub struct Config {
+ #[serde(default)]
+ web: crate::config::Web,
+}
+
+impl crate::config::Config for Config {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.web.merge_args(matches)
+ }
+
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>
+ {
+ Box::new(
+ gotham::init_server(
+ self.web.listen_address,
+ crate::web::router(),
+ )
+ .map_err(|_| unreachable!()),
+ )
+ }
+}
+
+pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ crate::config::Web::cmd(app.about("Run a teleterm web server"))
+}
+
+pub fn config(
+ config: Option<config::Config>,
+) -> Result<Box<dyn crate::config::Config>> {
+ let config: Config = if let Some(config) = config {
+ config
+ .try_into()
+ .context(crate::error::CouldntParseConfig)?
+ } else {
+ Config::default()
+ };
+ Ok(Box::new(config))
+}
diff --git a/teleterm/src/config.rs b/teleterm/src/config.rs
new file mode 100644
index 0000000..20528c9
--- /dev/null
+++ b/teleterm/src/config.rs
@@ -0,0 +1,877 @@
+use crate::prelude::*;
+use serde::de::Deserialize as _;
+use std::convert::TryFrom as _;
+use std::net::ToSocketAddrs as _;
+
+pub mod wizard;
+
+const CONFIG_FILENAME: &str = "config.toml";
+
+const ALLOWED_LOGIN_METHODS_OPTION: &str = "allowed-login-methods";
+const ARGS_OPTION: &str = "args";
+const COMMAND_OPTION: &str = "command";
+const CONNECT_ADDRESS_OPTION: &str = "connect-address";
+const FILENAME_OPTION: &str = "filename";
+const LISTEN_ADDRESS_OPTION: &str = "listen-address";
+const LOGIN_PLAIN_OPTION: &str = "login-plain";
+const LOGIN_RECURSE_CENTER_OPTION: &str = "login-recurse-center";
+const MAX_FRAME_LENGTH_OPTION: &str = "max-frame-length";
+const PLAY_AT_START_OPTION: &str = "play-at-start";
+const PLAYBACK_RATIO_OPTION: &str = "playback-ratio";
+const READ_TIMEOUT_OPTION: &str = "read-timeout-secs";
+const TLS_IDENTITY_FILE_OPTION: &str = "tls-identity-file";
+const TLS_OPTION: &str = "tls";
+
+const DEFAULT_LISTEN_ADDRESS: &str = "127.0.0.1:4144";
+const DEFAULT_CONNECT_ADDRESS: &str = "127.0.0.1:4144";
+const DEFAULT_WEB_LISTEN_ADDRESS: &str = "127.0.0.1:4145";
+const DEFAULT_READ_TIMEOUT: std::time::Duration =
+ std::time::Duration::from_secs(120);
+const DEFAULT_AUTH_TYPE: crate::protocol::AuthType =
+ crate::protocol::AuthType::Plain;
+const DEFAULT_TLS: bool = false;
+const DEFAULT_TTYREC_FILENAME: &str = "teleterm.ttyrec";
+
+pub trait Config: std::fmt::Debug {
+ fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()>;
+ fn run(
+ &self,
+ ) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send>;
+}
+
+pub fn config(
+ filename: Option<&std::path::Path>,
+) -> Result<Option<config::Config>> {
+ let config_filename = if let Some(filename) = filename {
+ if !filename.exists() {
+ return Err(Error::ConfigFileDoesntExist {
+ name: filename.to_string_lossy().to_string(),
+ });
+ }
+ Some(filename.to_path_buf())
+ } else {
+ crate::dirs::Dirs::new().config_file(CONFIG_FILENAME, true)
+ };
+ config_filename
+ .map(|config_filename| config_from_filename(&config_filename))
+ .transpose()
+}
+
+fn config_from_filename(
+ filename: &std::path::Path,
+) -> Result<config::Config> {
+ let mut config = config::Config::default();
+ config
+ .merge(config::File::from(filename))
+ .context(crate::error::ParseConfigFile)?;
+ Ok(config)
+}
+
+#[derive(serde::Deserialize, Debug)]
+pub struct Client {
+ #[serde(deserialize_with = "auth_type", default = "default_auth_type")]
+ pub auth: crate::protocol::AuthType,
+
+ #[serde(default = "default_username")]
+ pub username: Option<String>,
+
+ #[serde(
+ deserialize_with = "connect_address",
+ default = "default_connect_address"
+ )]
+ pub connect_address: (String, std::net::SocketAddr),
+
+ #[serde(default = "default_tls")]
+ pub tls: bool,
+}
+
+impl Client {
+ pub fn host(&self) -> &str {
+ &self.connect_address.0
+ }
+
+ pub fn addr(&self) -> &std::net::SocketAddr {
+ &self.connect_address.1
+ }
+
+ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ let login_plain_help = "Use the 'plain' authentication method (default), with username USERNAME (defaults to $USER)";
+ let login_recurse_center_help =
+ "Use the 'recurse_center' authentication method";
+ let connect_address_help =
+ "Host and port to connect to (defaults to localhost:4144)";
+ let tls_help = "Connect to the server using TLS";
+
+ app.arg(
+ clap::Arg::with_name(LOGIN_PLAIN_OPTION)
+ .long(LOGIN_PLAIN_OPTION)
+ .takes_value(true)
+ .value_name("USERNAME")
+ .help(login_plain_help),
+ )
+ .arg(
+ clap::Arg::with_name(LOGIN_RECURSE_CENTER_OPTION)
+ .long(LOGIN_RECURSE_CENTER_OPTION)
+ .conflicts_with(LOGIN_PLAIN_OPTION)
+ .help(login_recurse_center_help),
+ )
+ .arg(
+ clap::Arg::with_name(CONNECT_ADDRESS_OPTION)
+ .long(CONNECT_ADDRESS_OPTION)
+ .takes_value(true)
+ .value_name("HOST:PORT")
+ .help(connect_address_help),
+ )
+ .arg(
+ clap::Arg::with_name(TLS_OPTION)
+ .long(TLS_OPTION)
+ .help(tls_help),
+ )
+ }
+
+ pub fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ if matches.is_present(LOGIN_RECURSE_CENTER_OPTION) {
+ self.auth = crate::protocol::AuthType::RecurseCenter;
+ }
+ if matches.is_present(LOGIN_PLAIN_OPTION) {
+ let username = matches
+ .value_of(LOGIN_PLAIN_OPTION)
+ .map(std::string::ToString::to_string);
+ self.auth = crate::protocol::AuthType::Plain;
+ self.username = username;
+ }
+ if matches.is_present(CONNECT_ADDRESS_OPTION) {
+ let address = matches.value_of(CONNECT_ADDRESS_OPTION).unwrap();
+ self.connect_address = to_connect_address(address)?;
+ }
+ if matches.is_present(TLS_OPTION) {
+ self.tls = true;
+ }
+ Ok(())
+ }
+}
+
+impl Default for Client {
+ fn default() -> Self {
+ Self {
+ auth: default_auth_type(),
+ username: default_username(),
+ connect_address: default_connect_address(),
+ tls: default_tls(),
+ }
+ }
+}
+
+fn auth_type<'a, D>(
+ deserializer: D,
+) -> std::result::Result<crate::protocol::AuthType, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ crate::protocol::AuthType::try_from(
+ <String>::deserialize(deserializer)?.as_ref(),
+ )
+ .map_err(serde::de::Error::custom)
+}
+
+fn default_auth_type() -> crate::protocol::AuthType {
+ DEFAULT_AUTH_TYPE
+}
+
+fn default_username() -> Option<String> {
+ std::env::var("USER").ok()
+}
+
+fn connect_address<'a, D>(
+ deserializer: D,
+) -> std::result::Result<(String, std::net::SocketAddr), D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ to_connect_address(&<String>::deserialize(deserializer)?)
+ .map_err(serde::de::Error::custom)
+}
+
+fn default_connect_address() -> (String, std::net::SocketAddr) {
+ to_connect_address(DEFAULT_CONNECT_ADDRESS).unwrap()
+}
+
+// XXX this does a blocking dns lookup - should try to find an async version
+fn to_connect_address(
+ address: &str,
+) -> Result<(String, std::net::SocketAddr)> {
+ let mut address_parts = address.split(':');
+ let host = address_parts.next().context(crate::error::ParseAddress)?;
+ let port_str =
+ address_parts.next().context(crate::error::ParseAddress)?;
+ let port: u16 = port_str
+ .parse()
+ .context(crate::error::ParsePort { string: port_str })?;
+ let socket_addr = (host, port)
+ .to_socket_addrs()
+ .context(crate::error::ResolveAddress { host, port })?
+ .next()
+ .context(crate::error::HasResolvedAddr)?;
+ Ok((host.to_string(), socket_addr))
+}
+
+fn default_tls() -> bool {
+ DEFAULT_TLS
+}
+
+#[derive(serde::Deserialize, Debug)]
+pub struct Server {
+ #[serde(
+ deserialize_with = "listen_address",
+ default = "default_listen_address"
+ )]
+ pub listen_address: std::net::SocketAddr,
+
+ #[serde(
+ rename = "read_timeout_secs",
+ deserialize_with = "read_timeout",
+ default = "default_read_timeout"
+ )]
+ pub read_timeout: std::time::Duration,
+
+ pub tls_identity_file: Option<String>,
+
+ #[serde(
+ deserialize_with = "allowed_login_methods",
+ default = "default_allowed_login_methods"
+ )]
+ pub allowed_login_methods:
+ std::collections::HashSet<crate::protocol::AuthType>,
+
+ #[serde(deserialize_with = "uid", default)]
+ pub uid: Option<users::uid_t>,
+
+ #[serde(deserialize_with = "gid", default)]
+ pub gid: Option<users::gid_t>,
+}
+
+impl Server {
+ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ let listen_address_help =
+ "Host and port to listen on (defaults to localhost:4144)";
+ let read_timeout_help = "Number of idle seconds to wait before disconnecting a client (defaults to 30)";
+ let tls_identity_file_help = "File containing the TLS certificate and private key to use for accepting TLS connections. Must be in pfx format. The server will only allow connections over TLS if this option is set.";
+ let allowed_login_methods_help = "Comma separated list containing the auth methods this server should allow. Allows everything by default, valid values are plain, recurse_center";
+ app.arg(
+ clap::Arg::with_name(LISTEN_ADDRESS_OPTION)
+ .long(LISTEN_ADDRESS_OPTION)
+ .takes_value(true)
+ .value_name("HOST:PORT")
+ .help(listen_address_help),
+ )
+ .arg(
+ clap::Arg::with_name(READ_TIMEOUT_OPTION)
+ .long(READ_TIMEOUT_OPTION)
+ .takes_value(true)
+ .value_name("SECS")
+ .help(read_timeout_help),
+ )
+ .arg(
+ clap::Arg::with_name(TLS_IDENTITY_FILE_OPTION)
+ .long(TLS_IDENTITY_FILE_OPTION)
+ .takes_value(true)
+ .value_name("FILE")
+ .help(tls_identity_file_help),
+ )
+ .arg(
+ clap::Arg::with_name(ALLOWED_LOGIN_METHODS_OPTION)
+ .long(ALLOWED_LOGIN_METHODS_OPTION)
+ .use_delimiter(true)
+ .takes_value(true)
+ .value_name("AUTH_METHODS")
+ .help(allowed_login_methods_help),
+ )
+ }
+
+ pub fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ if matches.is_present(LISTEN_ADDRESS_OPTION) {
+ self.listen_address = matches
+ .value_of(LISTEN_ADDRESS_OPTION)
+ .unwrap()
+ .parse()
+ .context(crate::error::ParseAddr)?;
+ }
+ if matches.is_present(READ_TIMEOUT_OPTION) {
+ let s = matches.value_of(READ_TIMEOUT_OPTION).unwrap();
+ self.read_timeout = s
+ .parse()
+ .map(std::time::Duration::from_secs)
+ .context(crate::error::ParseReadTimeout { input: s })?;
+ }
+ if matches.is_present(TLS_IDENTITY_FILE_OPTION) {
+ self.tls_identity_file = Some(
+ matches
+ .value_of(TLS_IDENTITY_FILE_OPTION)
+ .unwrap()
+ .to_string(),
+ );
+ }
+ if matches.is_present(ALLOWED_LOGIN_METHODS_OPTION) {
+ self.allowed_login_methods = matches
+ .values_of(ALLOWED_LOGIN_METHODS_OPTION)
+ .unwrap()
+ .map(crate::protocol::AuthType::try_from)
+ .collect::<Result<
+ std::collections::HashSet<crate::protocol::AuthType>,
+ >>()?;
+ }
+ Ok(())
+ }
+}
+
+impl Default for Server {
+ fn default() -> Self {
+ Self {
+ listen_address: default_listen_address(),
+ read_timeout: default_read_timeout(),
+ tls_identity_file: None,
+ allowed_login_methods: default_allowed_login_methods(),
+ uid: None,
+ gid: None,
+ }
+ }
+}
+
+fn listen_address<'a, D>(
+ deserializer: D,
+) -> std::result::Result<std::net::SocketAddr, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ to_listen_address(&<String>::deserialize(deserializer)?)
+ .map_err(serde::de::Error::custom)
+}
+
+fn default_listen_address() -> std::net::SocketAddr {
+ to_listen_address(DEFAULT_LISTEN_ADDRESS).unwrap()
+}
+
+fn to_listen_address(address: &str) -> Result<std::net::SocketAddr> {
+ address.parse().context(crate::error::ParseAddr)
+}
+
+fn read_timeout<'a, D>(
+ deserializer: D,
+) -> std::result::Result<std::time::Duration, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ Ok(std::time::Duration::from_secs(u64::deserialize(
+ deserializer,
+ )?))
+}
+
+fn default_read_timeout() -> std::time::Duration {
+ DEFAULT_READ_TIMEOUT
+}
+
+fn allowed_login_methods<'a, D>(
+ deserializer: D,
+) -> std::result::Result<
+ std::collections::HashSet<crate::protocol::AuthType>,
+ D::Error,
+>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ struct StringOrVec;
+
+ impl<'a> serde::de::Visitor<'a> for StringOrVec {
+ type Value = Vec<String>;
+
+ fn expecting(
+ &self,
+ formatter: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ formatter.write_str("string or list")
+ }
+
+ fn visit_str<E>(
+ self,
+ value: &str,
+ ) -> std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(value
+ .split(',')
+ .map(std::string::ToString::to_string)
+ .collect())
+ }
+
+ fn visit_seq<A>(
+ self,
+ seq: A,
+ ) -> std::result::Result<Self::Value, A::Error>
+ where
+ A: serde::de::SeqAccess<'a>,
+ {
+ serde::de::Deserialize::deserialize(
+ serde::de::value::SeqAccessDeserializer::new(seq),
+ )
+ }
+ }
+
+ deserializer
+ .deserialize_any(StringOrVec)?
+ .iter()
+ .map(|s| {
+ crate::protocol::AuthType::try_from(s.as_str())
+ .map_err(serde::de::Error::custom)
+ })
+ .collect()
+}
+
+fn default_allowed_login_methods(
+) -> std::collections::HashSet<crate::protocol::AuthType> {
+ crate::protocol::AuthType::iter().collect()
+}
+
+fn uid<'a, D>(
+ deserializer: D,
+) -> std::result::Result<Option<users::uid_t>, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ struct StringOrInt;
+
+ impl<'a> serde::de::Visitor<'a> for StringOrInt {
+ type Value = Option<u32>;
+
+ fn expecting(
+ &self,
+ formatter: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ formatter.write_str("string or int")
+ }
+
+ fn visit_str<E>(
+ self,
+ value: &str,
+ ) -> std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(Some(
+ users::get_user_by_name(value)
+ .context(crate::error::UnknownUser { name: value })
+ .map_err(serde::de::Error::custom)?
+ .uid(),
+ ))
+ }
+
+ fn visit_u32<E>(
+ self,
+ value: u32,
+ ) -> std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ if users::get_user_by_uid(value).is_none() {
+ return Err(serde::de::Error::custom(Error::UnknownUid {
+ uid: value,
+ }));
+ }
+ Ok(Some(value))
+ }
+ }
+
+ deserializer.deserialize_any(StringOrInt)
+}
+
+fn gid<'a, D>(
+ deserializer: D,
+) -> std::result::Result<Option<users::gid_t>, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ struct StringOrInt;
+
+ impl<'a> serde::de::Visitor<'a> for StringOrInt {
+ type Value = Option<u32>;
+
+ fn expecting(
+ &self,
+ formatter: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ formatter.write_str("string or int")
+ }
+
+ fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(None)
+ }
+
+ fn visit_str<E>(
+ self,
+ value: &str,
+ ) -> std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(Some(
+ users::get_group_by_name(value)
+ .context(crate::error::UnknownGroup { name: value })
+ .map_err(serde::de::Error::custom)?
+ .gid(),
+ ))
+ }
+
+ fn visit_u32<E>(
+ self,
+ value: u32,
+ ) -> std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ if users::get_group_by_gid(value).is_none() {
+ return Err(serde::de::Error::custom(Error::UnknownGid {
+ gid: value,
+ }));
+ }
+ Ok(Some(value))
+ }
+ }
+
+ deserializer.deserialize_any(StringOrInt)
+}
+
+#[derive(serde::Deserialize, Debug)]
+pub struct Web {
+ #[serde(
+ deserialize_with = "listen_address",
+ default = "default_web_listen_address"
+ )]
+ pub listen_address: std::net::SocketAddr,
+}
+
+impl Web {
+ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ let listen_address_help =
+ "Host and port to listen on (defaults to localhost:4144)";
+ app.arg(
+ clap::Arg::with_name(LISTEN_ADDRESS_OPTION)
+ .long(LISTEN_ADDRESS_OPTION)
+ .takes_value(true)
+ .value_name("HOST:PORT")
+ .help(listen_address_help),
+ )
+ }
+ pub fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ if matches.is_present(LISTEN_ADDRESS_OPTION) {
+ self.listen_address = matches
+ .value_of(LISTEN_ADDRESS_OPTION)
+ .unwrap()
+ .parse()
+ .context(crate::error::ParseAddr)?;
+ }
+ Ok(())
+ }
+}
+
+impl Default for Web {
+ fn default() -> Self {
+ Self {
+ listen_address: default_web_listen_address(),
+ }
+ }
+}
+
+fn default_web_listen_address() -> std::net::SocketAddr {
+ to_listen_address(DEFAULT_WEB_LISTEN_ADDRESS).unwrap()
+}
+
+#[derive(serde::Deserialize, Debug)]
+pub struct Command {
+ #[serde(default = "default_command")]
+ pub command: String,
+
+ #[serde(default = "default_args")]
+ pub args: Vec<String>,
+}
+
+impl Command {
+ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ let command_help = "Command to run";
+ let args_help = "Arguments for the command";
+
+ app.arg(
+ clap::Arg::with_name(COMMAND_OPTION)
+ .index(1)
+ .help(command_help),
+ )
+ .arg(
+ clap::Arg::with_name(ARGS_OPTION)
+ .index(2)
+ .multiple(true)
+ .help(args_help),
+ )
+ }
+ pub fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ if matches.is_present(COMMAND_OPTION) {
+ self.command =
+ matches.value_of(COMMAND_OPTION).unwrap().to_string();
+ }
+ if matches.is_present(ARGS_OPTION) {
+ self.args = matches
+ .values_of(ARGS_OPTION)
+ .unwrap()
+ .map(std::string::ToString::to_string)
+ .collect();
+ }
+ Ok(())
+ }
+}
+
+impl Default for Command {
+ fn default() -> Self {
+ Self {
+ command: default_command(),
+ args: default_args(),
+ }
+ }
+}
+
+fn default_command() -> String {
+ std::env::var("SHELL").unwrap_or_else(|_| "/bin/bash".to_string())
+}
+
+fn default_args() -> Vec<String> {
+ vec![]
+}
+
+#[derive(serde::Deserialize, Debug)]
+pub struct Ttyrec {
+ #[serde(default = "default_ttyrec_filename")]
+ pub filename: String,
+}
+
+impl Ttyrec {
+ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ let filename_help =
+ "TTYrec file to use (defaults to teleterm.ttyrec)";
+ app.arg(
+ clap::Arg::with_name(FILENAME_OPTION)
+ .long(FILENAME_OPTION)
+ .takes_value(true)
+ .value_name("FILE")
+ .help(filename_help),
+ )
+ }
+
+ pub fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ if matches.is_present(FILENAME_OPTION) {
+ self.filename =
+ matches.value_of(FILENAME_OPTION).unwrap().to_string();
+ }
+ Ok(())
+ }
+}
+
+impl Default for Ttyrec {
+ fn default() -> Self {
+ Self {
+ filename: default_ttyrec_filename(),
+ }
+ }
+}
+
+fn default_ttyrec_filename() -> String {
+ DEFAULT_TTYREC_FILENAME.to_string()
+}
+
+#[derive(serde::Deserialize, Debug)]
+pub struct Play {
+ #[serde(default)]
+ pub play_at_start: bool,
+
+ #[serde(default = "default_playback_ratio")]
+ pub playback_ratio: f32,
+
+ #[serde(default, deserialize_with = "max_frame_length")]
+ pub max_frame_length: Option<std::time::Duration>,
+}
+
+impl Play {
+ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
+ let play_at_start_help = "Start the player unpaused";
+ let playback_ratio_help =
+ "Speed to play back the ttyrec at (defaults to 1.0)";
+ let max_frame_length_help =
+ "Clamp frame duration at this number of seconds";
+ app.arg(
+ clap::Arg::with_name(PLAY_AT_START_OPTION)
+ .long(PLAY_AT_START_OPTION)
+ .help(play_at_start_help),
+ )
+ .arg(
+ clap::Arg::with_name(PLAYBACK_RATIO_OPTION)
+ .long(PLAYBACK_RATIO_OPTION)
+ .takes_value(true)
+ .value_name("RATIO")
+ .help(playback_ratio_help),
+ )
+ .arg(
+ clap::Arg::with_name(MAX_FRAME_LENGTH_OPTION)
+ .long(MAX_FRAME_LENGTH_OPTION)
+ .takes_value(true)
+ .value_name("SECS")
+ .help(max_frame_length_help),
+ )
+ }
+
+ pub fn merge_args<'a>(
+ &mut self,
+ matches: &clap::ArgMatches<'a>,
+ ) -> Result<()> {
+ self.play_at_start = matches.is_present(PLAY_AT_START_OPTION);
+ if matches.is_present(PLAYBACK_RATIO_OPTION) {
+ self.playback_ratio = matches
+ .value_of(PLAYBACK_RATIO_OPTION)
+ .unwrap()
+ .to_string()
+ .parse()
+ .context(crate::error::ParseFloat {
+ name: PLAYBACK_RATIO_OPTION,
+ })?;
+ }
+ self.max_frame_length = matches
+ .value_of(MAX_FRAME_LENGTH_OPTION)
+ .map(|len| len.parse().map(std::time::Duration::from_secs))
+ .transpose()
+ .context(crate::error::ParseMaxFrameLength)?;
+ Ok(())
+ }
+}
+
+impl Default for Play {
+ fn default() -> Self {
+ Self {
+ play_at_start: false,
+ playback_ratio: default_playback_ratio(),
+ max_frame_length: None,
+ }
+ }
+}
+
+fn default_playback_ratio() -> f32 {
+ 1.0
+}
+
+fn max_frame_length<'a, D>(
+ deserializer: D,
+) -> std::result::Result<Option<std::time::Duration>, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ Ok(Some(std::time::Duration::from_secs(u64::deserialize(
+ deserializer,
+ )?)))
+}
+
+pub fn oauth_configs<'a, D>(
+ deserializer: D,
+) -> std::result::Result<
+ std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+ D::Error,
+>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ let configs =
+ <std::collections::HashMap<String, OauthConfig>>::deserialize(
+ deserializer,
+ )?;
+ let mut ret = std::collections::HashMap::new();
+ for (key, config) in configs {
+ let auth_type = crate::protocol::AuthType::try_from(key.as_str())
+ .map_err(serde::de::Error::custom)?;
+ let real_config = match auth_type {
+ crate::protocol::AuthType::RecurseCenter => {
+ let client_id = config
+ .client_id
+ .context(crate::error::OauthMissingConfiguration {
+ field: "client_id",
+ auth_type,
+ })
+ .map_err(serde::de::Error::custom)?;
+ let client_secret = config
+ .client_secret
+ .context(crate::error::OauthMissingConfiguration {
+ field: "client_secret",
+ auth_type,
+ })
+ .map_err(serde::de::Error::custom)?;
+ crate::oauth::RecurseCenter::config(
+ &client_id,
+ &client_secret,
+ )
+ }
+ ty if !ty.is_oauth() => {
+ return Err(Error::AuthTypeNotOauth { ty: auth_type })
+ .map_err(serde::de::Error::custom);
+ }
+ _ => unreachable!(),
+ };
+ ret.insert(auth_type, real_config);
+ }
+ Ok(ret)
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct OauthConfig {
+ #[serde(default)]
+ client_id: Option<String>,
+
+ #[serde(default)]
+ client_secret: Option<String>,
+
+ #[serde(deserialize_with = "url", default)]
+ auth_url: Option<url::Url>,
+
+ #[serde(deserialize_with = "url", default)]
+ token_url: Option<url::Url>,
+
+ #[serde(deserialize_with = "url", default)]
+ redirect_url: Option<url::Url>,
+}
+
+fn url<'a, D>(
+ deserializer: D,
+) -> std::result::Result<Option<url::Url>, D::Error>
+where
+ D: serde::de::Deserializer<'a>,
+{
+ Ok(<Option<String>>::deserialize(deserializer)?
+ .map(|s| url::Url::parse(&s))
+ .transpose()
+ .map_err(serde::de::Error::custom)?)
+}
diff --git a/teleterm/src/config/wizard.rs b/teleterm/src/config/wizard.rs
new file mode 100644
index 0000000..b13d0e0
--- /dev/null
+++ b/teleterm/src/config/wizard.rs
@@ -0,0 +1,159 @@
+use crate::prelude::*;
+use std::io::Write as _;
+
+pub fn run() -> Result<Option<config::Config>> {
+ println!("No configuration file found.");
+ let run_wizard = prompt(
+ "Would you like me to ask you some questions to generate one?",
+ )?;
+ if !run_wizard {
+ let shouldnt_touch = prompt(
+ "Would you like me to ask this question again in the future?",
+ )?;
+ if !shouldnt_touch {
+ touch_config_file()?;
+ }
+ return Ok(None);
+ }
+
+ let connect_address =
+ prompt_addr("Which server would you like to connect to?")?;
+ let tls = prompt("Does this server require a TLS connection?")?;
+ let auth_type = prompt_auth_type(
+ "How would you like to authenticate to this server?",
+ )?;
+
+ write_config_file(&connect_address, tls, &auth_type).and_then(
+ |config_filename| {
+ Some(super::config_from_filename(&config_filename)).transpose()
+ },
+ )
+}
+
+fn touch_config_file() -> Result<()> {
+ let config_filename = crate::dirs::Dirs::new()
+ .config_file(super::CONFIG_FILENAME, false)
+ .unwrap();
+ std::fs::File::create(config_filename.clone()).context(
+ crate::error::CreateFileSync {
+ filename: config_filename.to_string_lossy(),
+ },
+ )?;
+ Ok(())
+}
+
+fn write_config_file(
+ connect_address: &str,
+ tls: bool,
+ auth_type: &str,
+) -> Result<std::path::PathBuf> {
+ let contents = format!(
+ r#"[client]
+connect_address = "{}"
+tls = {}
+auth = "{}"
+"#,
+ connect_address, tls, auth_type
+ );
+ let config_filename = crate::dirs::Dirs::new()
+ .config_file(super::CONFIG_FILENAME, false)
+ .unwrap();
+ let mut file = std::fs::File::create(config_filename.clone()).context(
+ crate::error::CreateFileSync {
+ filename: config_filename.to_string_lossy(),
+ },
+ )?;
+ file.write_all(contents.as_bytes())
+ .context(crate::error::WriteFileSync)?;
+ Ok(config_filename)
+}
+
+fn prompt(msg: &str) -> Result<bool> {
+ print!("{} [y/n]: ", msg);
+ std::io::stdout()
+ .flush()
+ .context(crate::error::FlushTerminal)?;
+ let mut response = String::new();
+ std::io::stdin()
+ .read_line(&mut response)
+ .context(crate::error::ReadTerminal)?;
+
+ loop {
+ match response.trim() {
+ "y" | "yes" => {
+ return Ok(true);
+ }
+ "n" | "no" => {
+ return Ok(false);
+ }
+ _ => {
+ print!("Please answer [y]es or [n]o: ");
+ std::io::stdout()
+ .flush()
+ .context(crate::error::FlushTerminal)?;
+ std::io::stdin()
+ .read_line(&mut response)
+ .context(crate::error::ReadTerminal)?;
+ }
+ }
+ }
+}
+
+fn prompt_addr(msg: &str) -> Result<String> {
+ loop {
+ print!("{} [addr:port]: ", msg);
+ std::io::stdout()
+ .flush()
+ .context(crate::error::FlushTerminal)?;
+ let mut response = String::new();
+ std::io::stdin()
+ .read_line(&mut response)
+ .context(crate::error::ReadTerminal)?;
+
+ match response.trim() {
+ addr if addr.contains(':') => {
+ match super::to_connect_address(addr) {
+ Ok(..) => return Ok(addr.to_string()),
+ _ => {
+ println!("Couldn't parse '{}'.", addr);
+ }
+ };
+ }
+ _ => {
+ println!("Please include a port number.");
+ }
+ }
+ }
+}
+
+fn prompt_auth_type(msg: &str) -> Result<String> {
+ let auth_type_names: Vec<_> = crate::protocol::AuthType::iter()
+ .map(crate::protocol::AuthType::name)
+ .collect();
+
+ loop {
+ println!("{}", msg);
+ println!("Options are:");
+ for (i, name) in auth_type_names.iter().enumerate() {
+ println!("{}: {}", i + 1, name);
+ }
+ print!("Choose [1-{}]: ", auth_type_names.len());
+ std::io::stdout()
+ .flush()
+ .context(crate::error::FlushTerminal)?;
+ let mut response = String::new();
+ std::io::stdin()
+ .read_line(&mut response)
+ .context(crate::error::ReadTerminal)?;
+
+ let num: Option<usize> = response.trim().parse().ok();
+ if let Some(num) = num {
+ if num > 0 && num <= auth_type_names.len() {
+ let name = auth_type_names[num - 1];
+ return Ok(name.to_string());
+ }
+ }
+
+ println!("Invalid response '{}'", response.trim());
+ }
+}
diff --git a/teleterm/src/dirs.rs b/teleterm/src/dirs.rs
new file mode 100644
index 0000000..bd4741d
--- /dev/null
+++ b/teleterm/src/dirs.rs
@@ -0,0 +1,98 @@
+use crate::prelude::*;
+
+pub struct Dirs {
+ project_dirs: Option<directories::ProjectDirs>,
+}
+
+impl Dirs {
+ pub fn new() -> Self {
+ Self {
+ project_dirs: directories::ProjectDirs::from("", "", "teleterm"),
+ }
+ }
+
+ pub fn create_all(&self) -> Result<()> {
+ if let Some(filename) = self.data_dir() {
+ std::fs::create_dir_all(filename).with_context(|| {
+ crate::error::CreateDir {
+ filename: filename.to_string_lossy(),
+ }
+ })?;
+ }
+ Ok(())
+ }
+
+ fn has_home(&self) -> bool {
+ directories::BaseDirs::new().map_or(false, |dirs| {
+ dirs.home_dir() != std::path::Path::new("/")
+ })
+ }
+
+ fn global_config_dir(&self) -> &std::path::Path {
+ std::path::Path::new("/etc/teleterm")
+ }
+
+ fn config_dir(&self) -> Option<&std::path::Path> {
+ if self.has_home() {
+ self.project_dirs
+ .as_ref()
+ .map(directories::ProjectDirs::config_dir)
+ } else {
+ None
+ }
+ }
+
+ pub fn config_file(
+ &self,
+ name: &str,
+ must_exist: bool,
+ ) -> Option<std::path::PathBuf> {
+ if let Some(config_dir) = self.config_dir() {
+ let file = config_dir.join(name);
+ if !must_exist || file.exists() {
+ return Some(file);
+ }
+ }
+
+ let file = self.global_config_dir().join(name);
+ if !must_exist || file.exists() {
+ return Some(file);
+ }
+
+ None
+ }
+
+ fn global_data_dir(&self) -> &std::path::Path {
+ std::path::Path::new("/var/lib/teleterm")
+ }
+
+ fn data_dir(&self) -> Option<&std::path::Path> {
+ if self.has_home() {
+ self.project_dirs
+ .as_ref()
+ .map(directories::ProjectDirs::data_dir)
+ } else {
+ None
+ }
+ }
+
+ pub fn data_file(
+ &self,
+ name: &str,
+ must_exist: bool,
+ ) -> Option<std::path::PathBuf> {
+ if let Some(data_dir) = self.data_dir() {
+ let file = data_dir.join(name);
+ if !must_exist || file.exists() {
+ return Some(file);
+ }
+ }
+
+ let file = self.global_data_dir().join(name);
+ if !must_exist || file.exists() {
+ return Some(file);
+ }
+
+ None
+ }
+}
diff --git a/teleterm/src/error.rs b/teleterm/src/error.rs
new file mode 100644
index 0000000..e3c5206
--- /dev/null
+++ b/teleterm/src/error.rs
@@ -0,0 +1,415 @@
+#[derive(Debug, snafu::Snafu)]
+#[snafu(visibility = "pub")]
+pub enum Error {
+ #[snafu(display("failed to accept: {}", source))]
+ Acceptor { source: tokio::io::Error },
+
+ #[snafu(display(
+ "oauth configuration for auth type {:?} not found",
+ ty
+ ))]
+ AuthTypeMissingOauthConfig { ty: crate::protocol::AuthType },
+
+ #[snafu(display("auth type {:?} not allowed", ty))]
+ AuthTypeNotAllowed { ty: crate::protocol::AuthType },
+
+ #[snafu(display("auth type {:?} does not use oauth", ty))]
+ AuthTypeNotOauth { ty: crate::protocol::AuthType },
+
+ #[snafu(display("failed to bind to {}: {}", address, source))]
+ Bind {
+ address: std::net::SocketAddr,
+ source: tokio::io::Error,
+ },
+
+ #[snafu(display("config file {} doesn't exist", name))]
+ ConfigFileDoesntExist { name: String },
+
+ #[snafu(display("failed to connect to {}: {}", address, source))]
+ Connect {
+ address: std::net::SocketAddr,
+ source: std::io::Error,
+ },
+
+ #[snafu(display(
+ "failed to make tls connection to {}: {}",
+ host,
+ source
+ ))]
+ ConnectTls {
+ host: String,
+ source: native_tls::Error,
+ },
+
+ #[snafu(display("couldn't determine the current username"))]
+ CouldntFindUsername,
+
+ #[snafu(display("failed to parse configuration: {}", source))]
+ CouldntParseConfig { source: config::ConfigError },
+
+ #[snafu(display("failed to create tls acceptor: {}", source))]
+ CreateAcceptor { source: native_tls::Error },
+
+ #[snafu(display("failed to create tls connector: {}", source))]
+ CreateConnector { source: native_tls::Error },
+
+ #[snafu(display("failed to create directory {}: {}", filename, source))]
+ CreateDir {
+ filename: String,
+ source: std::io::Error,
+ },
+
+ #[snafu(display("failed to create file {}: {}", filename, source))]
+ CreateFile {
+ filename: String,
+ source: tokio::io::Error,
+ },
+
+ #[snafu(display("failed to create file {}: {}", filename, source))]
+ CreateFileSync {
+ filename: String,
+ source: std::io::Error,
+ },
+
+ #[snafu(display("received EOF from server"))]
+ EOF,
+
+ #[snafu(display("failed to retrieve access token: {:?}", msg))]
+ ExchangeCode {
+ msg: String,
+ // XXX RequestTokenError doesn't implement the right traits
+ // source: oauth2::RequestTokenError<
+ // oauth2::reqwest::Error,
+ // oauth2::StandardErrorResponse<
+ // oauth2::basic::BasicErrorResponseType,
+ // >,
+ // >
+ },
+
+ #[snafu(display(
+ "failed to parse string {:?}: unexpected trailing data",
+ data
+ ))]
+ ExtraMessageData { data: Vec<u8> },
+
+ #[snafu(display("failed to write to stdout: {}", source))]
+ FlushTerminal { source: tokio::io::Error },
+
+ #[snafu(display(
+ "failed to get recurse center profile data: {}",
+ source
+ ))]
+ GetRecurseCenterProfile { source: reqwest::Error },
+
+ #[snafu(display("failed to get terminal size: {}", source))]
+ GetTerminalSize { source: crossterm::ErrorKind },
+
+ #[snafu(display("failed to find any resolvable addresses"))]
+ HasResolvedAddr,
+
+ #[snafu(display("invalid auth type {}", ty))]
+ InvalidAuthType { ty: u8 },
+
+ #[snafu(display("invalid auth type {}", ty))]
+ InvalidAuthTypeStr { ty: String },
+
+ #[snafu(display("invalid message type {}", ty))]
+ InvalidMessageType { ty: u8 },
+
+ #[snafu(display("invalid watch id {}", id))]
+ InvalidWatchId { id: String },
+
+ #[snafu(display(
+ "packet length must be at least {} bytes (got {})",
+ expected,
+ len
+ ))]
+ LenTooSmall { len: u32, expected: usize },
+
+ #[snafu(display(
+ "packet length must be at most {} bytes (got {})",
+ expected,
+ len
+ ))]
+ LenTooBig { len: u32, expected: usize },
+
+ #[snafu(display("couldn't find name in argv"))]
+ MissingArgv,
+
+ #[snafu(display(
+ "detected argv path {} was not a valid filename",
+ path
+ ))]
+ NotAFileName { path: String },
+
+ #[snafu(display(
+ "missing oauth configuration item {} for auth type {}",
+ field,
+ auth_type.name(),
+ ))]
+ OauthMissingConfiguration {
+ field: String,
+ auth_type: crate::protocol::AuthType,
+ },
+
+ #[snafu(display("failed to open file {}: {}", filename, source))]
+ OpenFile {
+ filename: String,
+ source: tokio::io::Error,
+ },
+
+ #[snafu(display("failed to open file {}: {}", filename, source))]
+ OpenFileSync {
+ filename: String,
+ source: std::io::Error,
+ },
+
+ #[snafu(display("failed to open link in browser: {}", source))]
+ OpenLink { source: std::io::Error },
+
+ #[snafu(display("failed to parse address"))]
+ ParseAddress,
+
+ #[snafu(display("failed to parse address: {}", source))]
+ ParseAddr { source: std::net::AddrParseError },
+
+ #[snafu(display("{}", source))]
+ ParseArgs { source: clap::Error },
+
+ #[snafu(display("failed to parse buffer size {}: {}", input, source))]
+ ParseBufferSize {
+ input: String,
+ source: std::num::ParseIntError,
+ },
+
+ #[snafu(display("failed to parse config file: {}", source))]
+ ParseConfigFile { source: config::ConfigError },
+
+ #[snafu(display("failed to parse incoming http request"))]
+ ParseHttpRequest,
+
+ #[snafu(display(
+ "failed to validate csrf token on incoming http request"
+ ))]
+ ParseHttpRequestCsrf,
+
+ #[snafu(display(
+ "incoming http request had no code in the query parameters"
+ ))]
+ ParseHttpRequestMissingCode,
+
+ #[snafu(display(
+ "failed to parse path from incoming http request: {}",
+ source
+ ))]
+ ParseHttpRequestPath { source: url::ParseError },
+
+ #[snafu(display("failed to parse identity file: {}", source))]
+ ParseIdentity { source: native_tls::Error },
+
+ #[snafu(display(
+ "failed to parse int from buffer {:?}: {}",
+ buf,
+ source
+ ))]
+ ParseInt {
+ buf: Vec<u8>,
+ source: std::array::TryFromSliceError,
+ },
+
+ #[snafu(display("failed to parse float option {}: {}", name, source))]
+ ParseFloat {
+ name: String,
+ source: std::num::ParseFloatError,
+ },
+
+ #[snafu(display("failed to parse response json: {}", source))]
+ ParseJson { source: reqwest::Error },
+
+ #[snafu(display("failed to parse max frame length: {}", source))]
+ ParseMaxFrameLength { source: std::num::ParseIntError },
+
+ #[snafu(display(
+ "failed to parse port {} from address: {}",
+ string,
+ source
+ ))]
+ ParsePort {
+ string: String,
+ source: std::num::ParseIntError,
+ },
+
+ #[snafu(display("failed to parse read timeout {}: {}", input, source))]
+ ParseReadTimeout {
+ input: String,
+ source: std::num::ParseIntError,
+ },
+
+ #[snafu(display("failed to parse string {:?}: {}", string, source))]
+ ParseString {
+ string: Vec<u8>,
+ source: std::string::FromUtf8Error,
+ },
+
+ #[snafu(display("rate limit exceeded"))]
+ RateLimited,
+
+ #[snafu(display("failed to read from event channel: {}", source))]
+ ReadChannel {
+ source: tokio::sync::mpsc::error::UnboundedRecvError,
+ },
+
+ #[snafu(display("failed to read from file: {}", source))]
+ ReadFile { source: tokio::io::Error },
+
+ #[snafu(display("failed to read from file: {}", source))]
+ ReadFileSync { source: std::io::Error },
+
+ #[snafu(display("{}", source))]
+ ReadMessageWithTimeout {
+ #[snafu(source(from(tokio::timer::timeout::Error<Error>, Box::new)))]
+ source: Box<tokio::timer::timeout::Error<Error>>,
+ },
+
+ #[snafu(display("failed to read packet: {}", source))]
+ ReadPacket { source: tokio::io::Error },
+
+ #[snafu(display("failed to read from socket: {}", source))]
+ ReadSocket { source: tokio::io::Error },
+
+ #[snafu(display("failed to read from terminal: {}", source))]
+ ReadTerminal { source: std::io::Error },
+
+ #[snafu(display("failed to read ttyrec: {}", source))]
+ ReadTtyrec { source: ttyrec::Error },
+
+ #[snafu(display("failed to poll for terminal resizing: {}", source))]
+ Resize {
+ source: tokio_terminal_resize::Error,
+ },
+
+ #[snafu(display(
+ "failed to resolve address {}:{}: {}",
+ host,
+ port,
+ source
+ ))]
+ ResolveAddress {
+ host: String,
+ port: u16,
+ source: std::io::Error,
+ },
+
+ #[snafu(display("received error from server: {}", message))]
+ Server { message: String },
+
+ #[snafu(display("SIGWINCH handler failed: {}", source))]
+ SigWinchHandler { source: std::io::Error },
+
+ #[snafu(display("failed to sleep until next frame: {}", source))]
+ Sleep { source: tokio::timer::Error },
+
+ #[snafu(display(
+ "failed to receive new socket over channel: channel closed"
+ ))]
+ SocketChannelClosed,
+
+ #[snafu(display(
+ "failed to receive new socket over channel: {}",
+ source
+ ))]
+ SocketChannelReceive {
+ source: tokio::sync::mpsc::error::RecvError,
+ },
+
+ #[snafu(display("poll subprocess failed: {}", source))]
+ Subprocess {
+ source: tokio_pty_process_stream::Error,
+ },
+
+ #[snafu(display("failed to switch gid: {}", source))]
+ SwitchGid { source: std::io::Error },
+
+ #[snafu(display("failed to switch uid: {}", source))]
+ SwitchUid { source: std::io::Error },
+
+ #[snafu(display(
+ "failed to spawn a background thread to read terminal input: {}",
+ source
+ ))]
+ TerminalInputReadingThread { source: std::io::Error },
+
+ #[snafu(display(
+ "terminal must be smaller than 1000 rows or columns (got {})",
+ size
+ ))]
+ TermTooBig { size: crate::term::Size },
+
+ #[snafu(display("timeout"))]
+ Timeout,
+
+ #[snafu(display("heartbeat timer failed: {}", source))]
+ TimerHeartbeat { source: tokio::timer::Error },
+
+ #[snafu(display("read timeout timer failed: {}", source))]
+ TimerReadTimeout { source: tokio::timer::Error },
+
+ #[snafu(display("reconnect timer failed: {}", source))]
+ TimerReconnect { source: tokio::timer::Error },
+
+ #[snafu(display("failed to switch to alternate screen: {}", source))]
+ ToAlternateScreen { source: crossterm::ErrorKind },
+
+ #[snafu(display(
+ "failed to put the terminal into raw mode: {}",
+ source
+ ))]
+ ToRawMode { source: crossterm::ErrorKind },
+
+ #[snafu(display("unauthenticated message: {:?}", message))]
+ UnauthenticatedMessage { message: crate::protocol::Message },
+
+ #[snafu(display("unexpected message: {:?}", message))]
+ UnexpectedMessage { message: crate::protocol::Message },
+
+ #[snafu(display("failed to find group with gid {}", gid))]
+ UnknownGid { gid: users::gid_t },
+
+ #[snafu(display("failed to find group with group name {}", name))]
+ UnknownGroup { name: String },
+
+ #[snafu(display("failed to find user with uid {}", uid))]
+ UnknownUid { uid: users::uid_t },
+
+ #[snafu(display("failed to find user with username {}", name))]
+ UnknownUser { name: String },
+
+ #[snafu(display("failed to write to file: {}", source))]
+ WriteFile { source: tokio::io::Error },
+
+ #[snafu(display("failed to write to file: {}", source))]
+ WriteFileSync { source: std::io::Error },
+
+ #[snafu(display("{}", source))]
+ WriteMessageWithTimeout {
+ #[snafu(source(from(tokio::timer::timeout::Error<Error>, Box::new)))]
+ source: Box<tokio::timer::timeout::Error<Error>>,
+ },
+
+ #[snafu(display("failed to write packet: {}", source))]
+ WritePacket { source: tokio::io::Error },
+
+ #[snafu(display("failed to write to socket: {}", source))]
+ WriteSocket { source: tokio::io::Error },
+
+ #[snafu(display("failed to write to stdout: {}", source))]
+ WriteTerminal { source: tokio::io::Error },
+
+ #[snafu(display("failed to write to terminal: {}", source))]
+ WriteTerminalCrossterm { source: crossterm::ErrorKind },
+
+ #[snafu(display("failed to write ttyrec: {}", source))]
+ WriteTtyrec { source: ttyrec::Error },
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
diff --git a/teleterm/src/key_reader.rs b/teleterm/src/key_reader.rs
new file mode 100644
index 0000000..9a35e2d
--- /dev/null
+++ b/teleterm/src/key_reader.rs
@@ -0,0 +1,68 @@
+use crate::prelude::*;
+
+pub struct KeyReader {
+ events: Option<
+ tokio::sync::mpsc::UnboundedReceiver<crossterm::input::InputEvent>,
+ >,
+ quit: Option<tokio::sync::oneshot::Sender<()>>,
+}
+
+impl KeyReader {
+ pub fn new() -> Self {
+ Self {
+ events: None,
+ quit: None,
+ }
+ }
+}
+
+impl futures::stream::Stream for KeyReader {
+ type Item = crossterm::input::InputEvent;
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ if self.events.is_none() {
+ let task = futures::task::current();
+ let reader = crossterm::input::input().read_sync();
+ let (events_tx, events_rx) =
+ tokio::sync::mpsc::unbounded_channel();
+ let mut events_tx = events_tx.wait();
+ let (quit_tx, mut quit_rx) = tokio::sync::oneshot::channel();
+ // TODO: this is pretty janky - it'd be better to build in more
+ // useful support to crossterm directly
+ std::thread::Builder::new()
+ .spawn(move || {
+ for event in reader {
+ // unwrap is unpleasant, but so is figuring out how to
+ // propagate the error back to the main thread
+ events_tx.send(event).unwrap();
+ task.notify();
+ if quit_rx.try_recv().is_ok() {
+ break;
+ }
+ }
+ })
+ .context(crate::error::TerminalInputReadingThread)?;
+
+ self.events = Some(events_rx);
+ self.quit = Some(quit_tx);
+ }
+
+ self.events
+ .as_mut()
+ .unwrap()
+ .poll()
+ .context(crate::error::ReadChannel)
+ }
+}
+
+impl Drop for KeyReader {
+ fn drop(&mut self) {
+ if let Some(quit_tx) = self.quit.take() {
+ // don't care if it fails to send, this can happen if the thread
+ // terminates due to seeing a newline before the keyreader goes
+ // out of scope
+ let _ = quit_tx.send(());
+ }
+ }
+}
diff --git a/teleterm/src/main.rs b/teleterm/src/main.rs
new file mode 100644
index 0000000..2475981
--- /dev/null
+++ b/teleterm/src/main.rs
@@ -0,0 +1,44 @@
+// XXX this is broken with ale
+// #![warn(clippy::cargo)]
+#![warn(clippy::pedantic)]
+#![warn(clippy::nursery)]
+#![allow(clippy::match_same_arms)]
+#![allow(clippy::missing_const_for_fn)]
+#![allow(clippy::multiple_crate_versions)]
+#![allow(clippy::non_ascii_literal)]
+#![allow(clippy::similar_names)]
+#![allow(clippy::single_match)]
+#![allow(clippy::single_match_else)]
+#![allow(clippy::too_many_arguments)]
+#![allow(clippy::too_many_lines)]
+#![allow(clippy::type_complexity)]
+
+const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml");
+
+mod prelude;
+
+mod async_stdin;
+mod client;
+mod cmd;
+mod config;
+mod dirs;
+mod error;
+mod key_reader;
+mod oauth;
+mod protocol;
+mod server;
+mod session_list;
+mod term;
+mod web;
+
+fn main() {
+ dirs::Dirs::new().create_all().unwrap();
+ match crate::cmd::parse().and_then(|m| crate::cmd::run(&m)) {
+ Ok(_) => {}
+ Err(err) => {
+ // we don't know if the log crate has been initialized yet
+ eprintln!("{}", err);
+ std::process::exit(1);
+ }
+ }
+}
diff --git a/teleterm/src/oauth.rs b/teleterm/src/oauth.rs
new file mode 100644
index 0000000..26aecbc
--- /dev/null
+++ b/teleterm/src/oauth.rs
@@ -0,0 +1,181 @@
+use crate::prelude::*;
+use oauth2::TokenResponse as _;
+use std::io::Read as _;
+
+mod recurse_center;
+pub use recurse_center::RecurseCenter;
+
+// this needs to be fixed because we listen for it in a hardcoded place
+pub const REDIRECT_URL: &str = "http://localhost:44141/oauth";
+
+pub trait Oauth {
+ fn client(&self) -> &oauth2::basic::BasicClient;
+ fn user_id(&self) -> &str;
+ fn name(&self) -> &str;
+
+ fn server_token_file(
+ &self,
+ must_exist: bool,
+ ) -> Option<std::path::PathBuf> {
+ let name = format!("server-oauth-{}-{}", self.name(), self.user_id());
+ crate::dirs::Dirs::new().data_file(&name, must_exist)
+ }
+
+ fn generate_authorize_url(&self) -> String {
+ let (auth_url, _) = self
+ .client()
+ .authorize_url(oauth2::CsrfToken::new_random)
+ .url();
+ auth_url.to_string()
+ }
+
+ fn get_access_token_from_auth_code(
+ &self,
+ code: &str,
+ ) -> Box<dyn futures::future::Future<Item = String, Error = Error> + Send>
+ {
+ let token_cache_file = self.server_token_file(false).unwrap();
+ let fut = self
+ .client()
+ .exchange_code(oauth2::AuthorizationCode::new(code.to_string()))
+ .request_async(oauth2::reqwest::async_http_client)
+ .map_err(|e| {
+ let msg = stringify_oauth2_http_error(&e);
+ Error::ExchangeCode { msg }
+ })
+ .and_then(|token| {
+ cache_refresh_token(token_cache_file, &token)
+ .map(move |_| token.access_token().secret().to_string())
+ });
+ Box::new(fut)
+ }
+
+ fn get_access_token_from_refresh_token(
+ &self,
+ token: &str,
+ ) -> Box<dyn futures::future::Future<Item = String, Error = Error> + Send>
+ {
+ let token_cache_file = self.server_token_file(false).unwrap();
+ let fut = self
+ .client()
+ .exchange_refresh_token(&oauth2::RefreshToken::new(
+ token.to_string(),
+ ))
+ .request_async(oauth2::reqwest::async_http_client)
+ .map_err(|e| {
+ let msg = stringify_oauth2_http_error(&e);
+ Error::ExchangeCode { msg }
+ })
+ .and_then(|token| {
+ cache_refresh_token(token_cache_file, &token)
+ .map(move |_| token.access_token().secret().to_string())
+ });
+ Box::new(fut)
+ }
+
+ fn get_username_from_access_token(
+ self: Box<Self>,
+ token: &str,
+ ) -> Box<dyn futures::future::Future<Item = String, Error = Error> + Send>;
+}
+
+pub fn save_client_auth_id(
+ auth: crate::protocol::AuthType,
+ id: &str,
+) -> impl futures::future::Future<Item = (), Error = Error> {
+ let id_file = client_id_file(auth, false).unwrap();
+ let id = id.to_string();
+ tokio::fs::File::create(id_file.clone())
+ .with_context(move || crate::error::CreateFile {
+ filename: id_file.to_string_lossy().to_string(),
+ })
+ .and_then(|file| {
+ tokio::io::write_all(file, id).context(crate::error::WriteFile)
+ })
+ .map(|_| ())
+}
+
+pub fn load_client_auth_id(
+ auth: crate::protocol::AuthType,
+) -> Option<String> {
+ client_id_file(auth, true).and_then(|id_file| {
+ std::fs::File::open(id_file).ok().and_then(|mut file| {
+ let mut id = vec![];
+ file.read_to_end(&mut id).ok().map(|_| {
+ std::string::String::from_utf8_lossy(&id).to_string()
+ })
+ })
+ })
+}
+
+fn client_id_file(
+ auth: crate::protocol::AuthType,
+ must_exist: bool,
+) -> Option<std::path::PathBuf> {
+ let filename = format!("client-oauth-{}", auth.name());
+ crate::dirs::Dirs::new().data_file(&filename, must_exist)
+}
+
+fn cache_refresh_token(
+ token_cache_file: std::path::PathBuf,
+ token: &oauth2::basic::BasicTokenResponse,
+) -> Box<dyn futures::future::Future<Item = (), Error = Error> + Send> {
+ let token_data = format!(
+ "{}\n{}\n",
+ token.refresh_token().unwrap().secret(),
+ token.access_token().secret(),
+ );
+ let fut = tokio::fs::File::create(token_cache_file.clone())
+ .with_context(move || crate::error::CreateFile {
+ filename: token_cache_file.to_string_lossy().to_string(),
+ })
+ .and_then(|file| {
+ tokio::io::write_all(file, token_data)
+ .context(crate::error::WriteFile)
+ })
+ .map(|_| ());
+ Box::new(fut)
+}
+
+#[derive(Debug, Clone)]
+pub struct Config {
+ client_id: String,
+ client_secret: String,
+ auth_url: url::Url,
+ token_url: url::Url,
+ redirect_url: url::Url,
+}
+
+impl Config {
+ fn into_basic_client(self) -> oauth2::basic::BasicClient {
+ oauth2::basic::BasicClient::new(
+ oauth2::ClientId::new(self.client_id),
+ Some(oauth2::ClientSecret::new(self.client_secret)),
+ oauth2::AuthUrl::new(self.auth_url),
+ Some(oauth2::TokenUrl::new(self.token_url)),
+ )
+ .set_redirect_url(oauth2::RedirectUrl::new(self.redirect_url))
+ }
+}
+
+// make this actually give useful information, because the default
+// stringification is pretty useless
+fn stringify_oauth2_http_error(
+ e: &oauth2::RequestTokenError<
+ oauth2::reqwest::Error,
+ oauth2::StandardErrorResponse<oauth2::basic::BasicErrorResponseType>,
+ >,
+) -> String {
+ match e {
+ oauth2::RequestTokenError::ServerResponse(t) => {
+ format!("ServerResponse({})", t)
+ }
+ oauth2::RequestTokenError::Request(re) => format!("Request({})", re),
+ oauth2::RequestTokenError::Parse(se, b) => format!(
+ "Parse({}, {})",
+ se,
+ std::string::String::from_utf8_lossy(b)
+ ),
+ oauth2::RequestTokenError::Other(s) => format!("Other({})", s),
+ }
+}
diff --git a/teleterm/src/oauth/recurse_center.rs b/teleterm/src/oauth/recurse_center.rs
new file mode 100644
index 0000000..2b9f7f7
--- /dev/null
+++ b/teleterm/src/oauth/recurse_center.rs
@@ -0,0 +1,87 @@
+use crate::prelude::*;
+
+pub struct RecurseCenter {
+ client: oauth2::basic::BasicClient,
+ user_id: String,
+}
+
+impl RecurseCenter {
+ pub fn new(config: super::Config, user_id: &str) -> Self {
+ Self {
+ client: config.into_basic_client(),
+ user_id: user_id.to_string(),
+ }
+ }
+
+ pub fn config(client_id: &str, client_secret: &str) -> super::Config {
+ super::Config {
+ client_id: client_id.to_string(),
+ client_secret: client_secret.to_string(),
+ auth_url: url::Url::parse(
+ "https://www.recurse.com/oauth/authorize",
+ )
+ .unwrap(),
+ token_url: url::Url::parse("https://www.recurse.com/oauth/token")
+ .unwrap(),
+ redirect_url: url::Url::parse(super::REDIRECT_URL).unwrap(),
+ }
+ }
+}
+
+impl super::Oauth for RecurseCenter {
+ fn client(&self) -> &oauth2::basic::BasicClient {
+ &self.client
+ }
+
+ fn user_id(&self) -> &str {
+ &self.user_id
+ }
+
+ fn name(&self) -> &str {
+ crate::protocol::AuthType::RecurseCenter.name()
+ }
+
+ fn get_username_from_access_token(
+ self: Box<Self>,
+ token: &str,
+ ) -> Box<dyn futures::future::Future<Item = String, Error = Error> + Send>
+ {
+ let fut = reqwest::r#async::Client::new()
+ .get("https://www.recurse.com/api/v1/profiles/me")
+ .bearer_auth(token)
+ .send()
+ .context(crate::error::GetRecurseCenterProfile)
+ .and_then(|mut res| res.json().context(crate::error::ParseJson))
+ .map(|user: User| user.name());
+ Box::new(fut)
+ }
+}
+
+#[derive(serde::Deserialize)]
+struct User {
+ name: String,
+ stints: Vec<Stint>,
+}
+
+#[derive(serde::Deserialize)]
+struct Stint {
+ batch: Option<Batch>,
+ start_date: String,
+}
+
+#[derive(serde::Deserialize)]
+struct Batch {
+ short_name: String,
+}
+
+impl User {
+ fn name(&self) -> String {
+ let latest_stint =
+ self.stints.iter().max_by_key(|s| &s.start_date).unwrap();
+ if let Some(batch) = &latest_stint.batch {
+ format!("{} ({})", self.name, batch.short_name)
+ } else {
+ self.name.to_string()
+ }
+ }
+}
diff --git a/teleterm/src/prelude.rs b/teleterm/src/prelude.rs
new file mode 100644
index 0000000..1bd29b7
--- /dev/null
+++ b/teleterm/src/prelude.rs
@@ -0,0 +1,9 @@
+pub use futures::future::Future as _;
+pub use futures::sink::Sink as _;
+pub use futures::stream::Stream as _;
+pub use snafu::futures01::stream::StreamExt as _;
+pub use snafu::futures01::FutureExt as _;
+pub use snafu::{OptionExt as _, ResultExt as _};
+
+pub use crate::error::{Error, Result};
+pub use crate::oauth::Oauth as _;
diff --git a/teleterm/src/protocol.rs b/teleterm/src/protocol.rs
new file mode 100644
index 0000000..58fa396
--- /dev/null
+++ b/teleterm/src/protocol.rs
@@ -0,0 +1,947 @@
+use crate::prelude::*;
+use std::convert::{TryFrom as _, TryInto as _};
+
+pub type FramedReadHalf<S> = FramedReader<tokio::io::ReadHalf<S>>;
+pub type FramedWriteHalf<S> = FramedWriter<tokio::io::WriteHalf<S>>;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Session {
+ pub id: String,
+ pub username: String,
+ pub term_type: String,
+ pub size: crate::term::Size,
+ pub idle_time: u32,
+ pub title: String,
+ pub watchers: u32,
+}
+
+pub struct FramedReader<T: tokio::io::AsyncRead>(
+ tokio::codec::FramedRead<
+ T,
+ tokio::codec::length_delimited::LengthDelimitedCodec,
+ >,
+);
+
+impl<T: tokio::io::AsyncRead> FramedReader<T> {
+ pub fn new(rs: T) -> Self {
+ Self(
+ tokio::codec::length_delimited::Builder::new()
+ .length_field_length(4)
+ .new_read(rs),
+ )
+ }
+}
+
+pub struct FramedWriter<T: tokio::io::AsyncWrite>(
+ tokio::codec::FramedWrite<
+ T,
+ tokio::codec::length_delimited::LengthDelimitedCodec,
+ >,
+);
+
+impl<T: tokio::io::AsyncWrite> FramedWriter<T> {
+ pub fn new(ws: T) -> Self {
+ Self(
+ tokio::codec::length_delimited::Builder::new()
+ .length_field_length(4)
+ .new_write(ws),
+ )
+ }
+}
+
+pub const PROTO_VERSION: u8 = 1;
+
+#[repr(u8)]
+#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
+pub enum AuthType {
+ Plain = 0,
+ RecurseCenter,
+}
+
+impl AuthType {
+ pub fn name(self) -> &'static str {
+ match self {
+ Self::Plain => "plain",
+ Self::RecurseCenter => "recurse_center",
+ }
+ }
+
+ pub fn is_oauth(self) -> bool {
+ match self {
+ Self::Plain => false,
+ Self::RecurseCenter => true,
+ }
+ }
+
+ pub fn iter() -> impl Iterator<Item = Self> {
+ (0..=255)
+ .map(Self::try_from)
+ .take_while(std::result::Result::is_ok)
+ .map(std::result::Result::unwrap)
+ }
+}
+
+impl std::convert::TryFrom<u8> for AuthType {
+ type Error = Error;
+
+ fn try_from(n: u8) -> Result<Self> {
+ Ok(match n {
+ 0 => Self::Plain,
+ 1 => Self::RecurseCenter,
+ _ => return Err(Error::InvalidAuthType { ty: n }),
+ })
+ }
+}
+
+impl std::convert::TryFrom<&str> for AuthType {
+ type Error = Error;
+
+ fn try_from(s: &str) -> Result<Self> {
+ Ok(match s {
+ s if Self::Plain.name() == s => Self::Plain,
+ s if Self::RecurseCenter.name() == s => Self::RecurseCenter,
+ _ => return Err(Error::InvalidAuthTypeStr { ty: s.to_string() }),
+ })
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Auth {
+ Plain { username: String },
+ RecurseCenter { id: Option<String> },
+}
+
+impl Auth {
+ pub fn plain(username: &str) -> Self {
+ Self::Plain {
+ username: username.to_string(),
+ }
+ }
+
+ pub fn recurse_center(id: Option<&str>) -> Self {
+ Self::RecurseCenter {
+ id: id.map(std::string::ToString::to_string),
+ }
+ }
+
+ pub fn is_oauth(&self) -> bool {
+ self.auth_type().is_oauth()
+ }
+
+ pub fn name(&self) -> &'static str {
+ self.auth_type().name()
+ }
+
+ pub fn auth_type(&self) -> AuthType {
+ match self {
+ Self::Plain { .. } => AuthType::Plain,
+ Self::RecurseCenter { .. } => AuthType::RecurseCenter,
+ }
+ }
+}
+
+#[repr(u8)]
+#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
+pub enum MessageType {
+ Login = 0,
+ StartStreaming,
+ StartWatching,
+ Heartbeat,
+ TerminalOutput,
+ ListSessions,
+ Sessions,
+ Disconnected,
+ Error,
+ Resize,
+ LoggedIn,
+ OauthRequest,
+ OauthResponse,
+}
+
+impl std::convert::TryFrom<u8> for MessageType {
+ type Error = Error;
+
+ fn try_from(n: u8) -> Result<Self> {
+ Ok(match n {
+ 0 => Self::Login,
+ 1 => Self::StartStreaming,
+ 2 => Self::StartWatching,
+ 3 => Self::Heartbeat,
+ 4 => Self::TerminalOutput,
+ 5 => Self::ListSessions,
+ 6 => Self::Sessions,
+ 7 => Self::Disconnected,
+ 8 => Self::Error,
+ 9 => Self::Resize,
+ 10 => Self::LoggedIn,
+ 11 => Self::OauthRequest,
+ 12 => Self::OauthResponse,
+ _ => return Err(Error::InvalidMessageType { ty: n }),
+ })
+ }
+}
+
+// XXX https://github.com/rust-lang/rust/issues/64362
+#[allow(dead_code)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Message {
+ Login {
+ proto_version: u8,
+ auth: Auth,
+ term_type: String,
+ size: crate::term::Size,
+ },
+ StartStreaming,
+ StartWatching {
+ id: String,
+ },
+ Heartbeat,
+ TerminalOutput {
+ data: Vec<u8>,
+ },
+ ListSessions,
+ Sessions {
+ sessions: Vec<Session>,
+ },
+ Disconnected,
+ Error {
+ msg: String,
+ },
+ Resize {
+ size: crate::term::Size,
+ },
+ LoggedIn {
+ username: String,
+ },
+ OauthRequest {
+ url: String,
+ id: String,
+ },
+ OauthResponse {
+ code: String,
+ },
+}
+
+impl Message {
+ pub fn login(
+ auth: &Auth,
+ term_type: &str,
+ size: crate::term::Size,
+ ) -> Self {
+ Self::Login {
+ proto_version: PROTO_VERSION,
+ auth: auth.clone(),
+ term_type: term_type.to_string(),
+ size,
+ }
+ }
+
+ pub fn start_streaming() -> Self {
+ Self::StartStreaming
+ }
+
+ pub fn start_watching(id: &str) -> Self {
+ Self::StartWatching { id: id.to_string() }
+ }
+
+ pub fn heartbeat() -> Self {
+ Self::Heartbeat
+ }
+
+ pub fn terminal_output(data: &[u8]) -> Self {
+ Self::TerminalOutput {
+ data: data.to_vec(),
+ }
+ }
+
+ pub fn list_sessions() -> Self {
+ Self::ListSessions
+ }
+
+ pub fn sessions(sessions: &[Session]) -> Self {
+ Self::Sessions {
+ sessions: sessions.to_vec(),
+ }
+ }
+
+ pub fn disconnected() -> Self {
+ Self::Disconnected
+ }
+
+ pub fn error(msg: &str) -> Self {
+ Self::Error {
+ msg: msg.to_string(),
+ }
+ }
+
+ pub fn resize(size: crate::term::Size) -> Self {
+ Self::Resize { size }
+ }
+
+ pub fn logged_in(username: &str) -> Self {
+ Self::LoggedIn {
+ username: username.to_string(),
+ }
+ }
+
+ pub fn oauth_request(url: &str, id: &str) -> Self {
+ Self::OauthRequest {
+ url: url.to_string(),
+ id: id.to_string(),
+ }
+ }
+
+ pub fn oauth_response(code: &str) -> Self {
+ Self::OauthResponse {
+ code: code.to_string(),
+ }
+ }
+
+ pub fn message_type(&self) -> MessageType {
+ match self {
+ Self::Login { .. } => MessageType::Login,
+ Self::StartStreaming { .. } => MessageType::StartStreaming,
+ Self::StartWatching { .. } => MessageType::StartWatching,
+ Self::Heartbeat { .. } => MessageType::Heartbeat,
+ Self::TerminalOutput { .. } => MessageType::TerminalOutput,
+ Self::ListSessions { .. } => MessageType::ListSessions,
+ Self::Sessions { .. } => MessageType::Sessions,
+ Self::Disconnected { .. } => MessageType::Disconnected,
+ Self::Error { .. } => MessageType::Error,
+ Self::Resize { .. } => MessageType::Resize,
+ Self::LoggedIn { .. } => MessageType::LoggedIn,
+ Self::OauthRequest { .. } => MessageType::OauthRequest,
+ Self::OauthResponse { .. } => MessageType::OauthResponse,
+ }
+ }
+
+ #[allow(dead_code)]
+ pub fn read<R: std::io::Read>(r: R) -> Result<Self> {
+ Packet::read(r).and_then(Self::try_from)
+ }
+
+ pub fn read_async<T: tokio::io::AsyncRead>(
+ r: FramedReader<T>,
+ ) -> impl futures::future::Future<Item = (Self, FramedReader<T>), Error = Error>
+ {
+ Packet::read_async(r).and_then(|(packet, r)| {
+ Self::try_from(packet).map(|msg| (msg, r))
+ })
+ }
+
+ #[allow(dead_code)]
+ pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> {
+ Packet::from(self).write(w)
+ }
+
+ pub fn write_async<T: tokio::io::AsyncWrite>(
+ &self,
+ w: FramedWriter<T>,
+ ) -> impl futures::future::Future<Item = FramedWriter<T>, Error = Error>
+ {
+ Packet::from(self).write_async(w)
+ }
+
+ // it'd be nice if i could just override the Debug implementation for
+ // specific enum variants, but writing the whole impl Debug by hand just
+ // to make this one change would be super obnoxious
+ pub fn format_log(&self) -> String {
+ match self {
+ Self::TerminalOutput { data } => {
+ format!("TerminalOutput {{ data: ({} bytes) }}", data.len())
+ }
+
+ // these are security-sensitive, keep them out of logs
+ Self::OauthRequest { .. } => "OauthRequest {{ .. }}".to_string(),
+ Self::OauthResponse { .. } => {
+ "OauthResponse {{ .. }}".to_string()
+ }
+
+ _ => format!("{:?}", self),
+ }
+ }
+}
+
+struct Packet {
+ ty: u8,
+ data: Vec<u8>,
+}
+
+impl Packet {
+ fn read<R: std::io::Read>(mut r: R) -> Result<Self> {
+ let mut len_buf = [0_u8; std::mem::size_of::<u32>()];
+ r.read_exact(&mut len_buf)
+ .context(crate::error::ReadPacket)?;
+ let len = u32::from_be_bytes(len_buf.try_into().unwrap());
+ if (len as usize) < std::mem::size_of::<u8>() {
+ return Err(Error::LenTooSmall {
+ len,
+ expected: std::mem::size_of::<u8>(),
+ });
+ }
+
+ let mut data = vec![0_u8; len as usize];
+ r.read_exact(&mut data).context(crate::error::ReadPacket)?;
+ let (ty_buf, rest) = data.split_at(std::mem::size_of::<u8>());
+ let ty = u8::from_be_bytes(ty_buf.try_into().unwrap());
+
+ Ok(Self {
+ ty,
+ data: rest.to_vec(),
+ })
+ }
+
+ fn read_async<T: tokio::io::AsyncRead>(
+ r: FramedReader<T>,
+ ) -> impl futures::future::Future<Item = (Self, FramedReader<T>), Error = Error>
+ {
+ r.0.into_future()
+ .map_err(|(e, _)| Error::ReadPacket { source: e })
+ .and_then(|(data, r)| match data {
+ Some(data) => Ok((data, r)),
+ None => Err(Error::EOF),
+ })
+ .and_then(|(buf, r)| {
+ if buf.len() < std::mem::size_of::<u8>() {
+ return Err(Error::LenTooSmall {
+ len: buf.len().try_into().unwrap(),
+ expected: std::mem::size_of::<u8>(),
+ });
+ }
+ let (ty_buf, data_buf) =
+ buf.split_at(std::mem::size_of::<u8>());
+ let ty = u8::from_be_bytes(ty_buf.try_into().unwrap());
+ let data = data_buf.to_vec();
+ Ok((Self { ty, data }, FramedReader(r)))
+ })
+ }
+
+ fn write<W: std::io::Write>(&self, mut w: W) -> Result<()> {
+ let bytes = self.as_bytes();
+ let len: u32 = bytes.len().try_into().unwrap();
+ let len_buf = len.to_be_bytes();
+ let buf: Vec<u8> =
+ len_buf.iter().chain(bytes.iter()).copied().collect();
+ Ok(w.write_all(&buf).context(crate::error::WritePacket)?)
+ }
+
+ fn write_async<T: tokio::io::AsyncWrite>(
+ &self,
+ w: FramedWriter<T>,
+ ) -> impl futures::future::Future<Item = FramedWriter<T>, Error = Error>
+ {
+ w.0.send(bytes::Bytes::from(self.as_bytes()))
+ .map(FramedWriter)
+ .context(crate::error::WritePacket)
+ }
+
+ fn as_bytes(&self) -> Vec<u8> {
+ self.ty
+ .to_be_bytes()
+ .iter()
+ .chain(self.data.iter())
+ .cloned()
+ .collect()
+ }
+}
+
+impl From<&Message> for Packet {
+ fn from(msg: &Message) -> Self {
+ fn u32_from_usize(n: usize) -> u32 {
+ n.try_into().unwrap()
+ }
+ fn write_u32(val: u32, data: &mut Vec<u8>) {
+ data.extend_from_slice(&val.to_be_bytes());
+ }
+ fn write_u16(val: u16, data: &mut Vec<u8>) {
+ data.extend_from_slice(&val.to_be_bytes());
+ }
+ fn write_u8(val: u8, data: &mut Vec<u8>) {
+ data.extend_from_slice(&val.to_be_bytes());
+ }
+ fn write_bytes(val: &[u8], data: &mut Vec<u8>) {
+ write_u32(u32_from_usize(val.len()), data);
+ data.extend_from_slice(val);
+ }
+ fn write_str(val: &str, data: &mut Vec<u8>) {
+ write_bytes(val.as_bytes(), data);
+ }
+ fn write_size(val: crate::term::Size, data: &mut Vec<u8>) {
+ write_u16(val.rows, data);
+ write_u16(val.cols, data);
+ }
+ fn write_session(val: &Session, data: &mut Vec<u8>) {
+ write_str(&val.id, data);
+ write_str(&val.username, data);
+ write_str(&val.term_type, data);
+ write_size(val.size, data);
+ write_u32(val.idle_time, data);
+ write_str(&val.title, data);
+ write_u32(val.watchers, data);
+ }
+ fn write_sessions(val: &[Session], data: &mut Vec<u8>) {
+ write_u32(u32_from_usize(val.len()), data);
+ for s in val {
+ write_session(s, data);
+ }
+ }
+ fn write_auth(val: &Auth, data: &mut Vec<u8>) {
+ write_u8(val.auth_type() as u8, data);
+ match val {
+ Auth::Plain { username } => {
+ write_str(username, data);
+ }
+ Auth::RecurseCenter { id } => {
+ let id = id.as_ref().map_or("", |s| s.as_str());
+ write_str(id, data);
+ }
+ }
+ }
+
+ let ty = msg.message_type() as u8;
+ let mut data = vec![];
+
+ match msg {
+ Message::Login {
+ proto_version,
+ auth,
+ term_type,
+ size,
+ } => {
+ write_u8(*proto_version, &mut data);
+ write_auth(auth, &mut data);
+ write_str(term_type, &mut data);
+ write_size(*size, &mut data);
+ }
+ Message::StartStreaming => {}
+ Message::StartWatching { id } => {
+ write_str(id, &mut data);
+ }
+ Message::Heartbeat => {}
+ Message::TerminalOutput { data: output } => {
+ write_bytes(output, &mut data);
+ }
+ Message::ListSessions => {}
+ Message::Sessions { sessions } => {
+ write_sessions(sessions, &mut data);
+ }
+ Message::Disconnected => {}
+ Message::Error { msg } => {
+ write_str(msg, &mut data);
+ }
+ Message::Resize { size } => {
+ write_size(*size, &mut data);
+ }
+ Message::LoggedIn { username } => {
+ write_str(username, &mut data);
+ }
+ Message::OauthRequest { url, id } => {
+ write_str(url, &mut data);
+ write_str(id, &mut data);
+ }
+ Message::OauthResponse { code } => {
+ write_str(code, &mut data);
+ }
+ }
+
+ Self { ty, data }
+ }
+}
+
+impl std::convert::TryFrom<Packet> for Message {
+ type Error = Error;
+
+ fn try_from(packet: Packet) -> Result<Self> {
+ fn read_u32(data: &[u8]) -> Result<(u32, &[u8])> {
+ if std::mem::size_of::<u32>() > data.len() {
+ return Err(Error::LenTooBig {
+ len: std::mem::size_of::<u32>().try_into().unwrap(),
+ expected: data.len(),
+ });
+ }
+ let (buf, rest) = data.split_at(std::mem::size_of::<u32>());
+ let val = u32::from_be_bytes(
+ buf.try_into().context(crate::error::ParseInt { buf })?,
+ );
+ Ok((val, rest))
+ }
+ fn read_u16(data: &[u8]) -> Result<(u16, &[u8])> {
+ if std::mem::size_of::<u16>() > data.len() {
+ return Err(Error::LenTooBig {
+ len: std::mem::size_of::<u16>().try_into().unwrap(),
+ expected: data.len(),
+ });
+ }
+ let (buf, rest) = data.split_at(std::mem::size_of::<u16>());
+ let val = u16::from_be_bytes(
+ buf.try_into().context(crate::error::ParseInt { buf })?,
+ );
+ Ok((val, rest))
+ }
+ fn read_u8(data: &[u8]) -> Result<(u8, &[u8])> {
+ if std::mem::size_of::<u8>() > data.len() {
+ return Err(Error::LenTooBig {
+ len: std::mem::size_of::<u8>().try_into().unwrap(),
+ expected: data.len(),
+ });
+ }
+ let (buf, rest) = data.split_at(std::mem::size_of::<u8>());
+ let val = u8::from_be_bytes(
+ buf.try_into().context(crate::error::ParseInt { buf })?,
+ );
+ Ok((val, rest))
+ }
+ fn read_bytes(data: &[u8]) -> Result<(Vec<u8>, &[u8])> {
+ let (len, data) = read_u32(data)?;
+ if len as usize > data.len() {
+ return Err(Error::LenTooBig {
+ len,
+ expected: data.len(),
+ });
+ }
+ let (buf, rest) = data.split_at(len as usize);
+ let val = buf.to_vec();
+ Ok((val, rest))
+ }
+ fn read_str(data: &[u8]) -> Result<(String, &[u8])> {
+ let (bytes, rest) = read_bytes(data)?;
+ let val =
+ String::from_utf8(bytes).map_err(|e| Error::ParseString {
+ string: e.as_bytes().to_vec(),
+ source: e,
+ })?;
+ Ok((val, rest))
+ }
+ fn read_size(data: &[u8]) -> Result<(crate::term::Size, &[u8])> {
+ let (rows, data) = read_u16(data)?;
+ let (cols, data) = read_u16(data)?;
+ Ok((crate::term::Size { rows, cols }, data))
+ }
+ fn read_session(data: &[u8]) -> Result<(Session, &[u8])> {
+ let (id, data) = read_str(data)?;
+ let (username, data) = read_str(data)?;
+ let (term_type, data) = read_str(data)?;
+ let (size, data) = read_size(data)?;
+ let (idle_time, data) = read_u32(data)?;
+ let (title, data) = read_str(data)?;
+ let (watchers, data) = read_u32(data)?;
+ Ok((
+ Session {
+ id,
+ username,
+ term_type,
+ size,
+ idle_time,
+ title,
+ watchers,
+ },
+ data,
+ ))
+ }
+ fn read_sessions(data: &[u8]) -> Result<(Vec<Session>, &[u8])> {
+ let mut val = vec![];
+ let (len, mut data) = read_u32(data)?;
+ for _ in 0..len {
+ let (subval, subdata) = read_session(data)?;
+ val.push(subval);
+ data = subdata;
+ }
+ Ok((val, data))
+ }
+ fn read_auth(data: &[u8]) -> Result<(Auth, &[u8])> {
+ let (ty, data) = read_u8(data)?;
+ let ty = AuthType::try_from(ty)?;
+ let (auth, data) = match ty {
+ AuthType::Plain => {
+ let (username, data) = read_str(data)?;
+ let auth = Auth::Plain { username };
+ (auth, data)
+ }
+ AuthType::RecurseCenter => {
+ let (id, data) = read_str(data)?;
+ let id = if id == "" { None } else { Some(id) };
+ let auth = Auth::RecurseCenter { id };
+ (auth, data)
+ }
+ };
+ Ok((auth, data))
+ }
+
+ let ty = MessageType::try_from(packet.ty)?;
+ let data: &[u8] = packet.data.as_ref();
+ let (msg, rest) = match ty {
+ MessageType::Login => {
+ let (proto_version, data) = read_u8(data)?;
+ let (auth, data) = read_auth(data)?;
+ let (term_type, data) = read_str(data)?;
+ let (size, data) = read_size(data)?;
+
+ (
+ Self::Login {
+ proto_version,
+ auth,
+ term_type,
+ size,
+ },
+ data,
+ )
+ }
+ MessageType::StartStreaming => (Self::StartStreaming, data),
+ MessageType::StartWatching => {
+ let (id, data) = read_str(data)?;
+
+ (Self::StartWatching { id }, data)
+ }
+ MessageType::Heartbeat => (Self::Heartbeat, data),
+ MessageType::TerminalOutput => {
+ let (output, data) = read_bytes(data)?;
+
+ (Self::TerminalOutput { data: output }, data)
+ }
+ MessageType::ListSessions => (Self::ListSessions, data),
+ MessageType::Sessions => {
+ let (sessions, data) = read_sessions(data)?;
+
+ (Self::Sessions { sessions }, data)
+ }
+ MessageType::Disconnected => (Self::Disconnected, data),
+ MessageType::Error => {
+ let (msg, data) = read_str(data)?;
+
+ (Self::Error { msg }, data)
+ }
+ MessageType::Resize => {
+ let (size, data) = read_size(data)?;
+
+ (Self::Resize { size }, data)
+ }
+ MessageType::LoggedIn => {
+ let (username, data) = read_str(data)?;
+
+ (Self::LoggedIn { username }, data)
+ }
+ MessageType::OauthRequest => {
+ let (url, data) = read_str(data)?;
+ let (id, data) = read_str(data)?;
+
+ (Self::OauthRequest { url, id }, data)
+ }
+ MessageType::OauthResponse => {
+ let (code, data) = read_str(data)?;
+
+ (Self::OauthResponse { code }, data)
+ }
+ };
+
+ if !rest.is_empty() {
+ return Err(Error::ExtraMessageData {
+ data: rest.to_vec(),
+ });
+ }
+
+ Ok(msg)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_serialize_deserialize() {
+ for msg in valid_messages() {
+ let packet = Packet::from(&msg);
+ let msg2 = Message::try_from(packet).unwrap();
+ assert_eq!(msg, msg2);
+ }
+ }
+
+ #[test]
+ fn test_read_write() {
+ for msg in valid_messages() {
+ let mut buf = vec![];
+ msg.write(&mut buf).unwrap();
+ let msg2 = Message::read(buf.as_slice()).unwrap();
+ assert_eq!(msg, msg2);
+ }
+ }
+
+ #[test]
+ fn test_read_write_async() {
+ for msg in valid_messages() {
+ let (wres, rres) = tokio::sync::mpsc::channel(1);
+ let wres2 = wres.clone();
+ let buf = std::io::Cursor::new(vec![]);
+ let fut = msg
+ .write_async(FramedWriter::new(buf))
+ .and_then(|w| {
+ let mut buf = w.0.into_inner();
+ buf.set_position(0);
+ Message::read_async(FramedReader::new(buf))
+ })
+ .and_then(move |(msg2, _)| {
+ wres.wait().send(Ok(msg2)).unwrap();
+ futures::future::ok(())
+ })
+ .map_err(|e| {
+ wres2.wait().send(Err(e)).unwrap();
+ });
+ tokio::run(fut);
+ let msg2 = rres.wait().next();
+ let msg2 = msg2.unwrap();
+ let msg2 = msg2.unwrap();
+ let msg2 = msg2.unwrap();
+ assert_eq!(msg, msg2);
+ }
+ }
+
+ #[test]
+ fn test_invalid_sync() {
+ for buf in invalid_messages() {
+ let res = Message::read(buf.as_slice());
+ assert!(res.is_err())
+ }
+ }
+
+ #[test]
+ fn test_invalid_async() {
+ for buf in invalid_messages() {
+ let (wres, rres) = tokio::sync::mpsc::channel(1);
+ let wres2 = wres.clone();
+ let buf = std::io::Cursor::new(buf);
+ let fut = Message::read_async(FramedReader::new(buf))
+ .and_then(move |(msg2, _)| {
+ wres.wait().send(Ok(msg2)).unwrap();
+ futures::future::ok(())
+ })
+ .map_err(|e| {
+ wres2.wait().send(Err(e)).unwrap();
+ });
+ tokio::run(fut);
+ let res = rres.wait().next();
+ let res = res.unwrap();
+ let res = res.unwrap();
+ assert!(res.is_err());
+ }
+ }
+
+ #[test]
+ fn test_auth_values() {
+ let mut set = std::collections::HashSet::new();
+ let mut seen_err = false;
+ for i in 0..=255 {
+ if seen_err {
+ assert!(AuthType::try_from(i).is_err());
+ } else {
+ match AuthType::try_from(i) {
+ Ok(ty) => {
+ assert!(!set.contains(&ty));
+ set.insert(ty);
+ }
+ Err(_) => {
+ seen_err = true;
+ }
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn test_message_values() {
+ let mut set = std::collections::HashSet::new();
+ let mut seen_err = false;
+ for i in 0..=255 {
+ if seen_err {
+ assert!(MessageType::try_from(i).is_err());
+ } else {
+ match MessageType::try_from(i) {
+ Ok(ty) => {
+ assert!(!set.contains(&ty));
+ set.insert(ty);
+ }
+ Err(_) => {
+ seen_err = true;
+ }
+ }
+ }
+ }
+ }
+
+ fn valid_messages() -> Vec<Message> {
+ vec![
+ Message::login(
+ &Auth::Plain {
+ username: "doy".to_string(),
+ },
+ "screen",
+ crate::term::Size { rows: 24, cols: 80 },
+ ),
+ Message::login(
+ &Auth::RecurseCenter {
+ id: Some("some-random-id".to_string()),
+ },
+ "screen",
+ crate::term::Size { rows: 24, cols: 80 },
+ ),
+ Message::login(
+ &Auth::RecurseCenter { id: None },
+ "screen",
+ crate::term::Size { rows: 24, cols: 80 },
+ ),
+ Message::start_streaming(),
+ Message::start_watching("some-session-id"),
+ Message::heartbeat(),
+ Message::terminal_output(b"foobar"),
+ Message::terminal_output(b""),
+ Message::list_sessions(),
+ Message::sessions(&[]),
+ Message::sessions(&[Session {
+ id: "some-session-id".to_string(),
+ username: "doy".to_string(),
+ term_type: "screen".to_string(),
+ size: crate::term::Size { rows: 24, cols: 80 },
+ idle_time: 123,
+ title: "it's my terminal title".to_string(),
+ watchers: 0,
+ }]),
+ Message::sessions(&[
+ Session {
+ id: "some-session-id".to_string(),
+ username: "doy".to_string(),
+ term_type: "screen".to_string(),
+ size: crate::term::Size { rows: 24, cols: 80 },
+ idle_time: 123,
+ title: "it's my terminal title".to_string(),
+ watchers: 0,
+ },
+ Session {
+ id: "some-other-session-id".to_string(),
+ username: "sartak".to_string(),
+ term_type: "screen".to_string(),
+ size: crate::term::Size { rows: 24, cols: 80 },
+ idle_time: 68,
+ title: "some other terminal title".to_string(),
+ watchers: 0,
+ },
+ ]),
+ Message::disconnected(),
+ Message::error("error message"),
+ Message::resize(crate::term::Size { rows: 25, cols: 81 }),
+ Message::logged_in("doy"),
+ ]
+ }
+
+ fn invalid_messages() -> Vec<Vec<u8>> {
+ vec![
+ b"".to_vec(),
+ b"\x04".to_vec(),
+ b"\x00\x00\x00\x00".to_vec(),
+ b"\x00\x00\x00\x01\x00".to_vec(),
+ b"\x00\x00\x00\x01\xff".to_vec(),
+ b"\x00\x00\x00\x00\x01".to_vec(),
+ b"\x00\x00\x00\x02\x01".to_vec(),
+ b"\xee\xee\xee\xee\x01".to_vec(),
+ b"\x00\x00\x00\x06\x08\x00\x00\x00\x01\xff".to_vec(),
+ ]
+ }
+}
diff --git a/teleterm/src/server.rs b/teleterm/src/server.rs
new file mode 100644
index 0000000..d18fa95
--- /dev/null
+++ b/teleterm/src/server.rs
@@ -0,0 +1,1073 @@
+use crate::prelude::*;
+use tokio::util::FutureExt as _;
+
+pub mod tls;
+
+enum ReadSocket<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ Connected(crate::protocol::FramedReadHalf<S>),
+ Reading(
+ Box<
+ dyn futures::future::Future<
+ Item = (
+ crate::protocol::Message,
+ crate::protocol::FramedReadHalf<S>,
+ ),
+ Error = Error,
+ > + Send,
+ >,
+ ),
+ Processing(
+ crate::protocol::FramedReadHalf<S>,
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ ),
+}
+
+enum WriteSocket<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ Connected(crate::protocol::FramedWriteHalf<S>),
+ Writing(
+ Box<
+ dyn futures::future::Future<
+ Item = crate::protocol::FramedWriteHalf<S>,
+ Error = Error,
+ > + Send,
+ >,
+ ),
+}
+
+#[derive(Debug, Clone)]
+struct TerminalInfo {
+ term: String,
+ size: crate::term::Size,
+}
+
+#[allow(clippy::large_enum_variant)]
+// XXX https://github.com/rust-lang/rust/issues/64362
+#[allow(dead_code)]
+enum ConnectionState {
+ Accepted,
+ LoggingIn {
+ term_info: TerminalInfo,
+ },
+ LoggedIn {
+ username: String,
+ term_info: TerminalInfo,
+ },
+ Streaming {
+ username: String,
+ term_info: TerminalInfo,
+ term: vt100::Parser,
+ },
+ Watching {
+ username: String,
+ term_info: TerminalInfo,
+ watch_id: String,
+ },
+}
+
+impl ConnectionState {
+ fn new() -> Self {
+ Self::Accepted
+ }
+
+ fn username(&self) -> Option<&str> {
+ match self {
+ Self::Accepted => None,
+ Self::LoggingIn { .. } => None,
+ Self::LoggedIn { username, .. } => Some(username),
+ Self::Streaming { username, .. } => Some(username),
+ Self::Watching { username, .. } => Some(username),
+ }
+ }
+
+ fn term_info(&mut self) -> Option<&TerminalInfo> {
+ match self {
+ Self::Accepted => None,
+ Self::LoggingIn { term_info, .. } => Some(term_info),
+ Self::LoggedIn { term_info, .. } => Some(term_info),
+ Self::Streaming { term_info, .. } => Some(term_info),
+ Self::Watching { term_info, .. } => Some(term_info),
+ }
+ }
+
+ fn term_info_mut(&mut self) -> Option<&mut TerminalInfo> {
+ match self {
+ Self::Accepted => None,
+ Self::LoggingIn { term_info, .. } => Some(term_info),
+ Self::LoggedIn { term_info, .. } => Some(term_info),
+ Self::Streaming { term_info, .. } => Some(term_info),
+ Self::Watching { term_info, .. } => Some(term_info),
+ }
+ }
+
+ fn term(&self) -> Option<&vt100::Parser> {
+ match self {
+ Self::Accepted => None,
+ Self::LoggingIn { .. } => None,
+ Self::LoggedIn { .. } => None,
+ Self::Streaming { term, .. } => Some(term),
+ Self::Watching { .. } => None,
+ }
+ }
+
+ fn term_mut(&mut self) -> Option<&mut vt100::Parser> {
+ match self {
+ Self::Accepted => None,
+ Self::LoggingIn { .. } => None,
+ Self::LoggedIn { .. } => None,
+ Self::Streaming { term, .. } => Some(term),
+ Self::Watching { .. } => None,
+ }
+ }
+
+ fn watch_id(&self) -> Option<&str> {
+ match self {
+ Self::Accepted => None,
+ Self::LoggingIn { .. } => None,
+ Self::LoggedIn { .. } => None,
+ Self::Streaming { .. } => None,
+ Self::Watching { watch_id, .. } => Some(watch_id),
+ }
+ }
+
+ fn login_plain(
+ &mut self,
+ username: &str,
+ term_type: &str,
+ size: crate::term::Size,
+ ) {
+ if let Self::Accepted = self {
+ *self = Self::LoggedIn {
+ username: username.to_string(),
+ term_info: TerminalInfo {
+ term: term_type.to_string(),
+ size,
+ },
+ };
+ } else {
+ unreachable!()
+ }
+ }
+
+ fn login_oauth_start(
+ &mut self,
+ term_type: &str,
+ size: crate::term::Size,
+ ) {
+ if let Self::Accepted = self {
+ *self = Self::LoggingIn {
+ term_info: TerminalInfo {
+ term: term_type.to_string(),
+ size,
+ },
+ };
+ } else {
+ unreachable!()
+ }
+ }
+
+ fn stream(&mut self) {
+ if let Self::LoggedIn {
+ username,
+ term_info,
+ } = std::mem::replace(self, Self::Accepted)
+ {
+ *self = Self::Streaming {
+ username,
+ term_info,
+ term: vt100::Parser::default(),
+ };
+ } else {
+ unreachable!()
+ }
+ }
+
+ fn watch(&mut self, id: &str) {
+ if let Self::LoggedIn {
+ username,
+ term_info,
+ } = std::mem::replace(self, Self::Accepted)
+ {
+ *self = Self::Watching {
+ username,
+ term_info,
+ watch_id: id.to_string(),
+ };
+ } else {
+ unreachable!()
+ }
+ }
+}
+
+struct Connection<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ id: String,
+ rsock: Option<ReadSocket<S>>,
+ wsock: Option<WriteSocket<S>>,
+ to_send: std::collections::VecDeque<crate::protocol::Message>,
+ closed: bool,
+ state: ConnectionState,
+ last_activity: std::time::Instant,
+ oauth_client: Option<Box<dyn crate::oauth::Oauth + Send>>,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Connection<S>
+{
+ fn new(s: S) -> Self {
+ let (rs, ws) = s.split();
+ let id = format!("{}", uuid::Uuid::new_v4());
+ log::info!("{}: new connection", id);
+
+ Self {
+ id,
+ rsock: Some(ReadSocket::Connected(
+ crate::protocol::FramedReader::new(rs),
+ )),
+ wsock: Some(WriteSocket::Connected(
+ crate::protocol::FramedWriter::new(ws),
+ )),
+ to_send: std::collections::VecDeque::new(),
+ closed: false,
+ state: ConnectionState::new(),
+ last_activity: std::time::Instant::now(),
+ oauth_client: None,
+ }
+ }
+
+ fn session(&self, watchers: u32) -> Option<crate::protocol::Session> {
+ let (username, term_info) = match &self.state {
+ ConnectionState::Accepted => return None,
+ ConnectionState::LoggingIn { .. } => return None,
+ ConnectionState::LoggedIn {
+ username,
+ term_info,
+ } => (username, term_info),
+ ConnectionState::Streaming {
+ username,
+ term_info,
+ ..
+ } => (username, term_info),
+ ConnectionState::Watching {
+ username,
+ term_info,
+ ..
+ } => (username, term_info),
+ };
+ let title = self
+ .state
+ .term()
+ .map_or("", |parser| parser.screen().title());
+
+ // i don't really care if things break for a connection that has been
+ // idle for 136 years
+ #[allow(clippy::cast_possible_truncation)]
+ Some(crate::protocol::Session {
+ id: self.id.clone(),
+ username: username.clone(),
+ term_type: term_info.term.clone(),
+ size: term_info.size,
+ idle_time: std::time::Instant::now()
+ .duration_since(self.last_activity)
+ .as_secs() as u32,
+ title: title.to_string(),
+ watchers,
+ })
+ }
+
+ fn send_message(&mut self, message: crate::protocol::Message) {
+ self.to_send.push_back(message);
+ }
+
+ fn close(&mut self, res: Result<()>) {
+ let msg = match res {
+ Ok(()) => crate::protocol::Message::disconnected(),
+ Err(e) => crate::protocol::Message::error(&format!("{}", e)),
+ };
+ self.send_message(msg);
+ self.closed = true;
+ }
+}
+
+pub struct Server<
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static,
+> {
+ read_timeout: std::time::Duration,
+ acceptor:
+ Box<dyn futures::stream::Stream<Item = S, Error = Error> + Send>,
+ connections: std::collections::HashMap<String, Connection<S>>,
+ rate_limiter: ratelimit_meter::KeyedRateLimiter<Option<String>>,
+ allowed_auth_types: std::collections::HashSet<crate::protocol::AuthType>,
+ oauth_configs: std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Server<S>
+{
+ pub fn new(
+ acceptor: Box<
+ dyn futures::stream::Stream<Item = S, Error = Error> + Send,
+ >,
+ read_timeout: std::time::Duration,
+ allowed_auth_types: std::collections::HashSet<
+ crate::protocol::AuthType,
+ >,
+ oauth_configs: std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+ ) -> Self {
+ Self {
+ read_timeout,
+ acceptor,
+ connections: std::collections::HashMap::new(),
+ rate_limiter: ratelimit_meter::KeyedRateLimiter::new(
+ std::num::NonZeroU32::new(300).unwrap(),
+ std::time::Duration::from_secs(60),
+ ),
+ allowed_auth_types,
+ oauth_configs,
+ }
+ }
+
+ fn handle_message_login(
+ &mut self,
+ conn: &mut Connection<S>,
+ auth: &crate::protocol::Auth,
+ term_type: &str,
+ size: crate::term::Size,
+ ) -> Result<
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ > {
+ if size.rows >= 1000 || size.cols >= 1000 {
+ return Err(Error::TermTooBig { size });
+ }
+
+ let ty = auth.auth_type();
+ if !self.allowed_auth_types.contains(&ty) {
+ return Err(Error::AuthTypeNotAllowed { ty });
+ }
+
+ match &auth {
+ crate::protocol::Auth::Plain { username } => {
+ log::info!(
+ "{}: login({}, {})",
+ auth.name(),
+ conn.id,
+ username
+ );
+ conn.state.login_plain(username, term_type, size);
+ conn.send_message(crate::protocol::Message::logged_in(
+ username,
+ ));
+ }
+ oauth if oauth.is_oauth() => {
+ let config = self.oauth_configs.get(&ty).context(
+ crate::error::AuthTypeMissingOauthConfig { ty },
+ )?;
+ let (refresh, client) = match oauth {
+ crate::protocol::Auth::RecurseCenter { id } => (
+ id.is_some(),
+ Box::new(crate::oauth::RecurseCenter::new(
+ config.clone(),
+ &id.clone().unwrap_or_else(|| {
+ format!("{}", uuid::Uuid::new_v4())
+ }),
+ )),
+ ),
+ _ => unreachable!(),
+ };
+
+ conn.oauth_client = Some(client);
+ let client = conn.oauth_client.as_ref().unwrap();
+
+ log::info!(
+ "{}: login(oauth({}), {:?})",
+ conn.id,
+ auth.name(),
+ client.user_id()
+ );
+
+ let token_filename = client.server_token_file(true);
+ if let (Some(token_filename), true) =
+ (token_filename, refresh)
+ {
+ let term_type = term_type.to_string();
+ let client = conn.oauth_client.take().unwrap();
+ let fut = tokio::fs::File::open(token_filename.clone())
+ .with_context(move || crate::error::OpenFile {
+ filename: token_filename
+ .to_string_lossy()
+ .to_string(),
+ })
+ .and_then(|file| {
+ tokio::io::lines(std::io::BufReader::new(file))
+ .into_future()
+ .map_err(|(e, _)| e)
+ .context(crate::error::ReadFile)
+ })
+ .and_then(|(refresh_token, _)| {
+ // XXX unwrap here isn't super safe
+ let refresh_token = refresh_token.unwrap();
+ client
+ .get_access_token_from_refresh_token(
+ refresh_token.trim(),
+ )
+ .and_then(|access_token| {
+ client.get_username_from_access_token(
+ &access_token,
+ )
+ })
+ })
+ .map(move |username| {
+ (
+ ConnectionState::LoggedIn {
+ username: username.clone(),
+ term_info: TerminalInfo {
+ term: term_type,
+ size,
+ },
+ },
+ crate::protocol::Message::logged_in(
+ &username,
+ ),
+ )
+ });
+ return Ok(Some(Box::new(fut)));
+ } else {
+ conn.state.login_oauth_start(term_type, size);
+ let authorize_url = client.generate_authorize_url();
+ let user_id = client.user_id().to_string();
+ conn.send_message(
+ crate::protocol::Message::oauth_request(
+ &authorize_url,
+ &user_id,
+ ),
+ );
+ }
+ }
+ _ => unreachable!(),
+ }
+
+ Ok(None)
+ }
+
+ fn handle_message_start_streaming(
+ &mut self,
+ conn: &mut Connection<S>,
+ ) -> Result<()> {
+ let username = conn.state.username().unwrap();
+
+ log::info!("{}: stream({})", conn.id, username);
+ conn.state.stream();
+
+ Ok(())
+ }
+
+ fn handle_message_start_watching(
+ &mut self,
+ conn: &mut Connection<S>,
+ id: String,
+ ) -> Result<()> {
+ let username = conn.state.username().unwrap();
+
+ if let Some(stream_conn) = self.connections.get(&id) {
+ let data = stream_conn
+ .state
+ .term()
+ .map(|parser| parser.screen().contents_formatted())
+ .ok_or_else(|| Error::InvalidWatchId {
+ id: id.to_string(),
+ })?;
+
+ log::info!("{}: watch({}, {})", conn.id, username, id);
+ conn.state.watch(&id);
+ conn.send_message(crate::protocol::Message::terminal_output(
+ &data,
+ ));
+
+ Ok(())
+ } else {
+ Err(Error::InvalidWatchId { id })
+ }
+ }
+
+ fn handle_message_heartbeat(
+ &mut self,
+ conn: &mut Connection<S>,
+ ) -> Result<()> {
+ conn.send_message(crate::protocol::Message::heartbeat());
+
+ Ok(())
+ }
+
+ fn handle_message_terminal_output(
+ &mut self,
+ conn: &mut Connection<S>,
+ data: &[u8],
+ ) -> Result<()> {
+ let parser = conn.state.term_mut().unwrap();
+
+ let screen = parser.screen().clone();
+ parser.process(data);
+ let diff = parser.screen().contents_diff(&screen);
+ for watch_conn in self.watchers_mut() {
+ let watch_id = watch_conn.state.watch_id().unwrap();
+ if conn.id == watch_id {
+ watch_conn.send_message(
+ crate::protocol::Message::terminal_output(&diff),
+ );
+ }
+ }
+
+ conn.last_activity = std::time::Instant::now();
+
+ Ok(())
+ }
+
+ fn handle_message_list_sessions(
+ &mut self,
+ conn: &mut Connection<S>,
+ ) -> Result<()> {
+ let mut watcher_counts = std::collections::HashMap::new();
+ for watcher in self.watchers() {
+ let watch_id =
+ if let ConnectionState::Watching { watch_id, .. } =
+ &watcher.state
+ {
+ watch_id
+ } else {
+ unreachable!()
+ };
+ watcher_counts.insert(
+ watch_id,
+ *watcher_counts.get(&watch_id).unwrap_or(&0) + 1,
+ );
+ }
+ let sessions: Vec<_> = self
+ .streamers()
+ .flat_map(|streamer| {
+ streamer
+ .session(*watcher_counts.get(&streamer.id).unwrap_or(&0))
+ })
+ .collect();
+ conn.send_message(crate::protocol::Message::sessions(&sessions));
+
+ Ok(())
+ }
+
+ fn handle_message_resize(
+ &mut self,
+ conn: &mut Connection<S>,
+ size: crate::term::Size,
+ ) -> Result<()> {
+ let term_info = conn.state.term_info_mut().unwrap();
+ term_info.size = size;
+
+ if let Some(parser) = conn.state.term_mut() {
+ parser.set_size(size.rows, size.cols);
+ }
+
+ Ok(())
+ }
+
+ fn handle_message_oauth_response(
+ &mut self,
+ conn: &mut Connection<S>,
+ code: &str,
+ ) -> Result<
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ > {
+ let client = conn.oauth_client.take().ok_or_else(|| {
+ Error::UnexpectedMessage {
+ message: crate::protocol::Message::oauth_response(code),
+ }
+ })?;
+
+ let term_info = conn.state.term_info().unwrap().clone();
+ let fut = client
+ .get_access_token_from_auth_code(code)
+ .and_then(|token| client.get_username_from_access_token(&token))
+ .map(|username| {
+ (
+ ConnectionState::LoggedIn {
+ term_info,
+ username: username.clone(),
+ },
+ crate::protocol::Message::logged_in(&username),
+ )
+ });
+
+ Ok(Some(Box::new(fut)))
+ }
+
+ fn handle_accepted_message(
+ &mut self,
+ conn: &mut Connection<S>,
+ message: crate::protocol::Message,
+ ) -> Result<
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ > {
+ match message {
+ crate::protocol::Message::Login {
+ auth,
+ term_type,
+ size,
+ ..
+ } => self.handle_message_login(conn, &auth, &term_type, size),
+ m => Err(Error::UnauthenticatedMessage { message: m }),
+ }
+ }
+
+ fn handle_logging_in_message(
+ &mut self,
+ conn: &mut Connection<S>,
+ message: crate::protocol::Message,
+ ) -> Result<
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ > {
+ match message {
+ crate::protocol::Message::OauthResponse { code } => {
+ self.handle_message_oauth_response(conn, &code)
+ }
+ m => Err(Error::UnauthenticatedMessage { message: m }),
+ }
+ }
+
+ fn handle_logged_in_message(
+ &mut self,
+ conn: &mut Connection<S>,
+ message: crate::protocol::Message,
+ ) -> Result<()> {
+ match message {
+ crate::protocol::Message::Heartbeat => {
+ self.handle_message_heartbeat(conn)
+ }
+ crate::protocol::Message::Resize { size } => {
+ self.handle_message_resize(conn, size)
+ }
+ crate::protocol::Message::ListSessions => {
+ self.handle_message_list_sessions(conn)
+ }
+ crate::protocol::Message::StartStreaming => {
+ self.handle_message_start_streaming(conn)
+ }
+ crate::protocol::Message::StartWatching { id } => {
+ self.handle_message_start_watching(conn, id)
+ }
+ m => Err(crate::error::Error::UnexpectedMessage { message: m }),
+ }
+ }
+
+ fn handle_streaming_message(
+ &mut self,
+ conn: &mut Connection<S>,
+ message: crate::protocol::Message,
+ ) -> Result<()> {
+ match message {
+ crate::protocol::Message::Heartbeat => {
+ self.handle_message_heartbeat(conn)
+ }
+ crate::protocol::Message::Resize { size } => {
+ self.handle_message_resize(conn, size)
+ }
+ crate::protocol::Message::TerminalOutput { data } => {
+ self.handle_message_terminal_output(conn, &data)
+ }
+ m => Err(crate::error::Error::UnexpectedMessage { message: m }),
+ }
+ }
+
+ fn handle_watching_message(
+ &mut self,
+ conn: &mut Connection<S>,
+ message: crate::protocol::Message,
+ ) -> Result<()> {
+ match message {
+ crate::protocol::Message::Heartbeat => {
+ self.handle_message_heartbeat(conn)
+ }
+ crate::protocol::Message::Resize { size } => {
+ self.handle_message_resize(conn, size)
+ }
+ m => Err(crate::error::Error::UnexpectedMessage { message: m }),
+ }
+ }
+
+ fn handle_disconnect(&mut self, conn: &mut Connection<S>) {
+ if let Some(username) = conn.state.username() {
+ log::info!("{}: disconnect({})", conn.id, username);
+ } else {
+ log::info!("{}: disconnect", conn.id);
+ }
+
+ for watch_conn in self.watchers_mut() {
+ let watch_id = watch_conn.state.watch_id().unwrap();
+ if conn.id == watch_id {
+ watch_conn.close(Ok(()));
+ }
+ }
+ }
+
+ fn handle_message(
+ &mut self,
+ conn: &mut Connection<S>,
+ message: crate::protocol::Message,
+ ) -> Result<
+ Option<
+ Box<
+ dyn futures::future::Future<
+ Item = (ConnectionState, crate::protocol::Message),
+ Error = Error,
+ > + Send,
+ >,
+ >,
+ > {
+ if let crate::protocol::Message::TerminalOutput { .. } = message {
+ // do nothing, we expect TerminalOutput spam
+ } else {
+ let username =
+ conn.state.username().map(std::string::ToString::to_string);
+ if self.rate_limiter.check(username).is_err() {
+ let display_name =
+ conn.state.username().unwrap_or("(non-logged-in users)");
+ log::info!("{}: ratelimit({})", conn.id, display_name);
+ return Err(Error::RateLimited);
+ }
+ }
+
+ log::debug!("{}: recv({})", conn.id, message.format_log());
+
+ match conn.state {
+ ConnectionState::Accepted { .. } => {
+ self.handle_accepted_message(conn, message)
+ }
+ ConnectionState::LoggingIn { .. } => {
+ self.handle_logging_in_message(conn, message)
+ }
+ ConnectionState::LoggedIn { .. } => {
+ self.handle_logged_in_message(conn, message).map(|_| None)
+ }
+ ConnectionState::Streaming { .. } => {
+ self.handle_streaming_message(conn, message).map(|_| None)
+ }
+ ConnectionState::Watching { .. } => {
+ self.handle_watching_message(conn, message).map(|_| None)
+ }
+ }
+ }
+
+ fn poll_read_connection(
+ &mut self,
+ conn: &mut Connection<S>,
+ ) -> component_future::Poll<(), Error> {
+ match &mut conn.rsock {
+ Some(ReadSocket::Connected(..)) => {
+ if let Some(ReadSocket::Connected(s)) = conn.rsock.take() {
+ let fut = Box::new(
+ crate::protocol::Message::read_async(s)
+ .timeout(self.read_timeout)
+ .context(crate::error::ReadMessageWithTimeout),
+ );
+ conn.rsock = Some(ReadSocket::Reading(fut));
+ } else {
+ unreachable!()
+ }
+ Ok(component_future::Async::DidWork)
+ }
+ Some(ReadSocket::Reading(fut)) => match fut.poll() {
+ Ok(futures::Async::Ready((msg, s))) => {
+ let res = self.handle_message(conn, msg);
+ match res {
+ Ok(Some(fut)) => {
+ conn.rsock = Some(ReadSocket::Processing(s, fut));
+ }
+ Ok(None) => {
+ conn.rsock = Some(ReadSocket::Connected(s));
+ }
+ e @ Err(..) => {
+ conn.close(e.map(|_| ()));
+ conn.rsock = Some(ReadSocket::Connected(s));
+ }
+ }
+ Ok(component_future::Async::DidWork)
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(e) => classify_connection_error(e),
+ },
+ Some(ReadSocket::Processing(_, fut)) => {
+ let (state, msg) = component_future::try_ready!(fut.poll());
+ if let Some(ReadSocket::Processing(s, _)) = conn.rsock.take()
+ {
+ conn.state = state;
+ conn.send_message(msg);
+ conn.rsock = Some(ReadSocket::Connected(s));
+ } else {
+ unreachable!()
+ }
+ Ok(component_future::Async::DidWork)
+ }
+ _ => Ok(component_future::Async::NothingToDo),
+ }
+ }
+
+ fn poll_write_connection(
+ &mut self,
+ conn: &mut Connection<S>,
+ ) -> component_future::Poll<(), Error> {
+ match &mut conn.wsock {
+ Some(WriteSocket::Connected(..)) => {
+ if let Some(msg) = conn.to_send.pop_front() {
+ if let Some(WriteSocket::Connected(s)) = conn.wsock.take()
+ {
+ log::debug!(
+ "{}: send({})",
+ conn.id,
+ msg.format_log()
+ );
+ let fut = msg
+ .write_async(s)
+ .timeout(self.read_timeout)
+ .context(crate::error::WriteMessageWithTimeout);
+ conn.wsock =
+ Some(WriteSocket::Writing(Box::new(fut)));
+ } else {
+ unreachable!()
+ }
+ Ok(component_future::Async::DidWork)
+ } else if conn.closed {
+ Ok(component_future::Async::Ready(()))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+ Some(WriteSocket::Writing(fut)) => match fut.poll() {
+ Ok(futures::Async::Ready(s)) => {
+ conn.wsock = Some(WriteSocket::Connected(s));
+ Ok(component_future::Async::DidWork)
+ }
+ Ok(futures::Async::NotReady) => {
+ Ok(component_future::Async::NotReady)
+ }
+ Err(e) => classify_connection_error(e),
+ },
+ _ => Ok(component_future::Async::NothingToDo),
+ }
+ }
+
+ fn streamers(&self) -> impl Iterator<Item = &Connection<S>> {
+ self.connections.values().filter(|conn| match conn.state {
+ ConnectionState::Streaming { .. } => true,
+ _ => false,
+ })
+ }
+
+ fn watchers(&self) -> impl Iterator<Item = &Connection<S>> {
+ self.connections.values().filter(|conn| match conn.state {
+ ConnectionState::Watching { .. } => true,
+ _ => false,
+ })
+ }
+
+ fn watchers_mut(&mut self) -> impl Iterator<Item = &mut Connection<S>> {
+ self.connections
+ .values_mut()
+ .filter(|conn| match conn.state {
+ ConnectionState::Watching { .. } => true,
+ _ => false,
+ })
+ }
+}
+
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ Server<S>
+{
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[&Self::poll_accept, &Self::poll_read, &Self::poll_write];
+
+ fn poll_accept(&mut self) -> component_future::Poll<(), Error> {
+ if let Some(sock) = component_future::try_ready!(self.acceptor.poll())
+ {
+ let conn = Connection::new(sock);
+ self.connections.insert(conn.id.to_string(), conn);
+ Ok(component_future::Async::DidWork)
+ } else {
+ unreachable!()
+ }
+ }
+
+ fn poll_read(&mut self) -> component_future::Poll<(), Error> {
+ let mut did_work = false;
+ let mut not_ready = false;
+
+ let keys: Vec<_> = self.connections.keys().cloned().collect();
+ for key in keys {
+ let mut conn = self.connections.remove(&key).unwrap();
+ match self.poll_read_connection(&mut conn) {
+ Ok(component_future::Async::Ready(())) => {
+ self.handle_disconnect(&mut conn);
+ continue;
+ }
+ Ok(component_future::Async::DidWork) => {
+ did_work = true;
+ }
+ Ok(component_future::Async::NotReady) => {
+ not_ready = true;
+ }
+ Err(e) => {
+ log::error!(
+ "error reading from active connection: {}",
+ e
+ );
+ continue;
+ }
+ _ => {}
+ }
+ self.connections.insert(key.to_string(), conn);
+ }
+
+ if did_work {
+ Ok(component_future::Async::DidWork)
+ } else if not_ready {
+ Ok(component_future::Async::NotReady)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_write(&mut self) -> component_future::Poll<(), Error> {
+ let mut did_work = false;
+ let mut not_ready = false;
+
+ let keys: Vec<_> = self.connections.keys().cloned().collect();
+ for key in keys {
+ let mut conn = self.connections.remove(&key).unwrap();
+ match self.poll_write_connection(&mut conn) {
+ Ok(component_future::Async::Ready(())) => {
+ self.handle_disconnect(&mut conn);
+ continue;
+ }
+ Ok(component_future::Async::DidWork) => {
+ did_work = true;
+ }
+ Ok(component_future::Async::NotReady) => {
+ not_ready = true;
+ }
+ Err(e) => {
+ log::error!(
+ "error reading from active connection: {}",
+ e
+ );
+ continue;
+ }
+ _ => {}
+ }
+ self.connections.insert(key.to_string(), conn);
+ }
+
+ if did_work {
+ Ok(component_future::Async::DidWork)
+ } else if not_ready {
+ Ok(component_future::Async::NotReady)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+}
+
+fn classify_connection_error(e: Error) -> component_future::Poll<(), Error> {
+ let source = match e {
+ Error::ReadMessageWithTimeout { source } => source,
+ Error::WriteMessageWithTimeout { source } => source,
+ _ => return Err(e),
+ };
+
+ if source.is_inner() {
+ let source = source.into_inner().unwrap();
+ let tokio_err = match source {
+ Error::ReadPacket {
+ source: ref tokio_err,
+ } => tokio_err,
+ Error::WritePacket {
+ source: ref tokio_err,
+ } => tokio_err,
+ Error::EOF => {
+ return Ok(component_future::Async::Ready(()));
+ }
+ _ => {
+ return Err(source);
+ }
+ };
+
+ if tokio_err.kind() == tokio::io::ErrorKind::UnexpectedEof {
+ Ok(component_future::Async::Ready(()))
+ } else {
+ Err(source)
+ }
+ } else if source.is_elapsed() {
+ Err(Error::Timeout)
+ } else {
+ let source = source.into_timer().unwrap();
+ Err(Error::TimerReadTimeout { source })
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + 'static>
+ futures::future::Future for Server<S>
+{
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/server/tls.rs b/teleterm/src/server/tls.rs
new file mode 100644
index 0000000..39e63ba
--- /dev/null
+++ b/teleterm/src/server/tls.rs
@@ -0,0 +1,131 @@
+use crate::prelude::*;
+
+pub struct Server {
+ server: super::Server<tokio_tls::TlsStream<tokio::net::TcpStream>>,
+ acceptor: Box<
+ dyn futures::stream::Stream<
+ Item = tokio_tls::Accept<tokio::net::TcpStream>,
+ Error = Error,
+ > + Send,
+ >,
+ sock_w: tokio::sync::mpsc::Sender<
+ tokio_tls::TlsStream<tokio::net::TcpStream>,
+ >,
+ accepting_sockets: Vec<tokio_tls::Accept<tokio::net::TcpStream>>,
+}
+
+impl Server {
+ pub fn new(
+ acceptor: Box<
+ dyn futures::stream::Stream<
+ Item = tokio_tls::Accept<tokio::net::TcpStream>,
+ Error = Error,
+ > + Send,
+ >,
+ read_timeout: std::time::Duration,
+ allowed_login_methods: std::collections::HashSet<
+ crate::protocol::AuthType,
+ >,
+ oauth_configs: std::collections::HashMap<
+ crate::protocol::AuthType,
+ crate::oauth::Config,
+ >,
+ ) -> Self {
+ let (tls_sock_w, tls_sock_r) = tokio::sync::mpsc::channel(100);
+ Self {
+ server: super::Server::new(
+ Box::new(
+ tls_sock_r.context(crate::error::SocketChannelReceive),
+ ),
+ read_timeout,
+ allowed_login_methods,
+ oauth_configs,
+ ),
+ acceptor,
+ sock_w: tls_sock_w,
+ accepting_sockets: vec![],
+ }
+ }
+}
+
+impl Server {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ (),
+ Error,
+ >] = &[
+ &Self::poll_accept,
+ &Self::poll_handshake_connections,
+ &Self::poll_server,
+ ];
+
+ fn poll_accept(&mut self) -> component_future::Poll<(), Error> {
+ if let Some(sock) = component_future::try_ready!(self.acceptor.poll())
+ {
+ self.accepting_sockets.push(sock);
+ Ok(component_future::Async::DidWork)
+ } else {
+ Err(Error::SocketChannelClosed)
+ }
+ }
+
+ fn poll_handshake_connections(
+ &mut self,
+ ) -> component_future::Poll<(), Error> {
+ let mut did_work = false;
+ let mut not_ready = false;
+
+ let mut i = 0;
+ while i < self.accepting_sockets.len() {
+ let sock = self.accepting_sockets.get_mut(i).unwrap();
+ match sock.poll() {
+ Ok(futures::Async::Ready(sock)) => {
+ self.accepting_sockets.swap_remove(i);
+ self.sock_w.try_send(sock).unwrap_or_else(|e| {
+ log::warn!(
+ "failed to send connected tls socket: {}",
+ e
+ );
+ });
+ did_work = true;
+ continue;
+ }
+ Ok(futures::Async::NotReady) => {
+ not_ready = true;
+ }
+ Err(e) => {
+ log::warn!("failed to accept tls connection: {}", e);
+ self.accepting_sockets.swap_remove(i);
+ continue;
+ }
+ }
+ i += 1;
+ }
+
+ if did_work {
+ Ok(component_future::Async::DidWork)
+ } else if not_ready {
+ Ok(component_future::Async::NotReady)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_server(&mut self) -> component_future::Poll<(), Error> {
+ component_future::try_ready!(self.server.poll());
+ Ok(component_future::Async::Ready(()))
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+impl futures::future::Future for Server {
+ type Item = ();
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+}
diff --git a/teleterm/src/session_list.rs b/teleterm/src/session_list.rs
new file mode 100644
index 0000000..7fd4d0a
--- /dev/null
+++ b/teleterm/src/session_list.rs
@@ -0,0 +1,342 @@
+pub struct SessionList {
+ sessions: Vec<crate::protocol::Session>,
+ offset: usize,
+ size: crate::term::Size,
+}
+
+impl SessionList {
+ pub fn new(
+ sessions: Vec<crate::protocol::Session>,
+ size: crate::term::Size,
+ ) -> Self {
+ let mut by_name = std::collections::HashMap::new();
+ for session in sessions {
+ if !by_name.contains_key(&session.username) {
+ by_name.insert(session.username.clone(), vec![]);
+ }
+ by_name.get_mut(&session.username).unwrap().push(session);
+ }
+ let mut names: Vec<_> = by_name.keys().cloned().collect();
+ names.sort_by(|a: &String, b: &String| {
+ let a_idle =
+ by_name[a].iter().min_by_key(|session| session.idle_time);
+ let b_idle =
+ by_name[b].iter().min_by_key(|session| session.idle_time);
+ // these unwraps are safe because we know that none of the vecs in
+ // the map can be empty
+ a_idle.unwrap().idle_time.cmp(&b_idle.unwrap().idle_time)
+ });
+ for name in &names {
+ if let Some(sessions) = by_name.get_mut(name) {
+ sessions.sort_by_key(|s| s.idle_time);
+ }
+ }
+
+ let mut sorted = vec![];
+ for name in names {
+ let sessions = by_name.remove(&name).unwrap();
+ for session in sessions {
+ sorted.push(session);
+ }
+ }
+
+ Self {
+ sessions: sorted,
+ offset: 0,
+ size,
+ }
+ }
+
+ pub fn visible_sessions(&self) -> &[crate::protocol::Session] {
+ let start = self.offset;
+ let end = self.offset + self.limit();
+ let end = end.min(self.sessions.len());
+ &self.sessions[start..end]
+ }
+
+ pub fn visible_sessions_with_chars(
+ &self,
+ ) -> impl Iterator<Item = (char, &crate::protocol::Session)> {
+ self.visible_sessions()
+ .iter()
+ .enumerate()
+ .map(move |(i, s)| (self.idx_to_char(i).unwrap(), s))
+ }
+
+ pub fn size(&self) -> crate::term::Size {
+ self.size
+ }
+
+ pub fn resize(&mut self, size: crate::term::Size) {
+ self.size = size;
+ }
+
+ pub fn id_for(&self, c: char) -> Option<&str> {
+ self.char_to_idx(c).and_then(|i| {
+ self.sessions.get(i + self.offset).map(|s| s.id.as_ref())
+ })
+ }
+
+ pub fn next_page(&mut self) {
+ let inc = self.limit();
+ if self.offset + inc < self.sessions.len() {
+ self.offset += inc;
+ }
+ }
+
+ pub fn prev_page(&mut self) {
+ let dec = self.limit();
+ if self.offset >= dec {
+ self.offset -= dec;
+ }
+ }
+
+ pub fn current_page(&self) -> usize {
+ self.offset / self.limit() + 1
+ }
+
+ pub fn total_pages(&self) -> usize {
+ if self.sessions.is_empty() {
+ 1
+ } else {
+ (self.sessions.len() - 1) / self.limit() + 1
+ }
+ }
+
+ fn idx_to_char(&self, mut i: usize) -> Option<char> {
+ if i >= self.limit() {
+ return None;
+ }
+
+ // 'q' shouldn't be a list option, since it is bound to quit
+ if i >= 16 {
+ i += 1;
+ }
+
+ #[allow(clippy::cast_possible_truncation)]
+ Some(std::char::from_u32(('a' as u32) + (i as u32)).unwrap())
+ }
+
+ fn char_to_idx(&self, c: char) -> Option<usize> {
+ if c == 'q' {
+ return None;
+ }
+
+ let i = ((c as i32) - ('a' as i32)) as isize;
+ if i < 0 {
+ return None;
+ }
+ #[allow(clippy::cast_sign_loss)]
+ let mut i = i as usize;
+
+ // 'q' shouldn't be a list option, since it is bound to quit
+ if i > 16 {
+ i -= 1;
+ }
+
+ if i >= self.limit() {
+ return None;
+ }
+
+ Some(i)
+ }
+
+ fn limit(&self) -> usize {
+ let limit = self.size.rows as usize - 6;
+
+ // enough for a-z except q - if we want to allow more than this, we'll
+ // need to come up with a better way of choosing streams
+ if limit > 25 {
+ 25
+ } else {
+ limit
+ }
+ }
+}
+
+#[cfg(test)]
+#[allow(clippy::cognitive_complexity)]
+#[allow(clippy::redundant_clone)]
+#[allow(clippy::shadow_unrelated)]
+mod test {
+ use super::*;
+
+ fn session(username: &str, idle: u32) -> crate::protocol::Session {
+ crate::protocol::Session {
+ id: format!("{}", uuid::Uuid::new_v4()),
+ username: username.to_string(),
+ term_type: "screen".to_string(),
+ size: crate::term::Size { rows: 24, cols: 80 },
+ idle_time: idle,
+ title: "title".to_string(),
+ watchers: 0,
+ }
+ }
+
+ #[test]
+ fn test_session_list_sorting() {
+ let size = crate::term::Size { rows: 24, cols: 80 };
+
+ let session1 = session("doy", 35);
+ let session2 = session("doy", 3);
+ let mut session3 = session("sartak", 12);
+ let session4 = session("sartak", 100);
+ let mut session5 = session("toft", 5);
+ let mut sessions = vec![
+ session1.clone(),
+ session2.clone(),
+ session3.clone(),
+ session4.clone(),
+ session5.clone(),
+ ];
+
+ assert_eq!(
+ SessionList::new(sessions.clone(), size.clone()).sessions,
+ vec![
+ session2.clone(),
+ session1.clone(),
+ session5.clone(),
+ session3.clone(),
+ session4.clone(),
+ ]
+ );
+
+ session3.idle_time = 2;
+ sessions[2].idle_time = 2;
+ assert_eq!(
+ SessionList::new(sessions.clone(), size.clone()).sessions,
+ vec![
+ session3.clone(),
+ session4.clone(),
+ session2.clone(),
+ session1.clone(),
+ session5.clone(),
+ ]
+ );
+
+ session5.idle_time = 1;
+ sessions[4].idle_time = 1;
+ assert_eq!(
+ SessionList::new(sessions.clone(), size.clone()).sessions,
+ vec![
+ session5.clone(),
+ session3.clone(),
+ session4.clone(),
+ session2.clone(),
+ session1.clone(),
+ ]
+ );
+ }
+
+ #[test]
+ fn test_session_list_pagination() {
+ let size = crate::term::Size { rows: 11, cols: 80 };
+ let sessions = vec![
+ session("doy", 0),
+ session("doy", 1),
+ session("doy", 2),
+ session("doy", 3),
+ session("doy", 4),
+ session("doy", 5),
+ session("doy", 6),
+ session("doy", 7),
+ session("doy", 8),
+ session("doy", 9),
+ session("doy", 10),
+ ];
+ let mut list = SessionList::new(sessions.clone(), size);
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 1);
+
+ list.next_page();
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 2);
+
+ list.next_page();
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 3);
+
+ list.next_page();
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 3);
+
+ list.prev_page();
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 2);
+
+ list.prev_page();
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 1);
+
+ list.prev_page();
+ assert_eq!(list.limit(), 5);
+ assert_eq!(list.total_pages(), 3);
+ assert_eq!(list.current_page(), 1);
+
+ let id = list.id_for('a').unwrap();
+ assert_eq!(id, sessions[0].id);
+ let id = list.id_for('e').unwrap();
+ assert_eq!(id, sessions[4].id);
+ let id = list.id_for('f');
+ assert!(id.is_none());
+
+ list.next_page();
+ let id = list.id_for('a').unwrap();
+ assert_eq!(id, sessions[5].id);
+
+ list.next_page();
+ let id = list.id_for('a').unwrap();
+ assert_eq!(id, sessions[10].id);
+ let id = list.id_for('b');
+ assert!(id.is_none());
+
+ let size = crate::term::Size { rows: 24, cols: 80 };
+ let sessions = vec![
+ session("doy", 0),
+ session("doy", 1),
+ session("doy", 2),
+ session("doy", 3),
+ session("doy", 4),
+ session("doy", 5),
+ session("doy", 6),
+ session("doy", 7),
+ session("doy", 8),
+ session("doy", 9),
+ session("doy", 10),
+ session("doy", 11),
+ session("doy", 12),
+ session("doy", 13),
+ session("doy", 14),
+ session("doy", 15),
+ session("doy", 16),
+ session("doy", 17),
+ session("doy", 18),
+ session("doy", 19),
+ session("doy", 20),
+ session("doy", 21),
+ ];
+ let list = SessionList::new(sessions.clone(), size);
+ assert_eq!(list.limit(), 18);
+ assert_eq!(list.total_pages(), 2);
+ assert_eq!(list.current_page(), 1);
+
+ let id = list.id_for('a').unwrap();
+ assert_eq!(id, sessions[0].id);
+ let id = list.id_for('p').unwrap();
+ assert_eq!(id, sessions[15].id);
+ let id = list.id_for('q');
+ assert!(id.is_none());
+ let id = list.id_for('r').unwrap();
+ assert_eq!(id, sessions[16].id);
+ let id = list.id_for('s').unwrap();
+ assert_eq!(id, sessions[17].id);
+ let id = list.id_for('t');
+ assert!(id.is_none());
+ }
+}
diff --git a/teleterm/src/term.rs b/teleterm/src/term.rs
new file mode 100644
index 0000000..02c21bb
--- /dev/null
+++ b/teleterm/src/term.rs
@@ -0,0 +1,25 @@
+use crate::prelude::*;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct Size {
+ pub rows: u16,
+ pub cols: u16,
+}
+
+impl Size {
+ pub fn get() -> Result<Self> {
+ let (cols, rows) = crossterm::terminal::size()
+ .context(crate::error::GetTerminalSize)?;
+ Ok(Self { rows, cols })
+ }
+
+ pub fn fits_in(self, other: Self) -> bool {
+ self.rows <= other.rows && self.cols <= other.cols
+ }
+}
+
+impl std::fmt::Display for Size {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ std::fmt::Display::fmt(&format!("{}x{}", self.cols, self.rows), f)
+ }
+}
diff --git a/teleterm/src/web.rs b/teleterm/src/web.rs
new file mode 100644
index 0000000..afc5128
--- /dev/null
+++ b/teleterm/src/web.rs
@@ -0,0 +1,87 @@
+mod ws;
+
+use futures::{Future as _, Sink as _, Stream as _};
+use gotham::router::builder::{DefineSingleRoute as _, DrawRoutes as _};
+use gotham::state::FromState as _;
+
+pub fn router() -> impl gotham::handler::NewHandler {
+ gotham::router::builder::build_simple_router(|route| {
+ route.get("/").to(root);
+ route.get("/ws").to(handle_websocket_connection);
+ })
+}
+
+fn root(state: gotham::state::State) -> (gotham::state::State, String) {
+ log::info!("request for /");
+ (state, "hello world".to_string())
+}
+
+fn handle_websocket_connection(
+ mut state: gotham::state::State,
+) -> (gotham::state::State, hyper::Response<hyper::Body>) {
+ let body = hyper::Body::take_from(&mut state);
+ let headers = hyper::HeaderMap::take_from(&mut state);
+ if ws::requested(&headers) {
+ let (response, stream) = match ws::accept(&headers, body) {
+ Ok(res) => res,
+ Err(_) => {
+ log::error!("failed to accept websocket request");
+ return (
+ state,
+ hyper::Response::builder()
+ .status(hyper::StatusCode::BAD_REQUEST)
+ .body(hyper::Body::empty())
+ .unwrap(),
+ );
+ }
+ };
+ let req_id = gotham::state::request_id(&state).to_owned();
+ let stream = stream
+ .map_err(|e| {
+ log::error!(
+ "error upgrading connection for websockets: {}",
+ e
+ )
+ })
+ .and_then(move |stream| handle_websocket_stream(req_id, stream));
+ tokio::spawn(stream);
+ (state, response)
+ } else {
+ (
+ state,
+ hyper::Response::new(hyper::Body::from(
+ "non-websocket request to websocket endpoint",
+ )),
+ )
+ }
+}
+
+fn handle_websocket_stream<S>(
+ req_id: String,
+ stream: S,
+) -> impl futures::Future<Item = (), Error = ()>
+where
+ S: futures::Stream<
+ Item = tokio_tungstenite::tungstenite::protocol::Message,
+ Error = tokio_tungstenite::tungstenite::Error,
+ > + futures::Sink<
+ SinkItem = tokio_tungstenite::tungstenite::protocol::Message,
+ SinkError = tokio_tungstenite::tungstenite::Error,
+ >,
+{
+ let (sink, stream) = stream.split();
+ sink.send_all(stream.map(move |msg| {
+ handle_websocket_message(&req_id, &msg);
+ msg
+ }))
+ .map_err(|e| log::error!("error during websocket stream: {}", e))
+ .map(|_| log::info!("disconnect"))
+}
+
+fn handle_websocket_message(
+ req_id: &str,
+ msg: &tokio_tungstenite::tungstenite::protocol::Message,
+) {
+ // TODO
+ log::info!("websocket stream message for {}: {:?}", req_id, msg);
+}
diff --git a/teleterm/src/web/ws.rs b/teleterm/src/web/ws.rs
new file mode 100644
index 0000000..ad47147
--- /dev/null
+++ b/teleterm/src/web/ws.rs
@@ -0,0 +1,61 @@
+// from https://github.com/gotham-rs/gotham/blob/master/examples/websocket/src/main.rs
+
+use futures::future::Future as _;
+
+const PROTO_WEBSOCKET: &str = "websocket";
+const SEC_WEBSOCKET_KEY: &str = "Sec-WebSocket-Key";
+const SEC_WEBSOCKET_ACCEPT: &str = "Sec-WebSocket-Accept";
+
+pub fn requested(headers: &hyper::HeaderMap) -> bool {
+ headers.get(hyper::header::UPGRADE)
+ == Some(&hyper::header::HeaderValue::from_static(PROTO_WEBSOCKET))
+}
+
+pub fn accept(
+ headers: &hyper::HeaderMap,
+ body: hyper::Body,
+) -> Result<
+ (
+ hyper::Response<hyper::Body>,
+ impl futures::Future<
+ Item = tokio_tungstenite::WebSocketStream<
+ hyper::upgrade::Upgraded,
+ >,
+ Error = hyper::Error,
+ >,
+ ),
+ (),
+> {
+ let res = response(headers)?;
+ let ws = body.on_upgrade().map(|upgraded| {
+ tokio_tungstenite::WebSocketStream::from_raw_socket(
+ upgraded,
+ tokio_tungstenite::tungstenite::protocol::Role::Server,
+ None,
+ )
+ });
+
+ Ok((res, ws))
+}
+
+fn response(
+ headers: &hyper::HeaderMap,
+) -> Result<hyper::Response<hyper::Body>, ()> {
+ let key = headers.get(SEC_WEBSOCKET_KEY).ok_or(())?;
+
+ Ok(hyper::Response::builder()
+ .header(hyper::header::UPGRADE, PROTO_WEBSOCKET)
+ .header(hyper::header::CONNECTION, "upgrade")
+ .header(SEC_WEBSOCKET_ACCEPT, accept_key(key.as_bytes()))
+ .status(hyper::StatusCode::SWITCHING_PROTOCOLS)
+ .body(hyper::Body::empty())
+ .unwrap())
+}
+
+fn accept_key(key: &[u8]) -> String {
+ const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+ let mut sha1 = sha1::Sha1::default();
+ sha1.update(key);
+ sha1.update(WS_GUID);
+ base64::encode(&sha1.digest().bytes())
+}