From d6339933d54974952721659c3de2b2871a086c1a Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Sat, 8 Apr 2023 02:55:02 +0200 Subject: Implement basic websocket support --- Cargo.lock | 235 ++++++++++++++++++++++++++++++++++--- Cargo.toml | 4 + src/bin/rbw-agent/agent.rs | 11 ++ src/bin/rbw-agent/main.rs | 1 + src/bin/rbw-agent/notifications.rs | 166 ++++++++++++++++++++++++++ 5 files changed, 399 insertions(+), 18 deletions(-) create mode 100644 src/bin/rbw-agent/notifications.rs diff --git a/Cargo.lock b/Cargo.lock index 1d8112b..c484789 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23ce669cd6c8588f79e15cf450314f9638f967fc5770ff1c7c1deb0925ea7cfa" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.0" @@ -370,6 +376,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.1.0" @@ -379,59 +400,88 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] name = "futures-macro" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.10", ] [[package]] name = "futures-sink" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" [[package]] name = "futures-task" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-util" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -775,6 +825,24 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.26.2" @@ -853,12 +921,50 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +[[package]] +name = "openssl" +version = "0.10.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d2f106ab837a24e03672c59b1239669a0596406ff657c3c0835b6b7f0f35a33" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.10", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.84" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a20eace9dc2d82904039cb76dcf50fb1a0bba071cfd1629720b5d6f1ddba0fa" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "os_str_bytes" version = "6.5.0" @@ -899,6 +1005,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "paste" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" + [[package]] name = "pbkdf2" version = "0.12.1" @@ -958,6 +1070,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1022,7 +1140,7 @@ dependencies = [ "arrayvec", "async-trait", "base32", - "base64", + "base64 0.21.0", "block-padding", "cbc", "clap", @@ -1030,6 +1148,8 @@ dependencies = [ "daemonize", "directories", "env_logger", + "futures", + "futures-channel", "futures-util", "hkdf", "hmac", @@ -1043,6 +1163,7 @@ dependencies = [ "rand", "region", "reqwest", + "rmpv", "rsa", "serde", "serde_json", @@ -1056,6 +1177,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-tungstenite", "totp-lite", "url", "uuid", @@ -1117,7 +1239,7 @@ version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ba30cc2c0cd02af1222ed216ba659cdb2f879dfe3181852fe7c50b1d0005949" dependencies = [ - "base64", + "base64 0.21.0", "bytes", "encoding_rs", "futures-core", @@ -1165,6 +1287,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "rmp" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmpv" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de8813b3a2f95c5138fe5925bfb8784175d88d6bff059ba8ce090aa891319754" +dependencies = [ + "num-traits", + "rmp", +] + [[package]] name = "rsa" version = "0.8.2" @@ -1229,7 +1372,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64", + "base64 0.21.0", ] [[package]] @@ -1597,6 +1740,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -1619,6 +1772,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.7" @@ -1677,6 +1844,26 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.16.0" @@ -1737,6 +1924,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "1.3.0" @@ -1746,6 +1939,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 778e9d4..f2925f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ clap_complete = "4.1.5" daemonize = "0.5.0" directories = "5.0.0" env_logger = "0.10.0" +futures = "0.3.28" +futures-channel = "0.3.28" futures-util = "0.3.27" hkdf = "0.12.3" hmac = { version = "0.12.1", features = ["std"] } @@ -57,6 +59,8 @@ totp-lite = "2.0.0" url = "2.3.1" uuid = { version = "1.3.0", features = ["v4"] } zeroize = "1.5.7" +rmpv = "1.0.0" +tokio-tungstenite = { version = "*", features = ["native-tls"] } [package.metadata.deb] depends = "pinentry" diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index 7dcab16..9523c78 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -73,6 +73,17 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { + tokio::spawn(async move { + let config = rbw::config::Config::load_async().await.expect("Error loading config"); + let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; + if let Some(email) = &config.email { + let db = rbw::db::Db::load_async(&config.server_name().as_str(), email).await.expect("Error loading db"); + let access_token = db.access_token.expect("Error getting access token"); + websocket_url = websocket_url + &access_token; + crate::notifications::subscribe_to_notifications(websocket_url).await; + } + }); + enum Event { Request(std::io::Result), Timeout(()), diff --git a/src/bin/rbw-agent/main.rs b/src/bin/rbw-agent/main.rs index 81eee3a..5e0fa61 100644 --- a/src/bin/rbw-agent/main.rs +++ b/src/bin/rbw-agent/main.rs @@ -21,6 +21,7 @@ mod daemon; mod debugger; mod sock; mod timeout; +mod notifications; async fn tokio_main( startup_ack: Option, diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs new file mode 100644 index 0000000..ffdefe9 --- /dev/null +++ b/src/bin/rbw-agent/notifications.rs @@ -0,0 +1,166 @@ +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use futures_util::{StreamExt, SinkExt}; + +struct SyncCipherUpdate { + id: String +} + +struct SyncCipherCreate { + id: String +} + +enum NotificationMessage { + SyncCipherUpdate(SyncCipherUpdate), + SyncCipherCreate(SyncCipherCreate), + SyncLoginDelete, + SyncFolderDelete, + SyncCiphers, + + SyncVault, + SyncOrgKeys, + SyncFolderCreate, + SyncFolderUpdate, + SyncCipherDelete, + SyncSettings, + + Logout, + + SyncSendCreate, + SyncSendUpdate, + SyncSendDelete, + + AuthRequest, + AuthRequestResponse, + + None, +} + +fn parse_messagepack(data: &[u8]) -> Option { + if data.len() < 2 { + return None; + } + + // the first few bytes with th 0x80 bit set, plus one byte terminating the length contain the length of the message + let len_buffer_length = data.iter().position(|&x| (x & 0x80) == 0 )? + 1; + + println!("len_buffer_length: {:?}", len_buffer_length); + println!("data: {:?}", data); + let unpacked_messagepack = rmpv::decode::read_value(&mut &data[len_buffer_length..]).ok().unwrap(); + println!("unpacked_messagepack: {:?}", unpacked_messagepack); + if !unpacked_messagepack.is_array() { + return None; + } + let unpacked_message = unpacked_messagepack.as_array().unwrap(); + println!("unpacked_message: {:?}", unpacked_message); + let message_type = unpacked_message.iter().next()?.as_u64()?; + let message = unpacked_message.iter().skip(4).next()?.as_array()?.first()?.as_map()?; + let payload = message.iter().filter(|x| x.0.as_str().unwrap() == "Payload").next()?.1.as_map()?; + println!("message_type: {:?}", message_type); + println!("payload: {:?}", payload); + + let message = match message_type { + 0 => { + let id = payload.iter().filter(|x| x.0.as_str().unwrap() == "Id").next()?.1.as_str()?; + + Some(NotificationMessage::SyncCipherUpdate( + SyncCipherUpdate { + id: id.to_string() + } + )) + }, + 1 => { + let id = payload.iter().filter(|x| x.0.as_str().unwrap() == "Id").next()?.1.as_str()?; + + Some(NotificationMessage::SyncCipherCreate( + SyncCipherCreate { + id: id.to_string() + } + )) + }, + 2 => Some(NotificationMessage::SyncLoginDelete), + 3 => Some(NotificationMessage::SyncFolderDelete), + 4 => Some(NotificationMessage::SyncCiphers), + 5 => Some(NotificationMessage::SyncVault), + 6 => Some(NotificationMessage::SyncOrgKeys), + 7 => Some(NotificationMessage::SyncFolderCreate), + 8 => Some(NotificationMessage::SyncFolderUpdate), + 9 => Some(NotificationMessage::SyncCipherDelete), + 10 => Some(NotificationMessage::SyncSettings), + 11 => Some(NotificationMessage::Logout), + 12 => Some(NotificationMessage::SyncSendCreate), + 13 => Some(NotificationMessage::SyncSendUpdate), + 14 => Some(NotificationMessage::SyncSendDelete), + 15 => Some(NotificationMessage::AuthRequest), + 16 => Some(NotificationMessage::AuthRequestResponse), + 100 => Some(NotificationMessage::None), + _ => None + }; + + return message; +} + +pub async fn subscribe_to_notifications(url: String) { + let url = url::Url::parse(url.as_str()).unwrap(); + + let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect"); + + let (mut write, read) = ws_stream.split(); + + write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); + + let read_future = read.for_each(|message| async { + match message { + Ok(Message::Binary(binary)) => { + let msg = parse_messagepack(&binary); + match msg { + Some(NotificationMessage::SyncCipherUpdate(update)) => { + println!("Websocket sent SyncCipherUpdate for id: {:?}", update.id); + crate::actions::sync(None).await.unwrap(); + println!("Synced") + }, + Some(NotificationMessage::SyncCipherCreate(update)) => { + println!("Websocket sent SyncCipherUpdate for id: {:?}", update.id); + crate::actions::sync(None).await.unwrap(); + println!("Synced") + }, + Some(NotificationMessage::SyncLoginDelete) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncFolderDelete) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncCiphers) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncVault) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncOrgKeys) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncFolderCreate) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncFolderUpdate) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::SyncCipherDelete) => { + crate::actions::sync(None).await.unwrap(); + }, + Some(NotificationMessage::Logout) => { + println!("Websocket sent Logout"); + // todo: proper logout? + std::process::exit(0); + }, + _ => {} + } + }, + Err(e) => { + println!("websocket error: {:?}", e); + }, + _ => {} + } + }); + + read_future.await; +} -- cgit v1.2.3-54-g00ecf From 355e17dc29244856454db3bdaeed082cf33231e6 Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Sun, 16 Apr 2023 13:41:52 +0200 Subject: Restructure code --- src/bin/rbw-agent/actions.rs | 19 +++- src/bin/rbw-agent/agent.rs | 36 +++++-- src/bin/rbw-agent/notifications.rs | 210 ++++++++++++++++--------------------- 3 files changed, 136 insertions(+), 129 deletions(-) diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs index 7b5dc58..006f7ca 100644 --- a/src/bin/rbw-agent/actions.rs +++ b/src/bin/rbw-agent/actions.rs @@ -130,7 +130,7 @@ pub async fn login( protected_key, )) => { login_success( - state, + state.clone(), access_token, refresh_token, kdf, @@ -169,7 +169,7 @@ pub async fn login( ) .await?; login_success( - state, + state.clone(), access_token, refresh_token, kdf, @@ -205,6 +205,8 @@ pub async fn login( } } + subscribe_to_notifications(state.clone()).await.expect("could not subscribe"); + respond_ack(sock).await?; Ok(()) @@ -655,3 +657,16 @@ async fn config_pinentry() -> anyhow::Result { let config = rbw::config::Config::load_async().await?; Ok(config.pinentry) } + +pub async fn subscribe_to_notifications(state: std::sync::Arc>) -> anyhow::Result<()> { + let config = rbw::config::Config::load_async().await.expect("Config is missing"); + let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; + let email = config.email.clone().expect("Config is missing email"); + let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await.expect("Error loading db"); + let access_token = db.access_token.expect("Error getting access token"); + websocket_url = websocket_url + &access_token; + let mut state = state.write().await; + state.notifications_handler.connect(websocket_url).await.expect("Error connecting to websocket"); + + Ok(()) +} \ No newline at end of file diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index 9523c78..c025880 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -1,6 +1,8 @@ use anyhow::Context as _; use futures_util::StreamExt as _; +use crate::notifications; + pub struct State { pub priv_key: Option, pub org_keys: @@ -9,6 +11,7 @@ pub struct State { pub timeout_duration: std::time::Duration, pub sync_timeout: crate::timeout::Timeout, pub sync_timeout_duration: std::time::Duration, + pub notifications_handler: crate::notifications::NotificationsHandler, } impl State { @@ -55,6 +58,7 @@ impl Agent { if sync_timeout_duration > std::time::Duration::ZERO { sync_timeout.set(sync_timeout_duration); } + let notifications_handler = crate::notifications::NotificationsHandler::new(); Ok(Self { timer_r, sync_timer_r, @@ -65,6 +69,7 @@ impl Agent { timeout_duration, sync_timeout, sync_timeout_duration, + notifications_handler, })), }) } @@ -73,22 +78,32 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { - tokio::spawn(async move { - let config = rbw::config::Config::load_async().await.expect("Error loading config"); - let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; - if let Some(email) = &config.email { - let db = rbw::db::Db::load_async(&config.server_name().as_str(), email).await.expect("Error loading db"); - let access_token = db.access_token.expect("Error getting access token"); - websocket_url = websocket_url + &access_token; - crate::notifications::subscribe_to_notifications(websocket_url).await; - } - }); + crate::actions::subscribe_to_notifications(self.state.clone()).await.expect("could not subscribe"); enum Event { Request(std::io::Result), Timeout(()), Sync(()), } + + let c: tokio::sync::mpsc::UnboundedReceiver = { + self.state.write().await.notifications_handler.get_channel().await + }; + let notifications = tokio_stream::wrappers::UnboundedReceiverStream::new( + c, + ) + .map(|message| { + match message { + notifications::NotificationMessage::Logout => { + Event::Timeout(()) + } + _ => { + Event::Sync(()) + } + } + }) + .boxed(); + let mut stream = futures_util::stream::select_all([ tokio_stream::wrappers::UnixListenerStream::new(listener) .map(Event::Request) @@ -103,6 +118,7 @@ impl Agent { ) .map(Event::Sync) .boxed(), + notifications, ]); while let Some(event) = stream.next().await { match event { diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index ffdefe9..c72fe38 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -1,17 +1,12 @@ -use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use futures::{stream::SplitSink}; +use tokio::{net::{TcpStream}, task::JoinHandle}; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream}; use futures_util::{StreamExt, SinkExt}; -struct SyncCipherUpdate { - id: String -} - -struct SyncCipherCreate { - id: String -} - -enum NotificationMessage { - SyncCipherUpdate(SyncCipherUpdate), - SyncCipherCreate(SyncCipherCreate), +#[derive(Copy, Clone)] +pub enum NotificationMessage { + SyncCipherUpdate, + SyncCipherCreate, SyncLoginDelete, SyncFolderDelete, SyncCiphers, @@ -24,59 +19,25 @@ enum NotificationMessage { SyncSettings, Logout, +} - SyncSendCreate, - SyncSendUpdate, - SyncSendDelete, - - AuthRequest, - AuthRequestResponse, - None, -} fn parse_messagepack(data: &[u8]) -> Option { - if data.len() < 2 { - return None; - } - - // the first few bytes with th 0x80 bit set, plus one byte terminating the length contain the length of the message + // the first few bytes with the 0x80 bit set, plus one byte terminating the length contain the length of the message let len_buffer_length = data.iter().position(|&x| (x & 0x80) == 0 )? + 1; - println!("len_buffer_length: {:?}", len_buffer_length); - println!("data: {:?}", data); - let unpacked_messagepack = rmpv::decode::read_value(&mut &data[len_buffer_length..]).ok().unwrap(); - println!("unpacked_messagepack: {:?}", unpacked_messagepack); + let unpacked_messagepack = rmpv::decode::read_value(&mut &data[len_buffer_length..]).ok()?; if !unpacked_messagepack.is_array() { return None; } + let unpacked_message = unpacked_messagepack.as_array().unwrap(); - println!("unpacked_message: {:?}", unpacked_message); - let message_type = unpacked_message.iter().next()?.as_u64()?; - let message = unpacked_message.iter().skip(4).next()?.as_array()?.first()?.as_map()?; - let payload = message.iter().filter(|x| x.0.as_str().unwrap() == "Payload").next()?.1.as_map()?; - println!("message_type: {:?}", message_type); - println!("payload: {:?}", payload); + let message_type = unpacked_message.iter().next().unwrap().as_u64().unwrap(); let message = match message_type { - 0 => { - let id = payload.iter().filter(|x| x.0.as_str().unwrap() == "Id").next()?.1.as_str()?; - - Some(NotificationMessage::SyncCipherUpdate( - SyncCipherUpdate { - id: id.to_string() - } - )) - }, - 1 => { - let id = payload.iter().filter(|x| x.0.as_str().unwrap() == "Id").next()?.1.as_str()?; - - Some(NotificationMessage::SyncCipherCreate( - SyncCipherCreate { - id: id.to_string() - } - )) - }, + 0 => Some(NotificationMessage::SyncCipherUpdate), + 1 => Some(NotificationMessage::SyncCipherCreate), 2 => Some(NotificationMessage::SyncLoginDelete), 3 => Some(NotificationMessage::SyncFolderDelete), 4 => Some(NotificationMessage::SyncCiphers), @@ -87,80 +48,95 @@ fn parse_messagepack(data: &[u8]) -> Option { 9 => Some(NotificationMessage::SyncCipherDelete), 10 => Some(NotificationMessage::SyncSettings), 11 => Some(NotificationMessage::Logout), - 12 => Some(NotificationMessage::SyncSendCreate), - 13 => Some(NotificationMessage::SyncSendUpdate), - 14 => Some(NotificationMessage::SyncSendDelete), - 15 => Some(NotificationMessage::AuthRequest), - 16 => Some(NotificationMessage::AuthRequestResponse), - 100 => Some(NotificationMessage::None), _ => None }; return message; } -pub async fn subscribe_to_notifications(url: String) { - let url = url::Url::parse(url.as_str()).unwrap(); +pub struct NotificationsHandler { + write: Option>, Message>>, + read_handle: Option>, + sending_channels : std::sync::Arc>>>, +} - let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect"); +impl NotificationsHandler { + pub fn new() -> Self { + Self { + write: None, + read_handle: None, + sending_channels: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())), + } + } - let (mut write, read) = ws_stream.split(); + pub async fn connect(&mut self, url: String) -> Result<(), Box> { + if self.is_connected() { + self.disconnect().await?; + } - write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); + //subscribe_to_notifications(url, self.sending_channels.clone()).await?; + let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?; + + self.write = Some(write); + self.read_handle = Some(read_handle); + return Ok(()); + } + + pub fn is_connected(&self) -> bool { + self.write.is_some() + } - let read_future = read.for_each(|message| async { - match message { - Ok(Message::Binary(binary)) => { - let msg = parse_messagepack(&binary); - match msg { - Some(NotificationMessage::SyncCipherUpdate(update)) => { - println!("Websocket sent SyncCipherUpdate for id: {:?}", update.id); - crate::actions::sync(None).await.unwrap(); - println!("Synced") - }, - Some(NotificationMessage::SyncCipherCreate(update)) => { - println!("Websocket sent SyncCipherUpdate for id: {:?}", update.id); - crate::actions::sync(None).await.unwrap(); - println!("Synced") - }, - Some(NotificationMessage::SyncLoginDelete) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncFolderDelete) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncCiphers) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncVault) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncOrgKeys) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncFolderCreate) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncFolderUpdate) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::SyncCipherDelete) => { - crate::actions::sync(None).await.unwrap(); - }, - Some(NotificationMessage::Logout) => { - println!("Websocket sent Logout"); - // todo: proper logout? - std::process::exit(0); - }, - _ => {} - } - }, - Err(e) => { - println!("websocket error: {:?}", e); - }, - _ => {} + pub async fn disconnect(&mut self) -> Result<(), Box> { + self.sending_channels.write().await.clear(); + if let Some(mut write) = self.write.take() { + write.send(Message::Close(None)).await?; + write.close().await?; } - }); + Ok(()) + } + + pub async fn get_channel(&mut self) -> tokio::sync::mpsc::UnboundedReceiver { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + self.sending_channels.write().await.push(tx); + return rx; + } - read_future.await; } + +async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc>>>) -> Result<(SplitSink>, Message>, JoinHandle<()>), Box> { + let url = url::Url::parse(url.as_str())?; + println!("Connecting to {}", url); + let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect"); + let (mut write, read) = ws_stream.split(); + + write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); + + let read_future = async move { + read.map(|message| { + (message, sending_channels.clone()) + }).for_each(|(message, a)| async move { + let a = a.read().await; + + match message { + Ok(Message::Binary(binary)) => { + if binary.len() < 4 { + return; + } + + let msg1 = parse_messagepack(&binary); + if let Some(msg) = msg1 { + for channel in a.iter() { + let res = channel.send(msg); + } + } + }, + Err(e) => { + println!("websocket error: {:?}", e); + }, + _ => {} + } + }).await; + }; + + return Ok((write, tokio::spawn(read_future))); +} \ No newline at end of file -- cgit v1.2.3-54-g00ecf From 8fbed0f3fbcac726fb73f2cd49cb7e4d36091c47 Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Mon, 17 Apr 2023 02:46:38 +0200 Subject: Improve error handling --- src/bin/rbw-agent/actions.rs | 25 +++++++++++++++++-------- src/bin/rbw-agent/notifications.rs | 16 +++++++--------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs index 006f7ca..cc363dd 100644 --- a/src/bin/rbw-agent/actions.rs +++ b/src/bin/rbw-agent/actions.rs @@ -659,14 +659,23 @@ async fn config_pinentry() -> anyhow::Result { } pub async fn subscribe_to_notifications(state: std::sync::Arc>) -> anyhow::Result<()> { - let config = rbw::config::Config::load_async().await.expect("Config is missing"); - let mut websocket_url = config.base_url.clone().expect("Config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; - let email = config.email.clone().expect("Config is missing email"); - let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await.expect("Error loading db"); - let access_token = db.access_token.expect("Error getting access token"); + // access token might be out of date, so we do a sync to refresh it + sync(None).await?; + + let config = rbw::config::Config::load_async().await.context("Config is missing")?; + let email = config.email.clone().context("Config is missing email")?; + let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await?; + let access_token = db.access_token.context("Error getting access token")?; + + let mut websocket_url = config.base_url.clone().expect("config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; websocket_url = websocket_url + &access_token; + let mut state = state.write().await; - state.notifications_handler.connect(websocket_url).await.expect("Error connecting to websocket"); - - Ok(()) + let err = state.notifications_handler.connect(websocket_url).await.err(); + + if let Some(err) = err { + return Err(anyhow::anyhow!(err.to_string())); + } else { + Ok(()) + } } \ No newline at end of file diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index c72fe38..ab24c42 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -74,7 +74,6 @@ impl NotificationsHandler { self.disconnect().await?; } - //subscribe_to_notifications(url, self.sending_channels.clone()).await?; let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?; self.write = Some(write); @@ -91,6 +90,7 @@ impl NotificationsHandler { if let Some(mut write) = self.write.take() { write.send(Message::Close(None)).await?; write.close().await?; + self.read_handle.take().unwrap().await?; } Ok(()) } @@ -105,8 +105,7 @@ impl NotificationsHandler { async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc>>>) -> Result<(SplitSink>, Message>, JoinHandle<()>), Box> { let url = url::Url::parse(url.as_str())?; - println!("Connecting to {}", url); - let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect"); + let (ws_stream, _response) = connect_async(url).await?; let (mut write, read) = ws_stream.split(); write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); @@ -119,14 +118,13 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar match message { Ok(Message::Binary(binary)) => { - if binary.len() < 4 { - return; - } - - let msg1 = parse_messagepack(&binary); - if let Some(msg) = msg1 { + let msgpack = parse_messagepack(&binary); + if let Some(msg) = msgpack { for channel in a.iter() { let res = channel.send(msg); + if res.is_err() { + println!("error sending websocket message to channel"); + } } } }, -- cgit v1.2.3-54-g00ecf From e49ca91ea1607a39257fc28b58598482eff4338a Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Mon, 17 Apr 2023 02:59:23 +0200 Subject: Fix panic when websocket endpoint is not available --- src/bin/rbw-agent/actions.rs | 4 ++-- src/bin/rbw-agent/agent.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs index cc363dd..5ee23d9 100644 --- a/src/bin/rbw-agent/actions.rs +++ b/src/bin/rbw-agent/actions.rs @@ -205,7 +205,7 @@ pub async fn login( } } - subscribe_to_notifications(state.clone()).await.expect("could not subscribe"); + let _ = subscribe_to_notifications(state.clone()).await; respond_ack(sock).await?; @@ -669,7 +669,7 @@ pub async fn subscribe_to_notifications(state: std::sync::Arc), -- cgit v1.2.3-54-g00ecf From 7c77e2b47cee8bcda3bbe87af8979b6880356acd Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Mon, 17 Apr 2023 03:19:40 +0200 Subject: Improve websocket disconnect handling --- src/bin/rbw-agent/notifications.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index ab24c42..334466f 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -82,7 +82,7 @@ impl NotificationsHandler { } pub fn is_connected(&self) -> bool { - self.write.is_some() + self.write.is_some() && self.read_handle.is_some() && !self.read_handle.as_ref().unwrap().is_finished() } pub async fn disconnect(&mut self) -> Result<(), Box> { @@ -92,6 +92,8 @@ impl NotificationsHandler { write.close().await?; self.read_handle.take().unwrap().await?; } + self.write = None; + self.read_handle = None; Ok(()) } -- cgit v1.2.3-54-g00ecf From 7729f1c13c3c19423bdc3e92f1e8cc7057667fe8 Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Thu, 27 Apr 2023 02:07:59 +0200 Subject: Fix tokio-tungstenite dependency --- Cargo.lock | 102 ++++--------------------------------------------------------- Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c484789..8acee10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -376,21 +376,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.1.0" @@ -825,24 +810,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nix" version = "0.26.2" @@ -921,50 +888,12 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" -[[package]] -name = "openssl" -version = "0.10.49" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d2f106ab837a24e03672c59b1239669a0596406ff657c3c0835b6b7f0f35a33" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.10", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a20eace9dc2d82904039cb76dcf50fb1a0bba071cfd1629720b5d6f1ddba0fa" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "os_str_bytes" version = "6.5.0" @@ -1070,12 +999,6 @@ dependencies = [ "spki", ] -[[package]] -name = "pkg-config" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1740,16 +1663,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.23.4" @@ -1780,10 +1693,12 @@ checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" dependencies = [ "futures-util", "log", - "native-tls", + "rustls", + "rustls-native-certs", "tokio", - "tokio-native-tls", + "tokio-rustls", "tungstenite", + "webpki", ] [[package]] @@ -1856,12 +1771,13 @@ dependencies = [ "http", "httparse", "log", - "native-tls", "rand", + "rustls", "sha1", "thiserror", "url", "utf-8", + "webpki", ] [[package]] @@ -1939,12 +1855,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index f2925f0..ef700e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ url = "2.3.1" uuid = { version = "1.3.0", features = ["v4"] } zeroize = "1.5.7" rmpv = "1.0.0" -tokio-tungstenite = { version = "*", features = ["native-tls"] } +tokio-tungstenite = { version = "0.18.0", features = ["rustls-tls-native-roots"] } [package.metadata.deb] depends = "pinentry" -- cgit v1.2.3-54-g00ecf From d6bbc46089b519eb82ae009bd48fa6d449f07941 Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Thu, 27 Apr 2023 02:14:54 +0200 Subject: Improve error logging for websockets --- src/bin/rbw-agent/actions.rs | 8 +++++++- src/bin/rbw-agent/agent.rs | 5 ++++- src/bin/rbw-agent/notifications.rs | 4 ++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs index 5ee23d9..f5b9dc0 100644 --- a/src/bin/rbw-agent/actions.rs +++ b/src/bin/rbw-agent/actions.rs @@ -1,3 +1,5 @@ +use std::f32::consts::E; + use anyhow::Context as _; pub async fn register( @@ -205,7 +207,11 @@ pub async fn login( } } - let _ = subscribe_to_notifications(state.clone()).await; + let err = subscribe_to_notifications(state.clone()).await.err(); + if let Some(e) = err { + eprintln!("failed to subscribe to notifications: {}", e) + } + respond_ack(sock).await?; diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index 0d672ee..fb21728 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -78,7 +78,10 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { - let _ = crate::actions::subscribe_to_notifications(self.state.clone()).await; + let err = crate::actions::subscribe_to_notifications(self.state.clone()).await; + if let Err(e) = err { + eprintln!("failed to subscribe to notifications: {e:#}") + } enum Event { Request(std::io::Result), diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index 334466f..b575cf9 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -125,13 +125,13 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar for channel in a.iter() { let res = channel.send(msg); if res.is_err() { - println!("error sending websocket message to channel"); + eprintln!("error sending websocket message to channel"); } } } }, Err(e) => { - println!("websocket error: {:?}", e); + eprintln!("websocket error: {:?}", e); }, _ => {} } -- cgit v1.2.3-54-g00ecf From 66cf6aea2d2fc355543470dab762211d9c8ad306 Mon Sep 17 00:00:00 2001 From: Bernd Schoolmann Date: Thu, 27 Apr 2023 02:38:29 +0200 Subject: Cargo format and reconnect websocket on sync --- src/bin/rbw-agent/actions.rs | 34 +++++++---- src/bin/rbw-agent/agent.rs | 66 ++++++++++++++-------- src/bin/rbw-agent/main.rs | 2 +- src/bin/rbw-agent/notifications.rs | 113 ++++++++++++++++++++++++++----------- 4 files changed, 148 insertions(+), 67 deletions(-) diff --git a/src/bin/rbw-agent/actions.rs b/src/bin/rbw-agent/actions.rs index f5b9dc0..2f34c6b 100644 --- a/src/bin/rbw-agent/actions.rs +++ b/src/bin/rbw-agent/actions.rs @@ -212,7 +212,6 @@ pub async fn login( eprintln!("failed to subscribe to notifications: {}", e) } - respond_ack(sock).await?; Ok(()) @@ -664,24 +663,39 @@ async fn config_pinentry() -> anyhow::Result { Ok(config.pinentry) } -pub async fn subscribe_to_notifications(state: std::sync::Arc>) -> anyhow::Result<()> { +pub async fn subscribe_to_notifications( + state: std::sync::Arc>, +) -> anyhow::Result<()> { // access token might be out of date, so we do a sync to refresh it sync(None).await?; - let config = rbw::config::Config::load_async().await.context("Config is missing")?; + let config = rbw::config::Config::load_async() + .await + .context("Config is missing")?; let email = config.email.clone().context("Config is missing email")?; - let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email).await?; - let access_token = db.access_token.context("Error getting access token")?; - - let mut websocket_url = config.base_url.clone().expect("config is missing base url").replace("https://", "wss://") + "/notifications/hub?access_token="; + let db = rbw::db::Db::load_async(&config.server_name().as_str(), &email) + .await?; + let access_token = + db.access_token.context("Error getting access token")?; + + let mut websocket_url = config + .base_url + .clone() + .expect("config is missing base url") + .replace("https://", "wss://") + + "/notifications/hub?access_token="; websocket_url = websocket_url + &access_token; let mut state = state.write().await; - let err = state.notifications_handler.connect(websocket_url).await.err(); - + let err = state + .notifications_handler + .connect(websocket_url) + .await + .err(); + if let Some(err) = err { return Err(anyhow::anyhow!(err.to_string())); } else { Ok(()) } -} \ No newline at end of file +} diff --git a/src/bin/rbw-agent/agent.rs b/src/bin/rbw-agent/agent.rs index fb21728..b88121d 100644 --- a/src/bin/rbw-agent/agent.rs +++ b/src/bin/rbw-agent/agent.rs @@ -1,3 +1,4 @@ +use aes::cipher::typenum::private::IsNotEqualPrivate; use anyhow::Context as _; use futures_util::StreamExt as _; @@ -58,7 +59,8 @@ impl Agent { if sync_timeout_duration > std::time::Duration::ZERO { sync_timeout.set(sync_timeout_duration); } - let notifications_handler = crate::notifications::NotificationsHandler::new(); + let notifications_handler = + crate::notifications::NotificationsHandler::new(); Ok(Self { timer_r, sync_timer_r, @@ -78,7 +80,9 @@ impl Agent { self, listener: tokio::net::UnixListener, ) -> anyhow::Result<()> { - let err = crate::actions::subscribe_to_notifications(self.state.clone()).await; + let err = + crate::actions::subscribe_to_notifications(self.state.clone()) + .await; if let Err(e) = err { eprintln!("failed to subscribe to notifications: {e:#}") } @@ -88,25 +92,27 @@ impl Agent { Timeout(()), Sync(()), } - - let c: tokio::sync::mpsc::UnboundedReceiver = { - self.state.write().await.notifications_handler.get_channel().await + + let c: tokio::sync::mpsc::UnboundedReceiver< + notifications::NotificationMessage, + > = { + self.state + .write() + .await + .notifications_handler + .get_channel() + .await }; - let notifications = tokio_stream::wrappers::UnboundedReceiverStream::new( - c, - ) - .map(|message| { - match message { - notifications::NotificationMessage::Logout => { - Event::Timeout(()) - } - _ => { - Event::Sync(()) - } - } - }) - .boxed(); - + let notifications = + tokio_stream::wrappers::UnboundedReceiverStream::new(c) + .map(|message| match message { + notifications::NotificationMessage::Logout => { + Event::Timeout(()) + } + _ => Event::Sync(()), + }) + .boxed(); + let mut stream = futures_util::stream::select_all([ tokio_stream::wrappers::UnixListenerStream::new(listener) .map(Event::Request) @@ -121,7 +127,7 @@ impl Agent { ) .map(Event::Sync) .boxed(), - notifications, + notifications, ]); while let Some(event) = stream.next().await { match event { @@ -149,8 +155,24 @@ impl Agent { Event::Sync(()) => { // this could fail if we aren't logged in, but we don't // care about that + let state = self.state.clone(); tokio::spawn(async move { - let _ = crate::actions::sync(None).await; + let result = crate::actions::sync(None).await; + if let Err(e) = result { + eprintln!("failed to sync: {e:#}"); + } else { + if !state + .write() + .await + .notifications_handler + .is_connected() + { + let err = crate::actions::subscribe_to_notifications(state).await; + if let Err(e) = err { + eprintln!("failed to subscribe to notifications: {e:#}") + } + } + } }); self.state.write().await.set_sync_timeout(); } diff --git a/src/bin/rbw-agent/main.rs b/src/bin/rbw-agent/main.rs index 5e0fa61..a9477df 100644 --- a/src/bin/rbw-agent/main.rs +++ b/src/bin/rbw-agent/main.rs @@ -19,9 +19,9 @@ mod actions; mod agent; mod daemon; mod debugger; +mod notifications; mod sock; mod timeout; -mod notifications; async fn tokio_main( startup_ack: Option, diff --git a/src/bin/rbw-agent/notifications.rs b/src/bin/rbw-agent/notifications.rs index b575cf9..e8f84b0 100644 --- a/src/bin/rbw-agent/notifications.rs +++ b/src/bin/rbw-agent/notifications.rs @@ -1,7 +1,10 @@ -use futures::{stream::SplitSink}; -use tokio::{net::{TcpStream}, task::JoinHandle}; -use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream}; -use futures_util::{StreamExt, SinkExt}; +use futures::stream::SplitSink; +use futures_util::{SinkExt, StreamExt}; +use tokio::{net::TcpStream, task::JoinHandle}; +use tokio_tungstenite::{ + connect_async, tungstenite::protocol::Message, MaybeTlsStream, + WebSocketStream, +}; #[derive(Copy, Clone)] pub enum NotificationMessage { @@ -21,43 +24,54 @@ pub enum NotificationMessage { Logout, } - - fn parse_messagepack(data: &[u8]) -> Option { // the first few bytes with the 0x80 bit set, plus one byte terminating the length contain the length of the message - let len_buffer_length = data.iter().position(|&x| (x & 0x80) == 0 )? + 1; + let len_buffer_length = data.iter().position(|&x| (x & 0x80) == 0)? + 1; - let unpacked_messagepack = rmpv::decode::read_value(&mut &data[len_buffer_length..]).ok()?; + let unpacked_messagepack = + rmpv::decode::read_value(&mut &data[len_buffer_length..]).ok()?; if !unpacked_messagepack.is_array() { return None; } let unpacked_message = unpacked_messagepack.as_array().unwrap(); - let message_type = unpacked_message.iter().next().unwrap().as_u64().unwrap(); + let message_type = + unpacked_message.iter().next().unwrap().as_u64().unwrap(); let message = match message_type { - 0 => Some(NotificationMessage::SyncCipherUpdate), - 1 => Some(NotificationMessage::SyncCipherCreate), - 2 => Some(NotificationMessage::SyncLoginDelete), - 3 => Some(NotificationMessage::SyncFolderDelete), - 4 => Some(NotificationMessage::SyncCiphers), - 5 => Some(NotificationMessage::SyncVault), - 6 => Some(NotificationMessage::SyncOrgKeys), - 7 => Some(NotificationMessage::SyncFolderCreate), - 8 => Some(NotificationMessage::SyncFolderUpdate), - 9 => Some(NotificationMessage::SyncCipherDelete), + 0 => Some(NotificationMessage::SyncCipherUpdate), + 1 => Some(NotificationMessage::SyncCipherCreate), + 2 => Some(NotificationMessage::SyncLoginDelete), + 3 => Some(NotificationMessage::SyncFolderDelete), + 4 => Some(NotificationMessage::SyncCiphers), + 5 => Some(NotificationMessage::SyncVault), + 6 => Some(NotificationMessage::SyncOrgKeys), + 7 => Some(NotificationMessage::SyncFolderCreate), + 8 => Some(NotificationMessage::SyncFolderUpdate), + 9 => Some(NotificationMessage::SyncCipherDelete), 10 => Some(NotificationMessage::SyncSettings), 11 => Some(NotificationMessage::Logout), - _ => None + _ => None, }; return message; } pub struct NotificationsHandler { - write: Option>, Message>>, + write: Option< + futures::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + Message, + >, + >, read_handle: Option>, - sending_channels : std::sync::Arc>>>, + sending_channels: std::sync::Arc< + tokio::sync::RwLock< + Vec>, + >, + >, } impl NotificationsHandler { @@ -65,27 +79,38 @@ impl NotificationsHandler { Self { write: None, read_handle: None, - sending_channels: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())), + sending_channels: std::sync::Arc::new(tokio::sync::RwLock::new( + Vec::new(), + )), } } - pub async fn connect(&mut self, url: String) -> Result<(), Box> { + pub async fn connect( + &mut self, + url: String, + ) -> Result<(), Box> { if self.is_connected() { self.disconnect().await?; } - let (write, read_handle) = subscribe_to_notifications(url, self.sending_channels.clone()).await?; - + let (write, read_handle) = + subscribe_to_notifications(url, self.sending_channels.clone()) + .await?; + self.write = Some(write); self.read_handle = Some(read_handle); return Ok(()); } pub fn is_connected(&self) -> bool { - self.write.is_some() && self.read_handle.is_some() && !self.read_handle.as_ref().unwrap().is_finished() + self.write.is_some() + && self.read_handle.is_some() + && !self.read_handle.as_ref().unwrap().is_finished() } - pub async fn disconnect(&mut self) -> Result<(), Box> { + pub async fn disconnect( + &mut self, + ) -> Result<(), Box> { self.sending_channels.write().await.clear(); if let Some(mut write) = self.write.take() { write.send(Message::Close(None)).await?; @@ -97,20 +122,40 @@ impl NotificationsHandler { Ok(()) } - pub async fn get_channel(&mut self) -> tokio::sync::mpsc::UnboundedReceiver { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + pub async fn get_channel( + &mut self, + ) -> tokio::sync::mpsc::UnboundedReceiver { + let (tx, rx) = + tokio::sync::mpsc::unbounded_channel::(); self.sending_channels.write().await.push(tx); return rx; } - } -async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Arc>>>) -> Result<(SplitSink>, Message>, JoinHandle<()>), Box> { +async fn subscribe_to_notifications( + url: String, + sending_channels: std::sync::Arc< + tokio::sync::RwLock< + Vec>, + >, + >, +) -> Result< + ( + SplitSink>, Message>, + JoinHandle<()>, + ), + Box, +> { let url = url::Url::parse(url.as_str())?; let (ws_stream, _response) = connect_async(url).await?; let (mut write, read) = ws_stream.split(); - write.send(Message::Text("{\"protocol\":\"messagepack\",\"version\":1}\n".to_string())).await.unwrap(); + write + .send(Message::Text( + "{\"protocol\":\"messagepack\",\"version\":1}\n".to_string(), + )) + .await + .unwrap(); let read_future = async move { read.map(|message| { @@ -139,4 +184,4 @@ async fn subscribe_to_notifications(url: String, sending_channels: std::sync::Ar }; return Ok((write, tokio::spawn(read_future))); -} \ No newline at end of file +} -- cgit v1.2.3-54-g00ecf