From 8fbed0f3fbcac726fb73f2cd49cb7e4d36091c47 Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Mon, 17 Apr 2023 02:46:38 +0200 Subject: Improve error handling --- src/bin/rbw-agent/actions.rs | 25 +++++++++++++++++-------- src/bin/rbw-agent/notifications.rs | 16 +++++++--------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs index 006f7ca..cc363dd 100644 --- a/src/bin/rbw-agent/actions.rs +++ b/src/bin/rbw-agent/actions.rs @@ -659,14 +659,23 @@ async fn config_pinentry() -> anyhow::Result { } pub async fn subscribe_to_notifications(state: std::sync::Arc>) -> anyhow::Result<()> { - let config = rbw::config::Config::load_async().await.expect("Config is missing"); - let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; - let email = config.email.clone().expect("Config is missing email"); - let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await.expect("Error loading db"); - let access_token = db.access_token.expect("Error getting access token"); + // access token might be out of date, so we do a sync to refresh it + sync(None).await?; + + let config = rbw::config::Config::load_async().await.context("Config is missing")?; + let email = config.email.clone().context("Config is missing email")?; + let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await?; + let access_token = db.access_token.context("Error getting access token")?; + + let mut websocket_url = config.base_url.clone().expect("config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; websocket_url = websocket_url + &access_token; + let mut state = state.write().await; - state.notifications_handler.connect(websocket_url).await.expect("Error connecting to websocket"); - - Ok(()) + let err = state.notifications_handler.connect(websocket_url).await.err(); + + if let Some(err) = err { + return Err(anyhow::anyhow!(err.to_string())); + } else { + Ok(()) + } } \ No newline at end of file diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index c72fe38..ab24c42 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -74,7 +74,6 @@ impl NotificationsHandler { self.disconnect().await?; } - //subscribe_to_notifications(url, self.sending_channels.clone()).await?; let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?; self.write = Some(write); @@ -91,6 +90,7 @@ impl NotificationsHandler { if let Some(mut write) = self.write.take() { write.send(Message::Close(None)).await?; write.close().await?; + self.read_handle.take().unwrap().await?; } Ok(()) } @@ -105,8 +105,7 @@ impl NotificationsHandler { async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc>>>) -> Result<(SplitSink>, Message>, JoinHandle<()>), Box> { let url = url::Url::parse(url.as_str())?; - println!("Connecting to {}", url); - let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect"); + let (ws_stream, _response) = connect_async(url).await?; let (mut write, read) = ws_stream.split(); write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); @@ -119,14 +118,13 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar match message { Ok(Message::Binary(binary)) => { - if binary.len() < 4 { - return; - } - - let msg1 = parse_messagepack(&binary); - if let Some(msg) = msg1 { + let msgpack = parse_messagepack(&binary); + if let Some(msg) = msgpack { for channel in a.iter() { let res = channel.send(msg); + if res.is_err() { + println!("error sending websocket message to channel"); + } } } }, -- cgit v1.2.3-54-g00ecf