aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin/rbw-agent/notifications.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/rbw-agent/notifications.rs')
-rw-r--r--src/bin/rbw-agent/notifications.rs16
1 files changed, 7 insertions, 9 deletions
diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs
index c72fe38..ab24c42 100644
--- a/src/bin/rbw-agent/notifications.rs
+++ b/src/bin/rbw-agent/notifications.rs
@@ -74,7 +74,6 @@ impl NotificationsHandler {
self.disconnect().await?;
}
- //subscribe_to_notifications(url, self.sending_channels.clone()).await?;
let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?;
self.write = Some(write);
@@ -91,6 +90,7 @@ impl NotificationsHandler {
if let Some(mut write) = self.write.take() {
write.send(Message::Close(None)).await?;
write.close().await?;
+ self.read_handle.take().unwrap().await?;
}
Ok(())
}
@@ -105,8 +105,7 @@ impl NotificationsHandler {
async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::UnboundedSender<NotificationMessage>>>>) -> Result<(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, JoinHandle<()>), Box<dyn std::error::Error>> {
let url = url::Url::parse(url.as_str())?;
- println!("Connecting to {}", url);
- let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect");
+ let (ws_stream, _response) = connect_async(url).await?;
let (mut write, read) = ws_stream.split();
write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap();
@@ -119,14 +118,13 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar
match message {
Ok(Message::Binary(binary)) => {
- if binary.len() < 4 {
- return;
- }
-
- let msg1 = parse_messagepack(&binary);
- if let Some(msg) = msg1 {
+ let msgpack = parse_messagepack(&binary);
+ if let Some(msg) = msgpack {
for channel in a.iter() {
let res = channel.send(msg);
+ if res.is_err() {
+ println!("error sending websocket message to channel");
+ }
}
}
},