diff options
author | Bernd Schoolmann <mail@quexten.com> | 2023-04-16 13:41:52 +0200 |
---|---|---|
committer | Bernd Schoolmann <mail@quexten.com> | 2023-04-16 13:41:52 +0200 |
commit | 355e17dc29244856454db3bdaeed082cf33231e6 (patch) | |
tree | eca0a48f8816dd1c560ffd0b117f1332908b9ff8 /src/bin/rbw-agent/agent.rs | |
parent | d6339933d54974952721659c3de2b2871a086c1a (diff) | |
download | rbw-355e17dc29244856454db3bdaeed082cf33231e6.tar.gz rbw-355e17dc29244856454db3bdaeed082cf33231e6.zip |
Restructure code
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r-- | src/bin/rbw-agent/agent.rs | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index 9523c78..c025880 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -1,6 +1,8 @@ use anyhow::Context as _; use futures_util::StreamExt as _; +use crate::notifications; + pub struct State { pub priv_key: Option<rbw::locked::Keys>, pub org_keys: @@ -9,6 +11,7 @@ pub struct State { pub timeout_duration: std::time::Duration, pub sync_timeout: crate::timeout::Timeout, pub sync_timeout_duration: std::time::Duration, + pub notifications_handler: crate::notifications::NotificationsHandler, } impl State { @@ -55,6 +58,7 @@ impl Agent { if sync_timeout_duration > std::time::Duration::ZERO { sync_timeout.set(sync_timeout_duration); } + let notifications_handler = crate::notifications::NotificationsHandler::new(); Ok(Self { timer_r, sync_timer_r, @@ -65,6 +69,7 @@ impl Agent { timeout_duration, sync_timeout, sync_timeout_duration, + notifications_handler, })), }) } @@ -73,22 +78,32 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { - tokio::spawn(async move { - let config = rbw::config::Config::load_async().await.expect("Error loading config"); - let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; - if let Some(email) = &config.email { - let db = rbw::db::Db::load_async(&config.server_name().as_str(), email).await.expect("Error loading db"); - let access_token = db.access_token.expect("Error getting access token"); - websocket_url = websocket_url + &access_token; - crate::notifications::subscribe_to_notifications(websocket_url).await; - } - }); + crate::actions::subscribe_to_notifications(self.state.clone()).await.expect("could not subscribe"); enum Event { Request(std::io::Result<tokio::net::UnixStream>), Timeout(()), Sync(()), } + + let c: tokio::sync::mpsc::UnboundedReceiver<notifications::NotificationMessage> = { + self.state.write().await.notifications_handler.get_channel().await + }; + let notifications = tokio_stream::wrappers::UnboundedReceiverStream::new( + c, + ) + .map(|message| { + match message { + notifications::NotificationMessage::Logout => { + Event::Timeout(()) + } + _ => { + Event::Sync(()) + } + } + }) + .boxed(); + let mut stream = futures_util::stream::select_all([ tokio_stream::wrappers::UnixListenerStream::new(listener) .map(Event::Request) @@ -103,6 +118,7 @@ impl Agent { ) .map(Event::Sync) .boxed(), + notifications, ]); while let Some(event) = stream.next().await { match event { |