diff --git a/bindings_node/src/conversation.rs b/bindings_node/src/conversation.rs index d7df6a4fa..aeeffa6b1 100644 --- a/bindings_node/src/conversation.rs +++ b/bindings_node/src/conversation.rs @@ -1,5 +1,5 @@ use std::{ops::Deref, sync::Arc}; -use futures::TryFutureExt; + use napi::{ bindgen_prelude::{Result, Uint8Array}, threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}, diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 82a317dc6..4410f8c63 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -845,8 +845,8 @@ where async move { tracing::info!( inbox_id = self.inbox_id(), - "current epoch for [{}] in sync_all_groups()", - self.inbox_id(), + "[{}] syncing group", + self.inbox_id() ); tracing::info!( inbox_id = self.inbox_id(), diff --git a/xmtp_mls/src/groups/intents.rs b/xmtp_mls/src/groups/intents.rs index bbb4e06c0..65fdd32f7 100644 --- a/xmtp_mls/src/groups/intents.rs +++ b/xmtp_mls/src/groups/intents.rs @@ -725,7 +725,6 @@ impl TryFrom> for PostCommitAction { pub(crate) mod tests { #[cfg(target_arch = "wasm32")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker); - use openmls::prelude::{MlsMessageBodyIn, MlsMessageIn, ProcessedMessageContent}; use tls_codec::Deserialize; use xmtp_cryptography::utils::generate_local_wallet; @@ -866,9 +865,8 @@ pub(crate) mod tests { let provider = group.client.mls_provider().unwrap(); let decrypted_message = match group .load_mls_group_with_lock(&provider, |mut mls_group| { - mls_group - .process_message(&provider, mls_message) - .map_err(|e| GroupError::Generic(e.to_string())) + Ok(mls_group + .process_message(&provider, mls_message).unwrap()) }) { Ok(message) => message, Err(err) => panic!("Error: {:?}", err), diff --git a/xmtp_mls/src/groups/members.rs b/xmtp_mls/src/groups/members.rs index 730529fb3..cfdf56e28 100644 --- a/xmtp_mls/src/groups/members.rs +++ b/xmtp_mls/src/groups/members.rs @@ -41,7 +41,6 @@ where provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { let group_membership = self.load_mls_group_with_lock(provider, |mls_group| { - // Extract group membership from extensions Ok(extract_group_membership(mls_group.extensions())?) })?; let requests = group_membership diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 1a1f50b2d..cd450f11f 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -360,9 +360,8 @@ where if intent.state == IntentState::Committed { return Ok(IntentState::Committed); } - let group_epoch = mls_group.epoch(); - let message_epoch = message.epoch(); + let group_epoch = mls_group.epoch(); debug!( inbox_id = self.client.inbox_id(), installation_id = hex::encode(self.client.installation_id()), @@ -702,6 +701,14 @@ where let intent = provider .conn_ref() .find_group_intent_by_payload_hash(sha256(envelope.data.as_slice())); + tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = envelope.id, + "Processing envelope with hash {:?}", + hex::encode(sha256(envelope.data.as_slice())) + ); + match intent { // Intent with the payload hash matches Ok(Some(intent)) => { @@ -729,6 +736,14 @@ where } // No matching intent found Ok(None) => { + tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = envelope.id, + "client [{}] is about to process external envelope [{}]", + self.client.inbox_id(), + envelope.id + ); self.process_external_message(provider, message, envelope) .await } @@ -792,7 +807,10 @@ where for message in messages.into_iter() { let result = retry_async!( Retry::default(), - (async { self.consume_message(&message, provider.conn_ref()).await }) + (async { + self.consume_message(&message, provider.conn_ref()) + .await + }) ); if let Err(e) = result { let is_retryable = e.is_retryable(); @@ -1139,10 +1157,7 @@ where return Ok(()); } // determine how long of an interval in time to use before updating list - let interval_ns = match update_interval_ns { - Some(val) => val, - None => SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS, - }; + let interval_ns = update_interval_ns.unwrap_or_else(|| SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS); let now_ns = crate::utils::time::now_ns(); let last_ns = provider diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 285832b89..87739dde9 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -332,19 +332,6 @@ impl MlsGroup { } // Load the stored OpenMLS group from the OpenMLS provider's keystore - #[tracing::instrument(level = "trace", skip_all)] - pub(crate) fn load_mls_group( - &self, - provider: impl OpenMlsProvider, - ) -> Result { - let mls_group = - OpenMlsGroup::load(provider.storage(), &GroupId::from_slice(&self.group_id)) - .map_err(|_| GroupError::GroupNotFound)? - .ok_or(GroupError::GroupNotFound)?; - - Ok(mls_group) - } - #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn load_mls_group_with_lock( &self, @@ -358,7 +345,6 @@ impl MlsGroup { let group_id = self.group_id.clone(); // Acquire the lock synchronously using blocking_lock - let _lock = MLS_COMMIT_LOCK.get_lock_sync(group_id.clone())?; // Load the MLS group let mls_group = @@ -370,6 +356,7 @@ impl MlsGroup { operation(mls_group) } + // Load the stored OpenMLS group from the OpenMLS provider's keystore #[tracing::instrument(level = "trace", skip_all)] pub(crate) async fn load_mls_group_with_lock_async( &self, @@ -1173,14 +1160,16 @@ impl MlsGroup { /// /// If the current user has been kicked out of the group, `is_active` will return `false` pub fn is_active(&self, provider: impl OpenMlsProvider) -> Result { - self.load_mls_group_with_lock(provider, |mls_group| Ok(mls_group.is_active())) + self.load_mls_group_with_lock(provider, |mls_group| + Ok(mls_group.is_active()) + ) } /// Get the `GroupMetadata` of the group. pub fn metadata(&self, provider: impl OpenMlsProvider) -> Result { - self.load_mls_group_with_lock(provider, |mls_group| { + self.load_mls_group_with_lock(provider, |mls_group| Ok(extract_group_metadata(&mls_group)?) - }) + ) } /// Get the `GroupMutableMetadata` of the group. @@ -1188,18 +1177,18 @@ impl MlsGroup { &self, provider: impl OpenMlsProvider, ) -> Result { - self.load_mls_group_with_lock(provider, |mls_group| { + self.load_mls_group_with_lock(provider, |mls_group| Ok(GroupMutableMetadata::try_from(&mls_group)?) - }) + ) } pub fn permissions(&self) -> Result { let conn = self.context().store().conn()?; let provider = XmtpOpenMlsProvider::new(conn); - self.load_mls_group_with_lock(&provider, |mls_group| { + self.load_mls_group_with_lock(&provider, |mls_group| Ok(extract_group_permissions(&mls_group)?) - }) + ) } /// Used for testing that dm group validation works as expected. @@ -1920,17 +1909,17 @@ pub(crate) mod tests { // Check Amal's MLS group state. let amal_db = XmtpOpenMlsProvider::from(amal.context.store().conn().unwrap()); - let amal_members_len = amal_group.load_mls_group_with_lock(&amal_db, |amal_mls_group| { - Ok(amal_mls_group.members().count()) - }).unwrap(); + let amal_members_len = amal_group.load_mls_group_with_lock(&amal_db, |mls_group| + Ok(mls_group.members().count()) + ).unwrap(); assert_eq!(amal_members_len, 3); // Check Bola's MLS group state. let bola_db = XmtpOpenMlsProvider::from(bola.context.store().conn().unwrap()); - let bola_members_len = bola_group.load_mls_group_with_lock(&bola_db, |bola_mls_group| { - Ok(bola_mls_group.members().count()) - }).unwrap(); + let bola_members_len = bola_group.load_mls_group_with_lock(&bola_db, |mls_group| + Ok(mls_group.members().count()) + ).unwrap(); assert_eq!(bola_members_len, 3); @@ -2009,8 +1998,8 @@ pub(crate) mod tests { Ok(mls_group) // Return the updated group if necessary }).unwrap(); - force_add_member(&alix, &bo, &alix_group, &mut mls_group, &provider).await; // Now add bo to the group + force_add_member(&alix, &bo, &alix_group, &mut mls_group, &provider).await; // Bo should not be able to actually read this group bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap(); @@ -2134,9 +2123,9 @@ pub(crate) mod tests { assert_eq!(messages.len(), 2); let provider: XmtpOpenMlsProvider = client.context.store().conn().unwrap().into(); - let pending_commit_is_none = group.load_mls_group_with_lock(&provider, |mls_group| { + let pending_commit_is_none = group.load_mls_group_with_lock(&provider, |mls_group| Ok(mls_group.pending_commit().is_none()) - }).unwrap(); + ).unwrap(); assert!(pending_commit_is_none); @@ -2317,9 +2306,9 @@ pub(crate) mod tests { assert!(new_installations_were_added.is_ok()); group.sync().await.unwrap(); - let num_members = group.load_mls_group_with_lock(&provider, |mls_group| { + let num_members = group.load_mls_group_with_lock(&provider, |mls_group| Ok(mls_group.members().collect::>().len()) - }).unwrap(); + ).unwrap(); assert_eq!(num_members, 3); } @@ -3899,9 +3888,9 @@ pub(crate) mod tests { ) .unwrap(); assert!(valid_dm_group - .load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| { + .load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| validate_dm_group(&client, &mls_group, added_by_inbox) - }) + ) .is_ok()); // Test case 2: Invalid conversation type @@ -3917,9 +3906,9 @@ pub(crate) mod tests { ) .unwrap(); assert!(matches!( - invalid_type_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| { + invalid_type_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| validate_dm_group(&client, &mls_group, added_by_inbox) - }), + ), Err(GroupError::Generic(msg)) if msg.contains("Invalid conversation type") )); // Test case 3: Missing DmMembers @@ -3939,9 +3928,9 @@ pub(crate) mod tests { ) .unwrap(); assert!(matches!( - mismatched_dm_members_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| { + mismatched_dm_members_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| validate_dm_group(&client, &mls_group, added_by_inbox) - }), + ), Err(GroupError::Generic(msg)) if msg.contains("DM members do not match expected inboxes") )); @@ -3961,9 +3950,9 @@ pub(crate) mod tests { ) .unwrap(); assert!(matches!( - non_empty_admin_list_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| { + non_empty_admin_list_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| validate_dm_group(&client, &mls_group, added_by_inbox) - }), + ), Err(GroupError::Generic(msg)) if msg.contains("DM group must have empty admin and super admin lists") )); @@ -3982,9 +3971,9 @@ pub(crate) mod tests { ) .unwrap(); assert!(matches!( - invalid_permissions_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| { + invalid_permissions_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| validate_dm_group(&client, &mls_group, added_by_inbox) - }), + ), Err(GroupError::Generic(msg)) if msg.contains("Invalid permissions for DM group") )); } diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index b99821487..64d0a6f86 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -41,6 +41,7 @@ impl MlsGroup { let process_result = retry_async!( Retry::default(), (async { + let client_id = &client_id; let msgv1 = &msgv1; self.context() .store() @@ -48,9 +49,16 @@ impl MlsGroup { let prov_ref = &provider; // Borrow provider instead of moving it self.load_mls_group_with_lock_async( prov_ref, - |mut mls_group| async move { + |mls_group| async move { // Attempt processing immediately, but fail if the message is not an Application Message // Returning an error should roll back the DB tx + tracing::info!( + inbox_id = self.client.inbox_id(), + group_id = hex::encode(&self.group_id), + msg_id = msgv1.id, + "current epoch for [{}] in process_stream_entry()", + client_id, + ); self.process_message(&prov_ref, msgv1, false) .await // NOTE: We want to make sure we retry an error in process_message