diff --git a/.github/workflows/test-ffi-bindings.yml b/.github/workflows/test-ffi-bindings.yml index e6ebac803..2c9b3cca6 100644 --- a/.github/workflows/test-ffi-bindings.yml +++ b/.github/workflows/test-ffi-bindings.yml @@ -1,10 +1,8 @@ name: Test FFI Bindings - on: push: branches: - main - pull_request: # only run tests when related changes are made paths: @@ -21,7 +19,6 @@ on: - "Cargo.toml" - "Cargo.lock" - "rust-toolchain" - jobs: test: name: Test @@ -29,19 +26,15 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - - name: Update rust toolchains run: rustup update - - uses: Swatinem/rust-cache@v2 with: workspaces: | . bindings_ffi - - name: Start Docker containers run: dev/up - - name: Setup Kotlin run: | sudo apt update -q @@ -50,8 +43,7 @@ jobs: cd bindings_ffi make install-jar echo "$(make echo-jar | tail -n 1 | sed -e 's/\.*export //')" >> "$GITHUB_ENV" - - name: Run cargo test on FFI bindings run: | export CLASSPATH="${{ env.CLASSPATH }}" - cargo test --manifest-path bindings_ffi/Cargo.toml -- --test-threads=2 + RUST_LOG=warn,xmtpv3::mls=info cargo test --manifest-path bindings_ffi/Cargo.toml -- --test-threads=1 diff --git a/bindings_ffi/Cargo.lock b/bindings_ffi/Cargo.lock index 289d39db1..391d1f797 100644 --- a/bindings_ffi/Cargo.lock +++ b/bindings_ffi/Cargo.lock @@ -2582,6 +2582,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -3723,10 +3732,19 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", + "regex-automata 0.4.4", "regex-syntax 0.8.2", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + [[package]] name = "regex-automata" version = "0.4.4" @@ -3738,6 +3756,12 @@ dependencies = [ "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.5" @@ -5116,10 +5140,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/bindings_ffi/Cargo.toml b/bindings_ffi/Cargo.toml index 99488e085..562874b0c 100644 --- a/bindings_ffi/Cargo.toml +++ b/bindings_ffi/Cargo.toml @@ -23,6 +23,7 @@ xmtp_proto = { path = "../xmtp_proto", features = ["proto_full", "grpc"] } xmtp_user_preferences = { path = "../xmtp_user_preferences" } xmtp_v2 = { path = "../xmtp_v2" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } # NOTE: A regression in openssl-sys exists where libatomic is dynamically linked # for i686-linux-android targets. https://github.com/sfackler/rust-openssl/issues/2163 # diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 9a8a5387e..e25077b8b 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1,5 +1,5 @@ pub use crate::inbox_owner::SigningError; -use crate::logger::init_logger; +// use crate::logger::init_logger; use crate::logger::FfiLogger; use crate::GenericError; use std::collections::HashMap; @@ -81,7 +81,12 @@ pub async fn create_client( legacy_signed_private_key_proto: Option>, history_sync_url: Option, ) -> Result, GenericError> { - init_logger(logger); + use tracing_subscriber::prelude::*; + //init_logger(logger); + let _ = tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); log::info!( "Creating API client for host: {}, isSecure: {}", host, @@ -561,7 +566,7 @@ impl FfiConversations { Ok(convo_list) } - pub async fn stream(&self, callback: Box) -> FfiStreamCloser { + pub fn stream(&self, callback: Box) -> FfiStreamCloser { let client = self.inner_client.clone(); let handle = RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| { @@ -1009,13 +1014,19 @@ impl FfiGroup { Ok(()) } - pub async fn stream(&self, message_callback: Box) -> FfiStreamCloser { + pub fn stream(&self, message_callback: Box) -> FfiStreamCloser { let inner_client = Arc::clone(&self.inner_client); let handle = MlsGroup::stream_with_callback( inner_client, self.group_id.clone(), self.created_at_ns, - move |message| message_callback.on_message(message.into()), + move |message| { + log::info!( + "Got IN STREAM {}", + String::from_utf8_lossy(&message.decrypted_message_bytes) + ); + message_callback.on_message(message.into()) + }, ); FfiStreamCloser::new(handle) @@ -1176,19 +1187,20 @@ impl FfiStreamCloser { let stream_handle = stream_handle.take(); if let Some(h) = stream_handle { h.handle.abort(); - let join_result = h.handle.await; - if matches!(join_result, Err(ref e) if !e.is_cancelled()) { - return Err(GenericError::Generic { - err: format!( - "subscription event loop join error {}", - join_result.unwrap_err() - ), - }); + match h.handle.await { + Err(e) if !e.is_cancelled() => Err(GenericError::Generic { + err: format!("subscription event loop join error {}", e), + }), + Err(e) if e.is_cancelled() => Ok(()), + Ok(t) => t.map_err(|e| GenericError::Generic { err: e.to_string() }), + Err(e) => Err(GenericError::Generic { + err: format!("error joining task {}", e), + }), } } else { log::warn!("subscription already closed"); + Ok(()) } - Ok(()) } pub fn is_closed(&self) -> bool { @@ -1286,6 +1298,7 @@ mod tests { self, distributions::{Alphanumeric, DistString}, }; + use tokio::{sync::Notify, time::error::Elapsed}; use xmtp_cryptography::{signature::RecoverableSignature, utils::rng}; use xmtp_id::associations::generate_inbox_id; use xmtp_mls::{storage::EncryptionKey, InboxOwner}; @@ -1330,7 +1343,7 @@ mod tests { num_messages: Arc, messages: Arc>>, conversations: Arc>>>, - notify: Arc, + notify: Arc, } impl RustStreamCallback { @@ -1338,8 +1351,12 @@ mod tests { self.num_messages.load(Ordering::SeqCst) } - pub async fn wait_for_delivery(&self) { - self.notify.notified().await + pub async fn wait_for_delivery(&self) -> Result<(), Elapsed> { + log::info!("WAITING FOR DELIVERY"); + tokio::time::timeout(std::time::Duration::from_secs(60), async { + self.notify.notified().await + }) + .await } } @@ -1347,7 +1364,10 @@ mod tests { fn on_message(&self, message: FfiMessage) { log::debug!("On message called"); let mut messages = self.messages.lock().unwrap(); - log::info!("Received: {}", String::from_utf8_lossy(&message.content)); + log::info!( + "ON MESSAGE Received\n-------- \n{}\n----------", + String::from_utf8_lossy(&message.content) + ); messages.push(message); let _ = self.num_messages.fetch_add(1, Ordering::SeqCst); self.notify.notify_one(); @@ -1726,7 +1746,7 @@ mod tests { .update_group_name("Old Name".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); let bo_groups = bo .conversations() @@ -1739,14 +1759,14 @@ mod tests { .update_group_name("Old Name2".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); // Uncomment the following lines to add more group name updates bo_group .update_group_name("Old Name3".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); assert_eq!(message_callbacks.message_count(), 3); @@ -1785,11 +1805,11 @@ mod tests { .update_group_name("hello".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); - bo.conversations().sync().await.unwrap(); + // bo.conversations().sync().await.unwrap(); let bo_groups = bo .conversations() @@ -1806,9 +1826,9 @@ mod tests { assert_eq!(bo_messages1.len(), first_msg_check); bo_group.send("hello2".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); bo_group.send("hello3".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); alix_group.sync().await.unwrap(); @@ -1818,7 +1838,7 @@ mod tests { assert_eq!(alix_messages.len(), second_msg_check); alix_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await; + message_callbacks.wait_for_delivery().await.unwrap(); bo_group.sync().await.unwrap(); let bo_messages2 = bo_group @@ -1840,8 +1860,7 @@ mod tests { let stream = bola .conversations() - .stream(Box::new(stream_callback.clone())) - .await; + .stream(Box::new(stream_callback.clone())); amal.conversations() .create_group( @@ -1851,7 +1870,7 @@ mod tests { .await .unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 1); // Create another group and add bola @@ -1862,7 +1881,7 @@ mod tests { ) .await .unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 2); @@ -1893,7 +1912,7 @@ mod tests { stream.wait_for_ready().await; alix_group.send("first".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); let bo_group = bo .conversations() @@ -1906,11 +1925,11 @@ mod tests { let _ = caro.inner_client.sync_welcomes().await.unwrap(); bo_group.send("second".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); alix_group.send("third".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); bo_group.send("fourth".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 4); stream.end_and_wait().await.unwrap(); @@ -1922,7 +1941,7 @@ mod tests { let amal = new_test_client().await; let bola = new_test_client().await; - let group = amal + let group: Arc = amal .conversations() .create_group( vec![bola.account_address.clone()], @@ -1930,18 +1949,23 @@ mod tests { ) .await .unwrap(); + bola.inner_client.sync_welcomes().await.unwrap(); let stream_callback = RustStreamCallback::default(); - let stream_closer = group.stream(Box::new(stream_callback.clone())).await; + let stream_closer = group.stream(Box::new(stream_callback.clone())); + log::info!("WAITING FOR READY"); stream_closer.wait_for_ready().await; group.send("hello".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + log::info!("SEND HELLO"); + stream_callback.wait_for_delivery().await.unwrap(); + group.send("goodbye".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + log::info!("SEND GOODBYE"); + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 2); - + log::info!("STREAM CLOSER"); stream_closer.end_and_wait().await.unwrap(); } @@ -1971,9 +1995,9 @@ mod tests { stream_closer.wait_for_ready().await; amal_group.send(b"hello1".to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); amal_group.send(b"hello2".to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 2); assert!(!stream_closer.is_closed()); @@ -1982,7 +2006,7 @@ mod tests { .remove_members_by_inbox_id(vec![bola.inbox_id().clone()]) .await .unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 3); // Member removal transcript message // amal_group.send(b"hello3".to_vec()).await.unwrap(); @@ -2001,7 +2025,7 @@ mod tests { assert_eq!(stream_callback.message_count(), 3); // Don't receive transcript messages while removed amal_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await; + stream_callback.wait_for_delivery().await.unwrap(); assert_eq!(stream_callback.message_count(), 4); // Receiving messages again assert!(!stream_closer.is_closed()); @@ -2062,10 +2086,7 @@ mod tests { // Stream all group messages let message_callback = RustStreamCallback::default(); let group_callback = RustStreamCallback::default(); - let stream_groups = bo - .conversations() - .stream(Box::new(group_callback.clone())) - .await; + let stream_groups = bo.conversations().stream(Box::new(group_callback.clone())); let stream_messages = bo .conversations() @@ -2081,10 +2102,10 @@ mod tests { ) .await .unwrap(); - group_callback.wait_for_delivery().await; + group_callback.wait_for_delivery().await.unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - message_callback.wait_for_delivery().await; + message_callback.wait_for_delivery().await.unwrap(); assert_eq!(group_callback.message_count(), 1); assert_eq!(message_callback.message_count(), 1); diff --git a/bindings_node/src/streams.rs b/bindings_node/src/streams.rs index 5517b686e..c12301239 100644 --- a/bindings_node/src/streams.rs +++ b/bindings_node/src/streams.rs @@ -40,26 +40,29 @@ impl NapiStreamCloser { /// End the stream and `await` for it to shutdown /// Returns the `Result` of the task. #[napi] + /// End the stream and asyncronously wait for it to shutdown pub async fn end_and_wait(&self) -> Result<(), Error> { if self.abort_handle.is_finished() { return Ok(()); } - let mut handle = self.handle.lock().await; - let handle = handle.take(); - if let Some(h) = handle { + let mut stream_handle = self.handle.lock().await; + let stream_handle = stream_handle.take(); + if let Some(h) = stream_handle { h.handle.abort(); - let join_result = h.handle.await; - if matches!(join_result, Err(ref e) if !e.is_cancelled()) { - return Err(Error::from_reason(format!( + match h.handle.await { + Err(e) if !e.is_cancelled() => Err(Error::from_reason(format!( "subscription event loop join error {}", - join_result.unwrap_err() - ))); + e + ))), + Err(e) if e.is_cancelled() => Ok(()), + Ok(t) => t.map_err(|e| Error::from_reason(e.to_string())), + Err(e) => Err(Error::from_reason(format!("error joining task {}", e))), } } else { log::warn!("subscription already closed"); + Ok(()) } - Ok(()) } /// Checks if this stream is closed diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 80304ab34..a0e5acd03 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -283,7 +283,7 @@ where // group. biased; - messages = futures::future::ready(&mut extra_messages), if extra_messages.len() > 0 => { + messages = futures::future::ready(&mut extra_messages), if !extra_messages.is_empty() => { for message in messages.drain(0..) { if tx.send(message).is_err() { break; @@ -558,7 +558,7 @@ mod tests { let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); let messages_clone = messages.clone(); - let blocked = Arc::new(AtomicU64::new(105)); + let blocked = Arc::new(AtomicU64::new(55)); let blocked_pointer = blocked.clone(); let mut handle = @@ -571,12 +571,12 @@ mod tests { let alix_group_pointer = alix_group.clone(); let alix_pointer = alix.clone(); tokio::spawn(async move { - for _ in 0..100 { + for _ in 0..50 { alix_group_pointer .send_message(b"spam", &alix_pointer) .await .unwrap(); - tokio::time::sleep(std::time::Duration::from_micros(100)).await; + tokio::time::sleep(std::time::Duration::from_micros(200)).await; } }); @@ -594,7 +594,7 @@ mod tests { .unwrap(); } - let _ = tokio::time::timeout(std::time::Duration::from_secs(10), async { + let _ = tokio::time::timeout(std::time::Duration::from_secs(120), async { while blocked.load(Ordering::SeqCst) > 0 { tokio::task::yield_now().await; }