diff options
Diffstat (limited to 'src/bin/rbw-agent/notifications.rs')
-rw-r--r-- | src/bin/rbw-agent/notifications.rs | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index c72fe38..ab24c42 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -74,7 +74,6 @@ impl NotificationsHandler { self.disconnect().await?; } - //subscribe_to_notifications(url, self.sending_channels.clone()).await?; let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?; self.write = Some(write); @@ -91,6 +90,7 @@ impl NotificationsHandler { if let Some(mut write) = self.write.take() { write.send(Message::Close(None)).await?; write.close().await?; + self.read_handle.take().unwrap().await?; } Ok(()) } @@ -105,8 +105,7 @@ impl NotificationsHandler { async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::UnboundedSender<NotificationMessage>>>>) -> Result<(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, JoinHandle<()>), Box<dyn std::error::Error>> { let url = url::Url::parse(url.as_str())?; - println!("Connecting to {}", url); - let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect"); + let (ws_stream, _response) = connect_async(url).await?; let (mut write, read) = ws_stream.split(); write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); @@ -119,14 +118,13 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar match message { Ok(Message::Binary(binary)) => { - if binary.len() < 4 { - return; - } - - let msg1 = parse_messagepack(&binary); - if let Some(msg) = msg1 { + let msgpack = parse_messagepack(&binary); + if let Some(msg) = msgpack { for channel in a.iter() { let res = channel.send(msg); + if res.is_err() { + println!("error sending websocket message to channel"); + } } } }, |