diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index d13cc477d..f1dccdbb9 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -2303,6 +2303,115 @@ mod tests { assert_eq!(client2_members.len(), 2); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_create_new_installations_does_not_fork_group() { + let bo_wallet_key = &mut rng(); + let bo_wallet = xmtp_cryptography::utils::LocalWallet::new(bo_wallet_key); + + // Create clients + let alix = new_test_client().await; + let bo = new_test_client_with_wallet(bo_wallet.clone()).await; + let caro = new_test_client().await; + + // Alix begins a stream for all messages + let message_callbacks = RustStreamCallback::default(); + let stream_messages = alix + .conversations() + .stream_all_messages(Box::new(message_callbacks.clone())) + .await; + stream_messages.wait_for_ready().await; + + // Alix creates a group with Bo and Caro + let group = alix + .conversations() + .create_group( + vec![bo.account_address.clone(), caro.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Alix and Caro Sync groups + alix.conversations().sync().await.unwrap(); + bo.conversations().sync().await.unwrap(); + caro.conversations().sync().await.unwrap(); + + // Alix and Caro find the group + let alix_group = alix.group(group.id()).unwrap(); + let bo_group = bo.group(group.id()).unwrap(); + let caro_group = caro.group(group.id()).unwrap(); + + // Alix sends a message in the group + alix_group + .send("First message".as_bytes().to_vec()) + .await + .unwrap(); + + // Caro sends a message in the group + caro_group + .send("Second message".as_bytes().to_vec()) + .await + .unwrap(); + + // Bo logs back in with a new installation + let bo2 = new_test_client_with_wallet(bo_wallet).await; + + // Bo begins a stream for all messages + let bo_message_callbacks = RustStreamCallback::default(); + let bo_stream_messages = bo2 + .conversations() + .stream_all_messages(Box::new(bo_message_callbacks.clone())) + .await; + bo_stream_messages.wait_for_ready().await; + + // Alix sends a message to the group + alix_group + .send("Third message".as_bytes().to_vec()) + .await + .unwrap(); + + // New installation of bo finds the group + bo2.conversations().sync().await.unwrap(); + let bo2_group = bo2.group(group.id()).unwrap(); + + // Bo sends a message to the group + bo2_group + .send("Fourth message".as_bytes().to_vec()) + .await + .unwrap(); + + // Caro sends a message in the group + caro_group + .send("Fifth message".as_bytes().to_vec()) + .await + .unwrap(); + + alix_group.sync().await.unwrap(); + bo_group.sync().await.unwrap(); + bo2_group.sync().await.unwrap(); + caro_group.sync().await.unwrap(); + + // Get the message count for all the clients + let caro_messages = caro_group + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + let alix_messages = alix_group + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + let bo_messages = bo_group + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + let bo2_messages = bo2_group + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + + assert_eq!(caro_messages.len(), 5); + assert_eq!(alix_messages.len(), 6); + assert_eq!(bo_messages.len(), 5); + // Bo 2 only sees three messages since it joined after the first 2 were sent + assert_eq!(bo2_messages.len(), 3); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_send_messages_when_epochs_behind() { let alix = new_test_client().await; diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 388f2c045..34d370418 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -1455,7 +1455,7 @@ mod tests { bola_group .add_members_by_inbox_id(&bola, vec![charlie.inbox_id()]) .await - .expect_err("expected error"); + .expect("bola's add should succeed in a no-op"); amal_group .receive(&amal.store().conn().unwrap(), &amal) @@ -1494,8 +1494,8 @@ mod tests { None, ) .unwrap(); - // Bola should have one uncommitted intent in `Error::Failed` state for the failed attempt at adding Charlie, who is already in the group - assert_eq!(bola_failed_intents.len(), 1); + // Bola's attempted add should be deleted, since it will have been a no-op on the second try + assert_eq!(bola_failed_intents.len(), 0); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index 14d376b27..a9e5c5839 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -191,7 +191,10 @@ impl MlsGroup { state: IntentState::Error, .. })) => { - log::warn!("not retrying intent ID {id}. since it is in state Error",); + log::warn!( + "not retrying intent ID {id}. since it is in state Error. {:?}", + last_err + ); return Err(last_err.unwrap_or(GroupError::Generic( "Group intent could not be committed".to_string(), ))); @@ -803,50 +806,52 @@ impl MlsGroup { }) ); - if let Err(err) = result { - log::error!("error getting publish intent data {:?}", err); - if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS { - log::error!("intent {} has reached max publish attempts", intent.id); - // TODO: Eventually clean up errored attempts - provider - .conn() - .set_group_intent_error_and_fail_msg(&intent)?; - } else { - provider - .conn() - .increment_intent_publish_attempt_count(intent.id)?; - } - - return Err(err); - } + match result { + Err(err) => { + log::error!("error getting publish intent data {:?}", err); + if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS { + log::error!("intent {} has reached max publish attempts", intent.id); + // TODO: Eventually clean up errored attempts + provider + .conn() + .set_group_intent_error_and_fail_msg(&intent)?; + } else { + provider + .conn() + .increment_intent_publish_attempt_count(intent.id)?; + } - if let Some((payload, post_commit_data)) = result.expect("checked") { - let payload_slice = payload.as_slice(); + return Err(err); + } + Ok(Some((payload, post_commit_data))) => { + let payload_slice = payload.as_slice(); - client - .api_client - .send_group_messages(vec![payload_slice]) - .await?; - log::info!( - "[{}] published intent [{}] of type [{}]", - client.inbox_id(), - intent.id, - intent.kind - ); - provider.conn().set_group_intent_published( - intent.id, - sha256(payload_slice), - post_commit_data, - )?; - log::debug!( - "client [{}] set stored intent [{}] to state `published`", - client.inbox_id(), - intent.id - ); - } else { - provider - .conn() - .set_group_intent_error_and_fail_msg(&intent)?; + client + .api_client + .send_group_messages(vec![payload_slice]) + .await?; + log::info!( + "[{}] published intent [{}] of type [{}]", + client.inbox_id(), + intent.id, + intent.kind + ); + provider.conn().set_group_intent_published( + intent.id, + sha256(payload_slice), + post_commit_data, + )?; + log::debug!( + "client [{}] set stored intent [{}] to state `published`", + client.inbox_id(), + intent.id + ); + } + Ok(None) => { + log::info!("Skipping intent because no publish data returned"); + let deleter: &dyn Delete = &provider.conn(); + deleter.delete(intent.id)?; + } } } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 02b65f54c..5e0b1c4d3 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -237,7 +237,7 @@ where let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { - let mut stream = client.stream_conversations().await.unwrap(); + let mut stream = client.stream_conversations().await?; let _ = tx.send(()); while let Some(convo) = stream.next().await { convo_callback(convo)