From 481fad74ff60bc1f9e3f5d45266ff66849943474 Mon Sep 17 00:00:00 2001 From: Dakota Brink <779390+codabrink@users.noreply.github.com> Date: Tue, 10 Dec 2024 12:38:26 -0500 Subject: [PATCH] Dry up the streaming code, add a couple of sanity checks to tests, un-ignore test (#1398) * update the stream tests a bit * add a dm and a message * dry the code * cleanup --- bindings_ffi/src/mls.rs | 58 +++++++++++++++++++++-------------- xmtp_mls/src/subscriptions.rs | 23 ++++++-------- 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 6a8955bec..22f56f725 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1036,41 +1036,33 @@ impl FfiConversations { &self, message_callback: Arc, ) -> FfiStreamCloser { - let handle = RustXmtpClient::stream_all_messages_with_callback( - self.inner_client.clone(), - Some(ConversationType::Group), - move |msg| match msg { - Ok(m) => message_callback.on_message(m.into()), - Err(e) => message_callback.on_error(e.into()), - }, - ); - - FfiStreamCloser::new(handle) + self.stream_messages(message_callback, Some(FfiConversationType::Group)) + .await } pub async fn stream_all_dm_messages( &self, message_callback: Arc, ) -> FfiStreamCloser { - let handle = RustXmtpClient::stream_all_messages_with_callback( - self.inner_client.clone(), - Some(ConversationType::Dm), - move |msg| match msg { - Ok(m) => message_callback.on_message(m.into()), - Err(e) => message_callback.on_error(e.into()), - }, - ); - - FfiStreamCloser::new(handle) + self.stream_messages(message_callback, Some(FfiConversationType::Dm)) + .await } pub async fn stream_all_messages( &self, message_callback: Arc, + ) -> FfiStreamCloser { + self.stream_messages(message_callback, None).await + } + + async fn stream_messages( + &self, + message_callback: Arc, + conversation_type: Option, ) -> FfiStreamCloser { let handle = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), - None, + conversation_type.map(Into::into), move |msg| match msg { Ok(m) => message_callback.on_message(m.into()), Err(e) => message_callback.on_error(e.into()), @@ -1093,6 +1085,16 @@ impl FfiConversations { } } +impl From for ConversationType { + fn from(value: FfiConversationType) -> Self { + match value { + FfiConversationType::Dm => ConversationType::Dm, + FfiConversationType::Group => ConversationType::Group, + FfiConversationType::Sync => ConversationType::Sync, + } + } +} + #[derive(uniffi::Object)] pub struct FfiConversation { inner: MlsGroup, @@ -2512,7 +2514,6 @@ mod tests { // Looks like this test might be a separate issue #[tokio::test(flavor = "multi_thread", worker_threads = 5)] - #[ignore] async fn test_can_stream_group_messages_for_updates() { let alix = new_test_client().await; let bo = new_test_client().await; @@ -2554,6 +2555,17 @@ mod tests { .unwrap(); message_callbacks.wait_for_delivery(None).await.unwrap(); + alix_group.send(b"Hello there".to_vec()).await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); + + let dm = bo + .conversations() + .create_dm(alix.account_address.clone()) + .await + .unwrap(); + dm.send(b"Hello again".to_vec()).await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); + // Uncomment the following lines to add more group name updates bo_group .update_group_name("Old Name3".to_string()) @@ -2561,7 +2573,7 @@ mod tests { .unwrap(); message_callbacks.wait_for_delivery(None).await.unwrap(); - assert_eq!(message_callbacks.message_count(), 3); + assert_eq!(message_callbacks.message_count(), 6); stream_messages.end_and_wait().await.unwrap(); diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index d90bab05f..94b93c537 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -546,6 +546,7 @@ pub(crate) mod tests { Arc, }; use xmtp_cryptography::utils::generate_local_wallet; + use xmtp_id::InboxOwner; #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "current_thread"))] @@ -702,7 +703,8 @@ pub(crate) mod tests { async fn test_stream_all_messages_changing_group_list() { let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await; - let caro = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + let caro_wallet = generate_local_wallet(); + let caro = Arc::new(ClientBuilder::new_test_client(&caro_wallet).await); let alix_group = alix .create_group(None, GroupMetadataOptions::default()) @@ -726,27 +728,21 @@ pub(crate) mod tests { ); handle.wait_for_ready().await; - alix_group.send_message("first".as_bytes()).await.unwrap(); + alix_group.send_message(b"first").await.unwrap(); delivery .wait_for_delivery() .await .expect("timed out waiting for `first`"); - let bo_group = bo - .create_group(None, GroupMetadataOptions::default()) - .unwrap(); - bo_group - .add_members_by_inbox_id(&[caro.inbox_id()]) - .await - .unwrap(); + let bo_group = bo.create_dm(caro_wallet.get_address()).await.unwrap(); - bo_group.send_message("second".as_bytes()).await.unwrap(); + bo_group.send_message(b"second").await.unwrap(); delivery .wait_for_delivery() .await .expect("timed out waiting for `second`"); - alix_group.send_message("third".as_bytes()).await.unwrap(); + alix_group.send_message(b"third").await.unwrap(); delivery .wait_for_delivery() .await @@ -760,13 +756,13 @@ pub(crate) mod tests { .await .unwrap(); - alix_group.send_message("fourth".as_bytes()).await.unwrap(); + alix_group.send_message(b"fourth").await.unwrap(); delivery .wait_for_delivery() .await .expect("timed out waiting for `fourth`"); - alix_group_2.send_message("fifth".as_bytes()).await.unwrap(); + alix_group_2.send_message(b"fifth").await.unwrap(); delivery .wait_for_delivery() .await @@ -789,6 +785,7 @@ pub(crate) mod tests { crate::sleep(core::time::Duration::from_millis(100)).await; let messages = messages.lock(); + assert_eq!(messages.len(), 5); }