summaryrefslogtreecommitdiffstats
path: root/src/shell/event.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shell/event.rs')
-rw-r--r--src/shell/event.rs38
1 files changed, 20 insertions, 18 deletions
diff --git a/src/shell/event.rs b/src/shell/event.rs
index 025f3c4..ad14705 100644
--- a/src/shell/event.rs
+++ b/src/shell/event.rs
@@ -11,22 +11,23 @@ pub enum Event {
}
pub struct Reader {
- pending: async_std::sync::Mutex<Pending>,
- cvar: async_std::sync::Condvar,
+ pending: tokio::sync::Mutex<Pending>,
+ cvar: tokio::sync::Notify,
}
impl Reader {
pub fn new(
- input: async_std::channel::Receiver<Event>,
- ) -> async_std::sync::Arc<Self> {
- let this = async_std::sync::Arc::new(Self {
- pending: async_std::sync::Mutex::new(Pending::new()),
- cvar: async_std::sync::Condvar::new(),
- });
+ mut input: tokio::sync::mpsc::UnboundedReceiver<Event>,
+ ) -> std::sync::Arc<Self> {
+ let this = Self {
+ pending: tokio::sync::Mutex::new(Pending::new()),
+ cvar: tokio::sync::Notify::new(),
+ };
+ let this = std::sync::Arc::new(this);
{
- let this = async_std::sync::Arc::clone(&this);
- async_std::task::spawn(async move {
- while let Ok(event) = input.recv().await {
+ let this = this.clone();
+ tokio::task::spawn(async move {
+ while let Some(event) = input.recv().await {
this.new_event(Some(event)).await;
}
this.new_event(None).await;
@@ -36,13 +37,14 @@ impl Reader {
}
pub async fn recv(&self) -> Option<Event> {
- let mut pending = self
- .cvar
- .wait_until(self.pending.lock().await, |pending| {
- pending.has_event()
- })
- .await;
- pending.get_event()
+ loop {
+ let mut pending = self.pending.lock().await;
+ if pending.has_event() {
+ return pending.get_event();
+ }
+ drop(pending);
+ self.cvar.notified().await;
+ }
}
async fn new_event(&self, event: Option<Event>) {