Skip to content

Commit

Permalink
Dry up the streaming code, add a couple of sanity checks to tests, un…
Browse files Browse the repository at this point in the history
…-ignore test (#1398)

* update the stream tests a bit

* add a dm and a message

* dry the code

* cleanup
  • Loading branch information
codabrink authored Dec 10, 2024
1 parent 0d7674e commit 481fad7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 36 deletions.
58 changes: 35 additions & 23 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,41 +1036,33 @@ impl FfiConversations {
&self,
message_callback: Arc<dyn FfiMessageCallback>,
) -> FfiStreamCloser {
let handle = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
Some(ConversationType::Group),
move |msg| match msg {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
},
);

FfiStreamCloser::new(handle)
self.stream_messages(message_callback, Some(FfiConversationType::Group))
.await
}

pub async fn stream_all_dm_messages(
&self,
message_callback: Arc<dyn FfiMessageCallback>,
) -> FfiStreamCloser {
let handle = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
Some(ConversationType::Dm),
move |msg| match msg {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
},
);

FfiStreamCloser::new(handle)
self.stream_messages(message_callback, Some(FfiConversationType::Dm))
.await
}

pub async fn stream_all_messages(
&self,
message_callback: Arc<dyn FfiMessageCallback>,
) -> FfiStreamCloser {
self.stream_messages(message_callback, None).await
}

async fn stream_messages(
&self,
message_callback: Arc<dyn FfiMessageCallback>,
conversation_type: Option<FfiConversationType>,
) -> FfiStreamCloser {
let handle = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
None,
conversation_type.map(Into::into),
move |msg| match msg {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
Expand All @@ -1093,6 +1085,16 @@ impl FfiConversations {
}
}

impl From<FfiConversationType> for ConversationType {
fn from(value: FfiConversationType) -> Self {
match value {
FfiConversationType::Dm => ConversationType::Dm,
FfiConversationType::Group => ConversationType::Group,
FfiConversationType::Sync => ConversationType::Sync,
}
}
}

#[derive(uniffi::Object)]
pub struct FfiConversation {
inner: MlsGroup<RustXmtpClient>,
Expand Down Expand Up @@ -2512,7 +2514,6 @@ mod tests {

// Looks like this test might be a separate issue
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
#[ignore]
async fn test_can_stream_group_messages_for_updates() {
let alix = new_test_client().await;
let bo = new_test_client().await;
Expand Down Expand Up @@ -2554,14 +2555,25 @@ mod tests {
.unwrap();
message_callbacks.wait_for_delivery(None).await.unwrap();

alix_group.send(b"Hello there".to_vec()).await.unwrap();
message_callbacks.wait_for_delivery(None).await.unwrap();

let dm = bo
.conversations()
.create_dm(alix.account_address.clone())
.await
.unwrap();
dm.send(b"Hello again".to_vec()).await.unwrap();
message_callbacks.wait_for_delivery(None).await.unwrap();

// Uncomment the following lines to add more group name updates
bo_group
.update_group_name("Old Name3".to_string())
.await
.unwrap();
message_callbacks.wait_for_delivery(None).await.unwrap();

assert_eq!(message_callbacks.message_count(), 3);
assert_eq!(message_callbacks.message_count(), 6);

stream_messages.end_and_wait().await.unwrap();

Expand Down
23 changes: 10 additions & 13 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ pub(crate) mod tests {
Arc,
};
use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_id::InboxOwner;

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "current_thread"))]
Expand Down Expand Up @@ -702,7 +703,8 @@ pub(crate) mod tests {
async fn test_stream_all_messages_changing_group_list() {
let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await;
let caro = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let caro_wallet = generate_local_wallet();
let caro = Arc::new(ClientBuilder::new_test_client(&caro_wallet).await);

let alix_group = alix
.create_group(None, GroupMetadataOptions::default())
Expand All @@ -726,27 +728,21 @@ pub(crate) mod tests {
);
handle.wait_for_ready().await;

alix_group.send_message("first".as_bytes()).await.unwrap();
alix_group.send_message(b"first").await.unwrap();
delivery
.wait_for_delivery()
.await
.expect("timed out waiting for `first`");

let bo_group = bo
.create_group(None, GroupMetadataOptions::default())
.unwrap();
bo_group
.add_members_by_inbox_id(&[caro.inbox_id()])
.await
.unwrap();
let bo_group = bo.create_dm(caro_wallet.get_address()).await.unwrap();

bo_group.send_message("second".as_bytes()).await.unwrap();
bo_group.send_message(b"second").await.unwrap();
delivery
.wait_for_delivery()
.await
.expect("timed out waiting for `second`");

alix_group.send_message("third".as_bytes()).await.unwrap();
alix_group.send_message(b"third").await.unwrap();
delivery
.wait_for_delivery()
.await
Expand All @@ -760,13 +756,13 @@ pub(crate) mod tests {
.await
.unwrap();

alix_group.send_message("fourth".as_bytes()).await.unwrap();
alix_group.send_message(b"fourth").await.unwrap();
delivery
.wait_for_delivery()
.await
.expect("timed out waiting for `fourth`");

alix_group_2.send_message("fifth".as_bytes()).await.unwrap();
alix_group_2.send_message(b"fifth").await.unwrap();
delivery
.wait_for_delivery()
.await
Expand All @@ -789,6 +785,7 @@ pub(crate) mod tests {
crate::sleep(core::time::Duration::from_millis(100)).await;

let messages = messages.lock();

assert_eq!(messages.len(), 5);
}

Expand Down

0 comments on commit 481fad7

Please sign in to comment.