aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBernd Schoolmann <mail@quexten.com>2023-04-17 02:46:38 +0200
committerBernd Schoolmann <mail@quexten.com>2023-04-17 02:46:38 +0200
commit8fbed0f3fbcac726fb73f2cd49cb7e4d36091c47 (patch)
tree1cc442185cb28b04b94dbd21838578a2291bce48
parent355e17dc29244856454db3bdaeed082cf33231e6 (diff)
downloadrbw-8fbed0f3fbcac726fb73f2cd49cb7e4d36091c47.tar.gz
rbw-8fbed0f3fbcac726fb73f2cd49cb7e4d36091c47.zip
Improve error handling
-rw-r--r--src/bin/rbw-agent/actions.rs25
-rw-r--r--src/bin/rbw-agent/notifications.rs16
2 files changed, 24 insertions, 17 deletions
diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs
index 006f7ca..cc363dd 100644
--- a/src/bin/rbw-agent/actions.rs
+++ b/src/bin/rbw-agent/actions.rs
@@ -659,14 +659,23 @@ async fn config_pinentry() -> anyhow::Result<String> {
}
pub async fn subscribe_to_notifications(state: std::sync::Arc<tokio::sync::RwLock<crate::agent::State>>) -> anyhow::Result<()> {
- let config = rbw::config::Config::load_async().await.expect("Config is missing");
- let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token=";
- let email = config.email.clone().expect("Config is missing email");
- let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await.expect("Error loading db");
- let access_token = db.access_token.expect("Error getting access token");
+ // access token might be out of date, so we do a sync to refresh it
+ sync(None).await?;
+
+ let config = rbw::config::Config::load_async().await.context("Config is missing")?;
+ let email = config.email.clone().context("Config is missing email")?;
+ let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await?;
+ let access_token = db.access_token.context("Error getting access token")?;
+
+ let mut websocket_url = config.base_url.clone().expect("config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token=";
websocket_url = websocket_url + &access_token;
+
let mut state = state.write().await;
- state.notifications_handler.connect(websocket_url).await.expect("Error connecting to websocket");
-
- Ok(())
+ let err = state.notifications_handler.connect(websocket_url).await.err();
+
+ if let Some(err) = err {
+ return Err(anyhow::anyhow!(err.to_string()));
+ } else {
+ Ok(())
+ }
} \ No newline at end of file
diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs
index c72fe38..ab24c42 100644
--- a/src/bin/rbw-agent/notifications.rs
+++ b/src/bin/rbw-agent/notifications.rs
@@ -74,7 +74,6 @@ impl NotificationsHandler {
self.disconnect().await?;
}
- //subscribe_to_notifications(url, self.sending_channels.clone()).await?;
let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?;
self.write = Some(write);
@@ -91,6 +90,7 @@ impl NotificationsHandler {
if let Some(mut write) = self.write.take() {
write.send(Message::Close(None)).await?;
write.close().await?;
+ self.read_handle.take().unwrap().await?;
}
Ok(())
}
@@ -105,8 +105,7 @@ impl NotificationsHandler {
async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::UnboundedSender<NotificationMessage>>>>) -> Result<(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, JoinHandle<()>), Box<dyn std::error::Error>> {
let url = url::Url::parse(url.as_str())?;
- println!("Connecting to {}", url);
- let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect");
+ let (ws_stream, _response) = connect_async(url).await?;
let (mut write, read) = ws_stream.split();
write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap();
@@ -119,14 +118,13 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar
match message {
Ok(Message::Binary(binary)) => {
- if binary.len() < 4 {
- return;
- }
-
- let msg1 = parse_messagepack(&binary);
- if let Some(msg) = msg1 {
+ let msgpack = parse_messagepack(&binary);
+ if let Some(msg) = msgpack {
for channel in a.iter() {
let res = channel.send(msg);
+ if res.is_err() {
+ println!("error sending websocket message to channel");
+ }
}
}
},