aboutsummaryrefslogtreecommitdiffstats
path: root/src/bin
diff options
context:
space:
mode:
authorBernd Schoolmann <mail@quexten.com>2023-04-08 02:55:02 +0200
committerBernd Schoolmann <mail@quexten.com>2023-04-08 02:56:17 +0200
commitd6339933d54974952721659c3de2b2871a086c1a (patch)
treef89610646bb7819573e8b3e60f2ed52815c7c43c /src/bin
parent8aa7e36a4f2746b314b0a582f3c59cc8b6b03ca2 (diff)
downloadrbw-d6339933d54974952721659c3de2b2871a086c1a.tar.gz
rbw-d6339933d54974952721659c3de2b2871a086c1a.zip
Implement basic websocket support
Diffstat (limited to 'src/bin')
-rw-r--r--src/bin/rbw-agent/agent.rs11
-rw-r--r--src/bin/rbw-agent/main.rs1
-rw-r--r--src/bin/rbw-agent/notifications.rs166
3 files changed, 178 insertions, 0 deletions
diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs
index 7dcab16..9523c78 100644
--- a/src/bin/rbw-agent/agent.rs
+++ b/src/bin/rbw-agent/agent.rs
@@ -73,6 +73,17 @@ impl Agent {
self,
listener: tokio::net::UnixListener,
) -> anyhow::Result<()> {
+ tokio::spawn(async move {
+ let config = rbw::config::Config::load_async().await.expect("Error loading config");
+ let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token=";
+ if let Some(email) = &config.email {
+ let db = rbw::db::Db::load_async(&config.server_name().as_str(), email).await.expect("Error loading db");
+ let access_token = db.access_token.expect("Error getting access token");
+ websocket_url = websocket_url + &access_token;
+ crate::notifications::subscribe_to_notifications(websocket_url).await;
+ }
+ });
+
enum Event {
Request(std::io::Result<tokio::net::UnixStream>),
Timeout(()),
diff --git a/src/bin/rbw-agent/main.rs b/src/bin/rbw-agent/main.rs
index 81eee3a..5e0fa61 100644
--- a/src/bin/rbw-agent/main.rs
+++ b/src/bin/rbw-agent/main.rs
@@ -21,6 +21,7 @@ mod daemon;
mod debugger;
mod sock;
mod timeout;
+mod notifications;
async fn tokio_main(
startup_ack: Option<crate::daemon::StartupAck>,
diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs
new file mode 100644
index 0000000..ffdefe9
--- /dev/null
+++ b/src/bin/rbw-agent/notifications.rs
@@ -0,0 +1,166 @@
+use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
+use futures_util::{StreamExt, SinkExt};
+
+struct SyncCipherUpdate {
+ id: String
+}
+
+struct SyncCipherCreate {
+ id: String
+}
+
+enum NotificationMessage {
+ SyncCipherUpdate(SyncCipherUpdate),
+ SyncCipherCreate(SyncCipherCreate),
+ SyncLoginDelete,
+ SyncFolderDelete,
+ SyncCiphers,
+
+ SyncVault,
+ SyncOrgKeys,
+ SyncFolderCreate,
+ SyncFolderUpdate,
+ SyncCipherDelete,
+ SyncSettings,
+
+ Logout,
+
+ SyncSendCreate,
+ SyncSendUpdate,
+ SyncSendDelete,
+
+ AuthRequest,
+ AuthRequestResponse,
+
+ None,
+}
+
+fn parse_messagepack(data: &[u8]) -> Option<NotificationMessage> {
+ if data.len() < 2 {
+ return None;
+ }
+
+ // the first few bytes with th 0x80 bit set, plus one byte terminating the length contain the length of the message
+ let len_buffer_length = data.iter().position(|&x| (x & 0x80) == 0 )? + 1;
+
+ println!("len_buffer_length: {:?}", len_buffer_length);
+ println!("data: {:?}", data);
+ let unpacked_messagepack = rmpv::decode::read_value(&mut &data[len_buffer_length..]).ok().unwrap();
+ println!("unpacked_messagepack: {:?}", unpacked_messagepack);
+ if !unpacked_messagepack.is_array() {
+ return None;
+ }
+ let unpacked_message = unpacked_messagepack.as_array().unwrap();
+ println!("unpacked_message: {:?}", unpacked_message);
+ let message_type = unpacked_message.iter().next()?.as_u64()?;
+ let message = unpacked_message.iter().skip(4).next()?.as_array()?.first()?.as_map()?;
+ let payload = message.iter().filter(|x| x.0.as_str().unwrap() == "Payload").next()?.1.as_map()?;
+ println!("message_type: {:?}", message_type);
+ println!("payload: {:?}", payload);
+
+ let message = match message_type {
+ 0 => {
+ let id = payload.iter().filter(|x| x.0.as_str().unwrap() == "Id").next()?.1.as_str()?;
+
+ Some(NotificationMessage::SyncCipherUpdate(
+ SyncCipherUpdate {
+ id: id.to_string()
+ }
+ ))
+ },
+ 1 => {
+ let id = payload.iter().filter(|x| x.0.as_str().unwrap() == "Id").next()?.1.as_str()?;
+
+ Some(NotificationMessage::SyncCipherCreate(
+ SyncCipherCreate {
+ id: id.to_string()
+ }
+ ))
+ },
+ 2 => Some(NotificationMessage::SyncLoginDelete),
+ 3 => Some(NotificationMessage::SyncFolderDelete),
+ 4 => Some(NotificationMessage::SyncCiphers),
+ 5 => Some(NotificationMessage::SyncVault),
+ 6 => Some(NotificationMessage::SyncOrgKeys),
+ 7 => Some(NotificationMessage::SyncFolderCreate),
+ 8 => Some(NotificationMessage::SyncFolderUpdate),
+ 9 => Some(NotificationMessage::SyncCipherDelete),
+ 10 => Some(NotificationMessage::SyncSettings),
+ 11 => Some(NotificationMessage::Logout),
+ 12 => Some(NotificationMessage::SyncSendCreate),
+ 13 => Some(NotificationMessage::SyncSendUpdate),
+ 14 => Some(NotificationMessage::SyncSendDelete),
+ 15 => Some(NotificationMessage::AuthRequest),
+ 16 => Some(NotificationMessage::AuthRequestResponse),
+ 100 => Some(NotificationMessage::None),
+ _ => None
+ };
+
+ return message;
+}
+
+pub async fn subscribe_to_notifications(url: String) {
+ let url = url::Url::parse(url.as_str()).unwrap();
+
+ let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect");
+
+ let (mut write, read) = ws_stream.split();
+
+ write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap();
+
+ let read_future = read.for_each(|message| async {
+ match message {
+ Ok(Message::Binary(binary)) => {
+ let msg = parse_messagepack(&binary);
+ match msg {
+ Some(NotificationMessage::SyncCipherUpdate(update)) => {
+ println!("Websocket sent SyncCipherUpdate for id: {:?}", update.id);
+ crate::actions::sync(None).await.unwrap();
+ println!("Synced")
+ },
+ Some(NotificationMessage::SyncCipherCreate(update)) => {
+ println!("Websocket sent SyncCipherUpdate for id: {:?}", update.id);
+ crate::actions::sync(None).await.unwrap();
+ println!("Synced")
+ },
+ Some(NotificationMessage::SyncLoginDelete) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncFolderDelete) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncCiphers) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncVault) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncOrgKeys) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncFolderCreate) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncFolderUpdate) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::SyncCipherDelete) => {
+ crate::actions::sync(None).await.unwrap();
+ },
+ Some(NotificationMessage::Logout) => {
+ println!("Websocket sent Logout");
+ // todo: proper logout?
+ std::process::exit(0);
+ },
+ _ => {}
+ }
+ },
+ Err(e) => {
+ println!("websocket error: {:?}", e);
+ },
+ _ => {}
+ }
+ });
+
+ read_future.await;
+}