From facdea67e2646f4c1b86087c359eeb3b33a59a0d Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 11:52:34 -0600 Subject: [PATCH 1/9] add code to batch group syncs --- bindings_ffi/src/mls.rs | 14 ++++++++++++++ xmtp_mls/src/client.rs | 27 +++++++++++++++++++++++++++ xmtp_mls/src/groups/mod.rs | 5 ++++- xmtp_mls/src/groups/sync.rs | 4 ++-- 4 files changed, 47 insertions(+), 3 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index ec827873e..3e1e53670 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -752,6 +752,20 @@ impl FfiConversations { Ok(()) } + pub async fn sync_all(&self, opts: FfiListConversationsOptions) -> Result<(), GenericError> { + let inner = self.inner_client.as_ref(); + let groups = inner.find_groups( + None, + opts.created_after_ns, + opts.created_before_ns, + opts.limit, + ); + + inner.sync_all(&self.inner_client, groups).await?; + + Ok(()) + } + pub async fn list( &self, opts: FfiListConversationsOptions, diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 7beb42e80..9e80a86eb 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -587,6 +587,33 @@ where Ok(groups) } + pub async fn sync_all(&self, groups: Vec) -> Result<(), GroupError> { + // Acquire a single connection to be reused + let conn = &self.store().conn()?; + + // Iterate over the groups and sync each one + for group in groups { + let mls_provider = &self.mls_provider(conn.clone()); + + log::info!("[{}] syncing group", &self.inbox_id()); + log::info!( + "current epoch for [{}] in sync_all() is Epoch: [{}]", + &self.inbox_id(), + group.load_mls_group(mls_provider.clone()).unwrap().epoch() + ); + + // Perform the necessary updates + group + .maybe_update_installations(conn.clone(), None, &self) + .await?; + + // Sync the group with the connection + group.sync_with_conn(conn.clone(), &self).await?; + } + + Ok(()) + } + /** * Validates a credential against the given installation public key * diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index b842aa2ca..1645769b2 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -253,7 +253,10 @@ impl MlsGroup { // Load the stored MLS group from the OpenMLS provider's keystore #[tracing::instrument(level = "trace", skip_all)] - fn load_mls_group(&self, provider: impl OpenMlsProvider) -> Result { + pub fn load_mls_group( + &self, + provider: impl OpenMlsProvider, + ) -> Result { let mls_group = OpenMlsGroup::load(provider.storage(), &GroupId::from_slice(&self.group_id)) .map_err(|_| GroupError::GroupNotFound)? diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index cf05295d0..0723de47c 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -98,7 +98,7 @@ impl MlsGroup { } #[tracing::instrument(level = "trace", skip(client, self, conn))] - pub(super) async fn sync_with_conn( + pub async fn sync_with_conn( &self, conn: DbConnection, client: &Client, @@ -1000,7 +1000,7 @@ impl MlsGroup { Ok(()) } - pub(super) async fn maybe_update_installations( + pub async fn maybe_update_installations( &self, conn: DbConnection, update_interval: Option, From f9bb0df456e28b97257d9e1e0cda9575d53403da Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 11:57:54 -0600 Subject: [PATCH 2/9] small tweeks to sync all --- bindings_ffi/src/mls.rs | 12 +++--------- xmtp_mls/src/client.rs | 3 --- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 3e1e53670..6c9470310 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -752,17 +752,11 @@ impl FfiConversations { Ok(()) } - pub async fn sync_all(&self, opts: FfiListConversationsOptions) -> Result<(), GenericError> { + pub async fn sync_all(&self) -> Result<(), GenericError> { let inner = self.inner_client.as_ref(); - let groups = inner.find_groups( - None, - opts.created_after_ns, - opts.created_before_ns, - opts.limit, - ); - - inner.sync_all(&self.inner_client, groups).await?; + let groups = inner.find_groups(None,None,None,None)?; + inner.sync_all(groups).await?; Ok(()) } diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 9e80a86eb..adcb2cc6a 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -591,7 +591,6 @@ where // Acquire a single connection to be reused let conn = &self.store().conn()?; - // Iterate over the groups and sync each one for group in groups { let mls_provider = &self.mls_provider(conn.clone()); @@ -602,12 +601,10 @@ where group.load_mls_group(mls_provider.clone()).unwrap().epoch() ); - // Perform the necessary updates group .maybe_update_installations(conn.clone(), None, &self) .await?; - // Sync the group with the connection group.sync_with_conn(conn.clone(), &self).await?; } From 0c926dca86fca9a65c1a60441b98344f1f7bc130 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:28:47 -0600 Subject: [PATCH 3/9] write a test for it --- xmtp_mls/src/client.rs | 56 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index adcb2cc6a..f2b842a3f 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -800,6 +800,62 @@ mod tests { assert_eq!(duplicate_received_groups.len(), 0); } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_sync_all_groups() { + let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await; + let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await; + + let alix_bo_group1 = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + let alix_bo_group2 = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + alix_bo_group1 + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + alix_bo_group2 + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + + let bob_received_groups = bo.sync_welcomes().await.unwrap(); + assert_eq!(bob_received_groups.len(), 2); + + let bo_groups = bo.find_groups(None, None, None, None).unwrap(); + let bo_group1 = bo.group(alix_bo_group1.clone().group_id).unwrap(); + let bo_messages1 = bo_group1 + .find_messages(None, None, None, None, None) + .unwrap(); + assert_eq!(bo_messages1.len(), 0); + let bo_group2 = bo.group(alix_bo_group2.clone().group_id).unwrap(); + let bo_messages2 = bo_group2 + .find_messages(None, None, None, None, None) + .unwrap(); + assert_eq!(bo_messages2.len(), 0); + alix_bo_group1 + .send_message(vec![1, 2, 3].as_slice(), &alix) + .await + .unwrap(); + alix_bo_group2 + .send_message(vec![1, 2, 3].as_slice(), &alix) + .await + .unwrap(); + + bo.sync_all(bo_groups).await.unwrap(); + + let bo_messages1 = bo_group1 + .find_messages(None, None, None, None, None) + .unwrap(); + assert_eq!(bo_messages1.len(), 1); + let bo_group2 = bo.group(alix_bo_group2.clone().group_id).unwrap(); + let bo_messages2 = bo_group2 + .find_messages(None, None, None, None, None) + .unwrap(); + assert_eq!(bo_messages2.len(), 1); + } + #[tokio::test] async fn test_can_message() { // let amal = ClientBuilder::new_test_client(&generate_local_wallet()).await; From 44be43be989258a58553b68dd1cfcdf638ec60ab Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:36:44 -0600 Subject: [PATCH 4/9] fix up the lint --- xmtp_mls/src/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index f2b842a3f..d12cdf7a2 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -587,7 +587,7 @@ where Ok(groups) } - pub async fn sync_all(&self, groups: Vec) -> Result<(), GroupError> { + pub async fn sync_all_groups(&self, groups: Vec) -> Result<(), GroupError> { // Acquire a single connection to be reused let conn = &self.store().conn()?; @@ -596,16 +596,16 @@ where log::info!("[{}] syncing group", &self.inbox_id()); log::info!( - "current epoch for [{}] in sync_all() is Epoch: [{}]", + "current epoch for [{}] in sync_all_groups() is Epoch: [{}]", &self.inbox_id(), group.load_mls_group(mls_provider.clone()).unwrap().epoch() ); group - .maybe_update_installations(conn.clone(), None, &self) + .maybe_update_installations(conn.clone(), None, self) .await?; - group.sync_with_conn(conn.clone(), &self).await?; + group.sync_with_conn(conn.clone(), self).await?; } Ok(()) @@ -843,7 +843,7 @@ mod tests { .await .unwrap(); - bo.sync_all(bo_groups).await.unwrap(); + bo.sync_all_groups(bo_groups).await.unwrap(); let bo_messages1 = bo_group1 .find_messages(None, None, None, None, None) From 6888115491b1ad47bb9b87154fa676d1f89dd429 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:38:34 -0600 Subject: [PATCH 5/9] rename --- bindings_ffi/src/mls.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 6c9470310..2aaace642 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -752,11 +752,11 @@ impl FfiConversations { Ok(()) } - pub async fn sync_all(&self) -> Result<(), GenericError> { + pub async fn sync_all_groups(&self) -> Result<(), GenericError> { let inner = self.inner_client.as_ref(); let groups = inner.find_groups(None,None,None,None)?; - inner.sync_all(groups).await?; + inner.sync_all_groups(groups).await?; Ok(()) } From d29e5ca09a2a75a6c4806ceadc02e79c5d6eb21f Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:38:59 -0600 Subject: [PATCH 6/9] cargo fmt --- bindings_ffi/src/mls.rs | 2 +- xmtp_mls/src/client.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 2aaace642..830b8794c 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -754,7 +754,7 @@ impl FfiConversations { pub async fn sync_all_groups(&self) -> Result<(), GenericError> { let inner = self.inner_client.as_ref(); - let groups = inner.find_groups(None,None,None,None)?; + let groups = inner.find_groups(None, None, None, None)?; inner.sync_all_groups(groups).await?; Ok(()) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index d12cdf7a2..b82071a2a 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -842,7 +842,7 @@ mod tests { .send_message(vec![1, 2, 3].as_slice(), &alix) .await .unwrap(); - + bo.sync_all_groups(bo_groups).await.unwrap(); let bo_messages1 = bo_group1 From 7a083d0a32c7bb9f5fbf66e0570a2b626cd0a040 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:41:30 -0600 Subject: [PATCH 7/9] make it faster --- xmtp_mls/src/client.rs | 44 ++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index b82071a2a..225d33c2f 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, mem::Discriminant, sync::Arc}; use futures::{ + future::join_all, stream::{self, StreamExt}, Future, }; @@ -591,22 +592,35 @@ where // Acquire a single connection to be reused let conn = &self.store().conn()?; - for group in groups { - let mls_provider = &self.mls_provider(conn.clone()); - - log::info!("[{}] syncing group", &self.inbox_id()); - log::info!( - "current epoch for [{}] in sync_all_groups() is Epoch: [{}]", - &self.inbox_id(), - group.load_mls_group(mls_provider.clone()).unwrap().epoch() - ); - - group - .maybe_update_installations(conn.clone(), None, self) - .await?; + let sync_futures: Vec<_> = groups + .into_iter() + .map(|group| { + let conn = conn.clone(); + let mls_provider = self.mls_provider(conn.clone()); + + async move { + log::info!("[{}] syncing group", self.inbox_id()); + log::info!( + "current epoch for [{}] in sync_all_groups() is Epoch: [{}]", + self.inbox_id(), + group.load_mls_group(mls_provider.clone()).unwrap().epoch() + ); + + group + .maybe_update_installations(conn.clone(), None, self) + .await?; + + group.sync_with_conn(conn.clone(), self).await?; + Ok::<(), GroupError>(()) + } + }) + .collect(); - group.sync_with_conn(conn.clone(), self).await?; - } + // Run all sync operations concurrently + join_all(sync_futures) + .await + .into_iter() + .collect::>()?; Ok(()) } From 6789e50fdac2255ffed84ffe2fc1ec3a1130edd0 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:51:59 -0600 Subject: [PATCH 8/9] add an involved test --- bindings_ffi/src/mls.rs | 52 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 830b8794c..8be21708e 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -2244,6 +2244,58 @@ mod tests { assert!(stream_messages.is_closed()); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_sync_all_groups() { + let alix = new_test_client().await; + let bo = new_test_client().await; + + for _i in 0..30 { + alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + } + + bo.conversations().sync().await.unwrap(); + let alix_groups = alix + .conversations() + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + + let alix_group1 = alix_groups[0].clone(); + let alix_group5 = alix_groups[5].clone(); + let bo_group1 = bo.group(alix_group1.id()).unwrap(); + let bo_group5 = bo.group(alix_group5.id()).unwrap(); + + alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap(); + alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap(); + + let bo_messages1 = bo_group1 + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + let bo_messages5 = bo_group5 + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + assert_eq!(bo_messages1.len(), 0); + assert_eq!(bo_messages5.len(), 0); + + bo.conversations().sync_all_groups().await.unwrap(); + + let bo_messages1 = bo_group1 + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + let bo_messages5 = bo_group5 + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + assert_eq!(bo_messages1.len(), 1); + assert_eq!(bo_messages5.len(), 1); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_send_message_when_out_of_sync() { let alix = new_test_client().await; From 749649d41d69fc1e0fa24309f08be12b8efba0c4 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Aug 2024 12:54:35 -0600 Subject: [PATCH 9/9] cargo fmt --- bindings_ffi/src/mls.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 8be21708e..4deb0fc8d 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -2250,8 +2250,7 @@ mod tests { let bo = new_test_client().await; for _i in 0..30 { - alix - .conversations() + alix.conversations() .create_group( vec![bo.account_address.clone()], FfiCreateGroupOptions::default(), @@ -2271,7 +2270,7 @@ mod tests { let alix_group5 = alix_groups[5].clone(); let bo_group1 = bo.group(alix_group1.id()).unwrap(); let bo_group5 = bo.group(alix_group5.id()).unwrap(); - + alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap(); alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap(); @@ -2285,7 +2284,7 @@ mod tests { assert_eq!(bo_messages5.len(), 0); bo.conversations().sync_all_groups().await.unwrap(); - + let bo_messages1 = bo_group1 .find_messages(FfiListMessagesOptions::default()) .unwrap();