summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-02-25 20:59:40 -0500
committerJesse Luehrs <doy@tozt.net>2022-02-25 20:59:40 -0500
commitb8f61109f7d22a09458d78681155150f39a12269 (patch)
treedd2c67ae51413bd842dea9c86ac4ac3389d6e5e7 /src
parent31d2bd9dfc8da6cec159f38c28f3220c8b538d34 (diff)
downloadnbsh-b8f61109f7d22a09458d78681155150f39a12269.tar.gz
nbsh-b8f61109f7d22a09458d78681155150f39a12269.zip
don't error when sending events during application shutdown
Diffstat (limited to 'src')
-rw-r--r--src/shell/event.rs42
-rw-r--r--src/shell/history/entry.rs4
-rw-r--r--src/shell/history/mod.rs10
-rw-r--r--src/shell/history/pty.rs8
-rw-r--r--src/shell/mod.rs49
5 files changed, 71 insertions, 42 deletions
diff --git a/src/shell/event.rs b/src/shell/event.rs
index ad14705..8f21081 100644
--- a/src/shell/event.rs
+++ b/src/shell/event.rs
@@ -10,13 +10,47 @@ pub enum Event {
ClockTimer,
}
-pub struct Reader {
+pub fn channel() -> (Writer, Reader) {
+ let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel();
+ (Writer::new(event_w), Reader::new(event_r))
+}
+
+#[derive(Clone)]
+pub struct Writer(tokio::sync::mpsc::UnboundedSender<Event>);
+
+impl Writer {
+ pub fn new(event_w: tokio::sync::mpsc::UnboundedSender<Event>) -> Self {
+ Self(event_w)
+ }
+
+ pub fn send(&self, event: Event) {
+ // the only time this should ever error is when the application is
+ // shutting down, at which point we don't actually care about any
+ // further dropped messages
+ #[allow(clippy::let_underscore_drop)]
+ let _ = self.0.send(event);
+ }
+}
+
+pub struct Reader(std::sync::Arc<InnerReader>);
+
+impl Reader {
+ pub fn new(input: tokio::sync::mpsc::UnboundedReceiver<Event>) -> Self {
+ Self(InnerReader::new(input))
+ }
+
+ pub async fn recv(&self) -> Option<Event> {
+ self.0.recv().await
+ }
+}
+
+struct InnerReader {
pending: tokio::sync::Mutex<Pending>,
cvar: tokio::sync::Notify,
}
-impl Reader {
- pub fn new(
+impl InnerReader {
+ fn new(
mut input: tokio::sync::mpsc::UnboundedReceiver<Event>,
) -> std::sync::Arc<Self> {
let this = Self {
@@ -36,7 +70,7 @@ impl Reader {
this
}
- pub async fn recv(&self) -> Option<Event> {
+ async fn recv(&self) -> Option<Event> {
loop {
let mut pending = self.pending.lock().await;
if pending.has_event() {
diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs
index 97e8a7b..ac3a279 100644
--- a/src/shell/history/entry.rs
+++ b/src/shell/history/entry.rs
@@ -341,11 +341,11 @@ impl Entry {
pub async fn finish(
&mut self,
env: Env,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) {
self.state = State::Exited(ExitInfo::new(env.latest_status()));
self.env = env;
- event_w.send(Event::PtyClose).unwrap();
+ event_w.send(Event::PtyClose);
}
fn exit_info(&self) -> Option<&ExitInfo> {
diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs
index 2eeab0b..6d38891 100644
--- a/src/shell/history/mod.rs
+++ b/src/shell/history/mod.rs
@@ -86,7 +86,7 @@ impl History {
&mut self,
cmdline: &str,
env: &Env,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) -> anyhow::Result<usize> {
let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel();
let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel();
@@ -223,7 +223,7 @@ fn run_commands(
mut env: Env,
input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) {
tokio::task::spawn(async move {
let pty = match pty::Pty::new(
@@ -278,7 +278,7 @@ async fn spawn_commands(
cmdline: &str,
pty: &pty::Pty,
env: &mut Env,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) -> anyhow::Result<std::process::ExitStatus> {
enum Res {
Read(crate::runner::Event),
@@ -339,10 +339,10 @@ async fn spawn_commands(
match res {
Res::Read(event) => match event {
crate::runner::Event::RunPipeline(idx, span) => {
- event_w.send(Event::ChildRunPipeline(idx, span)).unwrap();
+ event_w.send(Event::ChildRunPipeline(idx, span));
}
crate::runner::Event::Suspend(idx) => {
- event_w.send(Event::ChildSuspend(idx)).unwrap();
+ event_w.send(Event::ChildSuspend(idx));
}
crate::runner::Event::Exit(new_env) => {
*env = new_env;
diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs
index acfe500..8825f12 100644
--- a/src/shell/history/pty.rs
+++ b/src/shell/history/pty.rs
@@ -11,7 +11,7 @@ impl Pty {
entry: &crate::mutex::Mutex<super::Entry>,
input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) -> anyhow::Result<Self> {
let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel();
@@ -49,7 +49,7 @@ async fn pty_task(
input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
close_r: tokio::sync::mpsc::UnboundedReceiver<()>,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) {
enum Res {
Read(Result<bytes::Bytes, std::io::Error>),
@@ -80,7 +80,7 @@ async fn pty_task(
Res::Read(res) => match res {
Ok(bytes) => {
entry.clone().lock_owned().await.process(&bytes);
- event_w.send(Event::PtyOutput).unwrap();
+ event_w.send(Event::PtyOutput);
}
Err(e) => {
panic!("pty read failed: {:?}", e);
@@ -93,7 +93,7 @@ async fn pty_task(
.resize(pty_process::Size::new(size.0, size.1))
.unwrap(),
Res::Close(()) => {
- event_w.send(Event::PtyClose).unwrap();
+ event_w.send(Event::PtyClose);
return;
}
}
diff --git a/src/shell/mod.rs b/src/shell/mod.rs
index 82d2021..c187272 100644
--- a/src/shell/mod.rs
+++ b/src/shell/mod.rs
@@ -18,7 +18,7 @@ pub async fn main() -> anyhow::Result<i32> {
let _input_guard = input.take_raw_guard();
let _output_guard = output.take_screen_guard();
- let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel();
+ let (event_w, event_r) = event::channel();
{
let mut signals = tokio::signal::unix::signal(
@@ -26,26 +26,24 @@ pub async fn main() -> anyhow::Result<i32> {
)?;
let event_w = event_w.clone();
tokio::task::spawn(async move {
- event_w
- .send(Event::Resize(terminal_size::terminal_size().map_or(
+ event_w.send(Event::Resize(
+ terminal_size::terminal_size().map_or(
(24, 80),
|(terminal_size::Width(w), terminal_size::Height(h))| {
(h, w)
},
- )))
- .unwrap();
+ ),
+ ));
while signals.recv().await.is_some() {
- event_w
- .send(Event::Resize(
- terminal_size::terminal_size().map_or(
- (24, 80),
- |(
- terminal_size::Width(w),
- terminal_size::Height(h),
- )| { (h, w) },
- ),
- ))
- .unwrap();
+ event_w.send(Event::Resize(
+ terminal_size::terminal_size().map_or(
+ (24, 80),
+ |(
+ terminal_size::Width(w),
+ terminal_size::Height(h),
+ )| { (h, w) },
+ ),
+ ));
}
});
}
@@ -54,7 +52,7 @@ pub async fn main() -> anyhow::Result<i32> {
let event_w = event_w.clone();
std::thread::spawn(move || {
while let Some(key) = input.read_key().unwrap() {
- event_w.send(Event::Key(key)).unwrap();
+ event_w.send(Event::Key(key));
}
});
}
@@ -75,7 +73,7 @@ pub async fn main() -> anyhow::Result<i32> {
);
loop {
interval.tick().await;
- event_w.send(Event::ClockTimer).unwrap();
+ event_w.send(Event::ClockTimer);
}
});
}
@@ -126,9 +124,7 @@ pub async fn main() -> anyhow::Result<i32> {
})
.await
.unwrap();
- if event_w.send(Event::GitInfo(info)).is_err() {
- break;
- }
+ event_w.send(Event::GitInfo(info));
}
});
_active_watcher = Some(watcher);
@@ -140,7 +136,7 @@ pub async fn main() -> anyhow::Result<i32> {
})
.await
.unwrap();
- event_w.send(Event::GitInfo(info)).unwrap();
+ event_w.send(Event::GitInfo(info));
}
});
}
@@ -148,8 +144,7 @@ pub async fn main() -> anyhow::Result<i32> {
let mut shell = Shell::new(crate::info::get_offset())?;
let mut prev_dir = shell.env.pwd().to_path_buf();
git_w.send(prev_dir.clone()).unwrap();
- let event_reader = event::Reader::new(event_r);
- while let Some(event) = event_reader.recv().await {
+ while let Some(event) = event_r.recv().await {
let dir = shell.env().pwd();
if dir != prev_dir {
prev_dir = dir.to_path_buf();
@@ -319,7 +314,7 @@ impl Shell {
pub async fn handle_event(
&mut self,
event: Event,
- event_w: &tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: &crate::shell::event::Writer,
) -> Option<Action> {
match event {
Event::Key(key) => {
@@ -402,7 +397,7 @@ impl Shell {
async fn handle_key_escape(
&mut self,
key: textmode::Key,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) -> Option<Action> {
match key {
textmode::Key::Ctrl(b'd') => {
@@ -511,7 +506,7 @@ impl Shell {
async fn handle_key_readline(
&mut self,
key: textmode::Key,
- event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+ event_w: crate::shell::event::Writer,
) -> Option<Action> {
match key {
textmode::Key::Char(c) => {