aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin/rbw-agent
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/rbw-agent')
-rw-r--r--src/bin/rbw-agent/actions.rs14
-rw-r--r--src/bin/rbw-agent/agent.rs44
-rw-r--r--src/bin/rbw-agent/notifications.rs21
3 files changed, 36 insertions, 43 deletions
diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs
index 4d77133..1974736 100644
--- a/src/bin/rbw-agent/actions.rs
+++ b/src/bin/rbw-agent/actions.rs
@@ -1,5 +1,3 @@
-use std::f32::consts::E;
-
use anyhow::Context as _;
pub async fn register(
@@ -210,7 +208,7 @@ pub async fn login(
let err = subscribe_to_notifications(state.clone()).await.err();
if let Some(e) = err {
- eprintln!("failed to subscribe to notifications: {}", e)
+ eprintln!("failed to subscribe to notifications: {e}");
}
respond_ack(sock).await?;
@@ -674,7 +672,7 @@ pub async fn subscribe_to_notifications(
.await
.context("Config is missing")?;
let email = config.email.clone().context("Config is missing email")?;
- let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email)
+ let db = rbw::db::Db::load_async(config.server_name().as_str(), &email)
.await?;
let access_token =
db.access_token.context("Error getting access token")?;
@@ -685,7 +683,7 @@ pub async fn subscribe_to_notifications(
.expect("config is missing base url")
.replace("https://", "wss://")
+ "/notifications/hub?access_token=";
- websocket_url = websocket_url + &access_token;
+ websocket_url.push_str(&access_token);
let mut state = state.write().await;
let err = state
@@ -694,9 +692,5 @@ pub async fn subscribe_to_notifications(
.await
.err();
- if let Some(err) = err {
- return Err(anyhow::anyhow!(err.to_string()));
- } else {
- Ok(())
- }
+ err.map_or_else(|| Ok(()), |err| Err(anyhow::anyhow!(err.to_string())))
}
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs
index b88121d..d4b3341 100644
--- a/src/bin/rbw-agent/agent.rs
+++ b/src/bin/rbw-agent/agent.rs
@@ -1,4 +1,3 @@
-use aes::cipher::typenum::private::IsNotEqualPrivate;
use anyhow::Context as _;
use futures_util::StreamExt as _;
@@ -12,7 +11,7 @@ pub struct State {
pub timeout_duration: std::time::Duration,
pub sync_timeout: crate::timeout::Timeout,
pub sync_timeout_duration: std::time::Duration,
- pub notifications_handler: crate::notifications::NotificationsHandler,
+ pub notifications_handler: crate::notifications::Handler,
}
impl State {
@@ -59,8 +58,7 @@ impl Agent {
if sync_timeout_duration > std::time::Duration::ZERO {
sync_timeout.set(sync_timeout_duration);
}
- let notifications_handler =
- crate::notifications::NotificationsHandler::new();
+ let notifications_handler = crate::notifications::Handler::new();
Ok(Self {
timer_r,
sync_timer_r,
@@ -80,19 +78,19 @@ impl Agent {
self,
listener: tokio::net::UnixListener,
) -> anyhow::Result<()> {
- let err =
- crate::actions::subscribe_to_notifications(self.state.clone())
- .await;
- if let Err(e) = err {
- eprintln!("failed to subscribe to notifications: {e:#}")
- }
-
enum Event {
Request(std::io::Result<tokio::net::UnixStream>),
Timeout(()),
Sync(()),
}
+ let err =
+ crate::actions::subscribe_to_notifications(self.state.clone())
+ .await;
+ if let Err(e) = err {
+ eprintln!("failed to subscribe to notifications: {e:#}");
+ }
+
let c: tokio::sync::mpsc::UnboundedReceiver<
notifications::NotificationMessage,
> = {
@@ -160,17 +158,19 @@ impl Agent {
let result = crate::actions::sync(None).await;
if let Err(e) = result {
eprintln!("failed to sync: {e:#}");
- } else {
- if !state
- .write()
- .await
- .notifications_handler
- .is_connected()
- {
- let err = crate::actions::subscribe_to_notifications(state).await;
- if let Err(e) = err {
- eprintln!("failed to subscribe to notifications: {e:#}")
- }
+ } else if !state
+ .write()
+ .await
+ .notifications_handler
+ .is_connected()
+ {
+ let err =
+ crate::actions::subscribe_to_notifications(
+ state,
+ )
+ .await;
+ if let Err(e) = err {
+ eprintln!("failed to subscribe to notifications: {e:#}");
}
}
});
diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs
index e8f84b0..69ebda5 100644
--- a/src/bin/rbw-agent/notifications.rs
+++ b/src/bin/rbw-agent/notifications.rs
@@ -38,7 +38,7 @@ fn parse_messagepack(data: &[u8]) -> Option<NotificationMessage> {
let message_type =
unpacked_message.iter().next().unwrap().as_u64().unwrap();
- let message = match message_type {
+ match message_type {
0 => Some(NotificationMessage::SyncCipherUpdate),
1 => Some(NotificationMessage::SyncCipherCreate),
2 => Some(NotificationMessage::SyncLoginDelete),
@@ -52,12 +52,10 @@ fn parse_messagepack(data: &[u8]) -> Option<NotificationMessage> {
10 => Some(NotificationMessage::SyncSettings),
11 => Some(NotificationMessage::Logout),
_ => None,
- };
-
- return message;
+ }
}
-pub struct NotificationsHandler {
+pub struct Handler {
write: Option<
futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<
@@ -74,7 +72,7 @@ pub struct NotificationsHandler {
>,
}
-impl NotificationsHandler {
+impl Handler {
pub fn new() -> Self {
Self {
write: None,
@@ -99,7 +97,7 @@ impl NotificationsHandler {
self.write = Some(write);
self.read_handle = Some(read_handle);
- return Ok(());
+ Ok(())
}
pub fn is_connected(&self) -> bool {
@@ -128,7 +126,7 @@ impl NotificationsHandler {
let (tx, rx) =
tokio::sync::mpsc::unbounded_channel::<NotificationMessage>();
self.sending_channels.write().await.push(tx);
- return rx;
+ rx
}
}
@@ -167,7 +165,8 @@ async fn subscribe_to_notifications(
Ok(Message::Binary(binary)) => {
let msgpack = parse_messagepack(&binary);
if let Some(msg) = msgpack {
- for channel in a.iter() {
+ let channels = a.as_slice();
+ for channel in channels {
let res = channel.send(msg);
if res.is_err() {
eprintln!("error sending websocket message to channel");
@@ -176,12 +175,12 @@ async fn subscribe_to_notifications(
}
},
Err(e) => {
- eprintln!("websocket error: {:?}", e);
+ eprintln!("websocket error: {e:?}");
},
_ => {}
}
}).await;
};
- return Ok((write, tokio::spawn(read_future)));
+ Ok((write, tokio::spawn(read_future)))
}