From b8f61109f7d22a09458d78681155150f39a12269 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 25 Feb 2022 20:59:40 -0500 Subject: don't error when sending events during application shutdown --- src/shell/event.rs | 42 +++++++++++++++++++++++++++++++++++---- src/shell/history/entry.rs | 4 ++-- src/shell/history/mod.rs | 10 +++++----- src/shell/history/pty.rs | 8 ++++---- src/shell/mod.rs | 49 +++++++++++++++++++++------------------------- 5 files changed, 71 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/shell/event.rs b/src/shell/event.rs index ad14705..8f21081 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -10,13 +10,47 @@ pub enum Event { ClockTimer, } -pub struct Reader { +pub fn channel() -> (Writer, Reader) { + let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); + (Writer::new(event_w), Reader::new(event_r)) +} + +#[derive(Clone)] +pub struct Writer(tokio::sync::mpsc::UnboundedSender); + +impl Writer { + pub fn new(event_w: tokio::sync::mpsc::UnboundedSender) -> Self { + Self(event_w) + } + + pub fn send(&self, event: Event) { + // the only time this should ever error is when the application is + // shutting down, at which point we don't actually care about any + // further dropped messages + #[allow(clippy::let_underscore_drop)] + let _ = self.0.send(event); + } +} + +pub struct Reader(std::sync::Arc); + +impl Reader { + pub fn new(input: tokio::sync::mpsc::UnboundedReceiver) -> Self { + Self(InnerReader::new(input)) + } + + pub async fn recv(&self) -> Option { + self.0.recv().await + } +} + +struct InnerReader { pending: tokio::sync::Mutex, cvar: tokio::sync::Notify, } -impl Reader { - pub fn new( +impl InnerReader { + fn new( mut input: tokio::sync::mpsc::UnboundedReceiver, ) -> std::sync::Arc { let this = Self { @@ -36,7 +70,7 @@ impl Reader { this } - pub async fn recv(&self) -> Option { + async fn recv(&self) -> Option { loop { let mut pending = self.pending.lock().await; if pending.has_event() { diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs index 97e8a7b..ac3a279 100644 --- a/src/shell/history/entry.rs +++ b/src/shell/history/entry.rs @@ -341,11 +341,11 @@ impl Entry { pub async fn finish( &mut self, env: Env, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) { self.state = State::Exited(ExitInfo::new(env.latest_status())); self.env = env; - event_w.send(Event::PtyClose).unwrap(); + event_w.send(Event::PtyClose); } fn exit_info(&self) -> Option<&ExitInfo> { diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 2eeab0b..6d38891 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -86,7 +86,7 @@ impl History { &mut self, cmdline: &str, env: &Env, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) -> anyhow::Result { let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); @@ -223,7 +223,7 @@ fn run_commands( mut env: Env, input_r: tokio::sync::mpsc::UnboundedReceiver>, resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) { tokio::task::spawn(async move { let pty = match pty::Pty::new( @@ -278,7 +278,7 @@ async fn spawn_commands( cmdline: &str, pty: &pty::Pty, env: &mut Env, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) -> anyhow::Result { enum Res { Read(crate::runner::Event), @@ -339,10 +339,10 @@ async fn spawn_commands( match res { Res::Read(event) => match event { crate::runner::Event::RunPipeline(idx, span) => { - event_w.send(Event::ChildRunPipeline(idx, span)).unwrap(); + event_w.send(Event::ChildRunPipeline(idx, span)); } crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).unwrap(); + event_w.send(Event::ChildSuspend(idx)); } crate::runner::Event::Exit(new_env) => { *env = new_env; diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index acfe500..8825f12 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -11,7 +11,7 @@ impl Pty { entry: &crate::mutex::Mutex, input_r: tokio::sync::mpsc::UnboundedReceiver>, resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) -> anyhow::Result { let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel(); @@ -49,7 +49,7 @@ async fn pty_task( input_r: tokio::sync::mpsc::UnboundedReceiver>, resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, close_r: tokio::sync::mpsc::UnboundedReceiver<()>, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) { enum Res { Read(Result), @@ -80,7 +80,7 @@ async fn pty_task( Res::Read(res) => match res { Ok(bytes) => { entry.clone().lock_owned().await.process(&bytes); - event_w.send(Event::PtyOutput).unwrap(); + event_w.send(Event::PtyOutput); } Err(e) => { panic!("pty read failed: {:?}", e); @@ -93,7 +93,7 @@ async fn pty_task( .resize(pty_process::Size::new(size.0, size.1)) .unwrap(), Res::Close(()) => { - event_w.send(Event::PtyClose).unwrap(); + event_w.send(Event::PtyClose); return; } } diff --git a/src/shell/mod.rs b/src/shell/mod.rs index 82d2021..c187272 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -18,7 +18,7 @@ pub async fn main() -> anyhow::Result { let _input_guard = input.take_raw_guard(); let _output_guard = output.take_screen_guard(); - let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); + let (event_w, event_r) = event::channel(); { let mut signals = tokio::signal::unix::signal( @@ -26,26 +26,24 @@ pub async fn main() -> anyhow::Result { )?; let event_w = event_w.clone(); tokio::task::spawn(async move { - event_w - .send(Event::Resize(terminal_size::terminal_size().map_or( + event_w.send(Event::Resize( + terminal_size::terminal_size().map_or( (24, 80), |(terminal_size::Width(w), terminal_size::Height(h))| { (h, w) }, - ))) - .unwrap(); + ), + )); while signals.recv().await.is_some() { - event_w - .send(Event::Resize( - terminal_size::terminal_size().map_or( - (24, 80), - |( - terminal_size::Width(w), - terminal_size::Height(h), - )| { (h, w) }, - ), - )) - .unwrap(); + event_w.send(Event::Resize( + terminal_size::terminal_size().map_or( + (24, 80), + |( + terminal_size::Width(w), + terminal_size::Height(h), + )| { (h, w) }, + ), + )); } }); } @@ -54,7 +52,7 @@ pub async fn main() -> anyhow::Result { let event_w = event_w.clone(); std::thread::spawn(move || { while let Some(key) = input.read_key().unwrap() { - event_w.send(Event::Key(key)).unwrap(); + event_w.send(Event::Key(key)); } }); } @@ -75,7 +73,7 @@ pub async fn main() -> anyhow::Result { ); loop { interval.tick().await; - event_w.send(Event::ClockTimer).unwrap(); + event_w.send(Event::ClockTimer); } }); } @@ -126,9 +124,7 @@ pub async fn main() -> anyhow::Result { }) .await .unwrap(); - if event_w.send(Event::GitInfo(info)).is_err() { - break; - } + event_w.send(Event::GitInfo(info)); } }); _active_watcher = Some(watcher); @@ -140,7 +136,7 @@ pub async fn main() -> anyhow::Result { }) .await .unwrap(); - event_w.send(Event::GitInfo(info)).unwrap(); + event_w.send(Event::GitInfo(info)); } }); } @@ -148,8 +144,7 @@ pub async fn main() -> anyhow::Result { let mut shell = Shell::new(crate::info::get_offset())?; let mut prev_dir = shell.env.pwd().to_path_buf(); git_w.send(prev_dir.clone()).unwrap(); - let event_reader = event::Reader::new(event_r); - while let Some(event) = event_reader.recv().await { + while let Some(event) = event_r.recv().await { let dir = shell.env().pwd(); if dir != prev_dir { prev_dir = dir.to_path_buf(); @@ -319,7 +314,7 @@ impl Shell { pub async fn handle_event( &mut self, event: Event, - event_w: &tokio::sync::mpsc::UnboundedSender, + event_w: &crate::shell::event::Writer, ) -> Option { match event { Event::Key(key) => { @@ -402,7 +397,7 @@ impl Shell { async fn handle_key_escape( &mut self, key: textmode::Key, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) -> Option { match key { textmode::Key::Ctrl(b'd') => { @@ -511,7 +506,7 @@ impl Shell { async fn handle_key_readline( &mut self, key: textmode::Key, - event_w: tokio::sync::mpsc::UnboundedSender, + event_w: crate::shell::event::Writer, ) -> Option { match key { textmode::Key::Char(c) => { -- cgit v1.2.3-54-g00ecf