diff options
author | Jesse Luehrs <doy@tozt.net> | 2023-03-25 18:29:04 -0400 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2023-03-25 23:14:16 -0400 |
commit | b659cc500476a7b4b94bc6659d46922be9465b99 (patch) | |
tree | cdf83f293d4951bf2565cc8f92e73413d7e81819 /src/bin/rbw-agent/timeout.rs | |
parent | ad0a078a5a2c4c9efd16b20ff47cc4e1ef922dab (diff) | |
download | rbw-b659cc500476a7b4b94bc6659d46922be9465b99.tar.gz rbw-b659cc500476a7b4b94bc6659d46922be9465b99.zip |
stop using tokio::select!
Diffstat (limited to 'src/bin/rbw-agent/timeout.rs')
-rw-r--r-- | src/bin/rbw-agent/timeout.rs | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/src/bin/rbw-agent/timeout.rs b/src/bin/rbw-agent/timeout.rs new file mode 100644 index 0000000..e613ff0 --- /dev/null +++ b/src/bin/rbw-agent/timeout.rs @@ -0,0 +1,66 @@ +use futures_util::StreamExt as _; + +#[derive(Debug, Hash, Eq, PartialEq, Copy, Clone)] +enum Streams { + Requests, + Timer, +} + +#[derive(Debug)] +enum Action { + Set(std::time::Duration), + Clear, +} + +pub struct Timeout { + req_w: tokio::sync::mpsc::UnboundedSender<Action>, +} + +impl Timeout { + pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<()>) { + let (req_w, req_r) = tokio::sync::mpsc::unbounded_channel(); + let (timer_w, timer_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn(async move { + enum Event { + Request(Action), + Timer, + } + let mut stream = tokio_stream::StreamMap::new(); + stream.insert( + Streams::Requests, + tokio_stream::wrappers::UnboundedReceiverStream::new(req_r) + .map(Event::Request) + .boxed(), + ); + while let Some(event) = stream.next().await { + match event { + (_, Event::Request(Action::Set(dur))) => { + stream.insert( + Streams::Timer, + futures_util::stream::once(tokio::time::sleep( + dur, + )) + .map(|_| Event::Timer) + .boxed(), + ); + } + (_, Event::Request(Action::Clear)) => { + stream.remove(&Streams::Timer); + } + (_, Event::Timer) => { + timer_w.send(()).unwrap(); + } + } + } + }); + (Self { req_w }, timer_r) + } + + pub fn set(&self, dur: std::time::Duration) { + self.req_w.send(Action::Set(dur)).unwrap(); + } + + pub fn clear(&self) { + self.req_w.send(Action::Clear).unwrap(); + } +} |