From b1c5f2f31874fc019b67ae981f66e0492b22c867 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 4 Mar 2022 16:31:38 -0500 Subject: another large refactor --- src/runner/mod.rs | 17 ++- src/shell/event.rs | 17 +-- src/shell/history/entry.rs | 160 ++++++++++------------------ src/shell/history/job.rs | 205 ++++++++++++++++++++++++++++++++++++ src/shell/history/mod.rs | 257 +++++++-------------------------------------- src/shell/history/pty.rs | 201 +++++++++++++++++++++++++++-------- src/shell/history/vt.rs | 97 ----------------- src/shell/mod.rs | 169 +++++++++++------------------ 8 files changed, 529 insertions(+), 594 deletions(-) create mode 100644 src/shell/history/job.rs delete mode 100644 src/shell/history/vt.rs (limited to 'src') diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 09eb539..628b333 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -8,8 +8,8 @@ mod sys; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum Event { - RunPipeline(usize, (usize, usize)), - Suspend(usize), + RunPipeline((usize, usize)), + Suspend, Exit(Env), } @@ -233,8 +233,7 @@ async fn run_pipeline( env: &mut Env, shell_write: &mut Option, ) -> Result<()> { - write_event(shell_write, Event::RunPipeline(env.idx(), pipeline.span())) - .await?; + write_event(shell_write, Event::RunPipeline(pipeline.span())).await?; // Safety: pipelines are run serially, so only one copy of these will ever // exist at once. note that reusing a single copy of these at the top // level would not be safe, because in the case of a command line like @@ -252,7 +251,7 @@ async fn run_pipeline( let pipeline = pipeline.eval(env).await?; let interactive = shell_write.is_some(); let (children, pg) = spawn_children(pipeline, env, &io, interactive)?; - let status = wait_children(children, pg, env, shell_write).await; + let status = wait_children(children, pg, shell_write).await; if interactive { sys::set_foreground_pg(nix::unistd::getpid())?; } @@ -320,7 +319,6 @@ fn spawn_children( async fn wait_children( children: Vec, pg: Option, - env: &Env, shell_write: &mut Option, ) -> std::process::ExitStatus { enum Res { @@ -420,11 +418,8 @@ async fn wait_children( } nix::sys::wait::WaitStatus::Stopped(pid, signal) => { if signal == nix::sys::signal::Signal::SIGTSTP { - if let Err(e) = write_event( - shell_write, - Event::Suspend(env.idx()), - ) - .await + if let Err(e) = + write_event(shell_write, Event::Suspend).await { bail!(e); } diff --git a/src/shell/event.rs b/src/shell/event.rs index e80cdef..2b12b05 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -1,11 +1,13 @@ +use crate::prelude::*; + #[derive(Debug)] pub enum Event { Key(textmode::Key), Resize((u16, u16)), PtyOutput, - PtyClose, ChildRunPipeline(usize, (usize, usize)), ChildSuspend(usize), + ChildExit(usize, Option), GitInfo(Option), ClockTimer, } @@ -90,9 +92,9 @@ struct Pending { key: std::collections::VecDeque, size: Option<(u16, u16)>, pty_output: bool, - pty_close: bool, child_run_pipeline: std::collections::VecDeque<(usize, (usize, usize))>, child_suspend: std::collections::VecDeque, + child_exit: Option<(usize, Option)>, git_info: Option>, clock_timer: bool, done: bool, @@ -113,16 +115,15 @@ impl Pending { if let Some(size) = self.size.take() { return Some(Some(Event::Resize(size))); } - if self.pty_close { - self.pty_close = false; - return Some(Some(Event::PtyClose)); - } if let Some((idx, span)) = self.child_run_pipeline.pop_front() { return Some(Some(Event::ChildRunPipeline(idx, span))); } if let Some(idx) = self.child_suspend.pop_front() { return Some(Some(Event::ChildSuspend(idx))); } + if let Some((idx, env)) = self.child_exit.take() { + return Some(Some(Event::ChildExit(idx, env))); + } if let Some(info) = self.git_info.take() { return Some(Some(Event::GitInfo(info))); } @@ -145,13 +146,15 @@ impl Pending { Some(Event::Key(key)) => self.key.push_back(key), Some(Event::Resize(size)) => self.size = Some(size), Some(Event::PtyOutput) => self.pty_output = true, - Some(Event::PtyClose) => self.pty_close = true, Some(Event::ChildRunPipeline(idx, span)) => { self.child_run_pipeline.push_back((idx, span)); } Some(Event::ChildSuspend(idx)) => { self.child_suspend.push_back(idx); } + Some(Event::ChildExit(idx, env)) => { + self.child_exit = Some((idx, env)); + } Some(Event::GitInfo(info)) => self.git_info = Some(info), Some(Event::ClockTimer) => self.clock_timer = true, None => self.done = true, diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs index 90509d3..24b90cf 100644 --- a/src/shell/history/entry.rs +++ b/src/shell/history/entry.rs @@ -1,20 +1,11 @@ use crate::shell::prelude::*; -enum State { - Running((usize, usize)), - Exited(ExitInfo), -} - pub struct Entry { cmdline: String, env: Env, - state: State, - vt: super::vt::Vt, + pty: super::pty::Pty, + job: super::job::Job, fullscreen: Option, - input: tokio::sync::mpsc::UnboundedSender>, - resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, - start_time: time::OffsetDateTime, - start_instant: std::time::Instant, } impl Entry { @@ -22,51 +13,56 @@ impl Entry { cmdline: String, env: Env, size: (u16, u16), - input: tokio::sync::mpsc::UnboundedSender>, - resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, + event_w: crate::shell::event::Writer, ) -> Self { - let span = (0, cmdline.len()); + let (pty, pts) = super::pty::Pty::new(size, event_w.clone()).unwrap(); + let job = super::job::Job::new(&cmdline, env.clone(), &pts, event_w) + .unwrap(); Self { cmdline, env, - state: State::Running(span), - vt: super::vt::Vt::new(size), - input, - resize, + pty, + job, fullscreen: None, - start_time: time::OffsetDateTime::now_utc(), - start_instant: std::time::Instant::now(), } } pub fn render( - &mut self, + &self, out: &mut impl textmode::Textmode, idx: usize, entry_count: usize, + state: &super::job::State, + vt: &mut super::pty::Vt, size: (u16, u16), focused: bool, scrolling: bool, offset: time::UtcOffset, ) { - let time = self.exit_info().map_or_else( + let time = state.exit_info().map_or_else( || { format!( "[{}]", - crate::format::time(self.start_time.to_offset(offset)) + crate::format::time( + self.job.start_time().to_offset(offset) + ) ) }, |info| { format!( "({}) [{}]", crate::format::duration( - info.instant - self.start_instant + *info.instant() - *self.job.start_instant() + ), + crate::format::time( + self.job.start_time().to_offset(offset) ), - crate::format::time(self.start_time.to_offset(offset)), ) }, ); + vt.bell(out, focused); + set_bgcolor(out, idx, focused); out.set_fgcolor(textmode::color::YELLOW); let entry_count_width = format!("{}", entry_count + 1).len(); @@ -77,24 +73,21 @@ impl Entry { out.reset_attributes(); set_bgcolor(out, idx, focused); - if let Some(info) = self.exit_info() { - if info.status.signal().is_some() { + if let Some(info) = state.exit_info() { + let status = info.status(); + if status.signal().is_some() { out.set_fgcolor(textmode::color::MAGENTA); - } else if info.status.success() { + } else if status.success() { out.set_fgcolor(textmode::color::DARKGREY); } else { out.set_fgcolor(textmode::color::RED); } - out.write_str(&crate::format::exit_status(info.status)); + out.write_str(&crate::format::exit_status(status)); } else { out.write_str(" "); } out.reset_attributes(); - self.vt.bell(out, focused); - - let vt = &self.vt; - if vt.is_bell() { out.set_bgcolor(textmode::Color::Rgb(64, 16, 16)); } else { @@ -110,7 +103,7 @@ impl Entry { } else { self.cmd() }; - if let State::Running(span) = self.state { + if let super::job::State::Running(span) = state { let span = (span.0.min(cmd.len()), span.1.min(cmd.len())); if !cmd[..span.0].is_empty() { out.write_str(&cmd[..span.0]); @@ -127,13 +120,13 @@ impl Entry { out.write_str(cmd); } if self.cmd().len() > max_len { - if let State::Running(span) = self.state { + if let super::job::State::Running(span) = state { if span.0 < cmd.len() && span.1 > cmd.len() { out.set_bgcolor(textmode::Color::Rgb(16, 64, 16)); } } out.write_str(" "); - if let State::Running(span) = self.state { + if let super::job::State::Running(span) = state { if span.1 > cmd.len() { out.set_bgcolor(textmode::Color::Rgb(16, 64, 16)); } @@ -164,7 +157,7 @@ impl Entry { out.hide_cursor(true); } else { let last_row = - vt.output_lines(focused && !scrolling, self.running()); + vt.output_lines(focused && !scrolling, state.running()); let mut max_lines = self.max_lines(entry_count); if last_row > max_lines { out.write(b"\r\n"); @@ -209,47 +202,31 @@ impl Entry { out.reset_attributes(); } - pub fn render_fullscreen(&mut self, out: &mut impl textmode::Textmode) { - let vt = &mut self.vt; - out.write(&vt.screen().state_formatted()); - vt.bell(out, true); - out.reset_attributes(); - } - - pub fn send_input(&self, bytes: Vec) { - if self.running() { - self.input.send(bytes).unwrap(); - } + pub fn render_fullscreen(&self, out: &mut impl textmode::Textmode) { + self.pty.with_vt_mut(|vt| { + out.write(&vt.screen().state_formatted()); + vt.bell(out, true); + out.reset_attributes(); + }); } - pub fn resize(&mut self, size: (u16, u16)) { - if self.running() { - self.resize.send(size).unwrap(); - self.vt.set_size(size); - } + pub fn input(&self, bytes: Vec) { + self.pty.input(bytes); } - pub fn size(&self) -> (u16, u16) { - self.vt.size() - } - - pub fn process(&mut self, input: &[u8]) { - self.vt.process(input); + pub fn resize(&self, size: (u16, u16)) { + self.pty.resize(size); } pub fn cmd(&self) -> &str { &self.cmdline } - pub fn env(&self) -> &Env { - &self.env - } - pub fn toggle_fullscreen(&mut self) { if let Some(fullscreen) = self.fullscreen { self.fullscreen = Some(!fullscreen); } else { - self.fullscreen = Some(!self.vt.screen().alternate_screen()); + self.fullscreen = Some(!self.pty.fullscreen()); } } @@ -258,63 +235,38 @@ impl Entry { } pub fn running(&self) -> bool { - matches!(self.state, State::Running(_)) + self.job.running() } pub fn lines(&self, entry_count: usize, focused: bool) -> usize { + let running = self.running(); 1 + std::cmp::min( - self.vt.output_lines(focused, self.running()), + self.pty.with_vt(|vt| vt.output_lines(focused, running)), self.max_lines(entry_count), ) } - fn max_lines(&self, entry_count: usize) -> usize { - if self.env.idx() == entry_count - 1 { - 15 - } else { - 5 - } - } - pub fn should_fullscreen(&self) -> bool { - self.fullscreen - .unwrap_or_else(|| self.vt.screen().alternate_screen()) + self.fullscreen.unwrap_or_else(|| self.pty.fullscreen()) } - pub fn set_span(&mut self, span: (usize, usize)) { - if matches!(self.state, State::Running(_)) { - self.state = State::Running(span); - } + pub fn lock_vt(&self) -> std::sync::MutexGuard { + self.pty.lock_vt() } - pub fn finish( - &mut self, - env: Env, - event_w: &crate::shell::event::Writer, - ) { - self.state = State::Exited(ExitInfo::new(env.latest_status())); - self.env = env; - event_w.send(Event::PtyClose); + pub fn lock_state(&self) -> std::sync::MutexGuard { + self.job.lock_state() } - fn exit_info(&self) -> Option<&ExitInfo> { - match &self.state { - State::Running(..) => None, - State::Exited(exit_info) => Some(exit_info), - } + pub fn set_span(&self, span: (usize, usize)) { + self.job.set_span(span); } -} - -struct ExitInfo { - status: std::process::ExitStatus, - instant: std::time::Instant, -} -impl ExitInfo { - fn new(status: std::process::ExitStatus) -> Self { - Self { - status, - instant: std::time::Instant::now(), + fn max_lines(&self, entry_count: usize) -> usize { + if self.env.idx() == entry_count - 1 { + 15 + } else { + 5 } } } diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs new file mode 100644 index 0000000..365a06d --- /dev/null +++ b/src/shell/history/job.rs @@ -0,0 +1,205 @@ +use crate::shell::prelude::*; + +pub struct Job { + state: std::sync::Arc>, + start_time: time::OffsetDateTime, + start_instant: std::time::Instant, +} + +impl Job { + pub fn new( + cmdline: &str, + env: Env, + pts: &pty_process::Pts, + event_w: crate::shell::event::Writer, + ) -> Result { + let start_time = time::OffsetDateTime::now_utc(); + let start_instant = std::time::Instant::now(); + let (child, fh) = spawn_command(cmdline, &env, pts)?; + let state = std::sync::Arc::new(std::sync::Mutex::new( + State::Running((0, 0)), + )); + tokio::task::spawn(job_task( + child, + fh, + std::sync::Arc::clone(&state), + env, + event_w, + )); + Ok(Self { + state, + start_time, + start_instant, + }) + } + + pub fn start_time(&self) -> &time::OffsetDateTime { + &self.start_time + } + + pub fn start_instant(&self) -> &std::time::Instant { + &self.start_instant + } + + pub fn with_state(&self, f: impl FnOnce(&State) -> T) -> T { + let state = self.state.lock().unwrap(); + f(&state) + } + + pub fn with_state_mut(&self, f: impl FnOnce(&mut State) -> T) -> T { + let mut state = self.state.lock().unwrap(); + f(&mut state) + } + + pub fn lock_state(&self) -> std::sync::MutexGuard { + self.state.lock().unwrap() + } + + pub fn running(&self) -> bool { + self.with_state(|state| matches!(state, State::Running(..))) + } + + pub fn set_span(&self, new_span: (usize, usize)) { + self.with_state_mut(|state| { + if let State::Running(span) = state { + *span = new_span; + } + }); + } +} + +pub enum State { + Running((usize, usize)), + Exited(ExitInfo), +} + +impl State { + pub fn exit_info(&self) -> Option<&ExitInfo> { + match self { + Self::Running(_) => None, + Self::Exited(exit_info) => Some(exit_info), + } + } + + pub fn running(&self) -> bool { + self.exit_info().is_none() + } +} + +pub struct ExitInfo { + status: std::process::ExitStatus, + instant: std::time::Instant, +} + +impl ExitInfo { + fn new(status: std::process::ExitStatus) -> Self { + Self { + status, + instant: std::time::Instant::now(), + } + } + + pub fn status(&self) -> std::process::ExitStatus { + self.status + } + + pub fn instant(&self) -> &std::time::Instant { + &self.instant + } +} + +async fn job_task( + mut child: tokio::process::Child, + fh: std::fs::File, + state: std::sync::Arc>, + env: Env, + event_w: crate::shell::event::Writer, +) { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result), + } + + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; + } + } + }); + + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + let mut new_env = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { + crate::runner::Event::RunPipeline(new_span) => { + // we could just update the span in place here, but we do + // this as an event so that we can also trigger a refresh + event_w + .send(Event::ChildRunPipeline(env.idx(), new_span)); + } + crate::runner::Event::Suspend => { + event_w.send(Event::ChildSuspend(env.idx())); + } + crate::runner::Event::Exit(env) => { + new_env = Some(env); + } + }, + Res::Exit(status) => { + exit_status = Some(status.unwrap()); + } + } + } + *state.lock().unwrap() = + State::Exited(ExitInfo::new(exit_status.unwrap())); + event_w.send(Event::ChildExit(env.idx(), new_env)); +} + +fn spawn_command( + cmdline: &str, + env: &Env, + pts: &pty_process::Pts, +) -> Result<(tokio::process::Child, std::fs::File)> { + let mut cmd = pty_process::Command::new(std::env::current_exe()?); + cmd.args(&["-c", cmdline, "--status-fd", "3"]); + env.apply(&mut cmd); + let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: from_r was just opened above and is not used anywhere else + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; + // Safety: dup2 is an async-signal-safe function + unsafe { + cmd.pre_exec(move || { + nix::unistd::dup2(from_w, 3)?; + Ok(()) + }); + } + let child = cmd.spawn(pts)?; + nix::unistd::close(from_w)?; + Ok((child, fh)) +} diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 2e8b817..df995e6 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -2,12 +2,12 @@ use crate::shell::prelude::*; mod entry; pub use entry::Entry; +mod job; mod pty; -mod vt; pub struct History { size: (u16, u16), - entries: Vec>>, + entries: Vec, scroll_pos: usize, } @@ -28,22 +28,21 @@ impl History { scrolling: bool, offset: time::UtcOffset, ) { - let mut used_lines = repl_lines; let mut cursor = None; - for (idx, mut entry) in + for (idx, used_lines, mut vt, state) in self.visible(repl_lines, focus, scrolling).rev() { let focused = focus.map_or(false, |focus| idx == focus); - used_lines += - entry.lines(self.entry_count(), focused && !scrolling); out.move_to( (usize::from(self.size.0) - used_lines).try_into().unwrap(), 0, ); - entry.render( + self.entries[idx].render( out, idx, self.entry_count(), + &*state, + &mut *vt, self.size, focused, scrolling, @@ -62,30 +61,18 @@ impl History { } } - pub fn render_fullscreen( - &self, - out: &mut impl textmode::Textmode, - idx: usize, - ) { - self.with_entry_mut(idx, |entry| entry.render_fullscreen(out)); - } - - pub fn send_input(&mut self, idx: usize, input: Vec) { - self.with_entry(idx, |entry| entry.send_input(input)); + pub fn entry(&self, idx: usize) -> &Entry { + &self.entries[idx] } - pub fn should_fullscreen(&self, idx: usize) -> bool { - self.with_entry(idx, Entry::should_fullscreen) - } - - pub fn running(&self, idx: usize) -> bool { - self.with_entry(idx, Entry::running) + pub fn entry_mut(&mut self, idx: usize) -> &mut Entry { + &mut self.entries[idx] } pub fn resize(&mut self, size: (u16, u16)) { self.size = size; for entry in &self.entries { - entry.lock().unwrap().resize(size); + entry.resize(size); } } @@ -94,46 +81,9 @@ impl History { cmdline: String, env: Env, event_w: crate::shell::event::Writer, - ) -> usize { - let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); - let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); - - let entry = std::sync::Arc::new(std::sync::Mutex::new(Entry::new( - cmdline.clone(), - env.clone(), - self.size, - input_w, - resize_w, - ))); - run_commands( - cmdline, - std::sync::Arc::clone(&entry), - env, - input_r, - resize_r, - event_w, - ); - - self.entries.push(entry); - self.entries.len() - 1 - } - - pub fn with_entry( - &self, - idx: usize, - f: impl FnOnce(&Entry) -> T, - ) -> T { - let entry = self.entries[idx].lock().unwrap(); - f(&*entry) - } - - pub fn with_entry_mut( - &self, - idx: usize, - f: impl FnOnce(&mut Entry) -> T, - ) -> T { - let mut entry = self.entries[idx].lock().unwrap(); - f(&mut *entry) + ) { + self.entries + .push(Entry::new(cmdline, env, self.size, event_w)); } pub fn entry_count(&self) -> usize { @@ -155,7 +105,7 @@ impl History { while focus < self .visible(repl_lines, Some(focus), scrolling) - .map(|(idx, _)| idx) + .map(|(idx, ..)| idx) .next() .unwrap() { @@ -169,7 +119,7 @@ impl History { while focus > self .visible(repl_lines, Some(focus), scrolling) - .map(|(idx, _)| idx) + .map(|(idx, ..)| idx) .last() .unwrap() { @@ -184,30 +134,29 @@ impl History { scrolling: bool, ) -> VisibleEntries { let mut iter = VisibleEntries::new(); - if self.entries.is_empty() { - return iter; - } - let mut used_lines = repl_lines; for (idx, entry) in self.entries.iter().enumerate().rev().skip(self.scroll_pos) { - let entry = entry.lock().unwrap(); let focused = focus.map_or(false, |focus| idx == focus); used_lines += entry.lines(self.entry_count(), focused && !scrolling); if used_lines > usize::from(self.size.0) { break; } - iter.add(idx, entry); + iter.add(idx, used_lines, entry.lock_vt(), entry.lock_state()); } iter } } struct VisibleEntries<'a> { - entries: - std::collections::VecDeque<(usize, std::sync::MutexGuard<'a, Entry>)>, + entries: std::collections::VecDeque<( + usize, + usize, + std::sync::MutexGuard<'a, pty::Vt>, + std::sync::MutexGuard<'a, job::State>, + )>, } impl<'a> VisibleEntries<'a> { @@ -217,14 +166,25 @@ impl<'a> VisibleEntries<'a> { } } - fn add(&mut self, idx: usize, entry: std::sync::MutexGuard<'a, Entry>) { + fn add( + &mut self, + idx: usize, + offset: usize, + vt: std::sync::MutexGuard<'a, pty::Vt>, + state: std::sync::MutexGuard<'a, job::State>, + ) { // push_front because we are adding them in reverse order - self.entries.push_front((idx, entry)); + self.entries.push_front((idx, offset, vt, state)); } } impl<'a> std::iter::Iterator for VisibleEntries<'a> { - type Item = (usize, std::sync::MutexGuard<'a, Entry>); + type Item = ( + usize, + usize, + std::sync::MutexGuard<'a, pty::Vt>, + std::sync::MutexGuard<'a, job::State>, + ); fn next(&mut self) -> Option { self.entries.pop_front() @@ -236,146 +196,3 @@ impl<'a> std::iter::DoubleEndedIterator for VisibleEntries<'a> { self.entries.pop_back() } } - -fn run_commands( - cmdline: String, - entry: std::sync::Arc>, - mut env: Env, - input_r: tokio::sync::mpsc::UnboundedReceiver>, - resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, - event_w: crate::shell::event::Writer, -) { - tokio::task::spawn(async move { - let size = entry.lock().unwrap().size(); - let pty = match pty::Pty::new( - size, - &entry, - input_r, - resize_r, - event_w.clone(), - ) { - Ok(pty) => pty, - Err(e) => { - let mut entry = entry.lock().unwrap(); - entry.process( - format!("nbsh: failed to allocate pty: {}\r\n", e) - .as_bytes(), - ); - env.set_status(std::process::ExitStatus::from_raw(1 << 8)); - entry.finish(env, &event_w); - return; - } - }; - - let status = - match spawn_commands(&cmdline, &pty, &mut env, event_w.clone()) - .await - { - Ok(status) => status, - Err(e) => { - let mut entry = entry.lock().unwrap(); - entry.process( - format!( - "nbsh: failed to spawn {}: {}\r\n", - cmdline, e - ) - .as_bytes(), - ); - env.set_status(std::process::ExitStatus::from_raw( - 1 << 8, - )); - entry.finish(env, &event_w); - return; - } - }; - env.set_status(status); - - entry.lock().unwrap().finish(env, &event_w); - pty.close(); - }); -} - -async fn spawn_commands( - cmdline: &str, - pty: &pty::Pty, - env: &mut Env, - event_w: crate::shell::event::Writer, -) -> Result { - enum Res { - Read(crate::runner::Event), - Exit(std::io::Result), - } - - let mut cmd = pty_process::Command::new(std::env::current_exe()?); - cmd.args(&["-c", cmdline, "--status-fd", "3"]); - env.apply(&mut cmd); - let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; - // Safety: from_r was just opened above and is not used anywhere else - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - // Safety: dup2 is an async-signal-safe function - unsafe { - cmd.pre_exec(move || { - nix::unistd::dup2(from_w, 3)?; - Ok(()) - }); - } - let mut child = pty.spawn(cmd)?; - nix::unistd::close(from_w)?; - - let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); - tokio::task::spawn_blocking(move || loop { - let event = bincode::deserialize_from(&fh); - match event { - Ok(event) => { - read_w.send(event).unwrap(); - } - Err(e) => { - match &*e { - bincode::ErrorKind::Io(io_e) => { - assert!( - io_e.kind() == std::io::ErrorKind::UnexpectedEof - ); - } - e => { - panic!("{}", e); - } - } - break; - } - } - }); - - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) - .map(Res::Read) - .boxed(), - futures_util::stream::once(child.wait()) - .map(Res::Exit) - .boxed(), - ] - .into_iter() - .collect(); - let mut exit_status = None; - while let Some(res) = stream.next().await { - match res { - Res::Read(event) => match event { - crate::runner::Event::RunPipeline(idx, span) => { - event_w.send(Event::ChildRunPipeline(idx, span)); - } - crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)); - } - crate::runner::Event::Exit(new_env) => { - *env = new_env; - } - }, - Res::Exit(Ok(status)) => { - exit_status = Some(status); - } - Res::Exit(Err(e)) => { - anyhow::bail!(e); - } - } - } - Ok(exit_status.unwrap()) -} diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 2a33e40..49681d4 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -1,65 +1,176 @@ use crate::shell::prelude::*; +#[derive(Debug)] +enum Request { + Input(Vec), + Resize(u16, u16), +} + pub struct Pty { - pts: std::sync::Arc, - close_w: tokio::sync::mpsc::UnboundedSender<()>, + vt: std::sync::Arc>, + request_w: tokio::sync::mpsc::UnboundedSender, } impl Pty { pub fn new( size: (u16, u16), - entry: &std::sync::Arc>, - input_r: tokio::sync::mpsc::UnboundedReceiver>, - resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, event_w: crate::shell::event::Writer, - ) -> Result { - let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel(); + ) -> Result<(Self, pty_process::Pts)> { + let (request_w, request_r) = tokio::sync::mpsc::unbounded_channel(); let pty = pty_process::Pty::new()?; pty.resize(pty_process::Size::new(size.0, size.1))?; - let pts = std::sync::Arc::new(pty.pts()?); + let pts = pty.pts()?; + + let vt = std::sync::Arc::new(std::sync::Mutex::new( + super::pty::Vt::new(size), + )); tokio::task::spawn(pty_task( pty, - std::sync::Arc::clone(&pts), - std::sync::Arc::clone(entry), - input_r, - resize_r, - close_r, + std::sync::Arc::clone(&vt), + request_r, event_w, )); - Ok(Self { pts, close_w }) + Ok((Self { vt, request_w }, pts)) + } + + pub fn with_vt(&self, f: impl FnOnce(&super::pty::Vt) -> T) -> T { + let vt = self.vt.lock().unwrap(); + f(&*vt) } - pub fn spawn( + pub fn with_vt_mut( &self, - mut cmd: pty_process::Command, - ) -> Result { - Ok(cmd.spawn(&*self.pts)?) + f: impl FnOnce(&mut super::pty::Vt) -> T, + ) -> T { + let mut vt = self.vt.lock().unwrap(); + f(&mut *vt) + } + + pub fn lock_vt(&self) -> std::sync::MutexGuard { + self.vt.lock().unwrap() + } + + pub fn fullscreen(&self) -> bool { + self.with_vt(|vt| vt.screen().alternate_screen()) + } + + pub fn input(&self, bytes: Vec) { + #[allow(clippy::let_underscore_drop)] + let _ = self.request_w.send(Request::Input(bytes)); } - pub fn close(&self) { - self.close_w.send(()).unwrap(); + pub fn resize(&self, size: (u16, u16)) { + #[allow(clippy::let_underscore_drop)] + let _ = self.request_w.send(Request::Resize(size.0, size.1)); + } +} + +pub struct Vt { + vt: vt100::Parser, + audible_bell_state: usize, + visual_bell_state: usize, + audible_bell: bool, + visual_bell: bool, + real_bell_pending: bool, +} + +impl Vt { + pub fn new(size: (u16, u16)) -> Self { + Self { + vt: vt100::Parser::new(size.0, size.1, 0), + audible_bell_state: 0, + visual_bell_state: 0, + audible_bell: false, + visual_bell: false, + real_bell_pending: false, + } + } + + pub fn process(&mut self, bytes: &[u8]) { + self.vt.process(bytes); + let screen = self.vt.screen(); + + let new_audible_bell_state = screen.audible_bell_count(); + if new_audible_bell_state != self.audible_bell_state { + self.audible_bell = true; + self.real_bell_pending = true; + self.audible_bell_state = new_audible_bell_state; + } + + let new_visual_bell_state = screen.visual_bell_count(); + if new_visual_bell_state != self.visual_bell_state { + self.visual_bell = true; + self.real_bell_pending = true; + self.visual_bell_state = new_visual_bell_state; + } + } + + pub fn screen(&self) -> &vt100::Screen { + self.vt.screen() + } + + pub fn set_size(&mut self, size: (u16, u16)) { + self.vt.set_size(size.0, size.1); + } + + pub fn is_bell(&self) -> bool { + self.audible_bell || self.visual_bell + } + + pub fn bell(&mut self, out: &mut impl textmode::Textmode, focused: bool) { + if self.real_bell_pending { + if self.audible_bell { + out.write(b"\x07"); + } + if self.visual_bell { + out.write(b"\x1bg"); + } + self.real_bell_pending = false; + } + if focused { + self.audible_bell = false; + self.visual_bell = false; + } + } + + pub fn binary(&self) -> bool { + self.vt.screen().errors() > 5 + } + + pub fn output_lines(&self, focused: bool, running: bool) -> usize { + if self.binary() { + return 1; + } + + let screen = self.vt.screen(); + let mut last_row = 0; + for (idx, row) in screen.rows(0, screen.size().1).enumerate() { + if !row.is_empty() { + last_row = idx + 1; + } + } + if focused && running { + last_row = std::cmp::max( + last_row, + usize::from(screen.cursor_position().0) + 1, + ); + } + last_row } } async fn pty_task( pty: pty_process::Pty, - // take the pts here just to ensure that we don't close it before this - // task finishes, otherwise the read call can return EIO - _pts: std::sync::Arc, - entry: std::sync::Arc>, - input_r: tokio::sync::mpsc::UnboundedReceiver>, - resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, - close_r: tokio::sync::mpsc::UnboundedReceiver<()>, + vt: std::sync::Arc>, + request_r: tokio::sync::mpsc::UnboundedReceiver, event_w: crate::shell::event::Writer, ) { enum Res { Read(Result), - Write(Vec), - Resize((u16, u16)), - Close(()), + Request(Request), } let (pty_r, mut pty_w) = pty.into_split(); @@ -67,14 +178,8 @@ async fn pty_task( tokio_util::io::ReaderStream::new(pty_r) .map(Res::Read) .boxed(), - tokio_stream::wrappers::UnboundedReceiverStream::new(input_r) - .map(Res::Write) - .boxed(), - tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r) - .map(Res::Resize) - .boxed(), - tokio_stream::wrappers::UnboundedReceiverStream::new(close_r) - .map(Res::Close) + tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) + .map(Res::Request) .boxed(), ] .into_iter() @@ -83,22 +188,26 @@ async fn pty_task( match res { Res::Read(res) => match res { Ok(bytes) => { - entry.lock().unwrap().process(&bytes); + vt.lock().unwrap().process(&bytes); event_w.send(Event::PtyOutput); } Err(e) => { + // this means that there are no longer any open pts fds. + // we could alternately signal this through an explicit + // channel at ChildExit time, but this seems reliable + // enough. + if e.raw_os_error() == Some(libc::EIO) { + return; + } panic!("pty read failed: {:?}", e); } }, - Res::Write(bytes) => { + Res::Request(Request::Input(bytes)) => { pty_w.write(&bytes).await.unwrap(); } - Res::Resize(size) => pty_w - .resize(pty_process::Size::new(size.0, size.1)) - .unwrap(), - Res::Close(()) => { - event_w.send(Event::PtyClose); - return; + Res::Request(Request::Resize(row, col)) => { + pty_w.resize(pty_process::Size::new(row, col)).unwrap(); + vt.lock().unwrap().set_size((row, col)); } } } diff --git a/src/shell/history/vt.rs b/src/shell/history/vt.rs deleted file mode 100644 index 511cab9..0000000 --- a/src/shell/history/vt.rs +++ /dev/null @@ -1,97 +0,0 @@ -pub struct Vt { - vt: vt100::Parser, - audible_bell_state: usize, - visual_bell_state: usize, - audible_bell: bool, - visual_bell: bool, - real_bell_pending: bool, -} - -impl Vt { - pub fn new(size: (u16, u16)) -> Self { - Self { - vt: vt100::Parser::new(size.0, size.1, 0), - audible_bell_state: 0, - visual_bell_state: 0, - audible_bell: false, - visual_bell: false, - real_bell_pending: false, - } - } - - pub fn process(&mut self, bytes: &[u8]) { - self.vt.process(bytes); - let screen = self.vt.screen(); - - let new_audible_bell_state = screen.audible_bell_count(); - if new_audible_bell_state != self.audible_bell_state { - self.audible_bell = true; - self.real_bell_pending = true; - self.audible_bell_state = new_audible_bell_state; - } - - let new_visual_bell_state = screen.visual_bell_count(); - if new_visual_bell_state != self.visual_bell_state { - self.visual_bell = true; - self.real_bell_pending = true; - self.visual_bell_state = new_visual_bell_state; - } - } - - pub fn screen(&self) -> &vt100::Screen { - self.vt.screen() - } - - pub fn size(&self) -> (u16, u16) { - self.vt.screen().size() - } - - pub fn set_size(&mut self, size: (u16, u16)) { - self.vt.set_size(size.0, size.1); - } - - pub fn is_bell(&self) -> bool { - self.audible_bell || self.visual_bell - } - - pub fn bell(&mut self, out: &mut impl textmode::Textmode, focused: bool) { - if self.real_bell_pending { - if self.audible_bell { - out.write(b"\x07"); - } - if self.visual_bell { - out.write(b"\x1bg"); - } - self.real_bell_pending = false; - } - if focused { - self.audible_bell = false; - self.visual_bell = false; - } - } - - pub fn binary(&self) -> bool { - self.vt.screen().errors() > 5 - } - - pub fn output_lines(&self, focused: bool, running: bool) -> usize { - if self.binary() { - return 1; - } - - let screen = self.vt.screen(); - let mut last_row = 0; - for (idx, row) in screen.rows(0, screen.size().1).enumerate() { - if !row.is_empty() { - last_row = idx + 1; - } - } - if focused && running { - last_row = std::cmp::max( - last_row, - usize::from(screen.cursor_position().0) + 1, - ); - } - last_row - } -} diff --git a/src/shell/mod.rs b/src/shell/mod.rs index a494752..b23ab1e 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -291,7 +291,7 @@ impl Shell { }, Scene::Fullscreen => { if let Focus::History(idx) = self.focus { - self.history.render_fullscreen(out, idx); + self.history.entry(idx).render_fullscreen(out); } else { unreachable!(); } @@ -342,46 +342,30 @@ impl Shell { idx, matches!(self.focus, Focus::Scrolling(_)), ); - self.scene = Self::default_scene( - self.focus, - idx.map_or(false, |idx| { - self.history.should_fullscreen(idx) - }), - ); + self.scene = self.default_scene(self.focus); } - Event::PtyClose => { - if let Some(idx) = self.focus_idx() { - let (running, env, fullscreen) = - self.history.with_entry(idx, |entry| { - ( - entry.running(), - entry.env().clone(), - entry.should_fullscreen(), - ) - }); - if !running { + Event::ChildExit(idx, env) => { + if self.focus_idx() == Some(idx) { + if let Some(env) = env { if self.hide_readline { let idx = self.env.idx(); self.env = env; self.env.set_idx(idx); } - self.set_focus( - if self.hide_readline { - Focus::Readline - } else { - Focus::Scrolling(Some(idx)) - }, - fullscreen, - ); } + self.set_focus(if self.hide_readline { + Focus::Readline + } else { + Focus::Scrolling(Some(idx)) + }); } } - Event::ChildRunPipeline(idx, span) => self - .history - .with_entry_mut(idx, |entry| entry.set_span(span)), + Event::ChildRunPipeline(idx, span) => { + self.history.entry_mut(idx).set_span(span); + } Event::ChildSuspend(idx) => { if self.focus_idx() == Some(idx) { - self.set_focus(Focus::Readline, false); + self.set_focus(Focus::Readline); } } Event::GitInfo(info) => { @@ -402,7 +386,7 @@ impl Shell { return Some(Action::Quit); } textmode::Key::Ctrl(b'e') => { - self.set_focus(Focus::Scrolling(self.focus_idx()), false); + self.set_focus(Focus::Scrolling(self.focus_idx())); } textmode::Key::Ctrl(b'l') => { return Some(Action::HardRefresh); @@ -410,34 +394,27 @@ impl Shell { textmode::Key::Ctrl(b'm') => { if let Some(idx) = self.focus_idx() { self.readline.clear_input(); - let (input, fullscreen) = - self.history.with_entry(idx, |entry| { - ( - entry.cmd().to_string(), - entry.should_fullscreen(), - ) - }); - let idx = - self.history.run(input, self.env.clone(), event_w); - self.set_focus(Focus::History(idx), fullscreen); + self.history.run( + self.history.entry(idx).cmd().to_string(), + self.env.clone(), + event_w, + ); + let idx = self.history.entry_count() - 1; + self.set_focus(Focus::History(idx)); self.hide_readline = true; self.env.set_idx(idx + 1); } else { - self.set_focus(Focus::Readline, false); + self.set_focus(Focus::Readline); } } textmode::Key::Char(' ') => { let idx = self.focus_idx(); if let Some(idx) = idx { - let (running, fullscreen) = - self.history.with_entry(idx, |entry| { - (entry.running(), entry.should_fullscreen()) - }); - if running { - self.set_focus(Focus::History(idx), fullscreen); + if self.history.entry(idx).running() { + self.set_focus(Focus::History(idx)); } } else { - self.set_focus(Focus::Readline, false); + self.set_focus(Focus::Readline); } } textmode::Key::Char('e') => { @@ -448,64 +425,43 @@ impl Shell { textmode::Key::Char('f') => { if let Some(idx) = self.focus_idx() { let mut focus = Focus::History(idx); - let fullscreen = - self.history.with_entry_mut(idx, |entry| { - if let Focus::Scrolling(_) = self.focus { - entry.set_fullscreen(true); - } else { - entry.toggle_fullscreen(); - if !entry.should_fullscreen() - && !entry.running() - { - focus = Focus::Scrolling(Some(idx)); - } - } - entry.should_fullscreen() - }); - self.set_focus(focus, fullscreen); + let entry = self.history.entry_mut(idx); + if let Focus::Scrolling(_) = self.focus { + entry.set_fullscreen(true); + } else { + entry.toggle_fullscreen(); + if !entry.should_fullscreen() && !entry.running() { + focus = Focus::Scrolling(Some(idx)); + } + } + self.set_focus(focus); } } textmode::Key::Char('i') => { if let Some(idx) = self.focus_idx() { - let input = self - .history - .with_entry(idx, |entry| entry.cmd().to_string()); - self.readline.set_input(input); - self.set_focus(Focus::Readline, false); + self.readline + .set_input(self.history.entry(idx).cmd().to_string()); + self.set_focus(Focus::Readline); } } textmode::Key::Char('j') | textmode::Key::Down => { - self.set_focus( - Focus::Scrolling(self.scroll_down(self.focus_idx())), - false, - ); + self.set_focus(Focus::Scrolling( + self.scroll_down(self.focus_idx()), + )); } textmode::Key::Char('k') | textmode::Key::Up => { - self.set_focus( - Focus::Scrolling(self.scroll_up(self.focus_idx())), - false, - ); + self.set_focus(Focus::Scrolling( + self.scroll_up(self.focus_idx()), + )); } textmode::Key::Char('n') => { - let focus = self.next_running(); - let fullscreen = if let Focus::History(idx) = focus { - self.history.should_fullscreen(idx) - } else { - false - }; - self.set_focus(focus, fullscreen); + self.set_focus(self.next_running()); } textmode::Key::Char('p') => { - let focus = self.prev_running(); - let fullscreen = if let Focus::History(idx) = focus { - self.history.should_fullscreen(idx) - } else { - false - }; - self.set_focus(focus, fullscreen); + self.set_focus(self.prev_running()); } textmode::Key::Char('r') => { - self.set_focus(Focus::Readline, false); + self.set_focus(Focus::Readline); } _ => { return None; @@ -533,15 +489,13 @@ impl Shell { textmode::Key::Ctrl(b'm') => { let input = self.readline.input(); if !input.is_empty() { - let idx = self.history.run( + self.history.run( input.to_string(), self.env.clone(), event_w, ); - self.set_focus( - Focus::History(idx), - self.history.should_fullscreen(idx), - ); + let idx = self.history.entry_count() - 1; + self.set_focus(Focus::History(idx)); self.hide_readline = true; self.env.set_idx(idx + 1); self.readline.clear_input(); @@ -554,10 +508,7 @@ impl Shell { textmode::Key::Up => { let entry_count = self.history.entry_count(); if entry_count > 0 { - self.set_focus( - Focus::Scrolling(Some(entry_count - 1)), - false, - ); + self.set_focus(Focus::Scrolling(Some(entry_count - 1))); } } _ => return None, @@ -566,14 +517,14 @@ impl Shell { } fn handle_key_history(&mut self, key: textmode::Key, idx: usize) { - self.history.send_input(idx, key.into_bytes()); + self.history.entry(idx).input(key.into_bytes()); } - fn default_scene(focus: Focus, fullscreen: bool) -> Scene { + fn default_scene(&self, focus: Focus) -> Scene { match focus { Focus::Readline | Focus::Scrolling(_) => Scene::Readline, - Focus::History(_) => { - if fullscreen { + Focus::History(idx) => { + if self.history.entry(idx).should_fullscreen() { Scene::Fullscreen } else { Scene::Readline @@ -582,10 +533,10 @@ impl Shell { } } - fn set_focus(&mut self, new_focus: Focus, fullscreen: bool) { + fn set_focus(&mut self, new_focus: Focus) { self.focus = new_focus; self.hide_readline = false; - self.scene = Self::default_scene(new_focus, fullscreen); + self.scene = self.default_scene(new_focus); // passing entry into default_scene above consumes it, which means // that the mutex lock will be dropped before we call into // make_focus_visible, which is important because otherwise we might @@ -637,7 +588,7 @@ impl Shell { let count = self.history.entry_count(); let cur = self.focus_idx().unwrap_or(count); for idx in ((cur + 1)..count).chain(0..cur) { - if self.history.running(idx) { + if self.history.entry(idx).running() { return Focus::History(idx); } } @@ -648,7 +599,7 @@ impl Shell { let count = self.history.entry_count(); let cur = self.focus_idx().unwrap_or(count); for idx in ((cur + 1)..count).chain(0..cur).rev() { - if self.history.running(idx) { + if self.history.entry(idx).running() { return Focus::History(idx); } } -- cgit v1.2.3-54-g00ecf