From bce93433368825952dea613746e9d15052ffa5fe Mon Sep 17 00:00:00 2001 From: neonphog Date: Thu, 9 Jan 2025 20:52:52 -0700 Subject: [PATCH 1/5] wip --- Cargo.lock | 7 +- Cargo.toml | 2 +- rust/sbd-e2e-crypto-client/Cargo.toml | 1 + rust/sbd-e2e-crypto-client/src/lib.rs | 525 ++++++++---------- rust/sbd-e2e-crypto-client/src/protocol.rs | 300 ++++++++++ .../src/sodoken_crypto.rs | 96 ++-- rust/sbd-e2e-crypto-client/src/test.rs | 157 +----- 7 files changed, 618 insertions(+), 470 deletions(-) create mode 100644 rust/sbd-e2e-crypto-client/src/protocol.rs diff --git a/Cargo.lock b/Cargo.lock index 56fef88..21498fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -176,9 +176,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cast" @@ -1461,6 +1461,7 @@ dependencies = [ name = "sbd-e2e-crypto-client" version = "0.0.8-alpha" dependencies = [ + "bytes", "sbd-client", "sbd-server", "sodoken", diff --git a/Cargo.toml b/Cargo.toml index dc74cc0..e13c5bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ sbd-server = { version = "0.0.8-alpha", path = "rust/sbd-server" } # crate deps anstyle = "1.0.6" base64 = "0.22.0" -bytes = "1.6.0" +bytes = "1.9.0" clap = "4.5.4" criterion = "0.5.1" ed25519-dalek = { version = "2.1.1", default-features = false } diff --git a/rust/sbd-e2e-crypto-client/Cargo.toml b/rust/sbd-e2e-crypto-client/Cargo.toml index 012c999..48568e0 100644 --- a/rust/sbd-e2e-crypto-client/Cargo.toml +++ b/rust/sbd-e2e-crypto-client/Cargo.toml @@ -11,6 +11,7 @@ version = "0.0.8-alpha" edition = "2021" [dependencies] +bytes = { workspace = true } sbd-client = { workspace = true } sodoken = { workspace = true } tokio = { workspace = true } diff --git a/rust/sbd-e2e-crypto-client/src/lib.rs b/rust/sbd-e2e-crypto-client/src/lib.rs index b3beceb..5cabfc8 100644 --- a/rust/sbd-e2e-crypto-client/src/lib.rs +++ b/rust/sbd-e2e-crypto-client/src/lib.rs @@ -1,13 +1,7 @@ -//! Sbd end to end encryption client. #![deny(missing_docs)] - -// Mutex strategy in this file is built on the assumption that this will -// largely be network bound. Since we only have the one rate-limited connection -// to the sbd server, it is okay to wrap it with a tokio Mutex and do the -// encryption / decryption while that mutex is locked. Without this top-level -// locking it is much easier to send secretstream headers out of order, -// especially on the receiving new connection side when a naive implementation -// trying to be clever might not lock the send side correctly. +//! Sbd end to end encryption client. +//! +//! See the [protocol] module documentation for spec details. use std::collections::HashMap; use std::io::{Error, Result}; @@ -15,6 +9,8 @@ use std::sync::{Arc, Mutex, Weak}; pub use sbd_client::PubKey; +pub mod protocol; + mod sodoken_crypto; pub use sodoken_crypto::*; @@ -28,9 +24,6 @@ pub struct Config { /// If `true` we will allow connecting to insecure plaintext servers. pub allow_plain_text: bool, - /// Cooldown time to prevent comms on "connection" close. - pub cooldown: std::time::Duration, - /// Max connection count. pub max_connections: usize, @@ -43,279 +36,56 @@ impl Default for Config { Self { listener: false, allow_plain_text: false, - cooldown: std::time::Duration::from_secs(10), max_connections: 4096, max_idle: std::time::Duration::from_secs(10), } } } -enum Conn { - Cooldown(tokio::time::Instant), - Active { - last_active: tokio::time::Instant, - enc: sodoken_crypto::Encryptor, - dec: sodoken_crypto::Decryptor, - }, -} - -struct Inner { - config: Arc, - crypto: sodoken_crypto::SodokenCrypto, - client: sbd_client::SbdClient, - map: HashMap, -} - -fn do_close_peer(pk: &PubKey, conn: &mut Conn, cooldown: std::time::Duration) { - tracing::debug!( - target: "NETAUDIT", - pub_key = ?pk, - cooldown_s = cooldown.as_secs_f64(), - m = "sbd-e2e-crypto-client", - a = "close_peer", - ); - *conn = Conn::Cooldown(tokio::time::Instant::now() + cooldown); -} - -impl Inner { - pub async fn close(&mut self) { - self.client.close().await; - } - - pub fn close_peer(&mut self, pk: &PubKey) { - if let Some(conn) = self.map.get_mut(pk) { - do_close_peer(pk, conn, self.config.cooldown); - } - } - - pub async fn assert(&mut self, pk: &PubKey) -> Result<()> { - let Self { - config, - crypto, - client, - map, - } = self; - - let (conn, hdr) = Self::priv_assert_con(pk, config, crypto, map, true)?; - - match conn { - Conn::Cooldown(_) => { - Err(Error::other("connection still cooling down")) - } - Conn::Active { .. } => { - if let Err(err) = async { - if let Some(hdr) = hdr { - client.send(pk, &hdr).await - } else { - Ok(()) - } - } - .await - { - do_close_peer(pk, conn, config.cooldown); - Err(err) - } else { - Ok(()) - } - } - } - } - - pub async fn recv( - &mut self, - msg: sbd_client::Msg, - ) -> Result)>> { - let Self { - config, - crypto, - client, - map, - } = self; - - let pk = msg.pub_key(); - - match Self::priv_assert_con(&pk, config, crypto, map, config.listener) { - Err(_) => Ok(None), - Ok((conn, hdr)) => { - if let Some(hdr) = hdr { - client.send(&pk, &hdr).await?; - } - - match conn { - Conn::Cooldown(_) => Ok(None), - Conn::Active { - last_active, dec, .. - } => { - *last_active = tokio::time::Instant::now(); - - match dec.decrypt(msg.message()) { - Err(_) => { - do_close_peer(&pk, conn, config.cooldown); - Ok(None) - } - Ok(None) => Ok(None), - Ok(Some(msg)) => Ok(Some((pk, msg))), - } - } - } - } - } - } - - pub async fn send(&mut self, pk: &PubKey, msg: &[u8]) -> Result<()> { - let Self { - config, - crypto, - client, - map, - } = self; - - let (conn, hdr) = Self::priv_assert_con(pk, config, crypto, map, true)?; - - match conn { - Conn::Cooldown(_) => { - Err(Error::other("connection still cooling down")) - } - Conn::Active { enc, .. } => { - if let Err(err) = async { - if let Some(hdr) = hdr { - client.send(pk, &hdr).await?; - } - let msg = enc.encrypt(msg)?; - client.send(pk, &msg).await - } - .await - { - do_close_peer(pk, conn, config.cooldown); - Err(err) - } else { - Ok(()) - } - } - } - } - - fn prune(config: &Config, map: &mut HashMap) { - let now = tokio::time::Instant::now(); - - map.retain(|pk, c| { - if let Conn::Active { last_active, .. } = c { - if now - *last_active > config.max_idle { - do_close_peer(pk, c, config.cooldown); - } - } - - if let Conn::Cooldown(at) = c { - now < *at - } else { - true - } - }) - } - - fn priv_assert_con<'a>( - pk: &PubKey, - config: &Config, - crypto: &sodoken_crypto::SodokenCrypto, - map: &'a mut HashMap, - do_create: bool, - ) -> Result<(&'a mut Conn, Option<[u8; 24]>)> { - use std::collections::hash_map::Entry; - - // TODO - more efficient to only prune if we need to - // but then, we'd need to manage expired cooldowns - // in-line, lest we keep denying connections - //if map.len() >= config.max_connections { - // Self::prune(config, map); - //} - // instead, for now, we just always prune - Self::prune(config, map); - - let len = map.len(); - - match map.entry(pk.clone()) { - Entry::Occupied(e) => Ok((e.into_mut(), None)), - Entry::Vacant(e) => { - if !do_create { - return Err(Error::other("ignore")); - } - if len >= config.max_connections { - tracing::debug!( - target: "NETAUDIT", - pub_key = ?pk, - m = "sbd-e2e-crypto-client", - "cannot open: too many connections", - ); - return Err(Error::other("too many connections")); - } - tracing::debug!( - target: "NETAUDIT", - pub_key = ?pk, - m = "sbd-e2e-crypto-client", - a = "open_peer", - ); - let (enc, hdr, dec) = crypto.new_enc(pk)?; - Ok(( - e.insert(Conn::Active { - last_active: tokio::time::Instant::now(), - enc, - dec, - }), - Some(hdr), - )) - } - } - } -} - -async fn close_inner(inner: &mut Option) { - if let Some(mut inner) = inner.take() { - inner.close().await; - } -} +// tokio mutex required to ensure ordering on new stream messages +// we can't send in parallel over the same sub-client anyways. +type ClientSync = tokio::sync::Mutex; /// Handle to receive data from the crypto connection. pub struct MsgRecv { - inner: Weak>>, + inner: Arc>, recv: sbd_client::MsgRecv, + client: Weak, } impl MsgRecv { /// Receive data from the crypto connection. - pub async fn recv(&mut self) -> Option<(PubKey, Vec)> { - loop { - let raw_msg = match self.recv.recv().await { - None => return None, - Some(raw_msg) => raw_msg, - }; - - if let Some(inner) = self.inner.upgrade() { - let mut lock = inner.lock().await; - - if let Some(inner) = &mut *lock { - match inner.recv(raw_msg).await { - Err(_) => (), - Ok(None) => continue, - Ok(Some(o)) => return Some(o), + pub async fn recv(&mut self) -> Option<(PubKey, bytes::Bytes)> { + while let Some(msg) = self.recv.recv().await { + let pk = msg.pub_key(); + match self.inner.lock().unwrap().dec(msg) { + DecRes::Ok(msg) => return Some((pk, msg)), + DecRes::Ignore => (), + DecRes::ReqNewStream => { + // error decoding, we need to request a new stream + if let Some(client) = self.client.upgrade() { + let msg = + protocol::Protocol::request_new_stream(&*pk.0); + if let Err(err) = + client.lock().await.send(&pk, msg.base_msg()).await + { + tracing::debug!(?err, "failure sending request_new_stream in message receive handler"); + } + } else { + return None; } - } else { - return None; } - - // the only code path leading out of the branches above - // is the error one where we need to close the connection - close_inner(&mut lock).await; - } else { - return None; } } + None } } /// An encrypted connection to peers through an Sbd server. pub struct SbdClientCrypto { pub_key: PubKey, - inner: Arc>>, + inner: Arc>, + client: Arc, } impl SbdClientCrypto { @@ -334,20 +104,23 @@ impl SbdClientCrypto { let (client, recv) = sbd_client::SbdClient::connect_config(url, &crypto, client_config) .await?; - let inner = Arc::new(tokio::sync::Mutex::new(Some(Inner { - config, - crypto, + + let client = Arc::new(tokio::sync::Mutex::new(client)); + let inner = Arc::new(Mutex::new(Inner::new(config, crypto))); + + let recv = MsgRecv { + inner: inner.clone(), + recv, + client: Arc::downgrade(&client), + }; + + let this = Self { + pub_key, + inner, client, - map: HashMap::default(), - }))); - let weak_inner = Arc::downgrade(&inner); - Ok(( - Self { pub_key, inner }, - MsgRecv { - inner: weak_inner, - recv, - }, - )) + }; + + Ok((this, recv)) } /// Get the public key of this node. @@ -355,45 +128,219 @@ impl SbdClientCrypto { &self.pub_key } + /// Get the current active "connected" peers. + pub fn active_peers(&self) -> Vec { + self.inner.lock().unwrap().c_map.keys().cloned().collect() + } + /// Assert that we are connected to a peer without sending any data. pub async fn assert(&self, pk: &PubKey) -> Result<()> { - let mut lock = self.inner.lock().await; - if let Some(inner) = &mut *lock { - inner.assert(pk).await - } else { - Err(Error::other("closed")) + let enc = self.inner.lock().unwrap().enc(pk, None)?; + + { + let client = self.client.lock().await; + for enc in enc { + client.send(pk, &enc).await?; + } } + + Ok(()) } /// Send a message to a peer. pub async fn send(&self, pk: &PubKey, msg: &[u8]) -> Result<()> { const SBD_MAX: usize = 20_000; const SBD_HDR: usize = 32; + const SBD_P_HDR: usize = 1; const SS_ABYTES: usize = sodoken::secretstream::ABYTES; - const MAX_MSG: usize = SBD_MAX - SBD_HDR - SS_ABYTES; + const MAX_MSG: usize = SBD_MAX - SBD_HDR - SBD_P_HDR - SS_ABYTES; if msg.len() > MAX_MSG { return Err(Error::other("message too long")); } - let mut lock = self.inner.lock().await; - if let Some(inner) = &mut *lock { - inner.send(pk, msg).await - } else { - Err(Error::other("closed")) + let enc = self.inner.lock().unwrap().enc(pk, Some(msg))?; + + { + let client = self.client.lock().await; + for enc in enc { + client.send(pk, &enc).await?; + } } + + Ok(()) } /// Close a connection to a specific peer. pub async fn close_peer(&self, pk: &PubKey) { - if let Some(inner) = self.inner.lock().await.as_mut() { - inner.close_peer(pk); - } + self.inner.lock().unwrap().close(pk); } /// Close the entire sbd client connection. pub async fn close(&self) { - close_inner(&mut *self.inner.lock().await).await; + self.client.lock().await.close().await; + } +} + +enum DecRes { + Ok(bytes::Bytes), + Ignore, + ReqNewStream, +} + +struct InnerRec { + enc: Option, + dec: Option, + last_active: std::time::Instant, +} + +impl InnerRec { + pub fn new() -> Self { + Self { + enc: None, + dec: None, + last_active: std::time::Instant::now(), + } + } +} + +struct Inner { + config: Arc, + crypto: SodokenCrypto, + c_map: HashMap, +} + +impl Inner { + fn new(config: Arc, crypto: SodokenCrypto) -> Self { + Self { + config, + crypto, + c_map: HashMap::new(), + } + } + + fn close(&mut self, pk: &PubKey) { + self.c_map.remove(pk); + } + + fn loc_assert<'a>( + config: &'a Config, + c_map: &'a mut HashMap, + pk: PubKey, + do_create: bool, + ) -> Result<&'a mut InnerRec> { + use std::collections::hash_map::Entry; + let tot = c_map.len(); + let now = std::time::Instant::now(); + c_map.retain(|_pk, r| now - r.last_active < config.max_idle); + match c_map.entry(pk.clone()) { + Entry::Vacant(e) => { + if do_create { + if tot >= config.max_connections { + return Err(Error::other("too many connections")); + } + Ok(e.insert(InnerRec::new())) + } else { + return Err(Error::other("ignore unsolicited")); + } + } + Entry::Occupied(e) => Ok(e.into_mut()), + } + } + + fn enc( + &mut self, + pk: &PubKey, + msg: Option<&[u8]>, + ) -> Result> { + let Self { + config, + crypto, + c_map, + } = self; + + let mut out = Vec::new(); + + // assert we have a record for the pubkey + let rec = Self::loc_assert(config, c_map, pk.clone(), true)?; + rec.last_active = std::time::Instant::now(); + + // assert we have an Encryptor, adding header to output as needed + if rec.enc.is_none() { + let (enc, hdr) = crypto.new_enc(&**pk)?; + rec.enc = Some(enc); + let msg = protocol::Protocol::new_stream(&**pk, &hdr); + out.push(msg.base_msg().clone()); + } + + if let Some(msg) = msg { + // encrypt our message + let msg = rec.enc.as_mut().unwrap().encrypt(&*pk.0, msg)?; + + out.push(msg.base_msg().clone()); + } + + Ok(out) + } + + fn dec(&mut self, msg: sbd_client::Msg) -> DecRes { + let Self { + config, + crypto, + c_map, + } = self; + let rec = match Self::loc_assert( + config, + c_map, + msg.pub_key(), + config.listener, + ) { + Ok(rec) => rec, + Err(_) => { + // too many connections, or unsolicited... ignore this message + return DecRes::Ignore; + } + }; + rec.last_active = std::time::Instant::now(); + let dec = match protocol::Protocol::from_full( + bytes::Bytes::copy_from_slice(&msg.0), + ) { + Some(dec) => dec, + None => { + rec.dec = None; + // cannot decode, request a new stream + // MAYBE track these too so we ban bad actors? + return DecRes::ReqNewStream; + } + }; + match dec { + protocol::Protocol::NewStream { header, .. } => { + let dec = + match crypto.new_dec(msg.pub_key_ref(), header.as_ref()) { + Ok(dec) => dec, + Err(_) => return DecRes::ReqNewStream, + }; + rec.dec = Some(dec); + DecRes::Ignore + } + protocol::Protocol::Message { message, .. } => { + match rec.dec.as_mut() { + Some(dec) => match dec.decrypt(message.as_ref()) { + Ok(message) => DecRes::Ok(message), + Err(_) => return DecRes::ReqNewStream, + }, + None => { + // we don't want to entertain peers that don't + // properly send us a new stream first + return DecRes::Ignore; + } + } + } + protocol::Protocol::RequestNewStream { .. } => { + rec.enc = None; + DecRes::Ignore + } + } } } diff --git a/rust/sbd-e2e-crypto-client/src/protocol.rs b/rust/sbd-e2e-crypto-client/src/protocol.rs new file mode 100644 index 0000000..2462a4c --- /dev/null +++ b/rust/sbd-e2e-crypto-client/src/protocol.rs @@ -0,0 +1,300 @@ +//! This adds end-to-end encryption for peer communications over the base +//! sbd communication protocol via +//! [libsodium secretstream](https://doc.libsodium.org/secret-key_cryptography/secretstream). +//! +//! ## Message Type Header +//! +//! Adds a single-byte header to messages sent. +//! +//! Messages with bytes other than the following three should be ignored +//! for future compatibility. +//! +//! - `0x10` - NewStream -- must be followed by 24 byte secret stream header. +//! - `0x11` - Message -- encrypted message including abytes. +//! - `0x12` - RequestNewStream -- only this single byte. +//! +//! ## Message Type Handling +//! +//! - When sending a message to a new (or not recent) peer, clients MUST +//! establish a new outgoing (encryption) secret stream state and send the +//! 24 byte header in a "NewStream" message. +//! - On receiving a "RequestNewStream" message, clients MUST establish a +//! new outgoing (encryption) secret stream state and send the 24 byte header +//! in a "NewStream" message. +//! - On receiving a "NewStream" message, clients MUST establish a new incoming +//! (decryption) secret stream state. +//! - On receiving a "Message" that cannot be decrypted, clients MUST +//! (1) ignore the message, (2) delete any incoming (decryption) state, and +//! (3) send a "RequestNewStream" message. Any message receipt tracking or +//! re-requesting will not be handled by this library, but may be added by +//! implementors as a layer on top of this. + +/// Start a new stream. +pub const T_NEW_STREAM: u8 = 0x10; + +/// Encrypted stream message. +pub const T_MESSAGE: u8 = 0x11; + +/// Request start of new stream. +pub const T_REQ_NEW_STREAM: u8 = 0x12; + +/// E2e crypto protocol enum. +/// +/// The enum variant fields are all shallow clone parts of the "full" field: +/// - `full` the entire message send/recv via sbd-client. +/// - `pub_key` the pub_key peer send/recv to/from. +/// - `base_msg` the base message send/recv to/from the peer. +/// - `...` all additional fields are broken out by enum variant. +#[derive(PartialEq)] +pub enum Protocol { + /// Message indicating a new stream state should be created + /// along with the secretstream header for doing so. + NewStream { + #[allow(missing_docs)] + full: bytes::Bytes, + #[allow(missing_docs)] + pub_key: bytes::Bytes, + #[allow(missing_docs)] + base_msg: bytes::Bytes, + #[allow(missing_docs)] + header: bytes::Bytes, + }, + + /// An encrypted message. + Message { + #[allow(missing_docs)] + full: bytes::Bytes, + #[allow(missing_docs)] + pub_key: bytes::Bytes, + #[allow(missing_docs)] + base_msg: bytes::Bytes, + #[allow(missing_docs)] + message: bytes::Bytes, + }, + + /// Request for a new descryption stream state. + RequestNewStream { + #[allow(missing_docs)] + full: bytes::Bytes, + #[allow(missing_docs)] + pub_key: bytes::Bytes, + #[allow(missing_docs)] + base_msg: bytes::Bytes, + }, +} + +impl std::fmt::Debug for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::NewStream { .. } => { + f.debug_struct("Protocol::NewStream").finish() + } + Self::Message { .. } => { + f.debug_struct("Protocol::Message").finish() + } + Self::RequestNewStream { .. } => { + f.debug_struct("Protocol::RequestNewStream").finish() + } + } + } +} + +impl Protocol { + /// Parse a protocol message from complete message bytes. + /// If None, this message should be ignored. + pub fn from_full(full: bytes::Bytes) -> Option { + if full.len() < 33 { + return None; + } + let pub_key = full.slice(..32); + let base_msg = full.slice(32..); + Some(match full[32] { + T_NEW_STREAM => { + if base_msg.len() != 25 { + return None; + } + let header = full.slice(33..); + Self::NewStream { + full, + pub_key, + base_msg, + header, + } + } + T_MESSAGE => { + let message = full.slice(33..); + Self::Message { + full, + pub_key, + base_msg, + message, + } + } + T_REQ_NEW_STREAM => { + if base_msg.len() != 1 { + return None; + } + Self::RequestNewStream { + full, + pub_key, + base_msg, + } + } + _ => return None, + }) + } + + /// Create a "NewStream" message type. + /// Panics if the pub_key is not 32 bytes or the header is not 24. + /// Why not take a `&[u8; N]` you ask? It's just a lot harder to work + /// with in rust... + pub fn new_stream(pub_key: &[u8], header: &[u8]) -> Self { + let mut out = bytes::BytesMut::with_capacity(32 + 1 + 24); + out.extend_from_slice(&pub_key[..32]); + out.extend_from_slice(&[T_NEW_STREAM]); + out.extend_from_slice(&header[..24]); + // unwrap because we know the content + Self::from_full(out.freeze()).unwrap() + } + + /// Create a "Message" message type. + /// Panics if the pub_key is not 32 bytes. + /// Why not take a `&[u8; N]` you ask? It's just a lot harder to work + /// with in rust... + pub fn message(pub_key: &[u8], message: &[u8]) -> Self { + let mut out = bytes::BytesMut::with_capacity(32 + 1 + message.len()); + out.extend_from_slice(&pub_key[..32]); + out.extend_from_slice(&[T_MESSAGE]); + out.extend_from_slice(message); + // unwrap because we know the content + Self::from_full(out.freeze()).unwrap() + } + + /// Create a "RequestNewStream" message type. + /// Panics if the pub_key is not 32 bytes. + /// Why not take a `&[u8; N]` you ask? It's just a lot harder to work + /// with in rust... + pub fn request_new_stream(pub_key: &[u8]) -> Self { + let mut out = bytes::BytesMut::with_capacity(32 + 1); + out.extend_from_slice(&pub_key[..32]); + out.extend_from_slice(&[T_REQ_NEW_STREAM]); + // unwrap because we know the content + Self::from_full(out.freeze()).unwrap() + } + + /// Get the full bytes of this protocol message. + pub fn full(&self) -> &bytes::Bytes { + match self { + Self::NewStream { full, .. } => full, + Self::Message { full, .. } => full, + Self::RequestNewStream { full, .. } => full, + } + } + + /// Get the pub_key bytes of this protocol message. + pub fn pub_key(&self) -> &bytes::Bytes { + match self { + Self::NewStream { pub_key, .. } => pub_key, + Self::Message { pub_key, .. } => pub_key, + Self::RequestNewStream { pub_key, .. } => pub_key, + } + } + + /// Get the base_msg bytes of this protocol message. + pub fn base_msg(&self) -> &bytes::Bytes { + match self { + Self::NewStream { base_msg, .. } => base_msg, + Self::Message { base_msg, .. } => base_msg, + Self::RequestNewStream { base_msg, .. } => base_msg, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + const PUB_KEY: &[u8] = &[4; 32]; + const HEADER: &[u8] = &[5; 24]; + + #[inline(always)] + fn valid_roundtrip(orig: &Protocol) { + let new = Protocol::from_full(orig.full().clone()).unwrap(); + assert_eq!(orig, &new); + assert_eq!(orig.full(), new.full()); + } + + #[test] + #[should_panic] + fn bad_pk_size() { + Protocol::request_new_stream(&[4; 31]); + } + + #[test] + #[should_panic] + fn bad_hdr_size() { + Protocol::new_stream(PUB_KEY, &[5; 23]); + } + + #[test] + fn other() { + let mut exp_other = bytes::BytesMut::new(); + exp_other.extend_from_slice(PUB_KEY); + exp_other.extend_from_slice(&[0x42]); + exp_other.extend_from_slice(b"not a thing"); + let exp_other = exp_other.freeze(); + assert!(Protocol::from_full(exp_other.clone()).is_none()); + } + + #[test] + fn new_stream() { + let ns = Protocol::new_stream(PUB_KEY, HEADER); + valid_roundtrip(&ns); + + let mut exp_base_msg = Vec::new(); + exp_base_msg.push(T_NEW_STREAM); + exp_base_msg.extend_from_slice(HEADER); + + assert!(matches!(ns, Protocol::NewStream { + pub_key, + base_msg, + header, + .. + } if pub_key.as_ref() == PUB_KEY + && base_msg.as_ref() == exp_base_msg + && header.as_ref() == HEADER + )); + } + + #[test] + fn message() { + let ns = Protocol::message(PUB_KEY, b"hello"); + valid_roundtrip(&ns); + + let mut exp_base_msg = Vec::new(); + exp_base_msg.push(T_MESSAGE); + exp_base_msg.extend_from_slice(b"hello"); + + assert!(matches!(ns, Protocol::Message { + pub_key, + base_msg, + message, + .. + } if pub_key.as_ref() == PUB_KEY + && base_msg.as_ref() == exp_base_msg + && message.as_ref() == b"hello" + )); + } + + #[test] + fn req_new_stream() { + let ns = Protocol::request_new_stream(PUB_KEY); + valid_roundtrip(&ns); + + assert!(matches!(ns, Protocol::RequestNewStream { + pub_key, + base_msg, + .. + } if pub_key.as_ref() == PUB_KEY && base_msg.as_ref() == [0x12])); + } +} diff --git a/rust/sbd-e2e-crypto-client/src/sodoken_crypto.rs b/rust/sbd-e2e-crypto-client/src/sodoken_crypto.rs index 9f6d91d..950739c 100644 --- a/rust/sbd-e2e-crypto-client/src/sodoken_crypto.rs +++ b/rust/sbd-e2e-crypto-client/src/sodoken_crypto.rs @@ -18,50 +18,44 @@ impl Encryptor { Ok(header) } - /// Encrypt a new message. - pub fn encrypt(&mut self, msg: &[u8]) -> Result> { - let mut out = vec![0; msg.len() + sodoken::secretstream::ABYTES]; + /// Encrypt a new message. This version saves a copy by putting the + /// encrypted data directly into a protocol message. + pub fn encrypt( + &mut self, + pub_key: &[u8], + msg: &[u8], + ) -> Result { + let mut out = bytes::BytesMut::zeroed( + 32 + 1 + msg.len() + sodoken::secretstream::ABYTES, + ); + out[..32].copy_from_slice(&pub_key[..32]); + out[32] = protocol::T_MESSAGE; + sodoken::secretstream::push( &mut self.state, msg, None, sodoken::secretstream::Tag::Message, - &mut out, + &mut out[33..], )?; - Ok(out) + + // unwrap okay since we are constructing this + Ok(protocol::Protocol::from_full(out.freeze()).unwrap()) } } /// Secret stream decryptor. pub struct Decryptor { - sk: sodoken::LockedArray<32>, state: sodoken::secretstream::State, } impl Decryptor { - /// Decrypt a new message. - pub fn decrypt(&mut self, msg: &[u8]) -> Result>> { - let mut out = vec![0; msg.len() - sodoken::secretstream::ABYTES]; - match sodoken::secretstream::pull(&mut self.state, &mut out, msg, None) - { - Ok(_) => return Ok(Some(out)), - Err(_) => { - if msg.len() == 24 { - let mut header = [0; 24]; - header.copy_from_slice(msg); - if sodoken::secretstream::init_pull( - &mut self.state, - &header, - &self.sk.lock(), - ) - .is_ok() - { - return Ok(None); - } - } - } - } - Err(Error::other("decryption failure")) + /// Decrypt a new message into [bytes::Bytes]. + pub fn decrypt(&mut self, msg: &[u8]) -> Result { + let mut out = + bytes::BytesMut::zeroed(msg.len() - sodoken::secretstream::ABYTES); + sodoken::secretstream::pull(&mut self.state, &mut out[..], msg, None)?; + Ok(out.freeze()) } } @@ -104,11 +98,10 @@ impl SodokenCrypto { } } - /// Construct a new encryption / decryption pair for given remote peer. - pub fn new_enc( + fn session( &self, peer_sign_pk: &[u8; 32], - ) -> Result<(Encryptor, [u8; 24], Decryptor)> { + ) -> Result<(sodoken::LockedArray<32>, sodoken::LockedArray<32>)> { let mut peer_enc_pk = [0; 32]; sodoken::sign::pk_to_curve25519(&mut peer_enc_pk, peer_sign_pk)?; @@ -133,6 +126,16 @@ impl SodokenCrypto { )?; } + Ok((rx, tx)) + } + + /// Construct a new encryptor for a peer connection. + pub fn new_enc( + &self, + peer_sign_pk: &[u8; 32], + ) -> Result<(Encryptor, [u8; 24])> { + let (_rx, tx) = self.session(peer_sign_pk)?; + let mut enc = Encryptor { sk: tx, state: sodoken::secretstream::State::default(), @@ -140,14 +143,27 @@ impl SodokenCrypto { let hdr = enc.init()?; - Ok(( - enc, - hdr, - Decryptor { - sk: rx, - state: sodoken::secretstream::State::default(), - }, - )) + Ok((enc, hdr)) + } + + /// Construct a new decryptor for a peer connection. + pub fn new_dec( + &self, + peer_sign_pk: &[u8], + hdr: &[u8], + ) -> Result { + let mut pk = [0; 32]; + pk.copy_from_slice(&peer_sign_pk[..32]); + let (mut rx, _tx) = self.session(&pk)?; + + let mut state = sodoken::secretstream::State::default(); + + let mut header = [0; 24]; + header.copy_from_slice(&hdr[..24]); + + sodoken::secretstream::init_pull(&mut state, &header, &rx.lock())?; + + Ok(Decryptor { state }) } } diff --git a/rust/sbd-e2e-crypto-client/src/test.rs b/rust/sbd-e2e-crypto-client/src/test.rs index 3ed7899..b4a6528 100644 --- a/rust/sbd-e2e-crypto-client/src/test.rs +++ b/rust/sbd-e2e-crypto-client/src/test.rs @@ -8,7 +8,6 @@ impl Cfg { Self(Config { listener: true, allow_plain_text: true, - cooldown: tokio::time::Duration::from_secs(1), max_connections: 100, max_idle: tokio::time::Duration::from_secs(10), }) @@ -24,11 +23,6 @@ impl Cfg { self } - pub fn cool(mut self, cool: std::time::Duration) -> Self { - self.0.cooldown = cool; - self - } - pub fn idle(mut self, idle: std::time::Duration) -> Self { self.0.max_idle = idle; self @@ -77,11 +71,11 @@ async fn sanity() { let (rk, rm) = r1.recv().await.unwrap(); assert_eq!(c2.pub_key(), &rk); - assert_eq!(b"hello", rm.as_slice()); + assert_eq!(b"hello", rm.as_ref()); let (rk, rm) = r2.recv().await.unwrap(); assert_eq!(c1.pub_key(), &rk); - assert_eq!(b"world", rm.as_slice()); + assert_eq!(b"world", rm.as_ref()); } #[tokio::test(flavor = "multi_thread")] @@ -174,149 +168,38 @@ async fn max_msg_size() { (20_000, false), (19_968, false), (19_952, false), - (19_951, true), + (19_951, false), + (19_950, true), ] { - assert_eq!(is_ok, c2.send(c1.pub_key(), &MSG[..size]).await.is_ok()) + let res = c2.send(c1.pub_key(), &MSG[..size]).await; + assert_eq!(is_ok, res.is_ok()) } let (_, r) = r1.recv().await.unwrap(); - assert_eq!(&MSG[..19_951], r.as_slice()); + assert_eq!(&MSG[..19_950], r.as_ref()); } #[tokio::test(flavor = "multi_thread")] -async fn cooldown_config_works_from_send_side() { - let test = Test::new().await; - - let ((c1, mut r1), (c2, _r2), (c3, _r3)) = tokio::join!( - test.conn(Cfg::d().cool(std::time::Duration::from_millis(1))), - test.conn(Cfg::d().cool(std::time::Duration::from_millis(1))), - test.conn(Cfg::d().cool(std::time::Duration::from_secs(5000))), - ); - - tokio::try_join!( - c2.send(c1.pub_key(), b"msg-a"), - c3.send(c1.pub_key(), b"msg-b"), - ) - .unwrap(); - - let (_, r) = r1.recv().await.unwrap(); - assert!(r == b"msg-a" || r == b"msg-b"); - let (_, r) = r1.recv().await.unwrap(); - assert!(r == b"msg-a" || r == b"msg-b"); +async fn idle_close_connections() { + const DUR: std::time::Duration = std::time::Duration::from_millis(20); - c2.close_peer(c1.pub_key()).await; - c3.close_peer(c1.pub_key()).await; - - tokio::time::sleep(std::time::Duration::from_millis(2)).await; - - assert!(c3.send(c1.pub_key(), b"msg-c").await.is_err()); - assert!(c2.send(c1.pub_key(), b"msg-d").await.is_ok()); - - let (_, r) = r1.recv().await.unwrap(); - assert!(r == b"msg-d"); -} - -#[tokio::test(flavor = "multi_thread")] -async fn cooldown_config_works_from_recv_side() { - let test = Test::new().await; - - let ((c1, mut r1), (c2, mut r2), (c3, _r3)) = tokio::join!( - test.conn(Cfg::d().cool(std::time::Duration::from_secs(5000))), - test.conn(Cfg::d().cool(std::time::Duration::from_millis(1))), - test.conn(Cfg::d().cool(std::time::Duration::from_millis(1))), - ); - - tokio::try_join!( - c3.send(c1.pub_key(), b"msg-a"), - c3.send(c2.pub_key(), b"msg-b"), - ) - .unwrap(); - - let (_, r) = r1.recv().await.unwrap(); - assert_eq!(b"msg-a", r.as_slice()); - let (_, r) = r2.recv().await.unwrap(); - assert_eq!(b"msg-b", r.as_slice()); - - c1.close_peer(c3.pub_key()).await; - c2.close_peer(c3.pub_key()).await; - - // note we have to close the send side too, or it won't re-init crypto - c3.close_peer(c1.pub_key()).await; - c3.close_peer(c2.pub_key()).await; - - tokio::time::sleep(std::time::Duration::from_millis(2)).await; - - tokio::try_join!( - c3.send(c1.pub_key(), b"msg-c"), - c3.send(c2.pub_key(), b"msg-d"), - ) - .unwrap(); - - let (_, r) = r2.recv().await.unwrap(); - assert_eq!(b"msg-d", r.as_slice()); - - assert!( - tokio::time::timeout(std::time::Duration::from_secs(1), r1.recv()) - .await - .is_err() - ); -} - -#[tokio::test(flavor = "multi_thread")] -async fn idle_close_even_if_sending() { - let (took, exited_early) = - idle_close_even_if_sending_inner(std::time::Duration::from_secs(5000)) - .await; - assert!(!exited_early); - let iter_millis = took.as_millis() as u64 / 20; - println!("1 iter took: {} millis", iter_millis); - - // let's try to have it close right in the middle of the run - let (_took, exited_early) = idle_close_even_if_sending_inner( - std::time::Duration::from_millis(iter_millis * 10), - ) - .await; - assert!(exited_early); -} - -async fn idle_close_even_if_sending_inner( - idle_dur: std::time::Duration, -) -> (std::time::Duration, bool) { let test = Test::new().await; let ((c1, mut r1), (c2, _r2)) = tokio::join!( - test.conn(Cfg::d().idle(idle_dur)), - test.conn(Cfg::d().idle(idle_dur)), + test.conn(Cfg::d().idle(DUR)), + test.conn(Cfg::d().idle(DUR)), ); - c2.send(c1.pub_key(), b"").await.unwrap(); + println!("bla1"); + + c2.send(c1.pub_key(), b"wabonb").await.unwrap(); + println!("bla2"); let _ = r1.recv().await.unwrap(); - let mut exited_early = false; + println!("bla3"); + println!("counts: {} {}", c1.active_peers().len(), c2.active_peers().len()); - let start = tokio::time::Instant::now(); - for i in 0..20 { - let istart = tokio::time::Instant::now(); - if c2.send(c1.pub_key(), &[i]).await.is_err() { - exited_early = true; - break; - } - match tokio::time::timeout(std::time::Duration::from_secs(1), r1.recv()) - .await - { - Err(_) | Ok(None) => { - exited_early = true; - break; - } - Ok(Some((_, r))) => { - assert_eq!(&[i], r.as_slice()); - } - } - let ielapsed = istart.elapsed(); - let ten = std::time::Duration::from_millis(10); - if ielapsed < ten { - tokio::time::sleep(ten - ielapsed).await; - } - } - (start.elapsed(), exited_early) + tokio::time::sleep(DUR * 2).await; + + println!("counts: {} {}", c1.active_peers().len(), c2.active_peers().len()); } From 4ec49d55cb08d4c953b41e2fd1821ac19dc2e043 Mon Sep 17 00:00:00 2001 From: neonphog Date: Fri, 10 Jan 2025 13:27:49 -0700 Subject: [PATCH 2/5] cleanup --- Cargo.lock | 12 +++++------ Cargo.toml | 12 +++++------ kill.bash | 3 +++ rust/sbd-bench/Cargo.toml | 2 +- rust/sbd-client/Cargo.toml | 2 +- rust/sbd-e2e-crypto-client/Cargo.toml | 2 +- rust/sbd-e2e-crypto-client/src/lib.rs | 27 +++++++++++++++++------- rust/sbd-e2e-crypto-client/src/test.rs | 11 +++------- rust/sbd-o-bahn-client-tester/Cargo.toml | 2 +- rust/sbd-o-bahn-server-tester/Cargo.toml | 2 +- rust/sbd-server/Cargo.toml | 2 +- 11 files changed, 43 insertions(+), 34 deletions(-) create mode 100755 kill.bash diff --git a/Cargo.lock b/Cargo.lock index 21498fa..553cdba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1428,7 +1428,7 @@ dependencies = [ [[package]] name = "sbd-bench" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" dependencies = [ "base64 0.22.1", "criterion", @@ -1439,7 +1439,7 @@ dependencies = [ [[package]] name = "sbd-client" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -1459,7 +1459,7 @@ dependencies = [ [[package]] name = "sbd-e2e-crypto-client" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" dependencies = [ "bytes", "sbd-client", @@ -1471,7 +1471,7 @@ dependencies = [ [[package]] name = "sbd-o-bahn-client-tester" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" dependencies = [ "hex", "sbd-server", @@ -1480,7 +1480,7 @@ dependencies = [ [[package]] name = "sbd-o-bahn-server-tester" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" dependencies = [ "sbd-client", "tokio", @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "sbd-server" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" dependencies = [ "anstyle", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index e13c5bf..6d6f999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,12 @@ panic = "abort" [workspace.dependencies] # workspace member deps -sbd-bench = { version = "0.0.8-alpha", path = "rust/sbd-bench" } -sbd-client = { version = "0.0.8-alpha", path = "rust/sbd-client" } -sbd-e2e-crypto-client = { version = "0.0.8-alpha", path = "rust/sbd-e2e-crypto-client" } -sbd-o-bahn-client-tester = { version = "0.0.8-alpha", path = "rust/sbd-o-bahn-client-tester" } -sbd-o-bahn-server-tester = { version = "0.0.8-alpha", path = "rust/sbd-o-bahn-server-tester" } -sbd-server = { version = "0.0.8-alpha", path = "rust/sbd-server" } +sbd-bench = { version = "0.0.9-alpha2", path = "rust/sbd-bench" } +sbd-client = { version = "0.0.9-alpha2", path = "rust/sbd-client" } +sbd-e2e-crypto-client = { version = "0.0.9-alpha2", path = "rust/sbd-e2e-crypto-client" } +sbd-o-bahn-client-tester = { version = "0.0.9-alpha2", path = "rust/sbd-o-bahn-client-tester" } +sbd-o-bahn-server-tester = { version = "0.0.9-alpha2", path = "rust/sbd-o-bahn-server-tester" } +sbd-server = { version = "0.0.9-alpha2", path = "rust/sbd-server" } # crate deps anstyle = "1.0.6" base64 = "0.22.0" diff --git a/kill.bash b/kill.bash new file mode 100755 index 0000000..1cd6685 --- /dev/null +++ b/kill.bash @@ -0,0 +1,3 @@ +killall -s KILL workerd +killall -s KILL esbuild +killall -s KILL node diff --git a/rust/sbd-bench/Cargo.toml b/rust/sbd-bench/Cargo.toml index ac472d3..d3a487a 100644 --- a/rust/sbd-bench/Cargo.toml +++ b/rust/sbd-bench/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sbd-bench" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" edition = "2021" [dependencies] diff --git a/rust/sbd-client/Cargo.toml b/rust/sbd-client/Cargo.toml index aff1f7a..5ae30c0 100644 --- a/rust/sbd-client/Cargo.toml +++ b/rust/sbd-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sbd-client" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" description = "simple websocket-based message relay client" license = "MIT OR Apache-2.0" repository = "https://github.com/holochain/sbd" diff --git a/rust/sbd-e2e-crypto-client/Cargo.toml b/rust/sbd-e2e-crypto-client/Cargo.toml index 48568e0..cd66fb3 100644 --- a/rust/sbd-e2e-crypto-client/Cargo.toml +++ b/rust/sbd-e2e-crypto-client/Cargo.toml @@ -7,7 +7,7 @@ documentation = "https://docs.rs/sbd-e2e-crypto-client" authors = ["Holochain Core Dev Team "] keywords = ["holochain", "holo", "p2p", "networking"] categories = ["network-programming"] -version = "0.0.8-alpha" +version = "0.0.9-alpha2" edition = "2021" [dependencies] diff --git a/rust/sbd-e2e-crypto-client/src/lib.rs b/rust/sbd-e2e-crypto-client/src/lib.rs index 5cabfc8..faa8636 100644 --- a/rust/sbd-e2e-crypto-client/src/lib.rs +++ b/rust/sbd-e2e-crypto-client/src/lib.rs @@ -58,7 +58,8 @@ impl MsgRecv { pub async fn recv(&mut self) -> Option<(PubKey, bytes::Bytes)> { while let Some(msg) = self.recv.recv().await { let pk = msg.pub_key(); - match self.inner.lock().unwrap().dec(msg) { + let dec = self.inner.lock().unwrap().dec(msg); + match dec { DecRes::Ok(msg) => return Some((pk, msg)), DecRes::Ignore => (), DecRes::ReqNewStream => { @@ -130,7 +131,10 @@ impl SbdClientCrypto { /// Get the current active "connected" peers. pub fn active_peers(&self) -> Vec { - self.inner.lock().unwrap().c_map.keys().cloned().collect() + let mut inner = self.inner.lock().unwrap(); + let max_idle = inner.config.max_idle; + Inner::prune(&mut inner.c_map, max_idle); + inner.c_map.keys().cloned().collect() } /// Assert that we are connected to a peer without sending any data. @@ -223,6 +227,14 @@ impl Inner { self.c_map.remove(pk); } + fn prune( + c_map: &mut HashMap, + max_idle: std::time::Duration, + ) { + let now = std::time::Instant::now(); + c_map.retain(|_pk, r| now - r.last_active < max_idle); + } + fn loc_assert<'a>( config: &'a Config, c_map: &'a mut HashMap, @@ -231,8 +243,7 @@ impl Inner { ) -> Result<&'a mut InnerRec> { use std::collections::hash_map::Entry; let tot = c_map.len(); - let now = std::time::Instant::now(); - c_map.retain(|_pk, r| now - r.last_active < config.max_idle); + Self::prune(c_map, config.max_idle); match c_map.entry(pk.clone()) { Entry::Vacant(e) => { if do_create { @@ -241,7 +252,7 @@ impl Inner { } Ok(e.insert(InnerRec::new())) } else { - return Err(Error::other("ignore unsolicited")); + Err(Error::other("ignore unsolicited")) } } Entry::Occupied(e) => Ok(e.into_mut()), @@ -267,7 +278,7 @@ impl Inner { // assert we have an Encryptor, adding header to output as needed if rec.enc.is_none() { - let (enc, hdr) = crypto.new_enc(&**pk)?; + let (enc, hdr) = crypto.new_enc(pk)?; rec.enc = Some(enc); let msg = protocol::Protocol::new_stream(&**pk, &hdr); out.push(msg.base_msg().clone()); @@ -327,12 +338,12 @@ impl Inner { match rec.dec.as_mut() { Some(dec) => match dec.decrypt(message.as_ref()) { Ok(message) => DecRes::Ok(message), - Err(_) => return DecRes::ReqNewStream, + Err(_) => DecRes::ReqNewStream, }, None => { // we don't want to entertain peers that don't // properly send us a new stream first - return DecRes::Ignore; + DecRes::Ignore } } } diff --git a/rust/sbd-e2e-crypto-client/src/test.rs b/rust/sbd-e2e-crypto-client/src/test.rs index b4a6528..c8bf205 100644 --- a/rust/sbd-e2e-crypto-client/src/test.rs +++ b/rust/sbd-e2e-crypto-client/src/test.rs @@ -181,7 +181,7 @@ async fn max_msg_size() { #[tokio::test(flavor = "multi_thread")] async fn idle_close_connections() { - const DUR: std::time::Duration = std::time::Duration::from_millis(20); + const DUR: std::time::Duration = std::time::Duration::from_millis(500); let test = Test::new().await; @@ -190,16 +190,11 @@ async fn idle_close_connections() { test.conn(Cfg::d().idle(DUR)), ); - println!("bla1"); - c2.send(c1.pub_key(), b"wabonb").await.unwrap(); - println!("bla2"); let _ = r1.recv().await.unwrap(); - println!("bla3"); - println!("counts: {} {}", c1.active_peers().len(), c2.active_peers().len()); - tokio::time::sleep(DUR * 2).await; - println!("counts: {} {}", c1.active_peers().len(), c2.active_peers().len()); + assert_eq!(0, c1.active_peers().len()); + assert_eq!(0, c2.active_peers().len()); } diff --git a/rust/sbd-o-bahn-client-tester/Cargo.toml b/rust/sbd-o-bahn-client-tester/Cargo.toml index 0a05a19..4ffc7c6 100644 --- a/rust/sbd-o-bahn-client-tester/Cargo.toml +++ b/rust/sbd-o-bahn-client-tester/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sbd-o-bahn-client-tester" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" description = "simple websocket-based message relay client tester" license = "MIT OR Apache-2.0" repository = "https://github.com/holochain/sbd" diff --git a/rust/sbd-o-bahn-server-tester/Cargo.toml b/rust/sbd-o-bahn-server-tester/Cargo.toml index 5e4dc52..db12c2c 100644 --- a/rust/sbd-o-bahn-server-tester/Cargo.toml +++ b/rust/sbd-o-bahn-server-tester/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sbd-o-bahn-server-tester" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" description = "simple websocket-based message relay server tester" license = "MIT OR Apache-2.0" repository = "https://github.com/holochain/sbd" diff --git a/rust/sbd-server/Cargo.toml b/rust/sbd-server/Cargo.toml index 92e6777..1566ba4 100644 --- a/rust/sbd-server/Cargo.toml +++ b/rust/sbd-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sbd-server" -version = "0.0.8-alpha" +version = "0.0.9-alpha2" description = "simple websocket-based message relay server" license = "MIT OR Apache-2.0" repository = "https://github.com/holochain/sbd" From 939028770d3fd22cc3ff2d5dde38b6f472a974a7 Mon Sep 17 00:00:00 2001 From: neonphog Date: Fri, 10 Jan 2025 13:42:03 -0700 Subject: [PATCH 3/5] toolchain --- rust-toolchain.toml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 rust-toolchain.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..0127c34 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "1.82.0" +components = ["rustfmt", "clippy"] +profile = "minimal" From 584e48af022fb2fe8ee4ca4e800f82772db2f208 Mon Sep 17 00:00:00 2001 From: David Braden Date: Tue, 14 Jan 2025 09:36:51 -0700 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: ThetaSinner Co-authored-by: Jost <28270981+jost-s@users.noreply.github.com> --- rust/sbd-e2e-crypto-client/src/lib.rs | 4 ++-- rust/sbd-e2e-crypto-client/src/protocol.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/sbd-e2e-crypto-client/src/lib.rs b/rust/sbd-e2e-crypto-client/src/lib.rs index faa8636..c6e3e37 100644 --- a/rust/sbd-e2e-crypto-client/src/lib.rs +++ b/rust/sbd-e2e-crypto-client/src/lib.rs @@ -42,8 +42,8 @@ impl Default for Config { } } -// tokio mutex required to ensure ordering on new stream messages -// we can't send in parallel over the same sub-client anyways. +// tokio mutex required to ensure ordering on new stream messages. +// We can't send in parallel over the same sub-client anyways. type ClientSync = tokio::sync::Mutex; /// Handle to receive data from the crypto connection. diff --git a/rust/sbd-e2e-crypto-client/src/protocol.rs b/rust/sbd-e2e-crypto-client/src/protocol.rs index 2462a4c..e6021a0 100644 --- a/rust/sbd-e2e-crypto-client/src/protocol.rs +++ b/rust/sbd-e2e-crypto-client/src/protocol.rs @@ -72,7 +72,7 @@ pub enum Protocol { message: bytes::Bytes, }, - /// Request for a new descryption stream state. + /// Request for a new decryption stream state. RequestNewStream { #[allow(missing_docs)] full: bytes::Bytes, @@ -237,7 +237,7 @@ mod test { } #[test] - fn other() { + fn invalid_type() { let mut exp_other = bytes::BytesMut::new(); exp_other.extend_from_slice(PUB_KEY); exp_other.extend_from_slice(&[0x42]); From c87d2cf38fac1c956469ccf8557f51b790dcb231 Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 14 Jan 2025 09:40:24 -0700 Subject: [PATCH 5/5] review comment --- rust/sbd-e2e-crypto-client/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/sbd-e2e-crypto-client/src/lib.rs b/rust/sbd-e2e-crypto-client/src/lib.rs index c6e3e37..c194d84 100644 --- a/rust/sbd-e2e-crypto-client/src/lib.rs +++ b/rust/sbd-e2e-crypto-client/src/lib.rs @@ -155,9 +155,11 @@ impl SbdClientCrypto { pub async fn send(&self, pk: &PubKey, msg: &[u8]) -> Result<()> { const SBD_MAX: usize = 20_000; const SBD_HDR: usize = 32; - const SBD_P_HDR: usize = 1; + // This is the internal "secretstream" header for distinguishing + // stream starts and messages, etc. + const SBD_SS_HDR: usize = 1; const SS_ABYTES: usize = sodoken::secretstream::ABYTES; - const MAX_MSG: usize = SBD_MAX - SBD_HDR - SBD_P_HDR - SS_ABYTES; + const MAX_MSG: usize = SBD_MAX - SBD_HDR - SBD_SS_HDR - SS_ABYTES; if msg.len() > MAX_MSG { return Err(Error::other("message too long"));