1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
use futures_util::StreamExt as _;
#[derive(Debug, Hash, Eq, PartialEq, Copy, Clone)]
enum Streams {
Requests,
Timer,
}
#[derive(Debug)]
enum Action {
Set(std::time::Duration),
Clear,
}
pub struct Timeout {
req_w: tokio::sync::mpsc::UnboundedSender<Action>,
}
impl Timeout {
pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<()>) {
let (req_w, req_r) = tokio::sync::mpsc::unbounded_channel();
let (timer_w, timer_r) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
enum Event {
Request(Action),
Timer,
}
let mut stream = tokio_stream::StreamMap::new();
stream.insert(
Streams::Requests,
tokio_stream::wrappers::UnboundedReceiverStream::new(req_r)
.map(Event::Request)
.boxed(),
);
while let Some(event) = stream.next().await {
match event {
(_, Event::Request(Action::Set(dur))) => {
stream.insert(
Streams::Timer,
futures_util::stream::once(tokio::time::sleep(
dur,
))
.map(|_| Event::Timer)
.boxed(),
);
}
(_, Event::Request(Action::Clear)) => {
stream.remove(&Streams::Timer);
}
(_, Event::Timer) => {
timer_w.send(()).unwrap();
}
}
}
});
(Self { req_w }, timer_r)
}
pub fn set(&self, dur: std::time::Duration) {
self.req_w.send(Action::Set(dur)).unwrap();
}
pub fn clear(&self) {
self.req_w.send(Action::Clear).unwrap();
}
}
|