aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin/rbw-agent/agent.rs
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2023-07-19 01:05:42 -0400
committerJesse Luehrs <doy@tozt.net>2023-07-19 01:05:42 -0400
commit76ab1de92a36b151b3c817e16737fd703567a3f2 (patch)
tree688f604bce050f154ec35b7f6faf0c161a5c79b6 /src/bin/rbw-agent/agent.rs
parent7a0eae68c1f3496a1d421b61f66115a7889d7e92 (diff)
downloadrbw-76ab1de92a36b151b3c817e16737fd703567a3f2.tar.gz
rbw-76ab1de92a36b151b3c817e16737fd703567a3f2.zip
more correct websocket notification handling
the servers tend to be fairly chatty with messages, mostly pings and heartbeats of various sorts, and we don't want to sync on all of those. also, the message type in the first array element of the messagepack structure is not the same thing as the UpdateType - that is stored as an argument to the ReceiveMessage invocation, so we need to parse a bit further to get the actual UpdateType. this still just does a full sync on any changes, though.
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r--src/bin/rbw-agent/agent.rs35
1 files changed, 16 insertions, 19 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs
index 29e400b..5769c0e 100644
--- a/src/bin/rbw-agent/agent.rs
+++ b/src/bin/rbw-agent/agent.rs
@@ -83,31 +83,28 @@ impl Agent {
self,
listener: tokio::net::UnixListener,
) -> anyhow::Result<()> {
- enum Event {
+ pub enum Event {
Request(std::io::Result<tokio::net::UnixStream>),
Timeout(()),
Sync(()),
}
- let c: tokio::sync::mpsc::UnboundedReceiver<
- notifications::NotificationMessage,
- > = {
- self.state
- .lock()
- .await
- .notifications_handler
- .get_channel()
- .await
- };
+ let notifications = self
+ .state
+ .lock()
+ .await
+ .notifications_handler
+ .get_channel()
+ .await;
let notifications =
- tokio_stream::wrappers::UnboundedReceiverStream::new(c)
- .map(|message| match message {
- notifications::NotificationMessage::Logout => {
- Event::Timeout(())
- }
- _ => Event::Sync(()),
- })
- .boxed();
+ tokio_stream::wrappers::UnboundedReceiverStream::new(
+ notifications,
+ )
+ .map(|message| match message {
+ notifications::Message::Logout => Event::Timeout(()),
+ notifications::Message::Sync => Event::Sync(()),
+ })
+ .boxed();
let mut stream = futures_util::stream::select_all([
tokio_stream::wrappers::UnixListenerStream::new(listener)