diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 049d7ac78..0c9dd93fe 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,12 +12,12 @@ updates: schedule: interval: "weekly" # Maintain dependencies for yarn - - package-ecosystem: "yarn" + - package-ecosystem: "npm" directory: "/bindings_wasm" schedule: interval: "weekly" # Maintain dependencies for yarn - - package-ecosystem: "yarn" + - package-ecosystem: "npm" directory: "/bindings_node" schedule: interval: "weekly" diff --git a/Cargo.lock b/Cargo.lock index 68e3c710f..3957b86f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -450,12 +450,16 @@ dependencies = [ name = "bindings_wasm" version = "0.1.0" dependencies = [ + "console_error_panic_hook", "hex", "js-sys", "prost", "serde", "serde-wasm-bindgen", "tokio", + "tracing", + "tracing-subscriber", + "tracing-web", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", @@ -5993,6 +5997,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "tracing-web" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e6a141feebd51f8d91ebfd785af50fca223c570b86852166caa3b141defe7c" +dependencies = [ + "js-sys", + "tracing-core", + "tracing-subscriber", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "trait-variant" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 10e32e567..03b44e811 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ trait-variant = "0.1.2" url = "2.5.0" zeroize = "1.8" bincode = "1.3" +console_error_panic_hook = "0.1" # Internal Crate Dependencies xmtp_cryptography = { path = "xmtp_cryptography" } diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 25e879fe1..55a5e7a72 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -240,7 +240,7 @@ pub struct FfiXmtpClient { #[uniffi::export(async_runtime = "tokio")] impl FfiXmtpClient { pub fn inbox_id(&self) -> InboxId { - self.inner_client.inbox_id() + self.inner_client.inbox_id().to_string() } pub fn conversations(&self) -> Arc { @@ -1258,7 +1258,10 @@ impl FfiConversation { &self, inbox_ids: Vec, ) -> Result<(), GenericError> { - self.inner.remove_members_by_inbox_id(&inbox_ids).await?; + let ids = inbox_ids.iter().map(AsRef::as_ref).collect::>(); + self.inner + .remove_members_by_inbox_id(ids.as_slice()) + .await?; Ok(()) } diff --git a/bindings_node/Cargo.toml b/bindings_node/Cargo.toml index 8f60eafb2..e3beae204 100644 --- a/bindings_node/Cargo.toml +++ b/bindings_node/Cargo.toml @@ -16,7 +16,7 @@ napi = { version = "2.12.2", default-features = false, features = [ napi-derive = "2.12.2" prost.workspace = true tokio = { workspace = true, features = ["sync"] } -tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json", "chrono"] } tracing.workspace = true xmtp_api_grpc = { path = "../xmtp_api_grpc" } xmtp_cryptography = { path = "../xmtp_cryptography" } diff --git a/bindings_node/src/client.rs b/bindings_node/src/client.rs index 9093ff66d..296da6480 100644 --- a/bindings_node/src/client.rs +++ b/bindings_node/src/client.rs @@ -6,9 +6,10 @@ use napi::bindgen_prelude::{Error, Result, Uint8Array}; use napi_derive::napi; use std::collections::HashMap; use std::ops::Deref; -use std::sync::{Arc, Once}; +use std::str::FromStr; +use std::sync::Arc; use tokio::sync::Mutex; -use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; +use tracing_subscriber::{fmt, prelude::*}; pub use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient; use xmtp_cryptography::signature::ed25519_public_key_to_address; use xmtp_id::associations::builder::SignatureRequest; @@ -20,7 +21,7 @@ use xmtp_mls::Client as MlsClient; use xmtp_proto::xmtp::mls::message_contents::DeviceSyncKind; pub type RustXmtpClient = MlsClient; -static LOGGER_INIT: Once = Once::new(); +static LOGGER_INIT: std::sync::OnceLock> = std::sync::OnceLock::new(); #[napi] pub struct Client { @@ -39,6 +40,76 @@ impl Client { } } +#[napi(string_enum)] +#[derive(Debug)] +#[allow(non_camel_case_types)] +pub enum Level { + off, + error, + warn, + info, + debug, + trace, +} + +impl std::fmt::Display for Level { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Level::*; + let s = match self { + off => "off", + error => "error", + warn => "warn", + info => "info", + debug => "debug", + trace => "trace", + }; + write!(f, "{}", s) + } +} + +/// Specify options for the logger +#[napi(object)] +#[derive(Default)] +pub struct LogOptions { + /// enable structured JSON logging to stdout.Useful for third-party log viewers + /// an option so that it does not require being specified in js object. + pub structured: Option, + /// Filter logs by level + pub level: Option, +} + +fn init_logging(options: LogOptions) -> Result<()> { + LOGGER_INIT + .get_or_init(|| { + let filter = if let Some(f) = options.level { + tracing_subscriber::filter::LevelFilter::from_str(&f.to_string()) + } else { + Ok(tracing_subscriber::filter::LevelFilter::INFO) + } + .map_err(ErrorWrapper::from)?; + + if options.structured.unwrap_or_default() { + let fmt = tracing_subscriber::fmt::layer() + .json() + .flatten_event(true) + .with_level(true) + .with_timer(tracing_subscriber::fmt::time::ChronoLocal::rfc_3339()) + .with_target(true); + + tracing_subscriber::registry().with(filter).with(fmt).init(); + } else { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(filter) + .init(); + } + Ok(()) + }) + .clone() + .map_err(ErrorWrapper::from)?; + Ok(()) +} + /** * Create a client * @@ -56,20 +127,9 @@ pub async fn create_client( account_address: String, encryption_key: Option, history_sync_url: Option, - #[napi(ts_arg_type = "\"debug\" | \"info\" | \"warn\" | \"error\" | \"off\" | undefined | null")] - env_filter: Option, + log_options: Option, ) -> Result { - LOGGER_INIT.call_once(|| { - let filter = EnvFilter::builder() - .with_regex(false) - .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) - .parse_lossy(env_filter.unwrap_or_default()); - - tracing_subscriber::registry() - .with(fmt::layer()) - .with(filter) - .init(); - }); + init_logging(log_options.unwrap_or_default())?; let api_client = TonicApiClient::create(host.clone(), is_secure) .await .map_err(|_| Error::from_reason("Error creating Tonic API client"))?; @@ -127,7 +187,7 @@ pub async fn create_client( impl Client { #[napi] pub fn inbox_id(&self) -> String { - self.inner_client.inbox_id() + self.inner_client.inbox_id().to_string() } #[napi] diff --git a/bindings_node/src/conversation.rs b/bindings_node/src/conversation.rs index ca6ffa191..e8b1c31df 100644 --- a/bindings_node/src/conversation.rs +++ b/bindings_node/src/conversation.rs @@ -410,7 +410,13 @@ impl Conversation { ); group - .remove_members_by_inbox_id(&inbox_ids) + .remove_members_by_inbox_id( + inbox_ids + .iter() + .map(AsRef::as_ref) + .collect::>() + .as_slice(), + ) .await .map_err(ErrorWrapper::from)?; diff --git a/bindings_node/test/Client.test.ts b/bindings_node/test/Client.test.ts index f6531a46f..063b0f603 100644 --- a/bindings_node/test/Client.test.ts +++ b/bindings_node/test/Client.test.ts @@ -241,4 +241,5 @@ describe('Client', () => { user2.account.address.toLowerCase(), ]) }) + it('should create client with structured logging', async () => {}) }) diff --git a/bindings_node/test/helpers.ts b/bindings_node/test/helpers.ts index 6f0917230..1d3260925 100644 --- a/bindings_node/test/helpers.ts +++ b/bindings_node/test/helpers.ts @@ -44,7 +44,7 @@ export const createClient = async (user: User) => { user.account.address, undefined, undefined, - 'error' + { level: 'info' } ) } diff --git a/bindings_wasm/Cargo.toml b/bindings_wasm/Cargo.toml index b5269b5fb..65f79084b 100644 --- a/bindings_wasm/Cargo.toml +++ b/bindings_wasm/Cargo.toml @@ -20,6 +20,10 @@ xmtp_cryptography = { path = "../xmtp_cryptography" } xmtp_id = { path = "../xmtp_id" } xmtp_mls = { path = "../xmtp_mls", features = ["test-utils", "http-api"] } xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] } +tracing-web = "0.1" +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +console_error_panic_hook.workspace = true [dev-dependencies] wasm-bindgen-test.workspace = true diff --git a/bindings_wasm/src/client.rs b/bindings_wasm/src/client.rs index 9774f12ee..c5a40878f 100644 --- a/bindings_wasm/src/client.rs +++ b/bindings_wasm/src/client.rs @@ -1,7 +1,11 @@ use js_sys::Uint8Array; use std::collections::HashMap; +use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{filter, fmt::format::Pretty}; use wasm_bindgen::prelude::{wasm_bindgen, JsError}; use wasm_bindgen::JsValue; use xmtp_api_http::XmtpHttpApiClient; @@ -36,6 +40,73 @@ impl Client { } } +static LOGGER_INIT: std::sync::OnceLock> = + std::sync::OnceLock::new(); + +#[wasm_bindgen] +#[derive(Copy, Clone, Debug)] +pub enum Level { + Off = "off", + Error = "error", + Warn = "warn", + Info = "info", + Debug = "debug", + Trace = "trace", +} + +/// Specify options for the logger +#[derive(Default)] +#[wasm_bindgen(getter_with_clone)] +pub struct LogOptions { + /// enable structured JSON logging to stdout.Useful for third-party log viewers + pub structured: bool, + /// enable performance metrics for libxmtp in the `performance` tab + pub performance: bool, + /// filter for logs + pub level: Option, +} + +fn init_logging(options: LogOptions) -> Result<(), JsError> { + LOGGER_INIT + .get_or_init(|| { + console_error_panic_hook::set_once(); + let filter = if let Some(f) = options.level { + tracing_subscriber::filter::LevelFilter::from_str(f.to_str()) + } else { + Ok(tracing_subscriber::filter::LevelFilter::INFO) + }?; + + if options.structured { + let fmt = tracing_subscriber::fmt::layer() + .json() + .flatten_event(true) + .with_level(true) + .without_time() // need to test whether this would break browsers + .with_target(true); + + tracing_subscriber::registry().with(filter).with(fmt).init(); + } else { + let fmt = tracing_subscriber::fmt::layer() + .with_ansi(false) // not supported by all browsers + .without_time() // std::time break things, but chrono might work + .with_writer(tracing_web::MakeWebConsoleWriter::new()); + + let subscriber = tracing_subscriber::registry().with(fmt).with(filter); + + if options.performance { + subscriber + .with(tracing_web::performance_layer().with_details_from_fields(Pretty::default())) + .init(); + } else { + subscriber.init(); + } + } + Ok(()) + }) + .clone()?; + Ok(()) +} + #[wasm_bindgen(js_name = createClient)] pub async fn create_client( host: String, @@ -44,8 +115,10 @@ pub async fn create_client( db_path: String, encryption_key: Option, history_sync_url: Option, + log_options: Option, ) -> Result { - xmtp_mls::utils::wasm::init().await; + init_logging(log_options.unwrap_or_default())?; + xmtp_mls::storage::init_sqlite().await; let api_client = XmtpHttpApiClient::new(host.clone()).unwrap(); let storage_option = StorageOption::Persistent(db_path); @@ -105,7 +178,7 @@ impl Client { #[wasm_bindgen(getter, js_name = inboxId)] pub fn inbox_id(&self) -> String { - self.inner_client.inbox_id() + self.inner_client.inbox_id().to_string() } #[wasm_bindgen(getter, js_name = isRegistered)] diff --git a/bindings_wasm/src/conversation.rs b/bindings_wasm/src/conversation.rs index 8e37f13bb..7bfc8e514 100644 --- a/bindings_wasm/src/conversation.rs +++ b/bindings_wasm/src/conversation.rs @@ -353,8 +353,9 @@ impl Conversation { pub async fn remove_members_by_inbox_id(&self, inbox_ids: Vec) -> Result<(), JsError> { let group = self.to_mls_group(); + let ids = inbox_ids.iter().map(AsRef::as_ref).collect::>(); group - .remove_members_by_inbox_id(&inbox_ids) + .remove_members_by_inbox_id(ids.as_slice()) .await .map_err(|e| JsError::new(&format!("{e}")))?; diff --git a/bindings_wasm/tests/web.rs b/bindings_wasm/tests/web.rs index 8158dde73..14c633d27 100644 --- a/bindings_wasm/tests/web.rs +++ b/bindings_wasm/tests/web.rs @@ -1,8 +1,13 @@ -use bindings_wasm::{client::create_client, inbox_id::get_inbox_id_for_address}; +use bindings_wasm::client::Level; +use bindings_wasm::{ + client::{create_client, LogOptions}, + inbox_id::get_inbox_id_for_address, +}; use wasm_bindgen::prelude::*; use wasm_bindgen_test::*; use xmtp_api_http::constants::ApiUrls; use xmtp_cryptography::utils::{rng, LocalWallet}; +use xmtp_id::associations::generate_inbox_id; use xmtp_id::InboxOwner; // Only run these tests in a browser. @@ -19,9 +24,7 @@ pub async fn test_create_client() { let wallet = LocalWallet::new(&mut rng()); let account_address = wallet.get_address(); let host = ApiUrls::LOCAL_ADDRESS.to_string(); - let inbox_id = get_inbox_id_for_address(host.clone(), account_address.clone()) - .await - .unwrap_or_else(|_| panic!("Error getting inbox ID")); + let inbox_id = generate_inbox_id(&account_address, &1); let client = create_client( host.clone(), inbox_id.unwrap(), @@ -29,6 +32,11 @@ pub async fn test_create_client() { "test".to_string(), None, None, + Some(LogOptions { + structured: false, + performance: false, + level: Some(Level::Info), + }), ) .await; diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index 746c3890d..ac4e55dda 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -24,6 +24,7 @@ use tracing_subscriber::EnvFilter; use tracing_subscriber::{ fmt::{format, time}, layer::SubscriberExt, + prelude::*, Registry, }; use valuable::Valuable; @@ -181,17 +182,17 @@ impl InboxOwner for Wallet { async fn main() -> color_eyre::eyre::Result<()> { color_eyre::install()?; let cli = Cli::parse(); + let crate_name = env!("CARGO_PKG_NAME"); + let filter = EnvFilter::builder().parse(format!("{crate_name}=INFO,xmtp_mls=INFO"))?; if cli.json { - let fmt = tracing_subscriber::fmt::format() + let fmt = tracing_subscriber::fmt::layer() .json() .flatten_event(true) - .with_thread_ids(true) .with_level(true) .with_timer(time::ChronoLocal::new("%s".into())); - tracing_subscriber::fmt().event_format(fmt).init(); + + tracing_subscriber::registry().with(filter).with(fmt).init(); } else { - let crate_name = env!("CARGO_PKG_NAME"); - let filter = EnvFilter::builder().parse(format!("{crate_name}=INFO,xmtp_mls=INFO"))?; let layer = tracing_subscriber::fmt::layer() .without_time() .map_event_format(|_| pretty::PrettyTarget) @@ -326,8 +327,8 @@ async fn main() -> color_eyre::eyre::Result<()> { "messages", ); } else { - let messages = - format_messages(messages, client.inbox_id()).expect("failed to get messages"); + let messages = format_messages(messages, client.inbox_id().to_string()) + .expect("failed to get messages"); info!( "====== Group {} ======\n{}", hex::encode(group.group_id), diff --git a/xmtp_id/src/associations/builder.rs b/xmtp_id/src/associations/builder.rs index 8cc0dcfd2..cc1af30d9 100644 --- a/xmtp_id/src/associations/builder.rs +++ b/xmtp_id/src/associations/builder.rs @@ -50,9 +50,9 @@ pub struct SignatureRequestBuilder { impl SignatureRequestBuilder { /// Create a new IdentityUpdateBuilder for the given `inbox_id` - pub fn new(inbox_id: String) -> Self { + pub fn new>(inbox_id: S) -> Self { Self { - inbox_id, + inbox_id: inbox_id.as_ref().to_string(), client_timestamp_ns: now_ns() as u64, actions: vec![], } @@ -311,8 +311,8 @@ impl SignatureRequest { )) } - pub fn inbox_id(&self) -> String { - self.inbox_id.clone() + pub fn inbox_id(&self) -> crate::InboxIdRef<'_> { + &self.inbox_id } } diff --git a/xmtp_id/src/associations/mod.rs b/xmtp_id/src/associations/mod.rs index ad84ab820..7cd29ba17 100644 --- a/xmtp_id/src/associations/mod.rs +++ b/xmtp_id/src/associations/mod.rs @@ -136,7 +136,7 @@ pub(crate) mod tests { pub fn new_test_inbox_with_installation() -> AssociationState { let initial_state = new_test_inbox(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let initial_wallet_address: MemberIdentifier = initial_state.recovery_address().clone().into(); @@ -152,7 +152,7 @@ pub(crate) mod tests { apply_update( initial_state, - IdentityUpdate::new_test(vec![update], inbox_id.clone()), + IdentityUpdate::new_test(vec![update], inbox_id.to_string()), ) .unwrap() } @@ -175,7 +175,7 @@ pub(crate) mod tests { #[test] fn create_and_add_separately() { let initial_state = new_test_inbox(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let new_installation_identifier: MemberIdentifier = rand_vec().into(); let first_member: MemberIdentifier = initial_state.recovery_address().clone().into(); @@ -197,7 +197,7 @@ pub(crate) mod tests { let new_state = apply_update( initial_state, - IdentityUpdate::new_test(vec![update], inbox_id.clone()), + IdentityUpdate::new_test(vec![update], inbox_id.to_string()), ) .unwrap(); assert_eq!(new_state.members().len(), 2); @@ -285,7 +285,7 @@ pub(crate) mod tests { #[test] fn add_wallet_from_installation_key() { let initial_state = new_test_inbox_with_installation(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let installation_id = initial_state .members_by_kind(MemberKind::Installation) .first() @@ -312,7 +312,7 @@ pub(crate) mod tests { let new_state = apply_update( initial_state, - IdentityUpdate::new_test(vec![add_association], inbox_id.clone()), + IdentityUpdate::new_test(vec![add_association], inbox_id.to_string()), ) .expect("expected update to succeed"); assert_eq!(new_state.members().len(), 3); @@ -347,7 +347,7 @@ pub(crate) mod tests { #[test] fn reject_invalid_signature_on_update() { let initial_state = new_test_inbox(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); // Signature is from a random address let bad_signature = VerifiedSignature::new( rand_string().into(), @@ -363,7 +363,7 @@ pub(crate) mod tests { let update_result = apply_update( initial_state.clone(), - IdentityUpdate::new_test(vec![update_with_bad_existing_member], inbox_id.clone()), + IdentityUpdate::new_test(vec![update_with_bad_existing_member], inbox_id.to_string()), ); assert!(matches!( @@ -384,7 +384,7 @@ pub(crate) mod tests { let update_result_2 = apply_update( initial_state, - IdentityUpdate::new_test(vec![update_with_bad_new_member], inbox_id.clone()), + IdentityUpdate::new_test(vec![update_with_bad_new_member], inbox_id.to_string()), ); assert!(matches!( update_result_2, @@ -423,7 +423,7 @@ pub(crate) mod tests { #[test] fn reject_if_installation_adding_installation() { let existing_state = new_test_inbox_with_installation(); - let inbox_id = existing_state.inbox_id().clone(); + let inbox_id = existing_state.inbox_id().to_string(); let existing_installations = existing_state.members_by_kind(MemberKind::Installation); let existing_installation = existing_installations.first().unwrap(); let new_installation_id: MemberIdentifier = rand_vec().into(); @@ -446,7 +446,7 @@ pub(crate) mod tests { let update_result = apply_update( existing_state, - IdentityUpdate::new_test(vec![update], inbox_id.clone()), + IdentityUpdate::new_test(vec![update], inbox_id.to_string()), ); assert!(matches!( update_result, @@ -460,7 +460,7 @@ pub(crate) mod tests { #[test] fn revoke() { let initial_state = new_test_inbox_with_installation(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let installation_id = initial_state .members_by_kind(MemberKind::Installation) .first() @@ -479,7 +479,7 @@ pub(crate) mod tests { let new_state = apply_update( initial_state, - IdentityUpdate::new_test(vec![update], inbox_id.clone()), + IdentityUpdate::new_test(vec![update], inbox_id.to_string()), ) .expect("expected update to succeed"); assert!(new_state.get(&installation_id).is_none()); @@ -488,7 +488,7 @@ pub(crate) mod tests { #[test] fn revoke_children() { let initial_state = new_test_inbox_with_installation(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let wallet_address = initial_state .members_by_kind(MemberKind::Address) .first() @@ -508,7 +508,7 @@ pub(crate) mod tests { let new_state = apply_update( initial_state, - IdentityUpdate::new_test(vec![add_second_installation], inbox_id.clone()), + IdentityUpdate::new_test(vec![add_second_installation], inbox_id.to_string()), ) .expect("expected update to succeed"); assert_eq!(new_state.members().len(), 3); @@ -526,7 +526,7 @@ pub(crate) mod tests { // With this revocation the original wallet + both installations should be gone let new_state = apply_update( new_state, - IdentityUpdate::new_test(vec![revocation], inbox_id.clone()), + IdentityUpdate::new_test(vec![revocation], inbox_id.to_string()), ) .expect("expected update to succeed"); assert_eq!(new_state.members().len(), 0); @@ -542,7 +542,7 @@ pub(crate) mod tests { .unwrap() .identifier; - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let second_wallet_address: MemberIdentifier = rand_string().into(); let add_second_wallet = Action::AddAssociation(AddAssociation { @@ -575,7 +575,7 @@ pub(crate) mod tests { initial_state, IdentityUpdate::new_test( vec![add_second_wallet, revoke_second_wallet], - inbox_id.clone(), + inbox_id.to_string(), ), ) .expect("expected update to succeed"); @@ -599,7 +599,7 @@ pub(crate) mod tests { let state_after_re_add = apply_update( state_after_remove, - IdentityUpdate::new_test(vec![add_second_wallet_again], inbox_id.clone()), + IdentityUpdate::new_test(vec![add_second_wallet_again], inbox_id.to_string()), ) .expect("expected update to succeed"); assert_eq!(state_after_re_add.members().len(), 2); @@ -608,7 +608,7 @@ pub(crate) mod tests { #[test] fn change_recovery_address() { let initial_state = new_test_inbox_with_installation(); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id().to_string(); let initial_recovery_address: MemberIdentifier = initial_state.recovery_address().clone().into(); let new_recovery_address = rand_string(); @@ -624,7 +624,7 @@ pub(crate) mod tests { let new_state = apply_update( initial_state, - IdentityUpdate::new_test(vec![update_recovery], inbox_id.clone()), + IdentityUpdate::new_test(vec![update_recovery], inbox_id.to_string()), ) .expect("expected update to succeed"); assert_eq!(new_state.recovery_address(), &new_recovery_address); @@ -641,7 +641,7 @@ pub(crate) mod tests { let revoke_result = apply_update( new_state, - IdentityUpdate::new_test(vec![attempted_revoke], inbox_id.clone()), + IdentityUpdate::new_test(vec![attempted_revoke], inbox_id.to_string()), ); assert!(revoke_result.is_err()); assert!(matches!( @@ -672,7 +672,7 @@ pub(crate) mod tests { )]) .expect("initial state should be OK"); - let inbox_id = initial_state.inbox_id().clone(); + let inbox_id = initial_state.inbox_id(); let new_chain_id: u64 = 2; let new_member: MemberIdentifier = rand_vec().into(); @@ -710,7 +710,7 @@ pub(crate) mod tests { for action in actions { let apply_result = apply_update( initial_state.clone(), - IdentityUpdate::new_test(vec![action], inbox_id.clone()), + IdentityUpdate::new_test(vec![action], inbox_id.to_string()), ); assert!(matches!( diff --git a/xmtp_id/src/associations/state.rs b/xmtp_id/src/associations/state.rs index 97801dacd..a086945bb 100644 --- a/xmtp_id/src/associations/state.rs +++ b/xmtp_id/src/associations/state.rs @@ -8,6 +8,7 @@ use std::collections::{HashMap, HashSet}; use super::{ hashes::generate_inbox_id, member::Member, AssociationError, MemberIdentifier, MemberKind, }; +use crate::InboxIdRef; #[derive(Debug, Clone)] pub struct AssociationStateDiff { @@ -92,7 +93,7 @@ impl AssociationState { self.members.values().cloned().collect() } - pub fn inbox_id(&self) -> &String { + pub fn inbox_id(&self) -> InboxIdRef<'_> { &self.inbox_id } diff --git a/xmtp_id/src/lib.rs b/xmtp_id/src/lib.rs index 5134a4dea..409c320fa 100755 --- a/xmtp_id/src/lib.rs +++ b/xmtp_id/src/lib.rs @@ -31,7 +31,10 @@ pub enum IdentityError { Signing(#[from] xmtp_cryptography::SignerError), } -/// The global InboxID Type. +/// The global InboxID Reference Type. +pub type InboxIdRef<'a> = &'a str; + +/// Global InboxID Owned Type. pub type InboxId = String; pub type WalletAddress = String; diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index b04b4dc05..62c1c139f 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -70,7 +70,7 @@ xmtp_id = { path = "../xmtp_id" } xmtp_proto = { workspace = true, features = ["convert"] } # Optional/Features -console_error_panic_hook = { version = "0.1", optional = true } +console_error_panic_hook = { workspace = true, optional = true } toml = { version = "0.8.4", optional = true } tracing-wasm = { version = "0.2", optional = true } xmtp_api_http = { path = "../xmtp_api_http", optional = true } diff --git a/xmtp_mls/benches/group_limit.rs b/xmtp_mls/benches/group_limit.rs index 24acacee9..c2c5a6909 100755 --- a/xmtp_mls/benches/group_limit.rs +++ b/xmtp_mls/benches/group_limit.rs @@ -167,6 +167,7 @@ fn add_to_100_member_group_by_inbox_id(c: &mut Criterion) { benchmark_group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| { let ids = map.get(&size).unwrap(); let span = trace_span!(BENCH_ROOT_SPAN, size); + let id_slice = ids.iter().map(AsRef::as_ref).collect::>(); b.to_async(&runtime).iter_batched( || { bench_async_setup(|| async { @@ -181,13 +182,14 @@ fn add_to_100_member_group_by_inbox_id(c: &mut Criterion) { ) .await .unwrap(); + let id_slice = id_slice.clone(); - (group, span.clone(), ids.clone()) + (group, span.clone(), id_slice) }) }, - |(group, span, ids)| async move { + |(group, span, id_slice)| async move { group - .add_members_by_inbox_id(&ids) + .add_members_by_inbox_id(&id_slice) .instrument(span) .await .unwrap(); @@ -217,6 +219,7 @@ fn remove_all_members_from_group(c: &mut Criterion) { benchmark_group.throughput(Throughput::Elements(*size as u64)); benchmark_group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| { let ids = map.get(&size).unwrap(); + let id_slice = ids.iter().map(AsRef::as_ref).collect::>(); let span = trace_span!(BENCH_ROOT_SPAN, size); b.to_async(&runtime).iter_batched( || { @@ -225,7 +228,8 @@ fn remove_all_members_from_group(c: &mut Criterion) { .create_group(None, GroupMetadataOptions::default()) .unwrap(); group.add_members_by_inbox_id(ids).await.unwrap(); - (group, span.clone(), ids.clone()) + let ids = id_slice.clone(); + (group, span.clone(), ids) }) }, |(group, span, ids)| async move { @@ -268,12 +272,17 @@ fn remove_half_members_from_group(c: &mut Criterion) { .create_group(None, GroupMetadataOptions::default()) .unwrap(); group.add_members_by_inbox_id(ids).await.unwrap(); - (group, span.clone(), ids[0..(size / 2)].into()) + let ids = ids + .iter() + .map(AsRef::as_ref) + .take(size / 2) + .collect::>(); + (group, span.clone(), ids) }) }, |(group, span, ids)| async move { group - .remove_members_by_inbox_id(ids) + .remove_members_by_inbox_id(&ids) .instrument(span) .await .unwrap(); diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index f7c3672b1..d82d87b1b 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -196,7 +196,7 @@ where load_identity_updates( &api_client_wrapper, &store.conn()?, - vec![identity.clone().inbox_id], + vec![identity.inbox_id.as_str()].as_slice(), ) .await?; diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 2cf09f093..e51a24746 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -27,7 +27,7 @@ use xmtp_id::{ AssociationError, AssociationState, SignatureError, }, scw_verifier::{RemoteSignatureVerifier, SmartContractSignatureVerifier}, - InboxId, + InboxId, InboxIdRef, }; use xmtp_proto::xmtp::mls::api::v1::{ @@ -260,8 +260,8 @@ impl XmtpMlsLocalContext { } /// Get the account address of the blockchain account associated with this client - pub fn inbox_id(&self) -> InboxId { - self.identity.inbox_id().clone() + pub fn inbox_id(&self) -> InboxIdRef<'_> { + self.identity.inbox_id() } /// Get sequence id, may not be consistent with the backend @@ -338,7 +338,7 @@ where self.context.installation_public_key() } /// Retrieves the client's inbox ID - pub fn inbox_id(&self) -> String { + pub fn inbox_id(&self) -> InboxIdRef<'_> { self.context.inbox_id() } @@ -434,7 +434,7 @@ where let conn = self.store().conn()?; let inbox_id = self.inbox_id(); if refresh_from_network { - load_identity_updates(&self.api_client, &conn, vec![inbox_id.clone()]).await?; + load_identity_updates(&self.api_client, &conn, &[inbox_id]).await?; } let state = self.get_association_state(&conn, inbox_id, None).await?; Ok(state) @@ -451,7 +451,7 @@ where load_identity_updates( &self.api_client, &conn, - inbox_ids.iter().map(|s| String::from(s.as_ref())).collect(), + &inbox_ids.iter().map(|s| s.as_ref()).collect::>(), ) .await?; } @@ -884,8 +884,14 @@ where let active_group_count = Arc::clone(&active_group_count); async move { let mls_group = group.load_mls_group(provider_ref)?; - tracing::info!("[{}] syncing group", self.inbox_id()); tracing::info!( + inbox_id = self.inbox_id(), + "[{}] syncing group", + self.inbox_id() + ); + tracing::info!( + inbox_id = self.inbox_id(), + group_epoch = mls_group.epoch().as_u64(), "current epoch for [{}] in sync_all_groups() is Epoch: [{}]", self.inbox_id(), mls_group.epoch() @@ -1121,7 +1127,7 @@ pub(crate) mod tests { .find_inbox_id_from_address(wallet.get_address()) .await .unwrap(), - Some(client.inbox_id()) + Some(client.inbox_id().to_string()) ); } @@ -1315,7 +1321,7 @@ pub(crate) mod tests { ); alix.set_consent_states(&[record]).await.unwrap(); let inbox_consent = alix - .get_consent_state(ConsentType::InboxId, bo.inbox_id()) + .get_consent_state(ConsentType::InboxId, bo.inbox_id().to_string()) .await .unwrap(); let address_consent = alix diff --git a/xmtp_mls/src/groups/group_membership.rs b/xmtp_mls/src/groups/group_membership.rs index c8596524c..a3c4b6227 100644 --- a/xmtp_mls/src/groups/group_membership.rs +++ b/xmtp_mls/src/groups/group_membership.rs @@ -26,15 +26,15 @@ impl GroupMembership { self.members.get(inbox_id.as_ref()) } - pub fn inbox_ids(&self) -> Vec { - self.members.keys().cloned().collect() + pub fn inbox_ids(&self) -> Vec<&str> { + self.members.keys().map(AsRef::as_ref).collect() } // Convert the mapping to a vector of `inbox_id`/`sequence_id` tuples - pub fn to_filters(&self) -> Vec<(String, i64)> { + pub fn to_filters(&self) -> Vec<(&str, i64)> { self.members .iter() - .map(|(inbox_id, sequence_id)| (inbox_id.clone(), *sequence_id as i64)) + .map(|(inbox_id, sequence_id)| (inbox_id.as_str(), *sequence_id as i64)) .collect() } diff --git a/xmtp_mls/src/groups/node_sync.rs b/xmtp_mls/src/groups/mls_sync.rs similarity index 88% rename from xmtp_mls/src/groups/node_sync.rs rename to xmtp_mls/src/groups/mls_sync.rs index 5026a2217..bcf46d48f 100644 --- a/xmtp_mls/src/groups/node_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -58,7 +58,7 @@ use openmls_traits::{signatures::Signer, OpenMlsProvider}; use prost::bytes::Bytes; use prost::Message; use tracing::debug; -use xmtp_id::InboxId; +use xmtp_id::{InboxId, InboxIdRef}; use xmtp_proto::xmtp::mls::{ api::v1::{ group_message::{Version as GroupMessageVersion, V1 as GroupMessageV1}, @@ -88,12 +88,21 @@ impl MlsGroup where ScopedClient: ScopedGroupClient, { + #[tracing::instrument(skip_all)] pub async fn sync(&self) -> Result<(), GroupError> { let conn = self.context().store().conn()?; let mls_provider = XmtpOpenMlsProvider::from(conn); - - tracing::info!("[{}] syncing group", self.client.inbox_id()); tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = self.load_mls_group(&mls_provider)?.epoch().as_u64(), + "[{}] syncing group", + self.client.inbox_id() + ); + tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = self.load_mls_group(&mls_provider)?.epoch().as_u64(), "current epoch for [{}] in sync() is Epoch: [{}]", self.client.inbox_id(), self.load_mls_group(&mls_provider)?.epoch() @@ -103,7 +112,7 @@ where self.sync_with_conn(&mls_provider).await } - #[tracing::instrument(level = "trace", skip(self, provider))] + #[tracing::instrument(skip_all)] pub(crate) async fn sync_with_conn( &self, provider: &XmtpOpenMlsProvider, @@ -115,20 +124,28 @@ where // Even if publish fails, continue to receiving if let Err(publish_error) = self.publish_intents(provider).await { - tracing::error!("Sync: error publishing intents {:?}", publish_error); + tracing::error!( + error = %publish_error, + "Sync: error publishing intents {:?}", + publish_error + ); errors.push(publish_error); } // Even if receiving fails, continue to post_commit if let Err(receive_error) = self.receive(provider).await { - tracing::error!("receive error {:?}", receive_error); + tracing::error!(error = %receive_error, "receive error {:?}", receive_error); // We don't return an error if receive fails, because it's possible this is caused // by malicious data sent over the network, or messages from before the user was // added to the group } if let Err(post_commit_err) = self.post_commit(conn).await { - tracing::error!("post commit error {:?}", post_commit_err); + tracing::error!( + error = %post_commit_err, + "post commit error {:?}", + post_commit_err + ); errors.push(post_commit_err); } @@ -139,6 +156,7 @@ where Ok(()) } + #[tracing::instrument(skip_all)] pub(super) async fn sync_until_last_intent_resolved( &self, provider: &XmtpOpenMlsProvider, @@ -164,7 +182,7 @@ where * * This method will retry up to `crate::configuration::MAX_GROUP_SYNC_RETRIES` times. */ - #[tracing::instrument(level = "trace", skip(self, provider))] + #[tracing::instrument(skip_all)] pub(super) async fn sync_until_intent_resolved( &self, provider: &XmtpOpenMlsProvider, @@ -210,7 +228,7 @@ where } fn is_valid_epoch( - inbox_id: InboxId, + inbox_id: InboxIdRef<'_>, intent_id: i32, group_epoch: GroupEpoch, message_epoch: GroupEpoch, @@ -218,17 +236,25 @@ where ) -> bool { if message_epoch.as_u64() + max_past_epochs as u64 <= group_epoch.as_u64() { tracing::warn!( + inbox_id, + message_epoch = message_epoch.as_u64(), + group_epoch = group_epoch.as_u64(), + intent_id, "[{}] own message epoch {} is {} or more less than group epoch {} for intent {}. Retrying message", inbox_id, message_epoch, max_past_epochs, - group_epoch, + group_epoch.as_u64(), intent_id ); return false; } else if message_epoch.as_u64() > group_epoch.as_u64() { // Should not happen, logging proactively tracing::error!( + inbox_id, + message_epoch = message_epoch.as_u64(), + group_epoch = group_epoch.as_u64(), + intent_id, "[{}] own message epoch {} is greater than group epoch {} for intent {}. Retrying message", inbox_id, message_epoch, @@ -248,14 +274,26 @@ where openmls_group: &mut OpenMlsGroup, provider: &XmtpOpenMlsProvider, message: ProtocolMessage, - envelope_timestamp_ns: u64, + envelope: &GroupMessageV1, ) -> Result { + let GroupMessageV1 { + created_ns: envelope_timestamp_ns, + id: ref msg_id, + .. + } = *envelope; + if intent.state == IntentState::Committed { return Ok(IntentState::Committed); } let message_epoch = message.epoch(); let group_epoch = openmls_group.epoch(); debug!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_id, + intent.id, + intent.kind = %intent.kind, "[{}]-[{}] processing own message for intent {} / {:?}, group epoch: {}, message_epoch: {}", self.context().inbox_id(), hex::encode(self.group_id.clone()), @@ -278,6 +316,12 @@ where if published_in_epoch_u64 != group_epoch_u64 { tracing::warn!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_id, + intent.id, + intent.kind = %intent.kind, "Intent was published in epoch {} but group is currently in epoch {}", published_in_epoch_u64, group_epoch_u64 @@ -358,21 +402,47 @@ where openmls_group: &mut OpenMlsGroup, provider: &XmtpOpenMlsProvider, message: PrivateMessageIn, - envelope_timestamp_ns: u64, + envelope: &GroupMessageV1, ) -> Result<(), MessageProcessingError> { + let GroupMessageV1 { + created_ns: envelope_timestamp_ns, + id: ref msg_id, + .. + } = *envelope; + let decrypted_message = openmls_group.process_message(provider, message)?; let (sender_inbox_id, sender_installation_id) = extract_message_sender(openmls_group, &decrypted_message, envelope_timestamp_ns)?; let sent_from_this_installation = sender_installation_id == self.context().installation_public_key(); tracing::info!( + inbox_id = self.client.inbox_id(), + sender_inbox_id = sender_inbox_id, + sender_installation_id = hex::encode(&sender_installation_id), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_epoch = decrypted_message.epoch().as_u64(), + msg_group_id = hex::encode(decrypted_message.group_id().as_slice()), + msg_id, "[{}] extracted sender inbox id: {}", - self.context().inbox_id(), + self.client.inbox_id(), sender_inbox_id ); + + let (msg_epoch, msg_group_id) = ( + decrypted_message.epoch().as_u64(), + hex::encode(decrypted_message.group_id().as_slice()), + ); match decrypted_message.into_content() { ProcessedMessageContent::ApplicationMessage(application_message) => { tracing::info!( + inbox_id = self.client.inbox_id(), + sender_inbox_id = sender_inbox_id, + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_epoch, + msg_group_id, + msg_id, "[{}] decoding application message", self.context().inbox_id() ); @@ -481,6 +551,14 @@ where } ProcessedMessageContent::StagedCommitMessage(staged_commit) => { tracing::info!( + inbox_id = self.client.inbox_id(), + sender_inbox_id = sender_inbox_id, + sender_installation_id = hex::encode(&sender_installation_id), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_epoch, + msg_group_id, + msg_id, "[{}] received staged commit. Merging and clearing any pending commits", self.context().inbox_id() ); @@ -496,6 +574,14 @@ where ) .await?; tracing::info!( + inbox_id = self.client.inbox_id(), + sender_inbox_id = sender_inbox_id, + sender_installation_id = hex::encode(&sender_installation_id), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_epoch, + msg_group_id, + msg_id, "[{}] staged commit is valid, will attempt to merge", self.context().inbox_id() ); @@ -535,6 +621,10 @@ where .conn_ref() .find_group_intent_by_payload_hash(sha256(envelope.data.as_slice())); tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_id = envelope.id, "Processing envelope with hash {:?}", hex::encode(sha256(envelope.data.as_slice())) ); @@ -544,19 +634,19 @@ where Ok(Some(intent)) => { let intent_id = intent.id; tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_id = envelope.id, + intent_id, + intent.kind = %intent.kind, "client [{}] is about to process own envelope [{}] for intent [{}]", self.client.inbox_id(), envelope.id, intent_id ); match self - .process_own_message( - intent, - openmls_group, - provider, - message.into(), - envelope.created_ns, - ) + .process_own_message(intent, openmls_group, provider, message.into(), envelope) .await? { IntentState::ToPublish => { @@ -578,11 +668,15 @@ where // No matching intent found Ok(None) => { tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_id = envelope.id, "client [{}] is about to process external envelope [{}]", self.client.inbox_id(), envelope.id ); - self.process_external_message(openmls_group, provider, message, envelope.created_ns) + self.process_external_message(openmls_group, provider, message, envelope) .await } Err(err) => Err(MessageProcessingError::Storage(err)), @@ -638,6 +732,7 @@ where // otherwise you can get into a forked group state. if is_retryable { tracing::error!( + error = %error_message, "Aborting message processing for retryable error: {}", error_message ); @@ -654,7 +749,7 @@ where } } - #[tracing::instrument(level = "trace", skip_all)] + #[tracing::instrument(skip_all)] pub(super) async fn receive(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> { let messages = self .client @@ -735,9 +830,14 @@ where match result { Err(err) => { - tracing::error!("error getting publish intent data {:?}", err); + tracing::error!(error = %err, "error getting publish intent data {:?}", err); if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS { - tracing::error!("intent {} has reached max publish attempts", intent.id); + tracing::error!( + intent.id, + intent.kind = %intent.kind, + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + "intent {} has reached max publish attempts", intent.id); // TODO: Eventually clean up errored attempts provider .conn_ref() @@ -765,6 +865,10 @@ where openmls_group.epoch().as_u64() as i64, )?; tracing::debug!( + intent.id, + intent.kind = %intent.kind, + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), "client [{}] set stored intent [{}] to state `published`", self.client.inbox_id(), intent.id @@ -776,6 +880,10 @@ where .await?; tracing::info!( + intent.id, + intent.kind = %intent.kind, + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), "[{}] published intent [{}] of type [{}]", self.client.inbox_id(), intent.id, @@ -914,7 +1022,7 @@ where } } - #[tracing::instrument(level = "trace", skip_all)] + #[tracing::instrument(skip_all)] pub(crate) async fn post_commit(&self, conn: &DbConnection) -> Result<(), GroupError> { let intents = conn.find_group_intents( self.group_id.clone(), @@ -924,6 +1032,8 @@ where for intent in intents { if let Some(post_commit_data) = intent.post_commit_data { + tracing::debug!(intent.id, intent.kind = %intent.kind, "taking post commit action"); + let post_commit_action = PostCommitAction::from_bytes(post_commit_data.as_slice())?; match post_commit_action { PostCommitAction::SendWelcomes(action) => { @@ -1006,8 +1116,8 @@ where pub(super) async fn get_membership_update_intent( &self, provider: &XmtpOpenMlsProvider, - inbox_ids_to_add: &[InboxId], - inbox_ids_to_remove: &[InboxId], + inbox_ids_to_add: &[InboxIdRef<'_>], + inbox_ids_to_remove: &[InboxIdRef<'_>], ) -> Result { let mls_group = self.load_mls_group(provider)?; let existing_group_membership = extract_group_membership(mls_group.extensions())?; @@ -1017,9 +1127,9 @@ where inbox_ids.extend_from_slice(inbox_ids_to_add); let conn = provider.conn_ref(); // Load any missing updates from the network - load_identity_updates(self.client.api(), conn, inbox_ids.clone()).await?; + load_identity_updates(self.client.api(), conn, &inbox_ids).await?; - let latest_sequence_id_map = conn.get_latest_sequence_id(&inbox_ids)?; + let latest_sequence_id_map = conn.get_latest_sequence_id(&inbox_ids as &[&str])?; // Get a list of all inbox IDs that have increased sequence_id for the group let changed_inbox_ids = @@ -1027,7 +1137,7 @@ where .iter() .try_fold(HashMap::new(), |mut updates, inbox_id| { match ( - latest_sequence_id_map.get(inbox_id), + latest_sequence_id_map.get(inbox_id as &str), existing_group_membership.get(inbox_id), ) { // This is an update. We have a new sequence ID and an existing one @@ -1056,7 +1166,10 @@ where Ok(UpdateGroupMembershipIntentData::new( changed_inbox_ids, - inbox_ids_to_remove.to_vec(), + inbox_ids_to_remove + .iter() + .map(|s| s.to_string()) + .collect::>(), )) } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 664ba318d..272617bb9 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -7,7 +7,7 @@ pub mod intents; pub mod members; pub mod scoped_client; -pub(super) mod node_sync; +pub(super) mod mls_sync; pub(super) mod subscriptions; pub mod validated_commit; @@ -58,7 +58,7 @@ use self::{ }; use std::{collections::HashSet, sync::Arc}; use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError}; -use xmtp_id::InboxId; +use xmtp_id::{InboxId, InboxIdRef}; use xmtp_proto::xmtp::mls::{ api::v1::{ group_message::{Version as GroupMessageVersion, V1 as GroupMessageV1}, @@ -313,9 +313,9 @@ impl MlsGroup { let provider = XmtpOpenMlsProvider::new(conn); let creator_inbox_id = context.inbox_id(); let protected_metadata = - build_protected_metadata_extension(creator_inbox_id.clone(), Purpose::Conversation)?; + build_protected_metadata_extension(creator_inbox_id, Purpose::Conversation)?; let mutable_metadata = build_mutable_metadata_extension_default(creator_inbox_id, opts)?; - let group_membership = build_starting_group_membership_extension(context.inbox_id(), 0); + let group_membership = build_starting_group_membership_extension(creator_inbox_id, 0); let mutable_permissions = build_mutable_permissions_extension(permissions_policy_set)?; let group_config = build_group_config( protected_metadata, @@ -339,7 +339,7 @@ impl MlsGroup { group_id.clone(), now_ns(), membership_state, - context.inbox_id(), + context.inbox_id().to_string(), None, ); @@ -390,7 +390,7 @@ impl MlsGroup { group_id.clone(), now_ns(), membership_state, - context.inbox_id(), + context.inbox_id().to_string(), Some(dm_target_inbox_id), ); @@ -513,13 +513,13 @@ impl MlsGroup { client: Arc, ) -> Result, GroupError> { let context = client.context(); - let creator_inbox_id = context.inbox_id() as String; + let creator_inbox_id = context.inbox_id(); let provider = client.mls_provider()?; let protected_metadata = - build_protected_metadata_extension(creator_inbox_id.clone(), Purpose::Sync)?; + build_protected_metadata_extension(creator_inbox_id, Purpose::Sync)?; let mutable_metadata = build_mutable_metadata_extension_default( - creator_inbox_id.clone(), + creator_inbox_id, GroupMetadataOptions::default(), )?; let group_membership = build_starting_group_membership_extension(creator_inbox_id, 0); @@ -643,7 +643,7 @@ impl MlsGroup { sent_at_ns: now, kind: GroupMessageKind::Application, sender_installation_id: self.context().installation_public_key(), - sender_inbox_id: self.context().inbox_id(), + sender_inbox_id: self.context().inbox_id().to_string(), delivery_status: DeliveryStatus::Unpublished, }; group_message.store(conn)?; @@ -705,10 +705,14 @@ impl MlsGroup { } #[tracing::instrument(level = "trace", skip_all)] - pub async fn add_members_by_inbox_id(&self, inbox_ids: &[String]) -> Result<(), GroupError> { + pub async fn add_members_by_inbox_id>( + &self, + inbox_ids: &[S], + ) -> Result<(), GroupError> { let provider = self.client.mls_provider()?; + let ids = inbox_ids.iter().map(AsRef::as_ref).collect::>(); let intent_data = self - .get_membership_update_intent(&provider, inbox_ids, &[]) + .get_membership_update_intent(&provider, ids.as_slice(), &[]) .await?; // TODO:nm this isn't the best test for whether the request is valid @@ -743,8 +747,11 @@ impl MlsGroup { let account_addresses = sanitize_evm_addresses(account_addresses_to_remove)?; let inbox_id_map = self.client.api().get_inbox_ids(account_addresses).await?; - self.remove_members_by_inbox_id(&inbox_id_map.into_values().collect::>()) - .await + let ids = inbox_id_map + .values() + .map(AsRef::as_ref) + .collect::>(); + self.remove_members_by_inbox_id(ids.as_slice()).await } /// Removes members from the group by their inbox IDs. @@ -757,7 +764,7 @@ impl MlsGroup { /// A `Result` indicating success or failure of the operation. pub async fn remove_members_by_inbox_id( &self, - inbox_ids: &[InboxId], + inbox_ids: &[InboxIdRef<'_>], ) -> Result<(), GroupError> { let provider = self.client.store().conn()?.into(); @@ -1120,7 +1127,7 @@ impl MlsGroup { group_id.clone(), now_ns(), GroupMembershipState::Allowed, // Use Allowed as default for tests - context.inbox_id(), + context.inbox_id().to_string(), Some(dm_target_inbox_id), ); @@ -1148,7 +1155,7 @@ pub fn extract_group_id(message: &GroupMessage) -> Result, MessageProces } fn build_protected_metadata_extension( - creator_inbox_id: String, + creator_inbox_id: &str, group_purpose: Purpose, ) -> Result { let group_type = match group_purpose { @@ -1156,22 +1163,26 @@ fn build_protected_metadata_extension( Purpose::Sync => ConversationType::Sync, }; - let metadata = GroupMetadata::new(group_type, creator_inbox_id, None); + let metadata = GroupMetadata::new(group_type, creator_inbox_id.to_string(), None); let protected_metadata = Metadata::new(metadata.try_into()?); Ok(Extension::ImmutableMetadata(protected_metadata)) } fn build_dm_protected_metadata_extension( - creator_inbox_id: String, + creator_inbox_id: &str, dm_inbox_id: InboxId, ) -> Result { let dm_members = Some(DmMembers { - member_one_inbox_id: creator_inbox_id.clone(), + member_one_inbox_id: creator_inbox_id.to_string(), member_two_inbox_id: dm_inbox_id, }); - let metadata = GroupMetadata::new(ConversationType::Dm, creator_inbox_id, dm_members); + let metadata = GroupMetadata::new( + ConversationType::Dm, + creator_inbox_id.to_string(), + dm_members, + ); let protected_metadata = Metadata::new(metadata.try_into()?); Ok(Extension::ImmutableMetadata(protected_metadata)) @@ -1188,11 +1199,11 @@ fn build_mutable_permissions_extension(policies: PolicySet) -> Result Result { let mutable_metadata: Vec = - GroupMutableMetadata::new_default(creator_inbox_id, opts).try_into()?; + GroupMutableMetadata::new_default(creator_inbox_id.to_string(), opts).try_into()?; let unknown_gc_extension = UnknownExtension(mutable_metadata); Ok(Extension::Unknown( @@ -1202,11 +1213,12 @@ pub fn build_mutable_metadata_extension_default( } pub fn build_dm_mutable_metadata_extension_default( - creator_inbox_id: String, + creator_inbox_id: &str, dm_target_inbox_id: &str, ) -> Result { let mutable_metadata: Vec = - GroupMutableMetadata::new_dm_default(creator_inbox_id, dm_target_inbox_id).try_into()?; + GroupMutableMetadata::new_dm_default(creator_inbox_id.to_string(), dm_target_inbox_id) + .try_into()?; let unknown_gc_extension = UnknownExtension(mutable_metadata); Ok(Extension::Unknown( @@ -1339,9 +1351,9 @@ pub fn build_extensions_for_admin_lists_update( Ok(extensions) } -pub fn build_starting_group_membership_extension(inbox_id: String, sequence_id: u64) -> Extension { +pub fn build_starting_group_membership_extension(inbox_id: &str, sequence_id: u64) -> Extension { let mut group_membership = GroupMembership::new(); - group_membership.add(inbox_id, sequence_id); + group_membership.add(inbox_id.to_string(), sequence_id); build_group_membership_extension(&group_membership) } @@ -1412,9 +1424,10 @@ async fn validate_initial_group_membership( ) -> Result<(), GroupError> { tracing::info!("Validating initial group membership"); let membership = extract_group_membership(mls_group.extensions())?; - let needs_update = conn.filter_inbox_ids_needing_updates(membership.to_filters())?; + let needs_update = conn.filter_inbox_ids_needing_updates(membership.to_filters().as_slice())?; if !needs_update.is_empty() { - load_identity_updates(client.api(), conn, needs_update).await?; + let ids = needs_update.iter().map(AsRef::as_ref).collect::>(); + load_identity_updates(client.api(), conn, ids.as_slice()).await?; } let mut expected_installation_ids = HashSet::>::new(); @@ -1462,7 +1475,7 @@ fn validate_dm_group( // Check if DmMembers are set and validate their contents if let Some(dm_members) = metadata.dm_members { - let our_inbox_id = client.context().identity.inbox_id().clone(); + let our_inbox_id = client.inbox_id(); if !((dm_members.member_one_inbox_id == added_by_inbox && dm_members.member_two_inbox_id == our_inbox_id) || (dm_members.member_one_inbox_id == our_inbox_id @@ -2621,7 +2634,7 @@ pub(crate) mod tests { drop(provider); // allow connection to be cleaned assert_eq!(admin_list.len(), 0); assert_eq!(super_admin_list.len(), 1); - assert!(super_admin_list.contains(&amal.inbox_id())); + assert!(super_admin_list.contains(&amal.inbox_id().to_string())); // Verify that bola can not add caro because they are not an admin bola.sync_welcomes(&bola.store().conn().unwrap()) @@ -2638,7 +2651,7 @@ pub(crate) mod tests { // Add bola as an admin amal_group - .update_admin_list(UpdateAdminListType::Add, bola.inbox_id()) + .update_admin_list(UpdateAdminListType::Add, bola.inbox_id().to_string()) .await .unwrap(); amal_group.sync().await.unwrap(); @@ -2653,7 +2666,7 @@ pub(crate) mod tests { assert!(bola_group .admin_list(bola_group.mls_provider().unwrap()) .unwrap() - .contains(&bola.inbox_id())); + .contains(&bola.inbox_id().to_string())); // Verify that bola can now add caro because they are an admin bola_group @@ -2666,13 +2679,16 @@ pub(crate) mod tests { // Verify that bola can not remove amal as a super admin, because // Remove admin is super admin only permissions bola_group - .update_admin_list(UpdateAdminListType::RemoveSuper, amal.inbox_id()) + .update_admin_list( + UpdateAdminListType::RemoveSuper, + amal.inbox_id().to_string(), + ) .await .expect_err("expected err"); // Now amal removes bola as an admin amal_group - .update_admin_list(UpdateAdminListType::Remove, bola.inbox_id()) + .update_admin_list(UpdateAdminListType::Remove, bola.inbox_id().to_string()) .await .unwrap(); amal_group.sync().await.unwrap(); @@ -2687,7 +2703,7 @@ pub(crate) mod tests { assert!(!bola_group .admin_list(bola_group.mls_provider().unwrap()) .unwrap() - .contains(&bola.inbox_id())); + .contains(&bola.inbox_id().to_string())); // Verify that bola can not add charlie because they are not an admin bola.sync_welcomes(&bola.store().conn().unwrap()) @@ -2736,7 +2752,7 @@ pub(crate) mod tests { drop(provider); // allow connection to be re-added to pool assert_eq!(admin_list.len(), 0); assert_eq!(super_admin_list.len(), 1); - assert!(super_admin_list.contains(&amal.inbox_id())); + assert!(super_admin_list.contains(&amal.inbox_id().to_string())); // Verify that bola can not add caro as an admin because they are not a super admin bola.sync_welcomes(&bola.store().conn().unwrap()) @@ -2748,13 +2764,13 @@ pub(crate) mod tests { let bola_group: &MlsGroup<_> = bola_groups.first().unwrap(); bola_group.sync().await.unwrap(); bola_group - .update_admin_list(UpdateAdminListType::Add, caro.inbox_id()) + .update_admin_list(UpdateAdminListType::Add, caro.inbox_id().to_string()) .await .expect_err("expected err"); // Add bola as a super admin amal_group - .update_admin_list(UpdateAdminListType::AddSuper, bola.inbox_id()) + .update_admin_list(UpdateAdminListType::AddSuper, bola.inbox_id().to_string()) .await .unwrap(); amal_group.sync().await.unwrap(); @@ -2764,12 +2780,12 @@ pub(crate) mod tests { assert!(bola_group .super_admin_list(&provider) .unwrap() - .contains(&bola.inbox_id())); + .contains(&bola.inbox_id().to_string())); drop(provider); // allow connection to be re-added to pool // Verify that bola can now add caro as an admin bola_group - .update_admin_list(UpdateAdminListType::Add, caro.inbox_id()) + .update_admin_list(UpdateAdminListType::Add, caro.inbox_id().to_string()) .await .unwrap(); bola_group.sync().await.unwrap(); @@ -2778,18 +2794,21 @@ pub(crate) mod tests { assert!(bola_group .admin_list(&provider) .unwrap() - .contains(&caro.inbox_id())); + .contains(&caro.inbox_id().to_string())); drop(provider); // allow connection to be re-added to pool // Verify that no one can remove a super admin from a group amal_group - .remove_members(&[bola.inbox_id()]) + .remove_members(&[bola.inbox_id().to_string()]) .await .expect_err("expected err"); // Verify that bola can now remove themself as a super admin bola_group - .update_admin_list(UpdateAdminListType::RemoveSuper, bola.inbox_id()) + .update_admin_list( + UpdateAdminListType::RemoveSuper, + bola.inbox_id().to_string(), + ) .await .unwrap(); bola_group.sync().await.unwrap(); @@ -2798,12 +2817,15 @@ pub(crate) mod tests { assert!(!bola_group .super_admin_list(&provider) .unwrap() - .contains(&bola.inbox_id())); + .contains(&bola.inbox_id().to_string())); drop(provider); // allow connection to be re-added to pool // Verify that amal can NOT remove themself as a super admin because they are the only remaining amal_group - .update_admin_list(UpdateAdminListType::RemoveSuper, amal.inbox_id()) + .update_admin_list( + UpdateAdminListType::RemoveSuper, + amal.inbox_id().to_string(), + ) .await .expect_err("expected err"); } @@ -2851,7 +2873,7 @@ pub(crate) mod tests { // Add Bola as an admin amal_group - .update_admin_list(UpdateAdminListType::Add, bola.inbox_id()) + .update_admin_list(UpdateAdminListType::Add, bola.inbox_id().to_string()) .await .unwrap(); amal_group.sync().await.unwrap(); @@ -2879,7 +2901,7 @@ pub(crate) mod tests { // Add Caro as a super admin amal_group - .update_admin_list(UpdateAdminListType::AddSuper, caro.inbox_id()) + .update_admin_list(UpdateAdminListType::AddSuper, caro.inbox_id().to_string()) .await .unwrap(); amal_group.sync().await.unwrap(); @@ -3014,7 +3036,7 @@ pub(crate) mod tests { // Step 4: Bola attempts an action that they do not have permissions for like add admin, fails as expected let result = bola_group - .update_admin_list(UpdateAdminListType::Add, bola.inbox_id()) + .update_admin_list(UpdateAdminListType::Add, bola.inbox_id().to_string()) .await; if let Err(e) = &result { eprintln!("Error updating admin list: {:?}", e); @@ -3208,7 +3230,10 @@ pub(crate) mod tests { let caro = ClientBuilder::new_test_client(&generate_local_wallet()).await; // Amal creates a dm group targetting bola - let amal_dm = amal.create_dm_by_inbox_id(bola.inbox_id()).await.unwrap(); + let amal_dm = amal + .create_dm_by_inbox_id(bola.inbox_id().to_string()) + .await + .unwrap(); // Amal can not add caro to the dm group let result = amal_dm.add_members_by_inbox_id(&[caro.inbox_id()]).await; @@ -3248,16 +3273,16 @@ pub(crate) mod tests { amal_dm.sync().await.unwrap(); bola_dm.sync().await.unwrap(); let is_amal_admin = amal_dm - .is_admin(amal.inbox_id(), amal.mls_provider().unwrap()) + .is_admin(amal.inbox_id().to_string(), amal.mls_provider().unwrap()) .unwrap(); let is_bola_admin = amal_dm - .is_admin(bola.inbox_id(), bola.mls_provider().unwrap()) + .is_admin(bola.inbox_id().to_string(), bola.mls_provider().unwrap()) .unwrap(); let is_amal_super_admin = amal_dm - .is_super_admin(amal.inbox_id(), amal.mls_provider().unwrap()) + .is_super_admin(amal.inbox_id().to_string(), amal.mls_provider().unwrap()) .unwrap(); let is_bola_super_admin = amal_dm - .is_super_admin(bola.inbox_id(), bola.mls_provider().unwrap()) + .is_super_admin(bola.inbox_id().to_string(), bola.mls_provider().unwrap()) .unwrap(); assert!(!is_amal_admin); assert!(!is_bola_admin); @@ -3672,7 +3697,7 @@ pub(crate) mod tests { async fn test_validate_dm_group() { let client = ClientBuilder::new_test_client(&generate_local_wallet()).await; let added_by_inbox = "added_by_inbox_id"; - let creator_inbox_id = client.context.identity.inbox_id().clone(); + let creator_inbox_id = client.context.identity.inbox_id(); let dm_target_inbox_id = added_by_inbox.to_string(); // Test case 1: Valid DM group @@ -3696,8 +3721,7 @@ pub(crate) mod tests { // Test case 2: Invalid conversation type let invalid_protected_metadata = - build_protected_metadata_extension(creator_inbox_id.clone(), Purpose::Conversation) - .unwrap(); + build_protected_metadata_extension(creator_inbox_id, Purpose::Conversation).unwrap(); let invalid_type_group = MlsGroup::::create_test_dm_group( client.clone().into(), dm_target_inbox_id.clone(), @@ -3716,11 +3740,9 @@ pub(crate) mod tests { // This case is not easily testable with the current structure, as DmMembers are set in the protected metadata // Test case 4: Mismatched DM members - let mismatched_dm_members = build_dm_protected_metadata_extension( - creator_inbox_id.clone(), - "wrong_inbox_id".to_string(), - ) - .unwrap(); + let mismatched_dm_members = + build_dm_protected_metadata_extension(creator_inbox_id, "wrong_inbox_id".to_string()) + .unwrap(); let mismatched_dm_members_group = MlsGroup::::create_test_dm_group( client.clone().into(), dm_target_inbox_id.clone(), @@ -3737,7 +3759,7 @@ pub(crate) mod tests { // Test case 5: Non-empty admin list let non_empty_admin_list = build_mutable_metadata_extension_default( - creator_inbox_id.clone(), + creator_inbox_id, GroupMetadataOptions::default(), ) .unwrap(); diff --git a/xmtp_mls/src/groups/scoped_client.rs b/xmtp_mls/src/groups/scoped_client.rs index 0808188ae..79e1bbb7c 100644 --- a/xmtp_mls/src/groups/scoped_client.rs +++ b/xmtp_mls/src/groups/scoped_client.rs @@ -1,6 +1,9 @@ use std::sync::Arc; + use tokio::sync::broadcast; -use xmtp_id::{associations::AssociationState, scw_verifier::SmartContractSignatureVerifier}; +use xmtp_id::{ + associations::AssociationState, scw_verifier::SmartContractSignatureVerifier, InboxIdRef, +}; use xmtp_proto::{api_client::trait_impls::XmtpApi, xmtp::mls::api::v1::GroupMessage}; use crate::{ @@ -31,8 +34,8 @@ pub trait LocalScopedGroupClient: Send + Sync + Sized { fn local_events(&self) -> &broadcast::Sender>; - fn inbox_id(&self) -> String { - self.context().inbox_id() + fn inbox_id(&self) -> InboxIdRef<'_> { + self.context_ref().inbox_id() } fn mls_provider(&self) -> Result { @@ -93,8 +96,8 @@ pub trait ScopedGroupClient: Sized { fn local_events(&self) -> &broadcast::Sender>; - fn inbox_id(&self) -> String { - self.context().inbox_id() + fn inbox_id(&self) -> InboxIdRef<'_> { + self.context_ref().inbox_id() } fn mls_provider(&self) -> Result { @@ -248,7 +251,7 @@ where (**self).intents() } - fn inbox_id(&self) -> String { + fn inbox_id(&self) -> InboxIdRef<'_> { (**self).inbox_id() } @@ -338,7 +341,7 @@ where (**self).intents() } - fn inbox_id(&self) -> String { + fn inbox_id(&self) -> InboxIdRef<'_> { (**self).inbox_id() } diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index 5e1a6adc3..724fe6df2 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -28,8 +28,11 @@ impl MlsGroup { let msg_id = msgv1.id; let client_id = self.client.inbox_id(); tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = msgv1.id, "client [{}] is about to process streamed envelope: [{}]", - &client_id.clone(), + &client_id, &msg_id ); let created_ns = msgv1.created_ns; @@ -48,29 +51,57 @@ impl MlsGroup { // Attempt processing immediately, but fail if the message is not an Application Message // Returning an error should roll back the DB tx tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + current_epoch = openmls_group.epoch().as_u64(), + msg_id = msgv1.id, "current epoch for [{}] in process_stream_entry() is Epoch: [{}]", client_id, openmls_group.epoch() ); self.process_message(&mut openmls_group, &provider, msgv1, false) - .await?; - Ok::<_, SubscribeError>(()) + .await + // NOTE: We want to make sure we retry an error in process_message + .map_err(SubscribeError::Receive) }) .await }) ); if let Err(SubscribeError::Receive(_)) = process_result { + tracing::debug!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = msgv1.id, + "attempting recovery sync" + ); // Swallow errors here, since another process may have successfully saved the message // to the DB if let Err(err) = self.sync_with_conn(&self.client.mls_provider()?).await { - tracing::warn!("Sync triggered by streamed message failed: {}", err); + tracing::warn!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = msgv1.id, + err = %err, + "recovery sync triggered by streamed message failed: {}", err + ); } else { - tracing::debug!("Sync triggered by streamed message successful") + tracing::debug!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = msgv1.id, + "recovery sync triggered by streamed message successful" + ) } } else if let Err(e) = process_result { - tracing::error!("Process stream entry {:?}", e); + tracing::error!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = msgv1.id, + err = %e, + "process stream entry {:?}", e + ); } } @@ -104,8 +135,6 @@ impl MlsGroup { envelope_bytes: Vec, ) -> Result { let envelope = GroupMessage::decode(envelope_bytes.as_slice())?; - // .map_err(|e| GroupError::Generic(e.to_string()))?; - self.process_stream_entry(envelope).await } diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index 322b92d8c..d12d53081 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -40,7 +40,7 @@ use xmtp_id::{ builder::{SignatureRequest, SignatureRequestBuilder, SignatureRequestError}, generate_inbox_id, sign_with_legacy_key, MemberIdentifier, }, - InboxId, + InboxId, InboxIdRef, }; use xmtp_proto::xmtp::identity::MlsCredential; @@ -385,7 +385,7 @@ impl Identity { } } - pub fn inbox_id(&self) -> &InboxId { + pub fn inbox_id(&self) -> InboxIdRef<'_> { &self.inbox_id } diff --git a/xmtp_mls/src/identity_updates.rs b/xmtp_mls/src/identity_updates.rs index b3feb7da6..741f716af 100644 --- a/xmtp_mls/src/identity_updates.rs +++ b/xmtp_mls/src/identity_updates.rs @@ -19,6 +19,7 @@ use xmtp_id::{ SignatureError, }, scw_verifier::SmartContractSignatureVerifier, + InboxIdRef, }; use crate::{ @@ -58,30 +59,26 @@ impl RetryableError for InstallationDiffError { impl DbConnection { /// Take a list of inbox_id/sequence_id tuples and determine which `inbox_id`s have missing entries /// in the local DB - pub(crate) fn filter_inbox_ids_needing_updates + ToString>( + pub(crate) fn filter_inbox_ids_needing_updates<'a>( &self, - filters: Vec<(InboxId, i64)>, - ) -> Result, ClientError> { - let existing_sequence_ids = self.get_latest_sequence_id( - &filters - .iter() - .map(|f| f.0.to_string()) - .collect::>(), - )?; + filters: &[(InboxIdRef<'a>, i64)], + ) -> Result, ClientError> { + let existing_sequence_ids = + self.get_latest_sequence_id(&filters.iter().map(|f| f.0).collect::>())?; let needs_update = filters .iter() .filter_map(|filter| { - let existing_sequence_id = existing_sequence_ids.get(filter.0.as_ref()); + let existing_sequence_id = existing_sequence_ids.get(filter.0); if let Some(sequence_id) = existing_sequence_id { if sequence_id.ge(&filter.1) { return None; } } - Some(filter.0.to_string()) + Some(filter.0) }) - .collect::>(); + .collect::>(); Ok(needs_update) } @@ -119,7 +116,7 @@ where conn: &DbConnection, inbox_id: InboxId, ) -> Result { - load_identity_updates(&self.api_client, conn, vec![inbox_id.as_ref().to_string()]).await?; + load_identity_updates(&self.api_client, conn, &[inbox_id.as_ref()]).await?; self.get_association_state(conn, inbox_id, None).await } @@ -355,7 +352,7 @@ where &self, signature_request: SignatureRequest, ) -> Result<(), ClientError> { - let inbox_id = signature_request.inbox_id(); + let inbox_id = signature_request.inbox_id().to_string(); // If the signature request isn't completed, this will error let identity_update = signature_request .build_identity_update() @@ -375,7 +372,7 @@ where load_identity_updates( &self.api_client, &self.store().conn()?, - vec![inbox_id.clone()], + &[inbox_id.as_str()], ) .await }) @@ -401,23 +398,22 @@ where let added_and_updated_members = membership_diff .added_inboxes .iter() - .chain(membership_diff.updated_inboxes.iter()) - .cloned(); + .chain(membership_diff.updated_inboxes.iter()); let filters = added_and_updated_members .clone() .map(|i| { ( - i, + i.as_str(), new_group_membership.get(i).map(|i| *i as i64).unwrap_or(0), ) }) - .collect::>(); + .collect::>(); load_identity_updates( &self.api_client, conn, - conn.filter_inbox_ids_needing_updates(filters)?, + &conn.filter_inbox_ids_needing_updates(filters.as_slice())?, ) .await?; @@ -471,19 +467,19 @@ where pub async fn load_identity_updates( api_client: &ApiClientWrapper, conn: &DbConnection, - inbox_ids: Vec, + inbox_ids: &[&str], ) -> Result>, ClientError> { if inbox_ids.is_empty() { return Ok(HashMap::new()); } tracing::debug!("Fetching identity updates for: {:?}", inbox_ids); - let existing_sequence_ids = conn.get_latest_sequence_id(&inbox_ids)?; + let existing_sequence_ids = conn.get_latest_sequence_id(inbox_ids)?; let filters: Vec = inbox_ids - .into_iter() + .iter() .map(|inbox_id| GetIdentityUpdatesV2Filter { - sequence_id: existing_sequence_ids.get(&inbox_id).map(|i| *i as u64), - inbox_id, + sequence_id: existing_sequence_ids.get(*inbox_id).map(|i| *i as u64), + inbox_id: inbox_id.to_string(), }) .collect(); @@ -551,7 +547,7 @@ pub(crate) mod tests { Verifier: SmartContractSignatureVerifier, { let conn = client.store().conn().unwrap(); - load_identity_updates(&client.api_client, &conn, vec![inbox_id.clone()]) + load_identity_updates(&client.api_client, &conn, &[inbox_id.as_str()]) .await .unwrap(); @@ -580,7 +576,7 @@ pub(crate) mod tests { .create_inbox(wallet_address.clone(), None) .await .unwrap(); - let inbox_id = signature_request.inbox_id(); + let inbox_id = signature_request.inbox_id().to_string(); add_wallet_signature(&mut signature_request, &wallet).await; @@ -589,7 +585,7 @@ pub(crate) mod tests { .await .unwrap(); - let association_state = get_association_state(&client, inbox_id.clone()).await; + let association_state = get_association_state(&client, inbox_id.to_string()).await; assert_eq!(association_state.members().len(), 2); assert_eq!(association_state.recovery_address(), &wallet_address); @@ -617,7 +613,7 @@ pub(crate) mod tests { .await .unwrap(); - let association_state = get_association_state(&client, client.inbox_id()).await; + let association_state = get_association_state(&client, client.inbox_id().to_string()).await; let members = association_state.members_by_parent(&MemberIdentifier::Address(wallet_address.clone())); @@ -643,12 +639,12 @@ pub(crate) mod tests { let client = ClientBuilder::new_test_client(&wallet).await; let inbox_id = client.inbox_id(); - get_association_state(&client, inbox_id.clone()).await; + get_association_state(&client, inbox_id.to_string()).await; assert_logged!("Loaded association", 0); assert_logged!("Wrote association", 1); - let association_state = get_association_state(&client, inbox_id.clone()).await; + let association_state = get_association_state(&client, inbox_id.to_string()).await; assert_eq!(association_state.members().len(), 2); assert_eq!(association_state.recovery_address(), &wallet_address); @@ -671,12 +667,12 @@ pub(crate) mod tests { .await .unwrap(); - get_association_state(&client, inbox_id.clone()).await; + get_association_state(&client, inbox_id.to_string()).await; assert_logged!("Loaded association", 1); assert_logged!("Wrote association", 2); - let association_state = get_association_state(&client, inbox_id.clone()).await; + let association_state = get_association_state(&client, inbox_id.to_string()).await; assert_logged!("Loaded association", 2); assert_logged!("Wrote association", 2); @@ -701,7 +697,7 @@ pub(crate) mod tests { let filtered = // Inbox 1 is requesting an inbox ID higher than what is in the DB. Inbox 2 is requesting one that matches the DB. // Inbox 3 is requesting one lower than what is in the DB - conn.filter_inbox_ids_needing_updates(vec![("inbox_1", 3), ("inbox_2", 2), ("inbox_3", 2)]); + conn.filter_inbox_ids_needing_updates(&[("inbox_1", 3), ("inbox_2", 2), ("inbox_3", 2)]); assert_eq!(filtered.unwrap(), vec!["inbox_1"]); } @@ -731,8 +727,8 @@ pub(crate) mod tests { .create_inbox(wallet.get_address(), None) .await .unwrap(); - let inbox_id = signature_request.inbox_id(); - inbox_ids.push(inbox_id.clone()); + let inbox_id = signature_request.inbox_id().to_string(); + inbox_ids.push(inbox_id); add_wallet_signature(&mut signature_request, &wallet).await; client @@ -756,15 +752,14 @@ pub(crate) mod tests { // Create a new client to test group operations with let other_client = ClientBuilder::new_test_client(&generate_local_wallet()).await; let other_conn = other_client.store().conn().unwrap(); + let ids = inbox_ids.iter().map(AsRef::as_ref).collect::>(); // Load all the identity updates for the new inboxes - load_identity_updates(&other_client.api_client, &other_conn, inbox_ids.clone()) + load_identity_updates(&other_client.api_client, &other_conn, ids.as_slice()) .await .expect("load should succeed"); // Get the latest sequence IDs so we can construct the updates - let latest_sequence_ids = other_conn - .get_latest_sequence_id(&inbox_ids.clone()) - .unwrap(); + let latest_sequence_ids = other_conn.get_latest_sequence_id(ids.as_slice()).unwrap(); let inbox_1_first_sequence_id = other_conn .get_identity_updates(inbox_ids[0].clone(), None, None) @@ -774,20 +769,20 @@ pub(crate) mod tests { .sequence_id; let mut original_group_membership = GroupMembership::new(); - original_group_membership.add(inbox_ids[0].clone(), inbox_1_first_sequence_id as u64); + original_group_membership.add(inbox_ids[0].to_string(), inbox_1_first_sequence_id as u64); original_group_membership.add( - inbox_ids[1].clone(), + inbox_ids[1].to_string(), *latest_sequence_ids.get(&inbox_ids[1]).unwrap() as u64, ); let mut new_group_membership = original_group_membership.clone(); // Update the first inbox to have a higher sequence ID, but no new installations new_group_membership.add( - inbox_ids[0].clone(), + inbox_ids[0].to_string(), *latest_sequence_ids.get(&inbox_ids[0]).unwrap() as u64, ); new_group_membership.add( - inbox_ids[2].clone(), + inbox_ids[2].to_string(), *latest_sequence_ids.get(&inbox_ids[2]).unwrap() as u64, ); new_group_membership.remove(&inbox_ids[1]); @@ -833,7 +828,8 @@ pub(crate) mod tests { .await .unwrap(); - let association_state_after_add = get_association_state(&client, client.inbox_id()).await; + let association_state_after_add = + get_association_state(&client, client.inbox_id().to_string()).await; assert_eq!(association_state_after_add.account_addresses().len(), 2); // Make sure the inbox ID is correctly registered @@ -859,7 +855,7 @@ pub(crate) mod tests { // Make sure that the association state has removed the second wallet let association_state_after_revoke = - get_association_state(&client, client.inbox_id()).await; + get_association_state(&client, client.inbox_id().to_string()).await; assert_eq!(association_state_after_revoke.account_addresses().len(), 1); // Make sure the inbox ID is correctly unregistered @@ -878,7 +874,8 @@ pub(crate) mod tests { let client1: FullXmtpClient = ClientBuilder::new_test_client(&wallet).await; let client2: FullXmtpClient = ClientBuilder::new_test_client(&wallet).await; - let association_state = get_association_state(&client1, client1.inbox_id()).await; + let association_state = + get_association_state(&client1, client1.inbox_id().to_string()).await; // Ensure there are two installations on the inbox assert_eq!(association_state.installation_ids().len(), 2); @@ -894,7 +891,8 @@ pub(crate) mod tests { .unwrap(); // Make sure there is only one installation on the inbox - let association_state = get_association_state(&client1, client1.inbox_id()).await; + let association_state = + get_association_state(&client1, client1.inbox_id().to_string()).await; assert_eq!(association_state.installation_ids().len(), 1); } } diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 16f10f586..e36193ddd 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -125,7 +125,7 @@ pub(crate) mod tests { .from_env_lossy(); tracing_subscriber::registry() - .with(fmt::layer()) + .with(fmt::layer().pretty()) .with(filter) .init(); } diff --git a/xmtp_mls/src/storage/encrypted_store/association_state.rs b/xmtp_mls/src/storage/encrypted_store/association_state.rs index e37f30c37..0eb194b26 100644 --- a/xmtp_mls/src/storage/encrypted_store/association_state.rs +++ b/xmtp_mls/src/storage/encrypted_store/association_state.rs @@ -137,7 +137,7 @@ pub(crate) mod tests { None, ) .unwrap(); - let inbox_id = association_state.inbox_id().clone(); + let inbox_id = association_state.inbox_id().to_string(); StoredAssociationState::write_to_cache( conn, inbox_id.to_string(), @@ -152,10 +152,10 @@ pub(crate) mod tests { None, ) .unwrap(); - let inbox_id_2 = association_state_2.inbox_id().clone(); + let inbox_id_2 = association_state_2.inbox_id().to_string(); StoredAssociationState::write_to_cache( conn, - association_state_2.inbox_id().clone(), + association_state_2.inbox_id().to_string(), 2, association_state_2, ) @@ -167,7 +167,7 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(first_association_state.len(), 1); - assert_eq!(first_association_state[0].inbox_id(), &inbox_id); + assert_eq!(&first_association_state[0].inbox_id(), &inbox_id); let both_association_states = StoredAssociationState::batch_read_from_cache( conn, diff --git a/xmtp_mls/src/storage/encrypted_store/identity_update.rs b/xmtp_mls/src/storage/encrypted_store/identity_update.rs index 9df363cb9..74eb878d3 100644 --- a/xmtp_mls/src/storage/encrypted_store/identity_update.rs +++ b/xmtp_mls/src/storage/encrypted_store/identity_update.rs @@ -105,7 +105,7 @@ impl DbConnection { #[tracing::instrument(level = "trace", skip_all)] pub fn get_latest_sequence_id( &self, - inbox_ids: &[String], + inbox_ids: &[&str], ) -> Result, StorageError> { // Query IdentityUpdates grouped by inbox_id, getting the max sequence_id let query = dsl::identity_updates @@ -221,14 +221,14 @@ pub(crate) mod tests { .expect("insert should succeed"); let latest_sequence_ids = conn - .get_latest_sequence_id(&[inbox_1.to_string(), inbox_2.to_string()]) + .get_latest_sequence_id(&[inbox_1, inbox_2]) .expect("query should work"); assert_eq!(latest_sequence_ids.get(inbox_1), Some(&3)); assert_eq!(latest_sequence_ids.get(inbox_2), Some(&6)); let latest_sequence_ids_with_missing_member = conn - .get_latest_sequence_id(&[inbox_1.to_string(), "missing_inbox".to_string()]) + .get_latest_sequence_id(&[inbox_1, "missing_inbox"]) .expect("should still succeed"); assert_eq!( diff --git a/xmtp_mls/src/storage/mod.rs b/xmtp_mls/src/storage/mod.rs index 772ed8c96..f2b3951fa 100644 --- a/xmtp_mls/src/storage/mod.rs +++ b/xmtp_mls/src/storage/mod.rs @@ -5,3 +5,11 @@ pub mod sql_key_store; pub use encrypted_store::*; pub use errors::{DuplicateItem, StorageError}; + +/// Initialize the SQLite WebAssembly Library +#[cfg(target_arch = "wasm32")] +pub async fn init_sqlite() { + diesel_wasm_sqlite::init_sqlite().await; +} +#[cfg(not(target_arch = "wasm32"))] +pub async fn init_sqlite() {} diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index a41e121e7..be4c98775 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -166,7 +166,10 @@ where let creation_result = retry_async!( Retry::default(), (async { - tracing::info!("Trying to process streamed welcome"); + tracing::info!( + installation_id = &welcome_v1.id, + "Trying to process streamed welcome" + ); let welcome_v1 = &welcome_v1; self.context .store() @@ -190,6 +193,8 @@ where match result { Ok(Some(group)) => { tracing::info!( + group_id = hex::encode(&group.id), + welcome_id = ?group.welcome_id, "Loading existing group for welcome_id: {:?}", group.welcome_id ); @@ -828,7 +833,9 @@ pub(crate) mod tests { }, ); - alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + alix.create_dm_by_inbox_id(bo.inbox_id().to_string()) + .await + .unwrap(); let result = notify.wait_for_delivery().await; assert!(result.is_err(), "Stream unexpectedly received a DM group"); @@ -876,7 +883,9 @@ pub(crate) mod tests { let result = notify.wait_for_delivery().await; assert!(result.is_err(), "Stream unexpectedly received a Group"); - alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + alix.create_dm_by_inbox_id(bo.inbox_id().to_string()) + .await + .unwrap(); notify.wait_for_delivery().await.unwrap(); { let grps = groups.lock(); @@ -897,14 +906,19 @@ pub(crate) mod tests { notify_pointer.notify_one(); }); - alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + alix.create_dm_by_inbox_id(bo.inbox_id().to_string()) + .await + .unwrap(); notify.wait_for_delivery().await.unwrap(); { let grps = groups.lock(); assert_eq!(grps.len(), 1); } - let dm = bo.create_dm_by_inbox_id(alix.inbox_id()).await.unwrap(); + let dm = bo + .create_dm_by_inbox_id(alix.inbox_id().to_string()) + .await + .unwrap(); dm.add_members_by_inbox_id(&[alix.inbox_id()]) .await .unwrap(); @@ -945,7 +959,10 @@ pub(crate) mod tests { .await .unwrap(); - let alix_dm = alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + let alix_dm = alix + .create_dm_by_inbox_id(bo.inbox_id().to_string()) + .await + .unwrap(); // Start a stream with only groups let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); diff --git a/xmtp_mls/src/utils/bench.rs b/xmtp_mls/src/utils/bench.rs index 3dffb4ba7..781e61bf8 100644 --- a/xmtp_mls/src/utils/bench.rs +++ b/xmtp_mls/src/utils/bench.rs @@ -132,7 +132,7 @@ async fn create_identity(is_dev_network: bool) -> Identity { } else { ClientBuilder::new_local_client(&wallet).await }; - Identity::new(client.inbox_id(), wallet.get_address()) + Identity::new(client.inbox_id().to_string(), wallet.get_address()) } async fn create_identities(n: usize, is_dev_network: bool) -> Vec {