diff options
Diffstat (limited to 'src/shell/event.rs')
-rw-r--r-- | src/shell/event.rs | 135 |
1 files changed, 79 insertions, 56 deletions
diff --git a/src/shell/event.rs b/src/shell/event.rs index 025f3c4..dc58e6f 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -1,53 +1,87 @@ +use crate::prelude::*; + #[derive(Debug)] pub enum Event { Key(textmode::Key), Resize((u16, u16)), PtyOutput, - PtyClose, ChildRunPipeline(usize, (usize, usize)), ChildSuspend(usize), - GitInfo(Option<super::git::Info>), + ChildExit(usize, super::history::ExitInfo, Option<Env>), + GitInfo(Option<super::inputs::GitInfo>), ClockTimer, } -pub struct Reader { - pending: async_std::sync::Mutex<Pending>, - cvar: async_std::sync::Condvar, +pub fn channel() -> (Writer, Reader) { + let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); + (Writer::new(event_w), Reader::new(event_r)) +} + +#[derive(Clone)] +pub struct Writer(tokio::sync::mpsc::UnboundedSender<Event>); + +impl Writer { + pub fn new(event_w: tokio::sync::mpsc::UnboundedSender<Event>) -> Self { + Self(event_w) + } + + pub fn send(&self, event: Event) { + // the only time this should ever error is when the application is + // shutting down, at which point we don't actually care about any + // further dropped messages + #[allow(clippy::let_underscore_drop)] + let _ = self.0.send(event); + } } +pub struct Reader(std::sync::Arc<InnerReader>); + impl Reader { pub fn new( - input: async_std::channel::Receiver<Event>, - ) -> async_std::sync::Arc<Self> { - let this = async_std::sync::Arc::new(Self { - pending: async_std::sync::Mutex::new(Pending::new()), - cvar: async_std::sync::Condvar::new(), - }); + mut input: tokio::sync::mpsc::UnboundedReceiver<Event>, + ) -> Self { + let inner = std::sync::Arc::new(InnerReader::new()); { - let this = async_std::sync::Arc::clone(&this); - async_std::task::spawn(async move { - while let Ok(event) = input.recv().await { - this.new_event(Some(event)).await; + let inner = inner.clone(); + tokio::spawn(async move { + while let Some(event) = input.recv().await { + inner.new_event(Some(event)); } - this.new_event(None).await; + inner.new_event(None); }); } - this + Self(inner) } pub async fn recv(&self) -> Option<Event> { - let mut pending = self - .cvar - .wait_until(self.pending.lock().await, |pending| { - pending.has_event() - }) - .await; - pending.get_event() + self.0.recv().await + } +} + +struct InnerReader { + pending: std::sync::Mutex<Pending>, + cvar: tokio::sync::Notify, +} + +impl InnerReader { + fn new() -> Self { + Self { + pending: std::sync::Mutex::new(Pending::new()), + cvar: tokio::sync::Notify::new(), + } + } + + async fn recv(&self) -> Option<Event> { + loop { + if let Some(event) = self.pending.lock().unwrap().get_event() { + return event; + } + self.cvar.notified().await; + } } - async fn new_event(&self, event: Option<Event>) { - let mut pending = self.pending.lock().await; - pending.new_event(event); + fn new_event(&self, event: Option<Event>) { + self.pending.lock().unwrap().new_event(event); self.cvar.notify_one(); } } @@ -58,10 +92,10 @@ struct Pending { key: std::collections::VecDeque<textmode::Key>, size: Option<(u16, u16)>, pty_output: bool, - pty_close: bool, child_run_pipeline: std::collections::VecDeque<(usize, (usize, usize))>, child_suspend: std::collections::VecDeque<usize>, - git_info: Option<Option<super::git::Info>>, + child_exit: Option<(usize, super::history::ExitInfo, Option<Env>)>, + git_info: Option<Option<super::inputs::GitInfo>>, clock_timer: bool, done: bool, } @@ -71,53 +105,40 @@ impl Pending { Self::default() } - fn has_event(&self) -> bool { - self.done - || !self.key.is_empty() - || self.size.is_some() - || self.pty_output - || self.pty_close - || !self.child_run_pipeline.is_empty() - || !self.child_suspend.is_empty() - || self.git_info.is_some() - || self.clock_timer - } - - fn get_event(&mut self) -> Option<Event> { + fn get_event(&mut self) -> Option<Option<Event>> { if self.done { - return None; + return Some(None); } if let Some(key) = self.key.pop_front() { - return Some(Event::Key(key)); + return Some(Some(Event::Key(key))); } if let Some(size) = self.size.take() { - return Some(Event::Resize(size)); - } - if self.pty_close { - self.pty_close = false; - return Some(Event::PtyClose); + return Some(Some(Event::Resize(size))); } if let Some((idx, span)) = self.child_run_pipeline.pop_front() { - return Some(Event::ChildRunPipeline(idx, span)); + return Some(Some(Event::ChildRunPipeline(idx, span))); } if let Some(idx) = self.child_suspend.pop_front() { - return Some(Event::ChildSuspend(idx)); + return Some(Some(Event::ChildSuspend(idx))); + } + if let Some((idx, exit_info, env)) = self.child_exit.take() { + return Some(Some(Event::ChildExit(idx, exit_info, env))); } if let Some(info) = self.git_info.take() { - return Some(Event::GitInfo(info)); + return Some(Some(Event::GitInfo(info))); } if self.clock_timer { self.clock_timer = false; - return Some(Event::ClockTimer); + return Some(Some(Event::ClockTimer)); } // process_output should be last because it will often be the case // that there is ~always new process output (cat on large files, yes, // etc) and that shouldn't prevent other events from happening if self.pty_output { self.pty_output = false; - return Some(Event::PtyOutput); + return Some(Some(Event::PtyOutput)); } - unreachable!() + None } fn new_event(&mut self, event: Option<Event>) { @@ -125,13 +146,15 @@ impl Pending { Some(Event::Key(key)) => self.key.push_back(key), Some(Event::Resize(size)) => self.size = Some(size), Some(Event::PtyOutput) => self.pty_output = true, - Some(Event::PtyClose) => self.pty_close = true, Some(Event::ChildRunPipeline(idx, span)) => { self.child_run_pipeline.push_back((idx, span)); } Some(Event::ChildSuspend(idx)) => { self.child_suspend.push_back(idx); } + Some(Event::ChildExit(idx, exit_info, env)) => { + self.child_exit = Some((idx, exit_info, env)); + } Some(Event::GitInfo(info)) => self.git_info = Some(info), Some(Event::ClockTimer) => self.clock_timer = true, None => self.done = true, |