diff options
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r-- | src/bin/rbw-agent/agent.rs | 54 |
1 files changed, 53 insertions, 1 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index 7dcab16..b88121d 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -1,6 +1,9 @@ +use aes::cipher::typenum::private::IsNotEqualPrivate; use anyhow::Context as _; use futures_util::StreamExt as _; +use crate::notifications; + pub struct State { pub priv_key: Option<rbw::locked::Keys>, pub org_keys: @@ -9,6 +12,7 @@ pub struct State { pub timeout_duration: std::time::Duration, pub sync_timeout: crate::timeout::Timeout, pub sync_timeout_duration: std::time::Duration, + pub notifications_handler: crate::notifications::NotificationsHandler, } impl State { @@ -55,6 +59,8 @@ impl Agent { if sync_timeout_duration > std::time::Duration::ZERO { sync_timeout.set(sync_timeout_duration); } + let notifications_handler = + crate::notifications::NotificationsHandler::new(); Ok(Self { timer_r, sync_timer_r, @@ -65,6 +71,7 @@ impl Agent { timeout_duration, sync_timeout, sync_timeout_duration, + notifications_handler, })), }) } @@ -73,11 +80,39 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { + let err = + crate::actions::subscribe_to_notifications(self.state.clone()) + .await; + if let Err(e) = err { + eprintln!("failed to subscribe to notifications: {e:#}") + } + enum Event { Request(std::io::Result<tokio::net::UnixStream>), Timeout(()), Sync(()), } + + let c: tokio::sync::mpsc::UnboundedReceiver< + notifications::NotificationMessage, + > = { + self.state + .write() + .await + .notifications_handler + .get_channel() + .await + }; + let notifications = + tokio_stream::wrappers::UnboundedReceiverStream::new(c) + .map(|message| match message { + notifications::NotificationMessage::Logout => { + Event::Timeout(()) + } + _ => Event::Sync(()), + }) + .boxed(); + let mut stream = futures_util::stream::select_all([ tokio_stream::wrappers::UnixListenerStream::new(listener) .map(Event::Request) @@ -92,6 +127,7 @@ impl Agent { ) .map(Event::Sync) .boxed(), + notifications, ]); while let Some(event) = stream.next().await { match event { @@ -119,8 +155,24 @@ impl Agent { Event::Sync(()) => { // this could fail if we aren't logged in, but we don't // care about that + let state = self.state.clone(); tokio::spawn(async move { - let _ = crate::actions::sync(None).await; + let result = crate::actions::sync(None).await; + if let Err(e) = result { + eprintln!("failed to sync: {e:#}"); + } else { + if !state + .write() + .await + .notifications_handler + .is_connected() + { + let err = crate::actions::subscribe_to_notifications(state).await; + if let Err(e) = err { + eprintln!("failed to subscribe to notifications: {e:#}") + } + } + } }); self.state.write().await.set_sync_timeout(); } |