aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin/rbw-agent/agent.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r--src/bin/rbw-agent/agent.rs54
1 files changed, 53 insertions, 1 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs
index 7dcab16..b88121d 100644
--- a/src/bin/rbw-agent/agent.rs
+++ b/src/bin/rbw-agent/agent.rs
@@ -1,6 +1,9 @@
+use aes::cipher::typenum::private::IsNotEqualPrivate;
use anyhow::Context as _;
use futures_util::StreamExt as _;
+use crate::notifications;
+
pub struct State {
pub priv_key: Option<rbw::locked::Keys>,
pub org_keys:
@@ -9,6 +12,7 @@ pub struct State {
pub timeout_duration: std::time::Duration,
pub sync_timeout: crate::timeout::Timeout,
pub sync_timeout_duration: std::time::Duration,
+ pub notifications_handler: crate::notifications::NotificationsHandler,
}
impl State {
@@ -55,6 +59,8 @@ impl Agent {
if sync_timeout_duration > std::time::Duration::ZERO {
sync_timeout.set(sync_timeout_duration);
}
+ let notifications_handler =
+ crate::notifications::NotificationsHandler::new();
Ok(Self {
timer_r,
sync_timer_r,
@@ -65,6 +71,7 @@ impl Agent {
timeout_duration,
sync_timeout,
sync_timeout_duration,
+ notifications_handler,
})),
})
}
@@ -73,11 +80,39 @@ impl Agent {
self,
listener: tokio::net::UnixListener,
) -> anyhow::Result<()> {
+ let err =
+ crate::actions::subscribe_to_notifications(self.state.clone())
+ .await;
+ if let Err(e) = err {
+ eprintln!("failed to subscribe to notifications: {e:#}")
+ }
+
enum Event {
Request(std::io::Result<tokio::net::UnixStream>),
Timeout(()),
Sync(()),
}
+
+ let c: tokio::sync::mpsc::UnboundedReceiver<
+ notifications::NotificationMessage,
+ > = {
+ self.state
+ .write()
+ .await
+ .notifications_handler
+ .get_channel()
+ .await
+ };
+ let notifications =
+ tokio_stream::wrappers::UnboundedReceiverStream::new(c)
+ .map(|message| match message {
+ notifications::NotificationMessage::Logout => {
+ Event::Timeout(())
+ }
+ _ => Event::Sync(()),
+ })
+ .boxed();
+
let mut stream = futures_util::stream::select_all([
tokio_stream::wrappers::UnixListenerStream::new(listener)
.map(Event::Request)
@@ -92,6 +127,7 @@ impl Agent {
)
.map(Event::Sync)
.boxed(),
+ notifications,
]);
while let Some(event) = stream.next().await {
match event {
@@ -119,8 +155,24 @@ impl Agent {
Event::Sync(()) => {
// this could fail if we aren't logged in, but we don't
// care about that
+ let state = self.state.clone();
tokio::spawn(async move {
- let _ = crate::actions::sync(None).await;
+ let result = crate::actions::sync(None).await;
+ if let Err(e) = result {
+ eprintln!("failed to sync: {e:#}");
+ } else {
+ if !state
+ .write()
+ .await
+ .notifications_handler
+ .is_connected()
+ {
+ let err = crate::actions::subscribe_to_notifications(state).await;
+ if let Err(e) = err {
+ eprintln!("failed to subscribe to notifications: {e:#}")
+ }
+ }
+ }
});
self.state.write().await.set_sync_timeout();
}