diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 2a5c57ca8..cca612843 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -4,8 +4,8 @@ use crate::logger::FfiLogger; use crate::GenericError; use std::collections::HashMap; use std::convert::TryInto; -use std::sync::Arc; -use tokio::task::JoinHandle; +use std::sync::{Arc, Mutex}; +use tokio::task::{JoinHandle, AbortHandle}; use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient; use xmtp_id::{ associations::{ @@ -164,7 +164,6 @@ pub fn generate_inbox_id(account_address: String, nonce: u64) -> String { #[derive(uniffi::Object)] pub struct FfiSignatureRequest { - // Using `tokio::sync::Mutex`bc rust MutexGuard cannot be sent between threads. inner: Arc>, } @@ -435,7 +434,7 @@ impl FfiConversations { pub async fn stream( &self, callback: Box, - ) -> Arc { + ) -> FfiStreamCloser { let client = self.inner_client.clone(); let handle = RustXmtpClient::stream_conversations_with_callback( client.clone(), @@ -448,21 +447,19 @@ impl FfiConversations { }, ); - Arc::new(FfiStreamCloser { - handle, - }) + FfiStreamCloser::new(handle) } pub fn stream_all_messages( &self, message_callback: Box, - ) -> Result, GenericError> { + ) -> FfiStreamCloser { let handle = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), move |message| message_callback.on_message(message.into()), ); - Ok(Arc::new(FfiStreamCloser { handle })) + FfiStreamCloser::new(handle) } } @@ -521,6 +518,7 @@ impl FfiGroup { self.created_at_ns, ); + log::debug!("Sending message"); let message_id = group .send_message(content_bytes.as_slice(), &self.inner_client) .await?; @@ -835,18 +833,16 @@ impl FfiGroup { pub async fn stream( &self, message_callback: Box, - ) -> Arc { + ) -> FfiStreamCloser { let inner_client = Arc::clone(&self.inner_client); - let stream_closer = MlsGroup::stream_with_callback( + let handle = MlsGroup::stream_with_callback( inner_client, self.group_id.clone(), self.created_at_ns, move |message| message_callback.on_message(message.into()), ); - Arc::new(FfiStreamCloser { - handle: stream_closer - }) + FfiStreamCloser::new(handle) } pub fn created_at_ns(&self) -> i64 { @@ -961,19 +957,30 @@ impl From for FfiMessage { } } -#[derive(uniffi::Object)] +#[derive(uniffi::Object, Clone, Debug)] pub struct FfiStreamCloser { - handle: JoinHandle>, + handle: Arc>>>>, + // for convenience, does not require locking mutex. + abort_handle: Arc, +} + +impl FfiStreamCloser { + pub fn new(handle: JoinHandle>) -> Self { + Self { + abort_handle: Arc::new(handle.abort_handle()), + handle: Arc::new(Mutex::new(Some(handle))), + } + } } #[uniffi::export] impl FfiStreamCloser { pub fn end(&self) { - self.handle.abort() + self.abort_handle.abort() } pub fn is_closed(&self) -> bool { - self.handle.is_finished() + self.abort_handle.is_finished() } } @@ -1095,6 +1102,12 @@ mod tests { pub fn message_count(&self) -> u32 { self.num_messages.load(Ordering::SeqCst) } + + pub async fn wait_for_delivery(&self, messages: usize) { + for _ in 0..messages { + self.notify.notified().await + } + } } impl FfiMessageCallback for RustStreamCallback { @@ -1104,7 +1117,10 @@ mod tests { log::info!("Received: {}", String::from_utf8_lossy(&message.content)); messages.push(message); let _ = self.num_messages.fetch_add(1, Ordering::SeqCst); + log::debug!("NOTIFYING"); self.notify.notify_one(); + log::debug!("NOTIFIED"); + } } @@ -1453,8 +1469,9 @@ mod tests { // Looks like this test might be a separate issue #[tokio::test(flavor = "multi_thread", worker_threads = 5)] - #[ignore] async fn test_can_stream_group_messages_for_updates() { + let _ = tracing_subscriber::fmt::try_init(); + let alix = new_test_client().await; let bo = new_test_client().await; @@ -1462,9 +1479,7 @@ mod tests { let message_callbacks = RustStreamCallback::default(); let stream_messages = bo .conversations() - .stream_all_messages(Box::new(message_callbacks.clone())) - .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + .stream_all_messages(Box::new(message_callbacks.clone())); // Create group and send first message let alix_group = alix @@ -1476,12 +1491,11 @@ mod tests { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - alix_group .update_group_name("Old Name".to_string()) .await .unwrap(); + message_callbacks.wait_for_delivery(1).await; let bo_groups = bo .conversations() @@ -1494,18 +1508,15 @@ mod tests { .update_group_name("Old Name2".to_string()) .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + message_callbacks.wait_for_delivery(1).await; // Uncomment the following lines to add more group name updates - // alix_group.update_group_name("Again Name".to_string()).await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - bo_group - .update_group_name("Old Name2".to_string()) + .update_group_name("Old Name3".to_string()) .await .unwrap(); + message_callbacks.wait_for_delivery(1).await; - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; assert_eq!(message_callbacks.message_count(), 3); stream_messages.end(); @@ -1515,7 +1526,6 @@ mod tests { // test is also showing intermittent failures with database locked msg #[tokio::test(flavor = "multi_thread", worker_threads = 5)] - #[ignore] async fn test_can_stream_and_update_name_without_forking_group() { let alix = new_test_client().await; let bo = new_test_client().await; @@ -1524,10 +1534,7 @@ mod tests { let message_callbacks = RustStreamCallback::default(); let stream_messages = bo .conversations() - .stream_all_messages(Box::new(message_callbacks.clone())) - .unwrap(); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + .stream_all_messages(Box::new(message_callbacks.clone())); let first_msg_check = 2; let second_msg_check = 5; @@ -1542,16 +1549,14 @@ mod tests { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - alix_group .update_group_name("hello".to_string()) .await .unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + message_callbacks.wait_for_delivery(2).await; + bo.conversations().sync().await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; let bo_groups = bo .conversations() @@ -1569,26 +1574,24 @@ mod tests { bo_group.send("hello2".as_bytes().to_vec()).await.unwrap(); bo_group.send("hello3".as_bytes().to_vec()).await.unwrap(); + message_callbacks.wait_for_delivery(2).await; + alix_group.sync().await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; let alix_messages = alix_group .find_messages(FfiListMessagesOptions::default()) .unwrap(); assert_eq!(alix_messages.len(), second_msg_check); alix_group.send("hello4".as_bytes().to_vec()).await.unwrap(); + message_callbacks.wait_for_delivery(1).await; bo_group.sync().await.unwrap(); let bo_messages2 = bo_group .find_messages(FfiListMessagesOptions::default()) .unwrap(); assert_eq!(bo_messages2.len(), second_msg_check); - - // TODO: message_callbacks should eventually come through here, why does this - // not work? - // tokio::time::sleep(tokio::time::Duration::from_millis(10000)).await; - // assert_eq!(message_callbacks.message_count(), second_msg_check as u32); + assert_eq!(message_callbacks.message_count(), second_msg_check as u32); stream_messages.end(); tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; @@ -1596,14 +1599,11 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 5)] - // This one is flaky for me. Passes reliably locally and fails on CI - #[ignore] async fn test_conversation_streaming() { let amal = new_test_client().await; let bola = new_test_client().await; let stream_callback = RustStreamCallback::default(); - let local_data = stream_callback.clone(); let stream = bola .conversations() @@ -1617,8 +1617,8 @@ mod tests { ) .await .unwrap(); - - local_data.notify.notified().await; + + stream_callback.wait_for_delivery(1).await; assert_eq!(stream_callback.message_count(), 1); // Create another group and add bola @@ -1629,8 +1629,7 @@ mod tests { ) .await .unwrap(); - - local_data.notify.notified().await; + stream_callback.wait_for_delivery(1).await; assert_eq!(stream_callback.message_count(), 2); @@ -1641,8 +1640,6 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_stream_all_messages() { - let _ = tracing_subscriber::fmt::try_init(); - let alix = new_test_client().await; let bo = new_test_client().await; let caro = new_test_client().await; @@ -1660,12 +1657,11 @@ mod tests { let stream = caro .conversations() - .stream_all_messages(Box::new(stream_callback.clone())) - .unwrap(); + .stream_all_messages(Box::new(stream_callback.clone())); alix_group.send("first".as_bytes().to_vec()).await.unwrap(); - - stream_callback.notify.notified().await; + + stream_callback.wait_for_delivery(1).await; let bo_group = bo .conversations() @@ -1678,13 +1674,9 @@ mod tests { let _ = caro.inner_client.sync_welcomes().await.unwrap(); bo_group.send("second".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; - alix_group.send("third".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; - bo_group.send("fourth".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; + stream_callback.wait_for_delivery(3).await; assert_eq!(stream_callback.message_count(), 4); stream.end(); @@ -1712,9 +1704,9 @@ mod tests { .await; group.send("hello".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; group.send("goodbye".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; + + stream_callback.wait_for_delivery(2).await; assert_eq!(stream_callback.message_count(), 2); stream_closer.end(); @@ -1742,13 +1734,11 @@ mod tests { let stream_callback = RustStreamCallback::default(); let stream_closer = bola .conversations() - .stream_all_messages(Box::new(stream_callback.clone())) - .unwrap(); + .stream_all_messages(Box::new(stream_callback.clone())); - amal_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; - amal_group.send("hello2".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; + amal_group.send(b"hello1".to_vec()).await.unwrap(); + amal_group.send(b"hello2".to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(2).await; assert_eq!(stream_callback.message_count(), 2); assert!(!stream_closer.is_closed()); @@ -1758,24 +1748,25 @@ mod tests { .await .unwrap(); - stream_callback.notify.notified().await; + stream_callback.wait_for_delivery(1).await; assert_eq!(stream_callback.message_count(), 3); // Member removal transcript message - - amal_group.send("hello3".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; + amal_group.send(b"hello3".to_vec()).await.unwrap(); + //TODO: could verify with a log message + tokio::time::sleep(std::time::Duration::from_millis(200)).await; assert_eq!(stream_callback.message_count(), 3); // Don't receive messages while removed assert!(!stream_closer.is_closed()); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; amal_group .add_members(vec![bola.account_address.clone()]) .await .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // TODO: could check for LOG message with a Eviction error on receive + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; assert_eq!(stream_callback.message_count(), 3); // Don't receive transcript messages while removed amal_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - stream_callback.notify.notified().await; + stream_callback.wait_for_delivery(1).await; assert_eq!(stream_callback.message_count(), 4); // Receiving messages again assert!(!stream_closer.is_closed()); @@ -1835,21 +1826,16 @@ mod tests { let bo = new_test_client().await; // Stream all group messages - let message_callbacks = RustStreamCallback::default(); - let group_callbacks = RustStreamCallback::default(); + let message_callback = RustStreamCallback::default(); + let group_callback = RustStreamCallback::default(); let stream_groups = bo .conversations() - .stream(Box::new(group_callbacks.clone())) + .stream(Box::new(group_callback.clone())) .await; - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - let stream_messages = bo .conversations() - .stream_all_messages(Box::new(message_callbacks.clone())) - .unwrap(); - - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + .stream_all_messages(Box::new(message_callback.clone())); // Create group and send first message let alix_group = alix @@ -1860,21 +1846,20 @@ mod tests { ) .await .unwrap(); - - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - + group_callback.wait_for_delivery(1).await; alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + message_callback.wait_for_delivery(1).await; - assert_eq!(group_callbacks.message_count(), 1); - assert_eq!(message_callbacks.message_count(), 1); + assert_eq!(group_callback.message_count(), 1); + assert_eq!(message_callback.message_count(), 1); stream_messages.end(); tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; assert!(stream_messages.is_closed()); stream_groups.end(); - tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; + tokio::time::sleep( + tokio::time::Duration::from_millis(5)).await; assert!(stream_groups.is_closed()); } } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 032c77291..ec9cbafe4 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -257,6 +257,8 @@ where cursor: 1, // For the new group, stream all messages since the group was created }, ); + // TODO:insipx Can remove the indiretion in `relay_messages` and just use + // `stream_messages` directly? handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone()); }, maybe_finished = &mut handle => {