From a56a5cd08c3b7b3869b4d5720af04aae2b66371a Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 26 Jun 2024 17:43:54 -0400 Subject: [PATCH] fix, not needed to wait on new groups --- bindings_ffi/src/mls.rs | 12 ++---------- xmtp_mls/src/subscriptions.rs | 28 +++++++--------------------- 2 files changed, 9 insertions(+), 31 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 1d4936e49..143e6ee62 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1665,7 +1665,6 @@ mod tests { alix_group.send("first".as_bytes().to_vec()).await.unwrap(); - println!("Waiting on first"); stream_callback.notify.notified().await; let bo_group = bo @@ -1676,22 +1675,15 @@ mod tests { ) .await .unwrap(); - - println!("Waiting on second"); - stream_callback.notify.notified().await; - + let _ = caro.inner_client.sync_welcomes().await.unwrap(); + bo_group.send("second".as_bytes().to_vec()).await.unwrap(); - - println!("Waiting on third"); stream_callback.notify.notified().await; alix_group.send("third".as_bytes().to_vec()).await.unwrap(); - - println!("Waiting on fourth"); stream_callback.notify.notified().await; bo_group.send("fourth".as_bytes().to_vec()).await.unwrap(); - println!("Waiting on fifth"); stream_callback.notify.notified().await; assert_eq!(stream_callback.message_count(), 4); diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index aa93f9a3f..032c77291 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -112,7 +112,7 @@ where match res.await { Ok(group) => Some(group), Err(err) => { - log::error!("Error processing stream entry: {:?}", err); + log::error!("Error processing stream entry for conversation: {:?}", err); None } } @@ -214,7 +214,6 @@ where let (tx, rx) = mpsc::unbounded_channel(); client.sync_welcomes().await?; - log::debug!("Synced Welcomes!!!"); let current_groups = client.store().conn()?.find_groups(None, None, None, None)?; @@ -231,16 +230,13 @@ where ) }) .collect(); - log::info!("Groups len: {:?}", group_id_to_info.len()); tokio::spawn(async move { + let client = client.clone(); let mut handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone()); - - let client_pointer = client.clone(); - let mut convo_stream = Self::stream_conversations(&client_pointer).await?; + let mut convo_stream = Self::stream_conversations(&client).await?; loop { - log::debug!("Selecting ...."); // TODO:insipx We should more closely investigate whether // the stream mapping in `stream_conversations` is cancellation safe // otherwise it could lead to hard-to-find bugs @@ -249,7 +245,7 @@ where if group_id_to_info.contains_key(&new_group.group_id) { continue; } - + //TODO: Should we await the handle to ensure it finishes? handle.abort(); for info in group_id_to_info.values_mut() { info.cursor = 0; @@ -258,19 +254,17 @@ where new_group.group_id, MessagesStreamInfo { convo_created_at_ns: new_group.created_at_ns, - cursor: 1, + cursor: 1, // For the new group, stream all messages since the group was created }, ); handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone()); }, maybe_finished = &mut handle => { match maybe_finished { - // if all is well it means the stream closed (our receiver is dropped - // or ended), our work is done + // if all is well it means the stream closed (receiver is dropped or ended) Ok(_) => break, Err(e) => { - // if we have an error, it probably means we need to try and - // restart the stream. + // if we have an error, try to restart the stream. log::error!("{}", e.to_string()); handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone()); } @@ -293,7 +287,6 @@ where let (tx, rx) = tokio::sync::oneshot::channel(); let handle = tokio::spawn(async move { - log::debug!("Spawning one"); let mut stream = Self::stream_all_messages(client).await?; let _ = tx.send(()); while let Some(message) = stream.next().await { @@ -316,15 +309,9 @@ where let mut stream = client.stream_messages(group_id_to_info).await?; while let Some(message) = stream.next().await { // an error can only mean the receiver has been dropped or closed - log::debug!( - "SENDING MESSAGE {}", - String::from_utf8_lossy(&message.decrypted_message_bytes) - ); if tx.send(message).is_err() { - log::debug!("CLOSING STREAM"); break; } - log::debug!("Sent Message!"); } Ok::<_, ClientError>(()) }) @@ -388,7 +375,6 @@ mod tests { let messages_clone = messages.clone(); Client::::stream_all_messages_with_callback(Arc::new(caro), move |message| { - log::debug!("YOOO MESSAGES"); (*messages_clone.lock().unwrap()).push(message); });