aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/client.rs24
-rw-r--r--src/cmd/watch.rs392
-rw-r--r--src/keyreader.rs70
-rw-r--r--src/main.rs1
-rw-r--r--src/protocol.rs2
-rw-r--r--src/server.rs13
6 files changed, 432 insertions, 70 deletions
diff --git a/src/client.rs b/src/client.rs
index f1a7622..f24c9a6 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -135,6 +135,30 @@ impl Client {
}
}
+ pub fn list(
+ address: &str,
+ username: &str,
+ heartbeat_duration: std::time::Duration,
+ ) -> Self {
+ let heartbeat_timer =
+ tokio::timer::Interval::new_interval(heartbeat_duration);
+ Self {
+ address: address.to_string(),
+ username: username.to_string(),
+ heartbeat_duration,
+
+ heartbeat_timer,
+ reconnect_timer: None,
+ last_server_time: std::time::Instant::now(),
+
+ rsock: ReadSocket::NotConnected,
+ wsock: WriteSocket::NotConnected,
+
+ on_connect: vec![],
+ to_send: std::collections::VecDeque::new(),
+ }
+ }
+
pub fn send_message(&mut self, msg: crate::protocol::Message) {
self.to_send.push_back(msg);
}
diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs
index 1df6311..337e2f0 100644
--- a/src/cmd/watch.rs
+++ b/src/cmd/watch.rs
@@ -11,14 +11,35 @@ pub enum Error {
#[snafu(display("failed to read message: {}", source))]
Read { source: crate::protocol::Error },
+ #[snafu(display("failed to read key from terminal: {}", source))]
+ ReadKey { source: crate::keyreader::Error },
+
#[snafu(display("failed to write message: {}", source))]
Write { source: crate::protocol::Error },
#[snafu(display("failed to write to terminal: {}", source))]
WriteTerminal { source: std::io::Error },
+ #[snafu(display("failed to flush writes to terminal: {}", source))]
+ FlushTerminal { source: std::io::Error },
+
+ #[snafu(display("failed to write to terminal: {}", source))]
+ WriteTerminalCrossterm { source: crossterm::ErrorKind },
+
#[snafu(display("communication with server failed: {}", source))]
Client { source: crate::client::Error },
+
+ #[snafu(display("received error from server: {}", message))]
+ Server { message: String },
+
+ #[snafu(display(
+ "failed to put the terminal into raw mode: {}",
+ source
+ ))]
+ IntoRawMode { source: crossterm::ErrorKind },
+
+ #[snafu(display("failed to create key reader: {}", source))]
+ KeyReader { source: crate::keyreader::Error },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -35,7 +56,6 @@ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> {
.long("address")
.takes_value(true),
)
- .arg(clap::Arg::with_name("id"))
}
pub fn run<'a>(matches: &clap::ArgMatches<'a>) -> super::Result<()> {
@@ -48,103 +68,178 @@ pub fn run<'a>(matches: &clap::ArgMatches<'a>) -> super::Result<()> {
.context(Common)
.context(super::Watch)?,
matches.value_of("address").unwrap_or("127.0.0.1:4144"),
- matches.value_of("id"),
)
.context(super::Watch)
}
-fn run_impl(username: &str, address: &str, id: Option<&str>) -> Result<()> {
- if let Some(id) = id {
- watch(username, address, id)
- } else {
- list(username, address)
- }
+fn run_impl(username: &str, address: &str) -> Result<()> {
+ let username = username.to_string();
+ let address = address.to_string();
+ tokio::run(futures::lazy(move || {
+ futures::future::result(WatchSession::new(
+ &address,
+ &username,
+ std::time::Duration::from_secs(5),
+ ))
+ .flatten()
+ .map_err(|e| {
+ eprintln!("{}", e);
+ })
+ }));
+
+ Ok(())
}
-fn list(username: &str, address: &str) -> Result<()> {
- let sock = std::net::TcpStream::connect(address)
- .context(crate::error::Connect)
- .context(Common)?;
- let term = std::env::var("TERM").unwrap_or_else(|_| "".to_string());
- let size = crossterm::terminal()
- .size()
- .context(crate::error::GetTerminalSize)
- .context(Common)?;
- let msg = crate::protocol::Message::login(
- username,
- &term,
- (u32::from(size.0), u32::from(size.1)),
- );
- msg.write(&sock).context(Write)?;
+struct SortedSessions {
+ sessions: std::collections::HashMap<char, crate::protocol::Session>,
+}
- let msg = crate::protocol::Message::list_sessions();
- msg.write(&sock).context(Write)?;
+impl SortedSessions {
+ fn new(sessions: Vec<crate::protocol::Session>) -> Self {
+ let mut by_name = std::collections::HashMap::new();
+ for session in sessions {
+ if !by_name.contains_key(&session.username) {
+ by_name.insert(session.username.clone(), vec![]);
+ }
+ by_name.get_mut(&session.username).unwrap().push(session);
+ }
+ let mut names: Vec<_> = by_name.keys().cloned().collect();
+ names.sort_by(|a: &String, b: &String| {
+ let a_idle =
+ by_name[a].iter().min_by_key(|session| session.idle_time);
+ let b_idle =
+ by_name[b].iter().min_by_key(|session| session.idle_time);
+ a_idle.unwrap().idle_time.cmp(&b_idle.unwrap().idle_time)
+ });
+ for name in &names {
+ if let Some(sessions) = by_name.get_mut(name) {
+ sessions.sort_by_key(|s| s.idle_time);
+ }
+ }
- let res = crate::protocol::Message::read(&sock).context(Read)?;
- match res {
- crate::protocol::Message::Sessions { sessions } => {
- println!("available sessions:");
+ let mut keymap = std::collections::HashMap::new();
+ let mut offset = 0;
+ for name in names {
+ let sessions = by_name.remove(&name).unwrap();
for session in sessions {
- println!(
- "{}: {}, {}x{}, TERM={}, idle {}s: {}",
- session.id,
- session.username,
- session.size.0,
- session.size.1,
- session.term_type,
- session.idle_time,
- session.title,
- );
+ let c = std::char::from_u32(('a' as u32) + offset).unwrap();
+ offset += 1;
+ if offset == 16 {
+ // 'q'
+ offset += 1;
+ }
+ keymap.insert(c, session);
}
}
- crate::protocol::Message::Error { msg } => {
- eprintln!("server error: {}", msg);
- }
- _ => {
- return Err(crate::error::Error::UnexpectedMessage {
- message: res,
- })
- .context(Common);
+
+ Self { sessions: keymap }
+ }
+
+ fn print(&self) -> Result<()> {
+ let term = crossterm::terminal();
+ term.clear(crossterm::ClearType::All)
+ .context(WriteTerminalCrossterm)?;
+
+ let name_width =
+ self.sessions.iter().map(|(_, s)| s.username.len()).max();
+ let name_width = if let Some(width) = name_width {
+ if width < 4 {
+ 4
+ } else {
+ width
+ }
+ } else {
+ 4
+ };
+ let (cols, _) = crossterm::terminal()
+ .size()
+ .context(crate::error::GetTerminalSize)
+ .context(Common)?;
+
+ println!("welcome to shellshare\r");
+ println!("available sessions:\r");
+ println!("\r");
+ println!(
+ " | {:3$} | {:7} | {:13} | title\r",
+ "name", "size", "idle", name_width
+ );
+ println!("{}\r", "-".repeat(cols as usize));
+
+ let mut prev_name: Option<&str> = None;
+ let mut chars: Vec<_> = self.sessions.keys().collect();
+ chars.sort();
+ for c in chars {
+ let session = self.sessions.get(c).unwrap();
+ let first = if let Some(name) = prev_name {
+ name != session.username
+ } else {
+ true
+ };
+ print!(
+ "{}) {:2$} ",
+ c,
+ if first { &session.username } else { "" },
+ name_width + 2,
+ );
+ print_session(session);
+
+ println!("\r");
+ prev_name = Some(&session.username);
}
+ print!(" --> ");
+ std::io::stdout().flush().context(FlushTerminal)?;
+
+ Ok(())
}
- Ok(())
+ fn id_for(&self, c: char) -> Option<&str> {
+ self.sessions.get(&c).map(|s| s.id.as_ref())
+ }
}
-fn watch(username: &str, address: &str, id: &str) -> Result<()> {
- tokio::run(
- WatchSession::new(
- id,
- address,
- username,
- std::time::Duration::from_secs(5),
- )?
- .map_err(|e| {
- eprintln!("{}", e);
- }),
- );
-
- Ok(())
+enum State {
+ LoggingIn,
+ Choosing { sessions: SortedSessions },
+ Watching { client: Box<crate::client::Client> },
}
struct WatchSession {
- client: crate::client::Client,
+ address: String,
+ username: String,
+ heartbeat_duration: std::time::Duration,
+
+ key_reader: crate::keyreader::KeyReader,
+ list_client: crate::client::Client,
+ state: State,
+ _raw_screen: crossterm::RawScreen,
}
impl WatchSession {
fn new(
- id: &str,
address: &str,
username: &str,
heartbeat_duration: std::time::Duration,
) -> Result<Self> {
- let client = crate::client::Client::watch(
+ let list_client = crate::client::Client::list(
address,
username,
heartbeat_duration,
- id,
);
- Ok(Self { client })
+
+ Ok(Self {
+ address: address.to_string(),
+ username: username.to_string(),
+ heartbeat_duration,
+
+ key_reader: crate::keyreader::KeyReader::new(
+ futures::task::current(),
+ )
+ .context(KeyReader)?,
+ list_client,
+ state: State::LoggingIn,
+ _raw_screen: crossterm::RawScreen::into_raw_mode()
+ .context(IntoRawMode)?,
+ })
}
}
@@ -153,12 +248,136 @@ impl WatchSession {
&'a mut Self,
) -> Result<
crate::component_future::Poll<()>,
- >] = &[&Self::poll_read_client];
+ >] = &[
+ &Self::poll_input,
+ &Self::poll_list_client,
+ &Self::poll_watch_client,
+ ];
+
+ fn poll_input(&mut self) -> Result<crate::component_future::Poll<()>> {
+ match &self.state {
+ State::LoggingIn => {
+ Ok(crate::component_future::Poll::NothingToDo)
+ }
+ State::Choosing { sessions } => {
+ match self.key_reader.poll().context(ReadKey)? {
+ futures::Async::Ready(Some(e)) => {
+ match e {
+ crossterm::InputEvent::Keyboard(
+ crossterm::KeyEvent::Char(' '),
+ ) => {
+ self.list_client.send_message(
+ crate::protocol::Message::list_sessions(),
+ );
+ }
+ crossterm::InputEvent::Keyboard(
+ crossterm::KeyEvent::Char('q'),
+ ) => {
+ println!("\r");
+ return Ok(
+ crate::component_future::Poll::Event(()),
+ );
+ }
+ crossterm::InputEvent::Keyboard(
+ crossterm::KeyEvent::Char(c),
+ ) => {
+ if let Some(id) = sessions.id_for(c) {
+ let term = crossterm::terminal();
+ term.clear(crossterm::ClearType::All)
+ .context(WriteTerminalCrossterm)?;
+ let client = crate::client::Client::watch(
+ &self.address,
+ &self.username,
+ self.heartbeat_duration,
+ id,
+ );
+ self.state = State::Watching {
+ client: Box::new(client),
+ };
+ }
+ }
+ _ => {}
+ }
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ futures::Async::Ready(None) => unreachable!(),
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Poll::NotReady)
+ }
+ }
+ }
+ State::Watching { .. } => {
+ match self.key_reader.poll().context(ReadKey)? {
+ futures::Async::Ready(Some(e)) => {
+ #[allow(clippy::single_match)]
+ match e {
+ crossterm::InputEvent::Keyboard(
+ crossterm::KeyEvent::Char('q'),
+ ) => {
+ self.state = State::LoggingIn;
+ self.list_client.send_message(
+ crate::protocol::Message::list_sessions(),
+ );
+ }
+ _ => {}
+ }
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ futures::Async::Ready(None) => unreachable!(),
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Poll::NotReady)
+ }
+ }
+ }
+ }
+ }
+
+ fn poll_list_client(
+ &mut self,
+ ) -> Result<crate::component_future::Poll<()>> {
+ match self.list_client.poll().context(Client)? {
+ futures::Async::Ready(Some(e)) => match e {
+ crate::client::Event::Reconnect => {
+ self.state = State::LoggingIn;
+ self.list_client.send_message(
+ crate::protocol::Message::list_sessions(),
+ );
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ crate::client::Event::ServerMessage(msg) => match msg {
+ crate::protocol::Message::Sessions { sessions } => {
+ let sorted = SortedSessions::new(sessions);
+ // TODO: async
+ sorted.print()?;
+ self.state = State::Choosing { sessions: sorted };
+ Ok(crate::component_future::Poll::DidWork)
+ }
+ msg => Err(crate::error::Error::UnexpectedMessage {
+ message: msg,
+ })
+ .context(Common),
+ },
+ },
+ futures::Async::Ready(None) => {
+ // the client should never exit on its own
+ unreachable!()
+ }
+ futures::Async::NotReady => {
+ Ok(crate::component_future::Poll::NotReady)
+ }
+ }
+ }
- fn poll_read_client(
+ fn poll_watch_client(
&mut self,
) -> Result<crate::component_future::Poll<()>> {
- match self.client.poll().context(Client)? {
+ let client = if let State::Watching { client } = &mut self.state {
+ client
+ } else {
+ return Ok(crate::component_future::Poll::NothingToDo);
+ };
+
+ match client.poll().context(Client)? {
futures::Async::Ready(Some(e)) => match e {
crate::client::Event::Reconnect => {
Ok(crate::component_future::Poll::DidWork)
@@ -195,6 +414,39 @@ impl WatchSession {
}
}
+fn print_session(session: &crate::protocol::Session) {
+ let size = format!("{}x{}", session.size.0, session.size.1);
+ print!(
+ "{:7} {:13} {}",
+ size,
+ format_time(session.idle_time),
+ session.title
+ );
+}
+
+fn format_time(dur: u32) -> String {
+ let secs = dur % 60;
+ let dur = dur / 60;
+ if dur == 0 {
+ return format!("{}s", secs);
+ }
+
+ let mins = dur % 60;
+ let dur = dur / 60;
+ if dur == 0 {
+ return format!("{}m{}s", mins, secs);
+ }
+
+ let hours = dur % 24;
+ let dur = dur / 24;
+ if dur == 0 {
+ return format!("{}h{}m{}s", hours, mins, secs);
+ }
+
+ let days = dur;
+ format!("{}d{}h{}m{}s", days, hours, mins, secs)
+}
+
#[must_use = "futures do nothing unless polled"]
impl futures::future::Future for WatchSession {
type Item = ();
diff --git a/src/keyreader.rs b/src/keyreader.rs
new file mode 100644
index 0000000..2fab2d6
--- /dev/null
+++ b/src/keyreader.rs
@@ -0,0 +1,70 @@
+use snafu::ResultExt as _;
+
+#[derive(Debug, snafu::Snafu)]
+pub enum Error {
+ #[snafu(display(
+ "failed to spawn a background thread to read terminal input: {}",
+ source
+ ))]
+ TerminalInputReadingThread { source: std::io::Error },
+
+ #[snafu(display("failed to read from event channel: {}", source))]
+ ReadChannel {
+ source: tokio::sync::mpsc::error::UnboundedRecvError,
+ },
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+pub struct KeyReader {
+ events: tokio::sync::mpsc::UnboundedReceiver<crossterm::InputEvent>,
+ quit: Option<tokio::sync::oneshot::Sender<()>>,
+}
+
+impl KeyReader {
+ pub fn new(task: futures::task::Task) -> Result<Self> {
+ let reader = crossterm::input().read_sync();
+ let (mut events_tx, events_rx) =
+ tokio::sync::mpsc::unbounded_channel();
+ let (quit_tx, mut quit_rx) = tokio::sync::oneshot::channel();
+ // TODO: this is pretty janky - it'd be better to build in more useful
+ // support to crossterm directly
+ std::thread::Builder::new()
+ .spawn(move || {
+ for event in reader {
+ // unwrap is unpleasant, but so is figuring out how to
+ // propagate the error back to the main thread
+ events_tx.try_send(event).unwrap();
+ task.notify();
+ if quit_rx.try_recv().is_ok() {
+ break;
+ }
+ }
+ })
+ .context(TerminalInputReadingThread)?;
+
+ Ok(Self {
+ events: events_rx,
+ quit: Some(quit_tx),
+ })
+ }
+}
+
+impl futures::stream::Stream for KeyReader {
+ type Item = crossterm::InputEvent;
+ type Error = Error;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ self.events.poll().context(ReadChannel)
+ }
+}
+
+impl Drop for KeyReader {
+ fn drop(&mut self) {
+ // don't care if it fails to send, this can happen if the thread
+ // terminates due to seeing a newline before the keyreader goes out of
+ // scope
+ let quit_tx = self.quit.take();
+ let _ = quit_tx.unwrap().send(());
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 9360b84..fac7f7d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,6 +10,7 @@ mod client;
mod cmd;
mod component_future;
mod error;
+mod keyreader;
mod process;
mod protocol;
mod server;
diff --git a/src/protocol.rs b/src/protocol.rs
index 8b2ce0b..892c476 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -179,6 +179,7 @@ impl Message {
Self::Resize { size }
}
+ #[allow(dead_code)]
pub fn read<R: std::io::Read>(r: R) -> Result<Self> {
Packet::read(r).and_then(Self::try_from)
}
@@ -192,6 +193,7 @@ impl Message {
})
}
+ #[allow(dead_code)]
pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> {
Packet::from(self).write(w)
}
diff --git a/src/server.rs b/src/server.rs
index 26b6f2d..7735ced 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -374,7 +374,20 @@ impl Server {
conn: &mut Connection,
message: crate::protocol::Message,
) -> Result<()> {
+ let username = if let ConnectionState::LoggedIn { username, .. } =
+ &mut conn.state
+ {
+ username
+ } else {
+ unreachable!()
+ };
match message {
+ crate::protocol::Message::Heartbeat => {
+ println!("got a heartbeat from {}", username);
+ conn.to_send
+ .push_back(crate::protocol::Message::heartbeat());
+ Ok(())
+ }
crate::protocol::Message::ListSessions => {
let sessions: Vec<_> =
self.casters().flat_map(Connection::session).collect();