aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin/rbw-agent/agent.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r--src/bin/rbw-agent/agent.rs36
1 files changed, 26 insertions, 10 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs
index 9523c78..c025880 100644
--- a/src/bin/rbw-agent/agent.rs
+++ b/src/bin/rbw-agent/agent.rs
@@ -1,6 +1,8 @@
use anyhow::Context as _;
use futures_util::StreamExt as _;
+use crate::notifications;
+
pub struct State {
pub priv_key: Option<rbw::locked::Keys>,
pub org_keys:
@@ -9,6 +11,7 @@ pub struct State {
pub timeout_duration: std::time::Duration,
pub sync_timeout: crate::timeout::Timeout,
pub sync_timeout_duration: std::time::Duration,
+ pub notifications_handler: crate::notifications::NotificationsHandler,
}
impl State {
@@ -55,6 +58,7 @@ impl Agent {
if sync_timeout_duration > std::time::Duration::ZERO {
sync_timeout.set(sync_timeout_duration);
}
+ let notifications_handler = crate::notifications::NotificationsHandler::new();
Ok(Self {
timer_r,
sync_timer_r,
@@ -65,6 +69,7 @@ impl Agent {
timeout_duration,
sync_timeout,
sync_timeout_duration,
+ notifications_handler,
})),
})
}
@@ -73,22 +78,32 @@ impl Agent {
self,
listener: tokio::net::UnixListener,
) -> anyhow::Result<()> {
- tokio::spawn(async move {
- let config = rbw::config::Config::load_async().await.expect("Error loading config");
- let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token=";
- if let Some(email) = &config.email {
- let db = rbw::db::Db::load_async(&config.server_name().as_str(), email).await.expect("Error loading db");
- let access_token = db.access_token.expect("Error getting access token");
- websocket_url = websocket_url + &access_token;
- crate::notifications::subscribe_to_notifications(websocket_url).await;
- }
- });
+ crate::actions::subscribe_to_notifications(self.state.clone()).await.expect("could not subscribe");
enum Event {
Request(std::io::Result<tokio::net::UnixStream>),
Timeout(()),
Sync(()),
}
+
+ let c: tokio::sync::mpsc::UnboundedReceiver<notifications::NotificationMessage> = {
+ self.state.write().await.notifications_handler.get_channel().await
+ };
+ let notifications = tokio_stream::wrappers::UnboundedReceiverStream::new(
+ c,
+ )
+ .map(|message| {
+ match message {
+ notifications::NotificationMessage::Logout => {
+ Event::Timeout(())
+ }
+ _ => {
+ Event::Sync(())
+ }
+ }
+ })
+ .boxed();
+
let mut stream = futures_util::stream::select_all([
tokio_stream::wrappers::UnixListenerStream::new(listener)
.map(Event::Request)
@@ -103,6 +118,7 @@ impl Agent {
)
.map(Event::Sync)
.boxed(),
+ notifications,
]);
while let Some(event) = stream.next().await {
match event {