diff options
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r-- | src/bin/rbw-agent/agent.rs | 66 |
1 files changed, 44 insertions, 22 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index fb21728..b88121d 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -1,3 +1,4 @@ +use aes::cipher::typenum::private::IsNotEqualPrivate; use anyhow::Context as _; use futures_util::StreamExt as _; @@ -58,7 +59,8 @@ impl Agent { if sync_timeout_duration > std::time::Duration::ZERO { sync_timeout.set(sync_timeout_duration); } - let notifications_handler = crate::notifications::NotificationsHandler::new(); + let notifications_handler = + crate::notifications::NotificationsHandler::new(); Ok(Self { timer_r, sync_timer_r, @@ -78,7 +80,9 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { - let err = crate::actions::subscribe_to_notifications(self.state.clone()).await; + let err = + crate::actions::subscribe_to_notifications(self.state.clone()) + .await; if let Err(e) = err { eprintln!("failed to subscribe to notifications: {e:#}") } @@ -88,25 +92,27 @@ impl Agent { Timeout(()), Sync(()), } - - let c: tokio::sync::mpsc::UnboundedReceiver<notifications::NotificationMessage> = { - self.state.write().await.notifications_handler.get_channel().await + + let c: tokio::sync::mpsc::UnboundedReceiver< + notifications::NotificationMessage, + > = { + self.state + .write() + .await + .notifications_handler + .get_channel() + .await }; - let notifications = tokio_stream::wrappers::UnboundedReceiverStream::new( - c, - ) - .map(|message| { - match message { - notifications::NotificationMessage::Logout => { - Event::Timeout(()) - } - _ => { - Event::Sync(()) - } - } - }) - .boxed(); - + let notifications = + tokio_stream::wrappers::UnboundedReceiverStream::new(c) + .map(|message| match message { + notifications::NotificationMessage::Logout => { + Event::Timeout(()) + } + _ => Event::Sync(()), + }) + .boxed(); + let mut stream = futures_util::stream::select_all([ tokio_stream::wrappers::UnixListenerStream::new(listener) .map(Event::Request) @@ -121,7 +127,7 @@ impl Agent { ) .map(Event::Sync) .boxed(), - notifications, + notifications, ]); while let Some(event) = stream.next().await { match event { @@ -149,8 +155,24 @@ impl Agent { Event::Sync(()) => { // this could fail if we aren't logged in, but we don't // care about that + let state = self.state.clone(); tokio::spawn(async move { - let _ = crate::actions::sync(None).await; + let result = crate::actions::sync(None).await; + if let Err(e) = result { + eprintln!("failed to sync: {e:#}"); + } else { + if !state + .write() + .await + .notifications_handler + .is_connected() + { + let err = crate::actions::subscribe_to_notifications(state).await; + if let Err(e) = err { + eprintln!("failed to subscribe to notifications: {e:#}") + } + } + } }); self.state.write().await.set_sync_timeout(); } |