diff options
Diffstat (limited to 'src/shell')
-rw-r--r-- | src/shell/event.rs | 135 | ||||
-rw-r--r-- | src/shell/history/entry.rs | 344 | ||||
-rw-r--r-- | src/shell/history/mod.rs | 302 | ||||
-rw-r--r-- | src/shell/history/pty.rs | 244 | ||||
-rw-r--r-- | src/shell/inputs/clock.rs | 27 | ||||
-rw-r--r-- | src/shell/inputs/git.rs (renamed from src/shell/git.rs) | 73 | ||||
-rw-r--r-- | src/shell/inputs/mod.rs | 32 | ||||
-rw-r--r-- | src/shell/inputs/signals.rs | 30 | ||||
-rw-r--r-- | src/shell/inputs/stdin.rs | 17 | ||||
-rw-r--r-- | src/shell/mod.rs | 497 | ||||
-rw-r--r-- | src/shell/old_history.rs | 185 | ||||
-rw-r--r-- | src/shell/readline.rs | 12 |
12 files changed, 1028 insertions, 870 deletions
diff --git a/src/shell/event.rs b/src/shell/event.rs index 025f3c4..dc58e6f 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -1,53 +1,87 @@ +use crate::prelude::*; + #[derive(Debug)] pub enum Event { Key(textmode::Key), Resize((u16, u16)), PtyOutput, - PtyClose, ChildRunPipeline(usize, (usize, usize)), ChildSuspend(usize), - GitInfo(Option<super::git::Info>), + ChildExit(usize, super::history::ExitInfo, Option<Env>), + GitInfo(Option<super::inputs::GitInfo>), ClockTimer, } -pub struct Reader { - pending: async_std::sync::Mutex<Pending>, - cvar: async_std::sync::Condvar, +pub fn channel() -> (Writer, Reader) { + let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); + (Writer::new(event_w), Reader::new(event_r)) +} + +#[derive(Clone)] +pub struct Writer(tokio::sync::mpsc::UnboundedSender<Event>); + +impl Writer { + pub fn new(event_w: tokio::sync::mpsc::UnboundedSender<Event>) -> Self { + Self(event_w) + } + + pub fn send(&self, event: Event) { + // the only time this should ever error is when the application is + // shutting down, at which point we don't actually care about any + // further dropped messages + #[allow(clippy::let_underscore_drop)] + let _ = self.0.send(event); + } } +pub struct Reader(std::sync::Arc<InnerReader>); + impl Reader { pub fn new( - input: async_std::channel::Receiver<Event>, - ) -> async_std::sync::Arc<Self> { - let this = async_std::sync::Arc::new(Self { - pending: async_std::sync::Mutex::new(Pending::new()), - cvar: async_std::sync::Condvar::new(), - }); + mut input: tokio::sync::mpsc::UnboundedReceiver<Event>, + ) -> Self { + let inner = std::sync::Arc::new(InnerReader::new()); { - let this = async_std::sync::Arc::clone(&this); - async_std::task::spawn(async move { - while let Ok(event) = input.recv().await { - this.new_event(Some(event)).await; + let inner = inner.clone(); + tokio::spawn(async move { + while let Some(event) = input.recv().await { + inner.new_event(Some(event)); } - this.new_event(None).await; + inner.new_event(None); }); } - this + Self(inner) } pub async fn recv(&self) -> Option<Event> { - let mut pending = self - .cvar - .wait_until(self.pending.lock().await, |pending| { - pending.has_event() - }) - .await; - pending.get_event() + self.0.recv().await + } +} + +struct InnerReader { + pending: std::sync::Mutex<Pending>, + cvar: tokio::sync::Notify, +} + +impl InnerReader { + fn new() -> Self { + Self { + pending: std::sync::Mutex::new(Pending::new()), + cvar: tokio::sync::Notify::new(), + } + } + + async fn recv(&self) -> Option<Event> { + loop { + if let Some(event) = self.pending.lock().unwrap().get_event() { + return event; + } + self.cvar.notified().await; + } } - async fn new_event(&self, event: Option<Event>) { - let mut pending = self.pending.lock().await; - pending.new_event(event); + fn new_event(&self, event: Option<Event>) { + self.pending.lock().unwrap().new_event(event); self.cvar.notify_one(); } } @@ -58,10 +92,10 @@ struct Pending { key: std::collections::VecDeque<textmode::Key>, size: Option<(u16, u16)>, pty_output: bool, - pty_close: bool, child_run_pipeline: std::collections::VecDeque<(usize, (usize, usize))>, child_suspend: std::collections::VecDeque<usize>, - git_info: Option<Option<super::git::Info>>, + child_exit: Option<(usize, super::history::ExitInfo, Option<Env>)>, + git_info: Option<Option<super::inputs::GitInfo>>, clock_timer: bool, done: bool, } @@ -71,53 +105,40 @@ impl Pending { Self::default() } - fn has_event(&self) -> bool { - self.done - || !self.key.is_empty() - || self.size.is_some() - || self.pty_output - || self.pty_close - || !self.child_run_pipeline.is_empty() - || !self.child_suspend.is_empty() - || self.git_info.is_some() - || self.clock_timer - } - - fn get_event(&mut self) -> Option<Event> { + fn get_event(&mut self) -> Option<Option<Event>> { if self.done { - return None; + return Some(None); } if let Some(key) = self.key.pop_front() { - return Some(Event::Key(key)); + return Some(Some(Event::Key(key))); } if let Some(size) = self.size.take() { - return Some(Event::Resize(size)); - } - if self.pty_close { - self.pty_close = false; - return Some(Event::PtyClose); + return Some(Some(Event::Resize(size))); } if let Some((idx, span)) = self.child_run_pipeline.pop_front() { - return Some(Event::ChildRunPipeline(idx, span)); + return Some(Some(Event::ChildRunPipeline(idx, span))); } if let Some(idx) = self.child_suspend.pop_front() { - return Some(Event::ChildSuspend(idx)); + return Some(Some(Event::ChildSuspend(idx))); + } + if let Some((idx, exit_info, env)) = self.child_exit.take() { + return Some(Some(Event::ChildExit(idx, exit_info, env))); } if let Some(info) = self.git_info.take() { - return Some(Event::GitInfo(info)); + return Some(Some(Event::GitInfo(info))); } if self.clock_timer { self.clock_timer = false; - return Some(Event::ClockTimer); + return Some(Some(Event::ClockTimer)); } // process_output should be last because it will often be the case // that there is ~always new process output (cat on large files, yes, // etc) and that shouldn't prevent other events from happening if self.pty_output { self.pty_output = false; - return Some(Event::PtyOutput); + return Some(Some(Event::PtyOutput)); } - unreachable!() + None } fn new_event(&mut self, event: Option<Event>) { @@ -125,13 +146,15 @@ impl Pending { Some(Event::Key(key)) => self.key.push_back(key), Some(Event::Resize(size)) => self.size = Some(size), Some(Event::PtyOutput) => self.pty_output = true, - Some(Event::PtyClose) => self.pty_close = true, Some(Event::ChildRunPipeline(idx, span)) => { self.child_run_pipeline.push_back((idx, span)); } Some(Event::ChildSuspend(idx)) => { self.child_suspend.push_back(idx); } + Some(Event::ChildExit(idx, exit_info, env)) => { + self.child_exit = Some((idx, exit_info, env)); + } Some(Event::GitInfo(info)) => self.git_info = Some(info), Some(Event::ClockTimer) => self.clock_timer = true, None => self.done = true, diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs index a45d99d..0491bf7 100644 --- a/src/shell/history/entry.rs +++ b/src/shell/history/entry.rs @@ -1,25 +1,13 @@ use crate::shell::prelude::*; -enum State { - Running((usize, usize)), - Exited(ExitInfo), -} - pub struct Entry { cmdline: String, env: Env, - state: State, - vt: vt100::Parser, - audible_bell_state: usize, - visual_bell_state: usize, - audible_bell: bool, - visual_bell: bool, - real_bell_pending: bool, + pty: super::pty::Pty, fullscreen: Option<bool>, - input: async_std::channel::Sender<Vec<u8>>, - resize: async_std::channel::Sender<(u16, u16)>, - start_time: time::OffsetDateTime, start_instant: std::time::Instant, + start_time: time::OffsetDateTime, + state: State, } impl Entry { @@ -27,39 +15,37 @@ impl Entry { cmdline: String, env: Env, size: (u16, u16), - input: async_std::channel::Sender<Vec<u8>>, - resize: async_std::channel::Sender<(u16, u16)>, - ) -> Self { - let span = (0, cmdline.len()); - Self { + event_w: crate::shell::event::Writer, + ) -> Result<Self> { + let start_instant = std::time::Instant::now(); + let start_time = time::OffsetDateTime::now_utc(); + + let (pty, pts) = super::pty::Pty::new(size, event_w.clone()).unwrap(); + let (child, fh) = Self::spawn_command(&cmdline, &env, &pts)?; + tokio::spawn(Self::task(child, fh, env.idx(), event_w)); + Ok(Self { cmdline, env, - state: State::Running(span), - vt: vt100::Parser::new(size.0, size.1, 0), - audible_bell_state: 0, - visual_bell_state: 0, - audible_bell: false, - visual_bell: false, - real_bell_pending: false, - input, - resize, + pty, fullscreen: None, - start_time: time::OffsetDateTime::now_utc(), - start_instant: std::time::Instant::now(), - } + start_instant, + start_time, + state: State::Running((0, 0)), + }) } pub fn render( - &mut self, + &self, out: &mut impl textmode::Textmode, - idx: usize, entry_count: usize, - size: (u16, u16), + vt: &mut super::pty::Vt, focused: bool, scrolling: bool, offset: time::UtcOffset, ) { - let time = self.exit_info().map_or_else( + let idx = self.env.idx(); + let size = out.screen().size(); + let time = self.state.exit_info().map_or_else( || { format!( "[{}]", @@ -77,13 +63,11 @@ impl Entry { }, ); - self.bell(out); - if focused { - self.audible_bell = false; - self.visual_bell = false; + if vt.bell(focused) { + out.write(b"\x07"); } - set_bgcolor(out, idx, focused); + Self::set_bgcolor(out, idx, focused); out.set_fgcolor(textmode::color::YELLOW); let entry_count_width = format!("{}", entry_count + 1).len(); let idx_str = format!("{}", idx + 1); @@ -92,8 +76,8 @@ impl Entry { out.write_str(" "); out.reset_attributes(); - set_bgcolor(out, idx, focused); - if let Some(info) = self.exit_info() { + Self::set_bgcolor(out, idx, focused); + if let Some(info) = self.state.exit_info() { if info.status.signal().is_some() { out.set_fgcolor(textmode::color::MAGENTA); } else if info.status.success() { @@ -107,13 +91,13 @@ impl Entry { } out.reset_attributes(); - if self.audible_bell || self.visual_bell { + if vt.is_bell() { out.set_bgcolor(textmode::Color::Rgb(64, 16, 16)); } else { - set_bgcolor(out, idx, focused); + Self::set_bgcolor(out, idx, focused); } out.write_str("$ "); - set_bgcolor(out, idx, focused); + Self::set_bgcolor(out, idx, focused); let start = usize::from(out.screen().cursor_position().1); let end = usize::from(size.1) - time.len() - 2; let max_len = end - start; @@ -130,7 +114,7 @@ impl Entry { if !cmd[span.0..span.1].is_empty() { out.set_bgcolor(textmode::Color::Rgb(16, 64, 16)); out.write_str(&cmd[span.0..span.1]); - set_bgcolor(out, idx, focused); + Self::set_bgcolor(out, idx, focused); } if !cmd[span.1..].is_empty() { out.write_str(&cmd[span.1..]); @@ -155,7 +139,7 @@ impl Entry { } out.reset_attributes(); - set_bgcolor(out, idx, focused); + Self::set_bgcolor(out, idx, focused); let cur_pos = out.screen().cursor_position(); out.write_str(&" ".repeat( usize::from(size.1) - time.len() - 1 - usize::from(cur_pos.1), @@ -164,7 +148,7 @@ impl Entry { out.write_str(" "); out.reset_attributes(); - if self.binary() { + if vt.binary() { let msg = "This appears to be binary data. Fullscreen this entry to view anyway."; let len: u16 = msg.len().try_into().unwrap(); out.move_to( @@ -175,7 +159,8 @@ impl Entry { out.write_str(msg); out.hide_cursor(true); } else { - let last_row = self.output_lines(focused && !scrolling); + let last_row = + vt.output_lines(focused && !scrolling, self.state.running()); let mut max_lines = self.max_lines(entry_count); if last_row > max_lines { out.write(b"\r\n"); @@ -185,7 +170,7 @@ impl Entry { max_lines -= 1; } let mut out_row = out.screen().cursor_position().0 + 1; - let screen = self.vt.screen(); + let screen = vt.screen(); let pos = screen.cursor_position(); let mut wrapped = false; let mut cursor_found = None; @@ -216,66 +201,41 @@ impl Entry { } } } - out.reset_attributes(); - } - pub fn render_fullscreen(&mut self, out: &mut impl textmode::Textmode) { - out.write(&self.vt.screen().state_formatted()); - self.bell(out); - self.audible_bell = false; - self.visual_bell = false; out.reset_attributes(); } - pub async fn send_input(&self, bytes: Vec<u8>) { - if self.running() { - self.input.send(bytes).await.unwrap(); - } - } - - pub async fn resize(&mut self, size: (u16, u16)) { - if self.running() { - self.resize.send(size).await.unwrap(); - self.vt.set_size(size.0, size.1); - } + pub fn render_fullscreen(&self, out: &mut impl textmode::Textmode) { + self.pty.with_vt_mut(|vt| { + out.write(&vt.screen().state_formatted()); + if vt.bell(true) { + out.write(b"\x07"); + } + out.reset_attributes(); + }); } - pub fn size(&self) -> (u16, u16) { - self.vt.screen().size() + pub fn input(&self, bytes: Vec<u8>) { + self.pty.input(bytes); } - pub fn process(&mut self, input: &[u8]) { - self.vt.process(input); - let screen = self.vt.screen(); - - let new_audible_bell_state = screen.audible_bell_count(); - if new_audible_bell_state != self.audible_bell_state { - self.audible_bell = true; - self.real_bell_pending = true; - self.audible_bell_state = new_audible_bell_state; - } - - let new_visual_bell_state = screen.visual_bell_count(); - if new_visual_bell_state != self.visual_bell_state { - self.visual_bell = true; - self.real_bell_pending = true; - self.visual_bell_state = new_visual_bell_state; - } + pub fn resize(&self, size: (u16, u16)) { + self.pty.resize(size); } pub fn cmd(&self) -> &str { &self.cmdline } - pub fn env(&self) -> &Env { - &self.env + pub fn start_time(&self) -> time::OffsetDateTime { + self.start_time } pub fn toggle_fullscreen(&mut self) { if let Some(fullscreen) = self.fullscreen { self.fullscreen = Some(!fullscreen); } else { - self.fullscreen = Some(!self.vt.screen().alternate_screen()); + self.fullscreen = Some(!self.pty.fullscreen()); } } @@ -284,110 +244,186 @@ impl Entry { } pub fn running(&self) -> bool { - matches!(self.state, State::Running(_)) + self.state.running() } - pub fn binary(&self) -> bool { - self.vt.screen().errors() > 5 + pub fn exited(&mut self, exit_info: ExitInfo) { + self.state = State::Exited(exit_info); } pub fn lines(&self, entry_count: usize, focused: bool) -> usize { + let running = self.running(); 1 + std::cmp::min( - self.output_lines(focused), + self.pty.with_vt(|vt| vt.output_lines(focused, running)), self.max_lines(entry_count), ) } + pub fn should_fullscreen(&self) -> bool { + self.fullscreen.unwrap_or_else(|| self.pty.fullscreen()) + } + + pub fn lock_vt(&self) -> std::sync::MutexGuard<super::pty::Vt> { + self.pty.lock_vt() + } + + pub fn set_span(&mut self, new_span: (usize, usize)) { + if let State::Running(ref mut span) = self.state { + *span = new_span; + } + } + fn max_lines(&self, entry_count: usize) -> usize { if self.env.idx() == entry_count - 1 { - usize::from(self.size().0) * 2 / 3 + 15 } else { 5 } } - pub fn output_lines(&self, focused: bool) -> usize { - if self.binary() { - return 1; + fn set_bgcolor( + out: &mut impl textmode::Textmode, + idx: usize, + focus: bool, + ) { + if focus { + out.set_bgcolor(textmode::Color::Rgb(0x56, 0x1b, 0x8b)); + } else if idx % 2 == 0 { + out.set_bgcolor(textmode::Color::Rgb(0x24, 0x21, 0x00)); + } else { + out.set_bgcolor(textmode::Color::Rgb(0x20, 0x20, 0x20)); } + } - let screen = self.vt.screen(); - let mut last_row = 0; - for (idx, row) in screen.rows(0, self.size().1).enumerate() { - if !row.is_empty() { - last_row = idx + 1; - } + fn spawn_command( + cmdline: &str, + env: &Env, + pts: &pty_process::Pts, + ) -> Result<(tokio::process::Child, std::fs::File)> { + let mut cmd = pty_process::Command::new(crate::info::current_exe()?); + cmd.args(&["-c", cmdline, "--status-fd", "3"]); + env.apply(&mut cmd); + let (from_r, from_w) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: from_r was just opened above and is not used anywhere else + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; + // Safety: dup2 is an async-signal-safe function + unsafe { + cmd.pre_exec(move || { + nix::unistd::dup2(from_w, 3)?; + Ok(()) + }); } - if focused && self.running() { - last_row = std::cmp::max( - last_row, - usize::from(screen.cursor_position().0) + 1, - ); - } - last_row + let child = cmd.spawn(pts)?; + nix::unistd::close(from_w)?; + Ok((child, fh)) } - pub fn should_fullscreen(&self) -> bool { - self.fullscreen - .unwrap_or_else(|| self.vt.screen().alternate_screen()) - } + async fn task( + mut child: tokio::process::Child, + fh: std::fs::File, + idx: usize, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result<std::process::ExitStatus>), + } - pub fn set_span(&mut self, span: (usize, usize)) { - if matches!(self.state, State::Running(_)) { - self.state = State::Running(span); + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() + == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; + } + } + }); + + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + let mut new_env = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { + crate::runner::Event::RunPipeline(new_span) => { + // we could just update the span in place here, but we + // do this as an event so that we can also trigger a + // refresh + event_w.send(Event::ChildRunPipeline(idx, new_span)); + } + crate::runner::Event::Suspend => { + event_w.send(Event::ChildSuspend(idx)); + } + crate::runner::Event::Exit(env) => { + new_env = Some(env); + } + }, + Res::Exit(status) => { + exit_status = Some(status.unwrap()); + } + } } + event_w.send(Event::ChildExit( + idx, + ExitInfo::new(exit_status.unwrap()), + new_env, + )); } +} - pub async fn finish( - &mut self, - env: Env, - event_w: async_std::channel::Sender<Event>, - ) { - self.state = State::Exited(ExitInfo::new(env.latest_status())); - self.env = env; - event_w.send(Event::PtyClose).await.unwrap(); - } +enum State { + Running((usize, usize)), + Exited(ExitInfo), +} +impl State { fn exit_info(&self) -> Option<&ExitInfo> { - match &self.state { - State::Running(..) => None, - State::Exited(exit_info) => Some(exit_info), + match self { + Self::Running(_) => None, + Self::Exited(exit_info) => Some(exit_info), } } - fn bell(&mut self, out: &mut impl textmode::Textmode) { - if self.real_bell_pending { - if self.audible_bell { - out.write(b"\x07"); - } - if self.visual_bell { - out.write(b"\x1bg"); - } - self.real_bell_pending = false; - } + fn running(&self) -> bool { + self.exit_info().is_none() } } -struct ExitInfo { - status: async_std::process::ExitStatus, +#[derive(Debug)] +pub struct ExitInfo { + status: std::process::ExitStatus, instant: std::time::Instant, } impl ExitInfo { - fn new(status: async_std::process::ExitStatus) -> Self { + fn new(status: std::process::ExitStatus) -> Self { Self { status, instant: std::time::Instant::now(), } } } - -fn set_bgcolor(out: &mut impl textmode::Textmode, idx: usize, focus: bool) { - if focus { - out.set_bgcolor(textmode::Color::Rgb(0x56, 0x1b, 0x8b)); - } else if idx % 2 == 0 { - out.set_bgcolor(textmode::Color::Rgb(0x24, 0x21, 0x00)); - } else { - out.set_bgcolor(textmode::Color::Rgb(0x20, 0x20, 0x20)); - } -} diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 1bc4e62..91149c1 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -1,12 +1,12 @@ use crate::shell::prelude::*; mod entry; -pub use entry::Entry; +pub use entry::{Entry, ExitInfo}; mod pty; pub struct History { size: (u16, u16), - entries: Vec<crate::mutex::Mutex<Entry>>, + entries: Vec<Entry>, scroll_pos: usize, } @@ -19,31 +19,27 @@ impl History { } } - pub async fn render( + pub fn render( &self, out: &mut impl textmode::Textmode, repl_lines: usize, focus: Option<usize>, scrolling: bool, offset: time::UtcOffset, - ) -> anyhow::Result<()> { - let mut used_lines = repl_lines; + ) { let mut cursor = None; - for (idx, mut entry) in - self.visible(repl_lines, focus, scrolling).await.rev() + for (idx, used_lines, mut vt) in + self.visible(repl_lines, focus, scrolling).rev() { let focused = focus.map_or(false, |focus| idx == focus); - used_lines += - entry.lines(self.entry_count(), focused && !scrolling); out.move_to( (usize::from(self.size.0) - used_lines).try_into().unwrap(), 0, ); - entry.render( + self.entries[idx].render( out, - idx, self.entry_count(), - self.size, + &mut *vt, focused, scrolling, offset, @@ -59,67 +55,38 @@ impl History { out.move_to(pos.0, pos.1); out.hide_cursor(hide); } - Ok(()) } - pub async fn render_fullscreen( - &self, - out: &mut impl textmode::Textmode, - idx: usize, - ) { - let mut entry = self.entries[idx].lock_arc().await; - entry.render_fullscreen(out); + pub fn entry(&self, idx: usize) -> &Entry { + &self.entries[idx] } - pub async fn send_input(&mut self, idx: usize, input: Vec<u8>) { - self.entry(idx).await.send_input(input).await; + pub fn entry_mut(&mut self, idx: usize) -> &mut Entry { + &mut self.entries[idx] } - pub async fn resize(&mut self, size: (u16, u16)) { + pub fn resize(&mut self, size: (u16, u16)) { self.size = size; for entry in &self.entries { - entry.lock_arc().await.resize(size).await; + entry.resize(size); } } - pub async fn run( + pub fn run( &mut self, - cmdline: &str, - env: &Env, - event_w: async_std::channel::Sender<Event>, - ) -> anyhow::Result<usize> { - let (input_w, input_r) = async_std::channel::unbounded(); - let (resize_w, resize_r) = async_std::channel::unbounded(); - - let entry = crate::mutex::new(Entry::new( - cmdline.to_string(), - env.clone(), - self.size, - input_w, - resize_w, - )); - run_commands( - cmdline.to_string(), - crate::mutex::clone(&entry), - env.clone(), - input_r, - resize_r, - event_w, - ); - - self.entries.push(entry); - Ok(self.entries.len() - 1) - } - - pub async fn entry(&self, idx: usize) -> crate::mutex::Guard<Entry> { - self.entries[idx].lock_arc().await + cmdline: String, + env: Env, + event_w: crate::shell::event::Writer, + ) { + self.entries + .push(Entry::new(cmdline, env, self.size, event_w).unwrap()); } pub fn entry_count(&self) -> usize { self.entries.len() } - pub async fn make_focus_visible( + pub fn make_focus_visible( &mut self, repl_lines: usize, focus: Option<usize>, @@ -134,8 +101,7 @@ impl History { while focus < self .visible(repl_lines, Some(focus), scrolling) - .await - .map(|(idx, _)| idx) + .map(|(idx, ..)| idx) .next() .unwrap() { @@ -149,8 +115,7 @@ impl History { while focus > self .visible(repl_lines, Some(focus), scrolling) - .await - .map(|(idx, _)| idx) + .map(|(idx, ..)| idx) .last() .unwrap() { @@ -158,225 +123,86 @@ impl History { } } - async fn visible( + pub async fn save(&self) { + // TODO: we'll probably want some amount of flock or something here + let mut fh = tokio::fs::OpenOptions::new() + .append(true) + .open(crate::dirs::history_file()) + .await + .unwrap(); + for entry in &self.entries { + fh.write_all( + format!( + ": {}:0;{}\n", + entry.start_time().unix_timestamp(), + entry.cmd() + ) + .as_bytes(), + ) + .await + .unwrap(); + } + } + + fn visible( &self, repl_lines: usize, focus: Option<usize>, scrolling: bool, ) -> VisibleEntries { let mut iter = VisibleEntries::new(); - if self.entries.is_empty() { - return iter; - } - let mut used_lines = repl_lines; for (idx, entry) in self.entries.iter().enumerate().rev().skip(self.scroll_pos) { - let entry = entry.lock_arc().await; let focused = focus.map_or(false, |focus| idx == focus); used_lines += entry.lines(self.entry_count(), focused && !scrolling); if used_lines > usize::from(self.size.0) { break; } - iter.add(idx, entry); + iter.add(idx, used_lines, entry.lock_vt()); } iter } } -struct VisibleEntries { - entries: std::collections::VecDeque<(usize, crate::mutex::Guard<Entry>)>, +struct VisibleEntries<'a> { + entries: std::collections::VecDeque<( + usize, + usize, + std::sync::MutexGuard<'a, pty::Vt>, + )>, } -impl VisibleEntries { +impl<'a> VisibleEntries<'a> { fn new() -> Self { Self { entries: std::collections::VecDeque::new(), } } - fn add(&mut self, idx: usize, entry: crate::mutex::Guard<Entry>) { + fn add( + &mut self, + idx: usize, + offset: usize, + vt: std::sync::MutexGuard<'a, pty::Vt>, + ) { // push_front because we are adding them in reverse order - self.entries.push_front((idx, entry)); + self.entries.push_front((idx, offset, vt)); } } -impl std::iter::Iterator for VisibleEntries { - type Item = (usize, crate::mutex::Guard<Entry>); +impl<'a> std::iter::Iterator for VisibleEntries<'a> { + type Item = (usize, usize, std::sync::MutexGuard<'a, pty::Vt>); fn next(&mut self) -> Option<Self::Item> { self.entries.pop_front() } } -impl std::iter::DoubleEndedIterator for VisibleEntries { +impl<'a> std::iter::DoubleEndedIterator for VisibleEntries<'a> { fn next_back(&mut self) -> Option<Self::Item> { self.entries.pop_back() } } - -fn run_commands( - cmdline: String, - entry: crate::mutex::Mutex<Entry>, - mut env: Env, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<Event>, -) { - async_std::task::spawn(async move { - let pty = match pty::Pty::new( - entry.lock_arc().await.size(), - &entry, - input_r, - resize_r, - event_w.clone(), - ) { - Ok(pty) => pty, - Err(e) => { - let mut entry = entry.lock_arc().await; - entry.process( - format!("nbsh: failed to allocate pty: {}\r\n", e) - .as_bytes(), - ); - env.set_status(async_std::process::ExitStatus::from_raw( - 1 << 8, - )); - entry.finish(env, event_w).await; - return; - } - }; - - let status = - match spawn_commands(&cmdline, &pty, &mut env, event_w.clone()) - .await - { - Ok(status) => status, - Err(e) => { - let mut entry = entry.lock_arc().await; - entry.process( - format!( - "nbsh: failed to spawn {}: {}\r\n", - cmdline, e - ) - .as_bytes(), - ); - env.set_status(async_std::process::ExitStatus::from_raw( - 1 << 8, - )); - entry.finish(env, event_w).await; - return; - } - }; - env.set_status(status); - - entry.lock_arc().await.finish(env, event_w).await; - pty.close().await; - }); -} - -async fn spawn_commands( - cmdline: &str, - pty: &pty::Pty, - env: &mut Env, - event_w: async_std::channel::Sender<Event>, -) -> anyhow::Result<async_std::process::ExitStatus> { - let mut cmd = pty_process::Command::new(std::env::current_exe()?); - cmd.args(&["-c", cmdline, "--status-fd", "3"]); - env.apply(&mut cmd); - let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; - // Safety: dup2 is an async-signal-safe function - unsafe { - cmd.pre_exec(move || { - nix::unistd::dup2(from_w, 3)?; - Ok(()) - }); - } - let child = pty.spawn(cmd)?; - nix::unistd::close(from_w)?; - - let (read_w, read_r) = async_std::channel::unbounded(); - let new_read = move || { - let read_w = read_w.clone(); - async_std::task::spawn(async move { - let event = blocking::unblock(move || { - // Safety: from_r was just opened above and is only - // referenced in this closure, which takes ownership of it - // at the start and returns ownership of it at the end - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - let event = bincode::deserialize_from(&fh); - let _ = fh.into_raw_fd(); - event - }) - .await; - if read_w.is_closed() { - // we should never drop read_r while there are still valid - // things to read - assert!(event.is_err()); - } else { - read_w.send(event).await.unwrap(); - } - }); - }; - - new_read(); - let mut read_done = false; - let mut exit_done = None; - loop { - enum Res { - Read(bincode::Result<crate::runner::Event>), - Exit(std::io::Result<std::process::ExitStatus>), - } - - let read_r = read_r.clone(); - let read = async move { Res::Read(read_r.recv().await.unwrap()) }; - let exit = async { - Res::Exit(if exit_done.is_none() { - child.status_no_drop().await - } else { - std::future::pending().await - }) - }; - match read.or(exit).await { - Res::Read(Ok(event)) => match event { - crate::runner::Event::RunPipeline(idx, span) => { - event_w - .send(Event::ChildRunPipeline(idx, span)) - .await - .unwrap(); - new_read(); - } - crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).await.unwrap(); - new_read(); - } - crate::runner::Event::Exit(new_env) => { - *env = new_env; - read_done = true; - } - }, - Res::Read(Err(e)) => { - if let bincode::ErrorKind::Io(io_e) = &*e { - if io_e.kind() == std::io::ErrorKind::UnexpectedEof { - read_done = true; - } else { - anyhow::bail!(e); - } - } else { - anyhow::bail!(e); - } - } - Res::Exit(Ok(status)) => { - exit_done = Some(status); - } - Res::Exit(Err(e)) => { - anyhow::bail!(e); - } - } - if let (true, Some(status)) = (read_done, exit_done) { - nix::unistd::close(from_r)?; - return Ok(status); - } - } -} diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 0fe0942..cef4ca9 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -1,106 +1,196 @@ use crate::shell::prelude::*; +#[derive(Debug)] +enum Request { + Input(Vec<u8>), + Resize(u16, u16), +} + pub struct Pty { - pty: async_std::sync::Arc<pty_process::Pty>, - close_w: async_std::channel::Sender<()>, + vt: std::sync::Arc<std::sync::Mutex<Vt>>, + request_w: tokio::sync::mpsc::UnboundedSender<Request>, } impl Pty { pub fn new( size: (u16, u16), - entry: &crate::mutex::Mutex<super::Entry>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<Event>, - ) -> anyhow::Result<Self> { - let (close_w, close_r) = async_std::channel::unbounded(); + event_w: crate::shell::event::Writer, + ) -> Result<(Self, pty_process::Pts)> { + let (request_w, request_r) = tokio::sync::mpsc::unbounded_channel(); let pty = pty_process::Pty::new()?; pty.resize(pty_process::Size::new(size.0, size.1))?; - let pty = async_std::sync::Arc::new(pty); - - async_std::task::spawn(pty_task( - async_std::sync::Arc::clone(&pty), - crate::mutex::clone(entry), - input_r, - resize_r, - close_r, + let pts = pty.pts()?; + + let vt = std::sync::Arc::new(std::sync::Mutex::new(Vt::new(size))); + + tokio::spawn(Self::task( + pty, + std::sync::Arc::clone(&vt), + request_r, event_w, )); - Ok(Self { pty, close_w }) + Ok((Self { vt, request_w }, pts)) } - pub fn spawn( - &self, - mut cmd: pty_process::Command, - ) -> anyhow::Result<async_std::process::Child> { - Ok(cmd.spawn(&self.pty)?) + pub fn with_vt<T>(&self, f: impl FnOnce(&Vt) -> T) -> T { + let vt = self.vt.lock().unwrap(); + f(&*vt) } - pub async fn close(&self) { - self.close_w.send(()).await.unwrap(); + pub fn with_vt_mut<T>(&self, f: impl FnOnce(&mut Vt) -> T) -> T { + let mut vt = self.vt.lock().unwrap(); + f(&mut *vt) + } + + pub fn lock_vt(&self) -> std::sync::MutexGuard<Vt> { + self.vt.lock().unwrap() + } + + pub fn fullscreen(&self) -> bool { + self.with_vt(|vt| vt.screen().alternate_screen()) + } + + pub fn input(&self, bytes: Vec<u8>) { + #[allow(clippy::let_underscore_drop)] + let _ = self.request_w.send(Request::Input(bytes)); + } + + pub fn resize(&self, size: (u16, u16)) { + #[allow(clippy::let_underscore_drop)] + let _ = self.request_w.send(Request::Resize(size.0, size.1)); } -} -async fn pty_task( - pty: async_std::sync::Arc<pty_process::Pty>, - entry: crate::mutex::Mutex<super::Entry>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - close_r: async_std::channel::Receiver<()>, - event_w: async_std::channel::Sender<Event>, -) { - loop { + async fn task( + pty: pty_process::Pty, + vt: std::sync::Arc<std::sync::Mutex<Vt>>, + request_r: tokio::sync::mpsc::UnboundedReceiver<Request>, + event_w: crate::shell::event::Writer, + ) { enum Res { - Read(Result<usize, std::io::Error>), - Write(Result<Vec<u8>, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - Close(Result<(), async_std::channel::RecvError>), + Read(Result<bytes::Bytes, std::io::Error>), + Request(Request), } - let mut buf = [0_u8; 4096]; - let read = async { Res::Read((&*pty).read(&mut buf).await) }; - let write = async { Res::Write(input_r.recv().await) }; - let resize = async { Res::Resize(resize_r.recv().await) }; - let close = async { Res::Close(close_r.recv().await) }; - match read.race(write).race(resize).or(close).await { - Res::Read(res) => match res { - Ok(bytes) => { - entry.lock_arc().await.process(&buf[..bytes]); - event_w.send(Event::PtyOutput).await.unwrap(); - } - Err(e) => { - if e.raw_os_error() != Some(libc::EIO) { + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) + .map(Res::Request) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { + Res::Read(res) => match res { + Ok(bytes) => { + vt.lock().unwrap().process(&bytes); + event_w.send(Event::PtyOutput); + } + Err(e) => { + // this means that there are no longer any open pts + // fds. we could alternately signal this through an + // explicit channel at ChildExit time, but this seems + // reliable enough. + if e.raw_os_error() == Some(libc::EIO) { + return; + } panic!("pty read failed: {:?}", e); } + }, + Res::Request(Request::Input(bytes)) => { + pty_w.write(&bytes).await.unwrap(); } - }, - Res::Write(res) => match res { - Ok(bytes) => { - (&*pty).write(&bytes).await.unwrap(); - } - Err(e) => { - panic!("failed to read from input channel: {}", e); - } - }, - Res::Resize(res) => match res { - Ok(size) => { - pty.resize(pty_process::Size::new(size.0, size.1)) - .unwrap(); - } - Err(e) => { - panic!("failed to read from resize channel: {}", e); + Res::Request(Request::Resize(row, col)) => { + pty_w.resize(pty_process::Size::new(row, col)).unwrap(); + vt.lock().unwrap().set_size((row, col)); } - }, - Res::Close(res) => match res { - Ok(()) => { - event_w.send(Event::PtyClose).await.unwrap(); - return; - } - Err(e) => { - panic!("failed to read from close channel: {}", e); - } - }, + } + } + } +} + +pub struct Vt { + vt: vt100::Parser, + bell_state: usize, + bell: bool, + real_bell_pending: bool, +} + +impl Vt { + pub fn new(size: (u16, u16)) -> Self { + Self { + vt: vt100::Parser::new(size.0, size.1, 0), + bell_state: 0, + bell: false, + real_bell_pending: false, + } + } + + pub fn process(&mut self, bytes: &[u8]) { + self.vt.process(bytes); + let screen = self.vt.screen(); + + let new_bell_state = screen.audible_bell_count(); + if new_bell_state != self.bell_state { + self.bell = true; + self.real_bell_pending = true; + self.bell_state = new_bell_state; + } + } + + pub fn screen(&self) -> &vt100::Screen { + self.vt.screen() + } + + pub fn set_size(&mut self, size: (u16, u16)) { + self.vt.set_size(size.0, size.1); + } + + pub fn is_bell(&self) -> bool { + self.bell + } + + pub fn bell(&mut self, focused: bool) -> bool { + let mut should = false; + if self.real_bell_pending { + if self.bell { + should = true; + } + self.real_bell_pending = false; + } + if focused { + self.bell = false; + } + should + } + + pub fn binary(&self) -> bool { + self.vt.screen().errors() > 5 + } + + pub fn output_lines(&self, focused: bool, running: bool) -> usize { + if self.binary() { + return 1; + } + + let screen = self.vt.screen(); + let mut last_row = 0; + for (idx, row) in screen.rows(0, screen.size().1).enumerate() { + if !row.is_empty() { + last_row = idx + 1; + } + } + if focused && running { + last_row = std::cmp::max( + last_row, + usize::from(screen.cursor_position().0) + 1, + ); } + last_row } } diff --git a/src/shell/inputs/clock.rs b/src/shell/inputs/clock.rs new file mode 100644 index 0000000..250466e --- /dev/null +++ b/src/shell/inputs/clock.rs @@ -0,0 +1,27 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Self { + tokio::spawn(Self::task(event_w)); + Self + } + + async fn task(event_w: crate::shell::event::Writer) { + let now_clock = time::OffsetDateTime::now_utc(); + let now_instant = tokio::time::Instant::now(); + let mut interval = tokio::time::interval_at( + now_instant + + std::time::Duration::from_nanos( + 1_000_000_000_u64 + .saturating_sub(now_clock.nanosecond().into()), + ), + std::time::Duration::from_secs(1), + ); + loop { + interval.tick().await; + event_w.send(Event::ClockTimer); + } + } +} diff --git a/src/shell/git.rs b/src/shell/inputs/git.rs index 48e5eea..dbae1c4 100644 --- a/src/shell/git.rs +++ b/src/shell/inputs/git.rs @@ -1,3 +1,76 @@ +use crate::shell::prelude::*; + +use notify::Watcher as _; + +pub struct Handler { + git_w: tokio::sync::mpsc::UnboundedSender<std::path::PathBuf>, +} + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Self { + let (git_w, git_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn(Self::task(git_r, event_w)); + Self { git_w } + } + + pub fn new_dir(&self, path: std::path::PathBuf) { + self.git_w.send(path).unwrap(); + } + + async fn task( + mut git_r: tokio::sync::mpsc::UnboundedReceiver<std::path::PathBuf>, + event_w: crate::shell::event::Writer, + ) { + // clippy can't tell that we assign to this later + #[allow(clippy::no_effect_underscore_binding)] + let mut _active_watcher = None; + while let Some(mut dir) = git_r.recv().await { + while let Ok(newer_dir) = git_r.try_recv() { + dir = newer_dir; + } + let repo = git2::Repository::discover(&dir).ok(); + if repo.is_some() { + let (sync_watch_w, sync_watch_r) = std::sync::mpsc::channel(); + let (watch_w, mut watch_r) = + tokio::sync::mpsc::unbounded_channel(); + let mut watcher = + notify::recommended_watcher(sync_watch_w).unwrap(); + watcher + .watch(&dir, notify::RecursiveMode::Recursive) + .unwrap(); + tokio::task::spawn_blocking(move || { + while let Ok(event) = sync_watch_r.recv() { + if watch_w.send(event).is_err() { + break; + } + } + }); + let event_w = event_w.clone(); + tokio::spawn(async move { + while watch_r.recv().await.is_some() { + let repo = git2::Repository::discover(&dir).ok(); + let info = tokio::task::spawn_blocking(|| { + repo.map(|repo| Info::new(&repo)) + }) + .await + .unwrap(); + event_w.send(Event::GitInfo(info)); + } + }); + _active_watcher = Some(watcher); + } else { + _active_watcher = None; + } + let info = tokio::task::spawn_blocking(|| { + repo.map(|repo| Info::new(&repo)) + }) + .await + .unwrap(); + event_w.send(Event::GitInfo(info)); + } + } +} + #[derive(Debug)] pub struct Info { modified_files: bool, diff --git a/src/shell/inputs/mod.rs b/src/shell/inputs/mod.rs new file mode 100644 index 0000000..48590a2 --- /dev/null +++ b/src/shell/inputs/mod.rs @@ -0,0 +1,32 @@ +use crate::shell::prelude::*; + +mod clock; +mod git; +pub use git::Info as GitInfo; +mod signals; +mod stdin; + +pub struct Handler { + _clock: clock::Handler, + git: git::Handler, + _signals: signals::Handler, + _stdin: stdin::Handler, +} + +impl Handler { + pub fn new( + input: textmode::blocking::Input, + event_w: crate::shell::event::Writer, + ) -> Result<Self> { + Ok(Self { + _clock: clock::Handler::new(event_w.clone()), + git: git::Handler::new(event_w.clone()), + _signals: signals::Handler::new(event_w.clone())?, + _stdin: stdin::Handler::new(input, event_w), + }) + } + + pub fn new_dir(&self, path: std::path::PathBuf) { + self.git.new_dir(path); + } +} diff --git a/src/shell/inputs/signals.rs b/src/shell/inputs/signals.rs new file mode 100644 index 0000000..4b91273 --- /dev/null +++ b/src/shell/inputs/signals.rs @@ -0,0 +1,30 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Result<Self> { + let signals = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change(), + )?; + tokio::spawn(Self::task(signals, event_w)); + Ok(Self) + } + + async fn task( + mut signals: tokio::signal::unix::Signal, + event_w: crate::shell::event::Writer, + ) { + event_w.send(resize_event()); + while signals.recv().await.is_some() { + event_w.send(resize_event()); + } + } +} + +fn resize_event() -> Event { + Event::Resize(terminal_size::terminal_size().map_or( + (24, 80), + |(terminal_size::Width(w), terminal_size::Height(h))| (h, w), + )) +} diff --git a/src/shell/inputs/stdin.rs b/src/shell/inputs/stdin.rs new file mode 100644 index 0000000..b966307 --- /dev/null +++ b/src/shell/inputs/stdin.rs @@ -0,0 +1,17 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new( + mut input: textmode::blocking::Input, + event_w: crate::shell::event::Writer, + ) -> Self { + std::thread::spawn(move || { + while let Some(key) = input.read_key().unwrap() { + event_w.send(Event::Key(key)); + } + }); + Self + } +} diff --git a/src/shell/mod.rs b/src/shell/mod.rs index f7080a4..fa7147b 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -1,16 +1,16 @@ use crate::shell::prelude::*; -use notify::Watcher as _; use textmode::Textmode as _; mod event; -mod git; mod history; +mod inputs; +mod old_history; mod prelude; mod readline; -pub async fn main() -> anyhow::Result<i32> { - let mut input = textmode::Input::new().await?; +pub async fn main() -> Result<i32> { + let mut input = textmode::blocking::Input::new()?; let mut output = textmode::Output::new().await?; // avoid the guards getting stuck in a task that doesn't run to @@ -18,163 +18,40 @@ pub async fn main() -> anyhow::Result<i32> { let _input_guard = input.take_raw_guard(); let _output_guard = output.take_screen_guard(); - let (event_w, event_r) = async_std::channel::unbounded(); + let (event_w, event_r) = event::channel(); - { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - let signals = signal_hook_async_std::Signals::new(&[ - nix::sys::signal::Signal::SIGWINCH as i32, - ])?; - let event_w = event_w.clone(); - async_std::task::spawn(async move { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - let mut signals = async_std::stream::once( - nix::sys::signal::Signal::SIGWINCH as i32, - ) - .chain(signals); - while signals.next().await.is_some() { - event_w - .send(Event::Resize( - terminal_size::terminal_size().map_or( - (24, 80), - |( - terminal_size::Width(w), - terminal_size::Height(h), - )| { (h, w) }, - ), - )) - .await - .unwrap(); - } - }); - } - - { - let event_w = event_w.clone(); - async_std::task::spawn(async move { - while let Some(key) = input.read_key().await.unwrap() { - event_w.send(Event::Key(key)).await.unwrap(); - } - }); - } - - // redraw the clock every second - { - let event_w = event_w.clone(); - async_std::task::spawn(async move { - let first_sleep = 1_000_000_000_u64.saturating_sub( - time::OffsetDateTime::now_utc().nanosecond().into(), - ); - async_std::task::sleep(std::time::Duration::from_nanos( - first_sleep, - )) - .await; - let mut interval = async_std::stream::interval( - std::time::Duration::from_secs(1), - ); - event_w.send(Event::ClockTimer).await.unwrap(); - while interval.next().await.is_some() { - event_w.send(Event::ClockTimer).await.unwrap(); - } - }); - } - - let (git_w, git_r): (async_std::channel::Sender<std::path::PathBuf>, _) = - async_std::channel::unbounded(); - { - let event_w = event_w.clone(); - let mut _active_watcher = None; - async_std::task::spawn(async move { - while let Ok(mut dir) = git_r.recv().await { - while let Ok(newer_dir) = git_r.try_recv() { - dir = newer_dir; - } - let repo = git2::Repository::discover(&dir).ok(); - if repo.is_some() { - let (sync_watch_w, sync_watch_r) = - std::sync::mpsc::channel(); - let (watch_w, watch_r) = async_std::channel::unbounded(); - let mut watcher = notify::RecommendedWatcher::new( - sync_watch_w, - std::time::Duration::from_millis(100), - ) - .unwrap(); - watcher - .watch(&dir, notify::RecursiveMode::Recursive) - .unwrap(); - async_std::task::spawn(blocking::unblock(move || { - while let Ok(event) = sync_watch_r.recv() { - let watch_w = watch_w.clone(); - let send_failed = - async_std::task::block_on(async move { - watch_w.send(event).await.is_err() - }); - if send_failed { - break; - } - } - })); - let event_w = event_w.clone(); - async_std::task::spawn(async move { - while watch_r.recv().await.is_ok() { - let repo = git2::Repository::discover(&dir).ok(); - let info = blocking::unblock(|| { - repo.map(|repo| git::Info::new(&repo)) - }) - .await; - if event_w - .send(Event::GitInfo(info)) - .await - .is_err() - { - break; - } - } - }); - _active_watcher = Some(watcher); - } else { - _active_watcher = None; - } - let info = blocking::unblock(|| { - repo.map(|repo| git::Info::new(&repo)) - }) - .await; - event_w.send(Event::GitInfo(info)).await.unwrap(); - } - }); - } + let inputs = inputs::Handler::new(input, event_w.clone()).unwrap(); let mut shell = Shell::new(crate::info::get_offset())?; let mut prev_dir = shell.env.pwd().to_path_buf(); - git_w.send(prev_dir.clone()).await.unwrap(); - let event_reader = event::Reader::new(event_r); - while let Some(event) = event_reader.recv().await { - let dir = shell.env().pwd(); - if dir != prev_dir { - prev_dir = dir.to_path_buf(); - git_w.send(dir.to_path_buf()).await.unwrap(); - } - match shell.handle_event(event, &event_w).await { + inputs.new_dir(prev_dir.clone()); + while let Some(event) = event_r.recv().await { + match shell.handle_event(event, &event_w) { Some(Action::Refresh) => { - shell.render(&mut output).await?; + shell.render(&mut output)?; output.refresh().await?; } Some(Action::HardRefresh) => { - shell.render(&mut output).await?; + shell.render(&mut output)?; output.hard_refresh().await?; } Some(Action::Resize(rows, cols)) => { output.set_size(rows, cols); - shell.render(&mut output).await?; + shell.render(&mut output)?; output.hard_refresh().await?; } Some(Action::Quit) => break, None => {} } + let dir = shell.env().pwd(); + if dir != prev_dir { + prev_dir = dir.to_path_buf(); + inputs.new_dir(dir.to_path_buf()); + } } + shell.history.save().await; + Ok(0) } @@ -201,8 +78,9 @@ pub enum Action { pub struct Shell { readline: readline::Readline, history: history::History, + old_history: old_history::History, env: Env, - git: Option<git::Info>, + git: Option<inputs::GitInfo>, focus: Focus, scene: Scene, escape: bool, @@ -211,13 +89,14 @@ pub struct Shell { } impl Shell { - pub fn new(offset: time::UtcOffset) -> anyhow::Result<Self> { + pub fn new(offset: time::UtcOffset) -> Result<Self> { let mut env = Env::new()?; env.set_var("SHELL", std::env::current_exe()?); env.set_var("TERM", "screen"); Ok(Self { readline: readline::Readline::new(), history: history::History::new(), + old_history: old_history::History::new(), env, git: None, focus: Focus::Readline, @@ -228,87 +107,76 @@ impl Shell { }) } - pub async fn render( - &self, - out: &mut impl textmode::Textmode, - ) -> anyhow::Result<()> { + pub fn render(&self, out: &mut impl textmode::Textmode) -> Result<()> { out.clear(); out.write(&vt100::Parser::default().screen().input_mode_formatted()); match self.scene { Scene::Readline => match self.focus { Focus::Readline => { - self.history - .render( + self.history.render( + out, + self.readline.lines(), + None, + false, + self.offset, + ); + self.readline.render( + out, + &self.env, + self.git.as_ref(), + true, + self.offset, + )?; + } + Focus::History(idx) => { + if self.hide_readline { + self.history.render( + out, + 0, + Some(idx), + false, + self.offset, + ); + } else { + self.history.render( out, self.readline.lines(), - None, + Some(idx), false, self.offset, - ) - .await?; - self.readline - .render( + ); + let pos = out.screen().cursor_position(); + self.readline.render( out, &self.env, self.git.as_ref(), - true, + false, self.offset, - ) - .await?; - } - Focus::History(idx) => { - if self.hide_readline { - self.history - .render(out, 0, Some(idx), false, self.offset) - .await?; - } else { - self.history - .render( - out, - self.readline.lines(), - Some(idx), - false, - self.offset, - ) - .await?; - let pos = out.screen().cursor_position(); - self.readline - .render( - out, - &self.env, - self.git.as_ref(), - false, - self.offset, - ) - .await?; + )?; out.move_to(pos.0, pos.1); } } Focus::Scrolling(idx) => { - self.history - .render( - out, - self.readline.lines(), - idx, - true, - self.offset, - ) - .await?; - self.readline - .render( - out, - &self.env, - self.git.as_ref(), - idx.is_none(), - self.offset, - ) - .await?; + self.history.render( + out, + self.readline.lines(), + idx, + true, + self.offset, + ); + self.readline.render( + out, + &self.env, + self.git.as_ref(), + idx.is_none(), + self.offset, + )?; out.hide_cursor(true); } }, Scene::Fullscreen => { if let Focus::History(idx) = self.focus { - self.history.render_fullscreen(out, idx).await; + self.history.entry(idx).render_fullscreen(out); } else { unreachable!(); } @@ -317,79 +185,72 @@ impl Shell { Ok(()) } - pub async fn handle_event( + pub fn handle_event( &mut self, event: Event, - event_w: &async_std::channel::Sender<Event>, + event_w: &crate::shell::event::Writer, ) -> Option<Action> { match event { Event::Key(key) => { return if self.escape { self.escape = false; - self.handle_key_escape(key, event_w.clone()).await + self.handle_key_escape(&key, event_w.clone()) } else if key == textmode::Key::Ctrl(b'e') { self.escape = true; None } else { match self.focus { Focus::Readline => { - self.handle_key_readline(key, event_w.clone()) - .await + self.handle_key_readline(&key, event_w.clone()) } Focus::History(idx) => { - self.handle_key_history(key, idx).await; + self.handle_key_history(key, idx); None } Focus::Scrolling(_) => { - self.handle_key_escape(key, event_w.clone()).await + self.handle_key_escape(&key, event_w.clone()) } } }; } Event::Resize(new_size) => { - self.readline.resize(new_size).await; - self.history.resize(new_size).await; + self.readline.resize(new_size); + self.history.resize(new_size); return Some(Action::Resize(new_size.0, new_size.1)); } Event::PtyOutput => { // the number of visible lines may have changed, so make sure // the focus is still visible - self.history - .make_focus_visible( - self.readline.lines(), - self.focus_idx(), - matches!(self.focus, Focus::Scrolling(_)), - ) - .await; - self.scene = self.default_scene(self.focus, None).await; - } - Event::PtyClose => { - if let Some(idx) = self.focus_idx() { - let entry = self.history.entry(idx).await; - if !entry.running() { + self.history.make_focus_visible( + self.readline.lines(), + self.focus_idx(), + matches!(self.focus, Focus::Scrolling(_)), + ); + self.scene = self.default_scene(self.focus); + } + Event::ChildExit(idx, exit_info, env) => { + self.history.entry_mut(idx).exited(exit_info); + if self.focus_idx() == Some(idx) { + if let Some(env) = env { if self.hide_readline { let idx = self.env.idx(); - self.env = entry.env().clone(); + self.env = env; self.env.set_idx(idx); } - self.set_focus( - if self.hide_readline { - Focus::Readline - } else { - Focus::Scrolling(Some(idx)) - }, - Some(entry), - ) - .await; } + self.set_focus(if self.hide_readline { + Focus::Readline + } else { + Focus::Scrolling(Some(idx)) + }); } } Event::ChildRunPipeline(idx, span) => { - self.history.entry(idx).await.set_span(span); + self.history.entry_mut(idx).set_span(span); } Event::ChildSuspend(idx) => { if self.focus_idx() == Some(idx) { - self.set_focus(Focus::Readline, None).await; + self.set_focus(Focus::Readline); } } Event::GitInfo(info) => { @@ -400,18 +261,17 @@ impl Shell { Some(Action::Refresh) } - async fn handle_key_escape( + fn handle_key_escape( &mut self, - key: textmode::Key, - event_w: async_std::channel::Sender<Event>, + key: &textmode::Key, + event_w: crate::shell::event::Writer, ) -> Option<Action> { match key { textmode::Key::Ctrl(b'd') => { return Some(Action::Quit); } textmode::Key::Ctrl(b'e') => { - self.set_focus(Focus::Scrolling(self.focus_idx()), None) - .await; + self.set_focus(Focus::Scrolling(self.focus_idx())); } textmode::Key::Ctrl(b'l') => { return Some(Action::HardRefresh); @@ -419,48 +279,37 @@ impl Shell { textmode::Key::Ctrl(b'm') => { if let Some(idx) = self.focus_idx() { self.readline.clear_input(); - let entry = self.history.entry(idx).await; - let input = entry.cmd(); - let idx = self - .history - .run(input, &self.env, event_w.clone()) - .await - .unwrap(); - self.set_focus(Focus::History(idx), Some(entry)).await; + self.history.run( + self.history.entry(idx).cmd().to_string(), + self.env.clone(), + event_w, + ); + let idx = self.history.entry_count() - 1; + self.set_focus(Focus::History(idx)); self.hide_readline = true; self.env.set_idx(idx + 1); } else { - self.set_focus(Focus::Readline, None).await; + self.set_focus(Focus::Readline); } } textmode::Key::Char(' ') => { - let idx = self.focus_idx(); - let (focus, entry) = if let Some(idx) = idx { - let entry = self.history.entry(idx).await; - (entry.running(), Some(entry)) + if let Some(idx) = self.focus_idx() { + if self.history.entry(idx).running() { + self.set_focus(Focus::History(idx)); + } } else { - (true, None) - }; - if focus { - self.set_focus( - idx.map_or(Focus::Readline, |idx| { - Focus::History(idx) - }), - entry, - ) - .await; + self.set_focus(Focus::Readline); } } textmode::Key::Char('e') => { if let Focus::History(idx) = self.focus { - self.handle_key_history(textmode::Key::Ctrl(b'e'), idx) - .await; + self.handle_key_history(textmode::Key::Ctrl(b'e'), idx); } } textmode::Key::Char('f') => { if let Some(idx) = self.focus_idx() { - let mut entry = self.history.entry(idx).await; let mut focus = Focus::History(idx); + let entry = self.history.entry_mut(idx); if let Focus::Scrolling(_) = self.focus { entry.set_fullscreen(true); } else { @@ -469,38 +318,30 @@ impl Shell { focus = Focus::Scrolling(Some(idx)); } } - self.set_focus(focus, Some(entry)).await; + self.set_focus(focus); } } textmode::Key::Char('i') => { if let Some(idx) = self.focus_idx() { - let entry = self.history.entry(idx).await; - self.readline.set_input(entry.cmd()); - self.set_focus(Focus::Readline, Some(entry)).await; + self.readline + .set_input(self.history.entry(idx).cmd().to_string()); + self.set_focus(Focus::Readline); } } textmode::Key::Char('j') | textmode::Key::Down => { - self.set_focus( - Focus::Scrolling(self.scroll_down(self.focus_idx())), - None, - ) - .await; + self.set_focus(Focus::Scrolling(self.scroll_down())); } textmode::Key::Char('k') | textmode::Key::Up => { - self.set_focus( - Focus::Scrolling(self.scroll_up(self.focus_idx())), - None, - ) - .await; + self.set_focus(Focus::Scrolling(self.scroll_up())); } textmode::Key::Char('n') => { - self.set_focus(self.next_running().await, None).await; + self.set_focus(self.next_running()); } textmode::Key::Char('p') => { - self.set_focus(self.prev_running().await, None).await; + self.set_focus(self.prev_running()); } textmode::Key::Char('r') => { - self.set_focus(Focus::Readline, None).await; + self.set_focus(Focus::Readline); } _ => { return None; @@ -509,10 +350,10 @@ impl Shell { Some(Action::Refresh) } - async fn handle_key_readline( + fn handle_key_readline( &mut self, - key: textmode::Key, - event_w: async_std::channel::Sender<Event>, + key: &textmode::Key, + event_w: crate::shell::event::Writer, ) -> Option<Action> { match key { textmode::Key::Char(c) => { @@ -528,12 +369,13 @@ impl Shell { textmode::Key::Ctrl(b'm') => { let input = self.readline.input(); if !input.is_empty() { - let idx = self - .history - .run(input, &self.env, event_w.clone()) - .await - .unwrap(); - self.set_focus(Focus::History(idx), None).await; + self.history.run( + input.to_string(), + self.env.clone(), + event_w, + ); + let idx = self.history.entry_count() - 1; + self.set_focus(Focus::History(idx)); self.hide_readline = true; self.env.set_idx(idx + 1); self.readline.clear_input(); @@ -546,11 +388,7 @@ impl Shell { textmode::Key::Up => { let entry_count = self.history.entry_count(); if entry_count > 0 { - self.set_focus( - Focus::Scrolling(Some(entry_count - 1)), - None, - ) - .await; + self.set_focus(Focus::Scrolling(Some(entry_count - 1))); } } _ => return None, @@ -558,24 +396,15 @@ impl Shell { Some(Action::Refresh) } - async fn handle_key_history(&mut self, key: textmode::Key, idx: usize) { - self.history.send_input(idx, key.into_bytes()).await; + fn handle_key_history(&mut self, key: textmode::Key, idx: usize) { + self.history.entry(idx).input(key.into_bytes()); } - async fn default_scene( - &self, - focus: Focus, - entry: Option<crate::mutex::Guard<history::Entry>>, - ) -> Scene { + fn default_scene(&self, focus: Focus) -> Scene { match focus { Focus::Readline | Focus::Scrolling(_) => Scene::Readline, Focus::History(idx) => { - let fullscreen = if let Some(entry) = entry { - entry.should_fullscreen() - } else { - self.history.entry(idx).await.should_fullscreen() - }; - if fullscreen { + if self.history.entry(idx).should_fullscreen() { Scene::Fullscreen } else { Scene::Readline @@ -584,25 +413,15 @@ impl Shell { } } - async fn set_focus( - &mut self, - new_focus: Focus, - entry: Option<crate::mutex::Guard<history::Entry>>, - ) { + fn set_focus(&mut self, new_focus: Focus) { self.focus = new_focus; self.hide_readline = false; - self.scene = self.default_scene(new_focus, entry).await; - // passing entry into default_scene above consumes it, which means - // that the mutex lock will be dropped before we call into - // make_focus_visible, which is important because otherwise we might - // get a deadlock depending on what is visible - self.history - .make_focus_visible( - self.readline.lines(), - self.focus_idx(), - matches!(self.focus, Focus::Scrolling(_)), - ) - .await; + self.scene = self.default_scene(new_focus); + self.history.make_focus_visible( + self.readline.lines(), + self.focus_idx(), + matches!(self.focus, Focus::Scrolling(_)), + ); } fn env(&self) -> &Env { @@ -617,8 +436,8 @@ impl Shell { } } - fn scroll_up(&self, idx: Option<usize>) -> Option<usize> { - idx.map_or_else( + fn scroll_up(&self) -> Option<usize> { + self.focus_idx().map_or_else( || { let count = self.history.entry_count(); if count == 0 { @@ -631,8 +450,8 @@ impl Shell { ) } - fn scroll_down(&self, idx: Option<usize>) -> Option<usize> { - idx.and_then(|idx| { + fn scroll_down(&self) -> Option<usize> { + self.focus_idx().and_then(|idx| { if idx >= self.history.entry_count() - 1 { None } else { @@ -641,25 +460,25 @@ impl Shell { }) } - async fn next_running(&self) -> Focus { + fn next_running(&self) -> Focus { let count = self.history.entry_count(); let cur = self.focus_idx().unwrap_or(count); for idx in ((cur + 1)..count).chain(0..cur) { - if self.history.entry(idx).await.running() { + if self.history.entry(idx).running() { return Focus::History(idx); } } - self.focus_idx().map_or(Focus::Readline, Focus::History) + self.focus } - async fn prev_running(&self) -> Focus { + fn prev_running(&self) -> Focus { let count = self.history.entry_count(); let cur = self.focus_idx().unwrap_or(count); for idx in ((cur + 1)..count).chain(0..cur).rev() { - if self.history.entry(idx).await.running() { + if self.history.entry(idx).running() { return Focus::History(idx); } } - self.focus_idx().map_or(Focus::Readline, Focus::History) + self.focus } } diff --git a/src/shell/old_history.rs b/src/shell/old_history.rs new file mode 100644 index 0000000..49fd1c2 --- /dev/null +++ b/src/shell/old_history.rs @@ -0,0 +1,185 @@ +use crate::shell::prelude::*; + +use tokio::io::AsyncBufReadExt as _; + +use pest::Parser as _; + +#[derive(pest_derive::Parser)] +#[grammar = "history.pest"] +struct HistoryLine; + +pub struct History { + entries: std::sync::Arc<std::sync::Mutex<Vec<Entry>>>, +} + +impl History { + pub fn new() -> Self { + let entries = std::sync::Arc::new(std::sync::Mutex::new(vec![])); + tokio::spawn(Self::task(std::sync::Arc::clone(&entries))); + Self { entries } + } + + pub fn entry_count(&self) -> usize { + self.entries.lock().unwrap().len() + } + + async fn task(entries: std::sync::Arc<std::sync::Mutex<Vec<Entry>>>) { + // TODO: we should actually read this in reverse order, because we + // want to populate the most recent entries first + let mut stream = tokio_stream::wrappers::LinesStream::new( + tokio::io::BufReader::new( + tokio::fs::File::open(crate::dirs::history_file()) + .await + .unwrap(), + ) + .lines(), + ); + while let Some(line) = stream.next().await { + let line = if let Ok(line) = line { + line + } else { + continue; + }; + let entry = if let Ok(entry) = line.parse() { + entry + } else { + continue; + }; + entries.lock().unwrap().push(entry); + } + } +} + +pub struct Entry { + cmdline: String, + start_time: Option<time::OffsetDateTime>, + duration: Option<std::time::Duration>, +} + +impl Entry { + pub fn render( + &self, + out: &mut impl textmode::Textmode, + offset: time::UtcOffset, + ) { + let size = out.screen().size(); + let mut time = "".to_string(); + if let Some(duration) = self.duration { + time.push_str(&crate::format::duration(duration)); + } + if let Some(start_time) = self.start_time { + time.push_str(&crate::format::time(start_time.to_offset(offset))); + } + + out.write_str(" $ "); + let start = usize::from(out.screen().cursor_position().1); + let end = usize::from(size.1) - time.len() - 2; + let max_len = end - start; + let cmd = if self.cmdline.len() > max_len { + &self.cmdline[..(max_len - 4)] + } else { + &self.cmdline + }; + out.write_str(cmd); + if self.cmdline.len() > max_len { + out.write_str(" "); + out.set_fgcolor(textmode::color::BLUE); + out.write_str("..."); + } + out.reset_attributes(); + + out.set_bgcolor(textmode::Color::Rgb(0x20, 0x20, 0x20)); + let cur_pos = out.screen().cursor_position(); + out.write_str(&" ".repeat( + usize::from(size.1) - time.len() - 1 - usize::from(cur_pos.1), + )); + out.write_str(&time); + out.write_str(" "); + out.reset_attributes(); + } + + pub fn cmd(&self) -> &str { + &self.cmdline + } +} + +impl std::str::FromStr for Entry { + type Err = anyhow::Error; + + fn from_str(line: &str) -> std::result::Result<Self, Self::Err> { + let mut parsed = + HistoryLine::parse(Rule::line, line).map_err(|e| anyhow!(e))?; + let line = parsed.next().unwrap(); + assert!(matches!(line.as_rule(), Rule::line)); + + let mut start_time = None; + let mut duration = None; + let mut cmdline = None; + for part in line.into_inner() { + match part.as_rule() { + Rule::time => { + start_time = + Some(time::OffsetDateTime::from_unix_timestamp( + part.as_str().parse()?, + )?); + } + Rule::duration => { + if part.as_str() == "0" { + continue; + } + let mut dur_parts = part.as_str().split('.'); + let secs: u64 = dur_parts.next().unwrap().parse()?; + let nsec_str = dur_parts.next().unwrap_or("0"); + let nsec_str = &nsec_str[..9.min(nsec_str.len())]; + let nsecs: u64 = nsec_str.parse()?; + duration = Some(std::time::Duration::from_nanos( + secs * 1_000_000_000 + + nsecs + * (10u64.pow( + (9 - nsec_str.len()).try_into().unwrap(), + )), + )); + } + Rule::command => { + cmdline = Some(part.as_str().to_string()); + } + Rule::line => unreachable!(), + Rule::EOI => break, + } + } + + Ok(Self { + cmdline: cmdline.unwrap(), + start_time, + duration, + }) + } +} + +#[test] +fn test_parse() { + let entry: Entry = + ": 1646779848:1234.56;vim ~/.zsh_history".parse().unwrap(); + assert_eq!(entry.cmdline, "vim ~/.zsh_history"); + assert_eq!( + entry.duration, + Some(std::time::Duration::from_nanos(1_234_560_000_000)) + ); + assert_eq!( + entry.start_time, + Some(time::macros::datetime!(2022-03-08 22:50:48).assume_utc()) + ); + + let entry: Entry = ": 1646779848:1;vim ~/.zsh_history".parse().unwrap(); + assert_eq!(entry.cmdline, "vim ~/.zsh_history"); + assert_eq!(entry.duration, Some(std::time::Duration::from_secs(1))); + assert_eq!( + entry.start_time, + Some(time::macros::datetime!(2022-03-08 22:50:48).assume_utc()) + ); + + let entry: Entry = "vim ~/.zsh_history".parse().unwrap(); + assert_eq!(entry.cmdline, "vim ~/.zsh_history"); + assert_eq!(entry.duration, None); + assert_eq!(entry.start_time, None); +} diff --git a/src/shell/readline.rs b/src/shell/readline.rs index f0fb950..654d264 100644 --- a/src/shell/readline.rs +++ b/src/shell/readline.rs @@ -19,14 +19,14 @@ impl Readline { } } - pub async fn render( + pub fn render( &self, out: &mut impl textmode::Textmode, env: &Env, - git: Option<&super::git::Info>, + git: Option<&super::inputs::GitInfo>, focus: bool, offset: time::UtcOffset, - ) -> anyhow::Result<()> { + ) -> Result<()> { let pwd = env.pwd(); let user = crate::info::user()?; let hostname = crate::info::hostname()?; @@ -83,7 +83,7 @@ impl Readline { Ok(()) } - pub async fn resize(&mut self, size: (u16, u16)) { + pub fn resize(&mut self, size: (u16, u16)) { self.size = size; } @@ -102,9 +102,9 @@ impl Readline { self.inc_pos(s.chars().count()); } - pub fn set_input(&mut self, s: &str) { - self.input_line = s.to_string(); + pub fn set_input(&mut self, s: String) { self.set_pos(s.chars().count()); + self.input_line = s; } pub fn backspace(&mut self) { |