diff options
-rw-r--r-- | src/client.rs | 24 | ||||
-rw-r--r-- | src/cmd/watch.rs | 392 | ||||
-rw-r--r-- | src/keyreader.rs | 70 | ||||
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/protocol.rs | 2 | ||||
-rw-r--r-- | src/server.rs | 13 |
6 files changed, 432 insertions, 70 deletions
diff --git a/src/client.rs b/src/client.rs index f1a7622..f24c9a6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -135,6 +135,30 @@ impl Client { } } + pub fn list( + address: &str, + username: &str, + heartbeat_duration: std::time::Duration, + ) -> Self { + let heartbeat_timer = + tokio::timer::Interval::new_interval(heartbeat_duration); + Self { + address: address.to_string(), + username: username.to_string(), + heartbeat_duration, + + heartbeat_timer, + reconnect_timer: None, + last_server_time: std::time::Instant::now(), + + rsock: ReadSocket::NotConnected, + wsock: WriteSocket::NotConnected, + + on_connect: vec![], + to_send: std::collections::VecDeque::new(), + } + } + pub fn send_message(&mut self, msg: crate::protocol::Message) { self.to_send.push_back(msg); } diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index 1df6311..337e2f0 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -11,14 +11,35 @@ pub enum Error { #[snafu(display("failed to read message: {}", source))] Read { source: crate::protocol::Error }, + #[snafu(display("failed to read key from terminal: {}", source))] + ReadKey { source: crate::keyreader::Error }, + #[snafu(display("failed to write message: {}", source))] Write { source: crate::protocol::Error }, #[snafu(display("failed to write to terminal: {}", source))] WriteTerminal { source: std::io::Error }, + #[snafu(display("failed to flush writes to terminal: {}", source))] + FlushTerminal { source: std::io::Error }, + + #[snafu(display("failed to write to terminal: {}", source))] + WriteTerminalCrossterm { source: crossterm::ErrorKind }, + #[snafu(display("communication with server failed: {}", source))] Client { source: crate::client::Error }, + + #[snafu(display("received error from server: {}", message))] + Server { message: String }, + + #[snafu(display( + "failed to put the terminal into raw mode: {}", + source + ))] + IntoRawMode { source: crossterm::ErrorKind }, + + #[snafu(display("failed to create key reader: {}", source))] + KeyReader { source: crate::keyreader::Error }, } pub type Result<T> = std::result::Result<T, Error>; @@ -35,7 +56,6 @@ pub fn cmd<'a, 'b>(app: clap::App<'a, 'b>) -> clap::App<'a, 'b> { .long("address") .takes_value(true), ) - .arg(clap::Arg::with_name("id")) } pub fn run<'a>(matches: &clap::ArgMatches<'a>) -> super::Result<()> { @@ -48,103 +68,178 @@ pub fn run<'a>(matches: &clap::ArgMatches<'a>) -> super::Result<()> { .context(Common) .context(super::Watch)?, matches.value_of("address").unwrap_or("127.0.0.1:4144"), - matches.value_of("id"), ) .context(super::Watch) } -fn run_impl(username: &str, address: &str, id: Option<&str>) -> Result<()> { - if let Some(id) = id { - watch(username, address, id) - } else { - list(username, address) - } +fn run_impl(username: &str, address: &str) -> Result<()> { + let username = username.to_string(); + let address = address.to_string(); + tokio::run(futures::lazy(move || { + futures::future::result(WatchSession::new( + &address, + &username, + std::time::Duration::from_secs(5), + )) + .flatten() + .map_err(|e| { + eprintln!("{}", e); + }) + })); + + Ok(()) } -fn list(username: &str, address: &str) -> Result<()> { - let sock = std::net::TcpStream::connect(address) - .context(crate::error::Connect) - .context(Common)?; - let term = std::env::var("TERM").unwrap_or_else(|_| "".to_string()); - let size = crossterm::terminal() - .size() - .context(crate::error::GetTerminalSize) - .context(Common)?; - let msg = crate::protocol::Message::login( - username, - &term, - (u32::from(size.0), u32::from(size.1)), - ); - msg.write(&sock).context(Write)?; +struct SortedSessions { + sessions: std::collections::HashMap<char, crate::protocol::Session>, +} - let msg = crate::protocol::Message::list_sessions(); - msg.write(&sock).context(Write)?; +impl SortedSessions { + fn new(sessions: Vec<crate::protocol::Session>) -> Self { + let mut by_name = std::collections::HashMap::new(); + for session in sessions { + if !by_name.contains_key(&session.username) { + by_name.insert(session.username.clone(), vec![]); + } + by_name.get_mut(&session.username).unwrap().push(session); + } + let mut names: Vec<_> = by_name.keys().cloned().collect(); + names.sort_by(|a: &String, b: &String| { + let a_idle = + by_name[a].iter().min_by_key(|session| session.idle_time); + let b_idle = + by_name[b].iter().min_by_key(|session| session.idle_time); + a_idle.unwrap().idle_time.cmp(&b_idle.unwrap().idle_time) + }); + for name in &names { + if let Some(sessions) = by_name.get_mut(name) { + sessions.sort_by_key(|s| s.idle_time); + } + } - let res = crate::protocol::Message::read(&sock).context(Read)?; - match res { - crate::protocol::Message::Sessions { sessions } => { - println!("available sessions:"); + let mut keymap = std::collections::HashMap::new(); + let mut offset = 0; + for name in names { + let sessions = by_name.remove(&name).unwrap(); for session in sessions { - println!( - "{}: {}, {}x{}, TERM={}, idle {}s: {}", - session.id, - session.username, - session.size.0, - session.size.1, - session.term_type, - session.idle_time, - session.title, - ); + let c = std::char::from_u32(('a' as u32) + offset).unwrap(); + offset += 1; + if offset == 16 { + // 'q' + offset += 1; + } + keymap.insert(c, session); } } - crate::protocol::Message::Error { msg } => { - eprintln!("server error: {}", msg); - } - _ => { - return Err(crate::error::Error::UnexpectedMessage { - message: res, - }) - .context(Common); + + Self { sessions: keymap } + } + + fn print(&self) -> Result<()> { + let term = crossterm::terminal(); + term.clear(crossterm::ClearType::All) + .context(WriteTerminalCrossterm)?; + + let name_width = + self.sessions.iter().map(|(_, s)| s.username.len()).max(); + let name_width = if let Some(width) = name_width { + if width < 4 { + 4 + } else { + width + } + } else { + 4 + }; + let (cols, _) = crossterm::terminal() + .size() + .context(crate::error::GetTerminalSize) + .context(Common)?; + + println!("welcome to shellshare\r"); + println!("available sessions:\r"); + println!("\r"); + println!( + " | {:3$} | {:7} | {:13} | title\r", + "name", "size", "idle", name_width + ); + println!("{}\r", "-".repeat(cols as usize)); + + let mut prev_name: Option<&str> = None; + let mut chars: Vec<_> = self.sessions.keys().collect(); + chars.sort(); + for c in chars { + let session = self.sessions.get(c).unwrap(); + let first = if let Some(name) = prev_name { + name != session.username + } else { + true + }; + print!( + "{}) {:2$} ", + c, + if first { &session.username } else { "" }, + name_width + 2, + ); + print_session(session); + + println!("\r"); + prev_name = Some(&session.username); } + print!(" --> "); + std::io::stdout().flush().context(FlushTerminal)?; + + Ok(()) } - Ok(()) + fn id_for(&self, c: char) -> Option<&str> { + self.sessions.get(&c).map(|s| s.id.as_ref()) + } } -fn watch(username: &str, address: &str, id: &str) -> Result<()> { - tokio::run( - WatchSession::new( - id, - address, - username, - std::time::Duration::from_secs(5), - )? - .map_err(|e| { - eprintln!("{}", e); - }), - ); - - Ok(()) +enum State { + LoggingIn, + Choosing { sessions: SortedSessions }, + Watching { client: Box<crate::client::Client> }, } struct WatchSession { - client: crate::client::Client, + address: String, + username: String, + heartbeat_duration: std::time::Duration, + + key_reader: crate::keyreader::KeyReader, + list_client: crate::client::Client, + state: State, + _raw_screen: crossterm::RawScreen, } impl WatchSession { fn new( - id: &str, address: &str, username: &str, heartbeat_duration: std::time::Duration, ) -> Result<Self> { - let client = crate::client::Client::watch( + let list_client = crate::client::Client::list( address, username, heartbeat_duration, - id, ); - Ok(Self { client }) + + Ok(Self { + address: address.to_string(), + username: username.to_string(), + heartbeat_duration, + + key_reader: crate::keyreader::KeyReader::new( + futures::task::current(), + ) + .context(KeyReader)?, + list_client, + state: State::LoggingIn, + _raw_screen: crossterm::RawScreen::into_raw_mode() + .context(IntoRawMode)?, + }) } } @@ -153,12 +248,136 @@ impl WatchSession { &'a mut Self, ) -> Result< crate::component_future::Poll<()>, - >] = &[&Self::poll_read_client]; + >] = &[ + &Self::poll_input, + &Self::poll_list_client, + &Self::poll_watch_client, + ]; + + fn poll_input(&mut self) -> Result<crate::component_future::Poll<()>> { + match &self.state { + State::LoggingIn => { + Ok(crate::component_future::Poll::NothingToDo) + } + State::Choosing { sessions } => { + match self.key_reader.poll().context(ReadKey)? { + futures::Async::Ready(Some(e)) => { + match e { + crossterm::InputEvent::Keyboard( + crossterm::KeyEvent::Char(' '), + ) => { + self.list_client.send_message( + crate::protocol::Message::list_sessions(), + ); + } + crossterm::InputEvent::Keyboard( + crossterm::KeyEvent::Char('q'), + ) => { + println!("\r"); + return Ok( + crate::component_future::Poll::Event(()), + ); + } + crossterm::InputEvent::Keyboard( + crossterm::KeyEvent::Char(c), + ) => { + if let Some(id) = sessions.id_for(c) { + let term = crossterm::terminal(); + term.clear(crossterm::ClearType::All) + .context(WriteTerminalCrossterm)?; + let client = crate::client::Client::watch( + &self.address, + &self.username, + self.heartbeat_duration, + id, + ); + self.state = State::Watching { + client: Box::new(client), + }; + } + } + _ => {} + } + Ok(crate::component_future::Poll::DidWork) + } + futures::Async::Ready(None) => unreachable!(), + futures::Async::NotReady => { + Ok(crate::component_future::Poll::NotReady) + } + } + } + State::Watching { .. } => { + match self.key_reader.poll().context(ReadKey)? { + futures::Async::Ready(Some(e)) => { + #[allow(clippy::single_match)] + match e { + crossterm::InputEvent::Keyboard( + crossterm::KeyEvent::Char('q'), + ) => { + self.state = State::LoggingIn; + self.list_client.send_message( + crate::protocol::Message::list_sessions(), + ); + } + _ => {} + } + Ok(crate::component_future::Poll::DidWork) + } + futures::Async::Ready(None) => unreachable!(), + futures::Async::NotReady => { + Ok(crate::component_future::Poll::NotReady) + } + } + } + } + } + + fn poll_list_client( + &mut self, + ) -> Result<crate::component_future::Poll<()>> { + match self.list_client.poll().context(Client)? { + futures::Async::Ready(Some(e)) => match e { + crate::client::Event::Reconnect => { + self.state = State::LoggingIn; + self.list_client.send_message( + crate::protocol::Message::list_sessions(), + ); + Ok(crate::component_future::Poll::DidWork) + } + crate::client::Event::ServerMessage(msg) => match msg { + crate::protocol::Message::Sessions { sessions } => { + let sorted = SortedSessions::new(sessions); + // TODO: async + sorted.print()?; + self.state = State::Choosing { sessions: sorted }; + Ok(crate::component_future::Poll::DidWork) + } + msg => Err(crate::error::Error::UnexpectedMessage { + message: msg, + }) + .context(Common), + }, + }, + futures::Async::Ready(None) => { + // the client should never exit on its own + unreachable!() + } + futures::Async::NotReady => { + Ok(crate::component_future::Poll::NotReady) + } + } + } - fn poll_read_client( + fn poll_watch_client( &mut self, ) -> Result<crate::component_future::Poll<()>> { - match self.client.poll().context(Client)? { + let client = if let State::Watching { client } = &mut self.state { + client + } else { + return Ok(crate::component_future::Poll::NothingToDo); + }; + + match client.poll().context(Client)? { futures::Async::Ready(Some(e)) => match e { crate::client::Event::Reconnect => { Ok(crate::component_future::Poll::DidWork) @@ -195,6 +414,39 @@ impl WatchSession { } } +fn print_session(session: &crate::protocol::Session) { + let size = format!("{}x{}", session.size.0, session.size.1); + print!( + "{:7} {:13} {}", + size, + format_time(session.idle_time), + session.title + ); +} + +fn format_time(dur: u32) -> String { + let secs = dur % 60; + let dur = dur / 60; + if dur == 0 { + return format!("{}s", secs); + } + + let mins = dur % 60; + let dur = dur / 60; + if dur == 0 { + return format!("{}m{}s", mins, secs); + } + + let hours = dur % 24; + let dur = dur / 24; + if dur == 0 { + return format!("{}h{}m{}s", hours, mins, secs); + } + + let days = dur; + format!("{}d{}h{}m{}s", days, hours, mins, secs) +} + #[must_use = "futures do nothing unless polled"] impl futures::future::Future for WatchSession { type Item = (); diff --git a/src/keyreader.rs b/src/keyreader.rs new file mode 100644 index 0000000..2fab2d6 --- /dev/null +++ b/src/keyreader.rs @@ -0,0 +1,70 @@ +use snafu::ResultExt as _; + +#[derive(Debug, snafu::Snafu)] +pub enum Error { + #[snafu(display( + "failed to spawn a background thread to read terminal input: {}", + source + ))] + TerminalInputReadingThread { source: std::io::Error }, + + #[snafu(display("failed to read from event channel: {}", source))] + ReadChannel { + source: tokio::sync::mpsc::error::UnboundedRecvError, + }, +} + +pub type Result<T> = std::result::Result<T, Error>; + +pub struct KeyReader { + events: tokio::sync::mpsc::UnboundedReceiver<crossterm::InputEvent>, + quit: Option<tokio::sync::oneshot::Sender<()>>, +} + +impl KeyReader { + pub fn new(task: futures::task::Task) -> Result<Self> { + let reader = crossterm::input().read_sync(); + let (mut events_tx, events_rx) = + tokio::sync::mpsc::unbounded_channel(); + let (quit_tx, mut quit_rx) = tokio::sync::oneshot::channel(); + // TODO: this is pretty janky - it'd be better to build in more useful + // support to crossterm directly + std::thread::Builder::new() + .spawn(move || { + for event in reader { + // unwrap is unpleasant, but so is figuring out how to + // propagate the error back to the main thread + events_tx.try_send(event).unwrap(); + task.notify(); + if quit_rx.try_recv().is_ok() { + break; + } + } + }) + .context(TerminalInputReadingThread)?; + + Ok(Self { + events: events_rx, + quit: Some(quit_tx), + }) + } +} + +impl futures::stream::Stream for KeyReader { + type Item = crossterm::InputEvent; + type Error = Error; + + fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { + self.events.poll().context(ReadChannel) + } +} + +impl Drop for KeyReader { + fn drop(&mut self) { + // don't care if it fails to send, this can happen if the thread + // terminates due to seeing a newline before the keyreader goes out of + // scope + let quit_tx = self.quit.take(); + let _ = quit_tx.unwrap().send(()); + } +} diff --git a/src/main.rs b/src/main.rs index 9360b84..fac7f7d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ mod client; mod cmd; mod component_future; mod error; +mod keyreader; mod process; mod protocol; mod server; diff --git a/src/protocol.rs b/src/protocol.rs index 8b2ce0b..892c476 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -179,6 +179,7 @@ impl Message { Self::Resize { size } } + #[allow(dead_code)] pub fn read<R: std::io::Read>(r: R) -> Result<Self> { Packet::read(r).and_then(Self::try_from) } @@ -192,6 +193,7 @@ impl Message { }) } + #[allow(dead_code)] pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> { Packet::from(self).write(w) } diff --git a/src/server.rs b/src/server.rs index 26b6f2d..7735ced 100644 --- a/src/server.rs +++ b/src/server.rs @@ -374,7 +374,20 @@ impl Server { conn: &mut Connection, message: crate::protocol::Message, ) -> Result<()> { + let username = if let ConnectionState::LoggedIn { username, .. } = + &mut conn.state + { + username + } else { + unreachable!() + }; match message { + crate::protocol::Message::Heartbeat => { + println!("got a heartbeat from {}", username); + conn.to_send + .push_back(crate::protocol::Message::heartbeat()); + Ok(()) + } crate::protocol::Message::ListSessions => { let sessions: Vec<_> = self.casters().flat_map(Connection::session).collect(); |