aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin/rbw-agent/agent.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/rbw-agent/agent.rs')
-rw-r--r--src/bin/rbw-agent/agent.rs66
1 files changed, 44 insertions, 22 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs
index fb21728..b88121d 100644
--- a/src/bin/rbw-agent/agent.rs
+++ b/src/bin/rbw-agent/agent.rs
@@ -1,3 +1,4 @@
+use aes::cipher::typenum::private::IsNotEqualPrivate;
use anyhow::Context as _;
use futures_util::StreamExt as _;
@@ -58,7 +59,8 @@ impl Agent {
if sync_timeout_duration > std::time::Duration::ZERO {
sync_timeout.set(sync_timeout_duration);
}
- let notifications_handler = crate::notifications::NotificationsHandler::new();
+ let notifications_handler =
+ crate::notifications::NotificationsHandler::new();
Ok(Self {
timer_r,
sync_timer_r,
@@ -78,7 +80,9 @@ impl Agent {
self,
listener: tokio::net::UnixListener,
) -> anyhow::Result<()> {
- let err = crate::actions::subscribe_to_notifications(self.state.clone()).await;
+ let err =
+ crate::actions::subscribe_to_notifications(self.state.clone())
+ .await;
if let Err(e) = err {
eprintln!("failed to subscribe to notifications: {e:#}")
}
@@ -88,25 +92,27 @@ impl Agent {
Timeout(()),
Sync(()),
}
-
- let c: tokio::sync::mpsc::UnboundedReceiver<notifications::NotificationMessage> = {
- self.state.write().await.notifications_handler.get_channel().await
+
+ let c: tokio::sync::mpsc::UnboundedReceiver<
+ notifications::NotificationMessage,
+ > = {
+ self.state
+ .write()
+ .await
+ .notifications_handler
+ .get_channel()
+ .await
};
- let notifications = tokio_stream::wrappers::UnboundedReceiverStream::new(
- c,
- )
- .map(|message| {
- match message {
- notifications::NotificationMessage::Logout => {
- Event::Timeout(())
- }
- _ => {
- Event::Sync(())
- }
- }
- })
- .boxed();
-
+ let notifications =
+ tokio_stream::wrappers::UnboundedReceiverStream::new(c)
+ .map(|message| match message {
+ notifications::NotificationMessage::Logout => {
+ Event::Timeout(())
+ }
+ _ => Event::Sync(()),
+ })
+ .boxed();
+
let mut stream = futures_util::stream::select_all([
tokio_stream::wrappers::UnixListenerStream::new(listener)
.map(Event::Request)
@@ -121,7 +127,7 @@ impl Agent {
)
.map(Event::Sync)
.boxed(),
- notifications,
+ notifications,
]);
while let Some(event) = stream.next().await {
match event {
@@ -149,8 +155,24 @@ impl Agent {
Event::Sync(()) => {
// this could fail if we aren't logged in, but we don't
// care about that
+ let state = self.state.clone();
tokio::spawn(async move {
- let _ = crate::actions::sync(None).await;
+ let result = crate::actions::sync(None).await;
+ if let Err(e) = result {
+ eprintln!("failed to sync: {e:#}");
+ } else {
+ if !state
+ .write()
+ .await
+ .notifications_handler
+ .is_connected()
+ {
+ let err = crate::actions::subscribe_to_notifications(state).await;
+ if let Err(e) = err {
+ eprintln!("failed to subscribe to notifications: {e:#}")
+ }
+ }
+ }
});
self.state.write().await.set_sync_timeout();
}