From 76ab1de92a36b151b3c817e16737fd703567a3f2 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 19 Jul 2023 01:05:42 -0400 Subject: more correct websocket notification handling the servers tend to be fairly chatty with messages, mostly pings and heartbeats of various sorts, and we don't want to sync on all of those. also, the message type in the first array element of the messagepack structure is not the same thing as the UpdateType - that is stored as an argument to the ReceiveMessage invocation, so we need to parse a bit further to get the actual UpdateType. this still just does a full sync on any changes, though. --- src/bin/rbw-agent/agent.rs | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) (limited to 'src/bin/rbw-agent/agent.rs') diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index 29e400b..5769c0e 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -83,31 +83,28 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { - enum Event { + pub enum Event { Request(std::io::Result), Timeout(()), Sync(()), } - let c: tokio::sync::mpsc::UnboundedReceiver< - notifications::NotificationMessage, - > = { - self.state - .lock() - .await - .notifications_handler - .get_channel() - .await - }; + let notifications = self + .state + .lock() + .await + .notifications_handler + .get_channel() + .await; let notifications = - tokio_stream::wrappers::UnboundedReceiverStream::new(c) - .map(|message| match message { - notifications::NotificationMessage::Logout => { - Event::Timeout(()) - } - _ => Event::Sync(()), - }) - .boxed(); + tokio_stream::wrappers::UnboundedReceiverStream::new( + notifications, + ) + .map(|message| match message { + notifications::Message::Logout => Event::Timeout(()), + notifications::Message::Sync => Event::Sync(()), + }) + .boxed(); let mut stream = futures_util::stream::select_all([ tokio_stream::wrappers::UnixListenerStream::new(listener) -- cgit v1.2.3-54-g00ecf