From ffa056442a3c0a634a21fc2a47e5d5fa3bee22bc Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 25 Jul 2024 16:11:07 -0600 Subject: [PATCH] Ensure message send succeeds even when out of sync (#917) We have configured the `max_past_epochs` value to 3, which means that we keep around message encryption keys for 3 epochs before deleting them. This means that if we are 3 commits behind when we send a message, nobody else will be able to decrypt it, because they process everything sequentially, and they'll have already deleted their encryption keys by the time they see it. The fix is as follows: - When pulling down messages from a group, if we see a message we previously published, we check that the message is no more than 3 epochs behind. If the check passes, the message send intent is updated to COMMITTED, otherwise it's reset to TO_PUBLISH so that the message can be sent again. - After sending a message, we should go ahead and pull down the messages afterwards, to make sure the message send succeeded (and retry via intents otherwise). This has the following implications: 1. It's not required to sync the group before sending a message 2. Confirming that a message sent successfully (i.e. waiting for `send_message()` to complete) is slower - there is an extra round trip to pull down the messages afterwards (+more if the message needs to be retried) My justification for the slower message send is that we've already set up optimistic message sends, with separate prepare and publish steps. In the event that multiple optimistic message sends happen back-to-back, you can call a single publish at the end. Perhaps we can recommend using optimistic message sends, with debounced publishes, in the docs somewhere. \- Rich --- bindings_ffi/src/mls.rs | 132 ++++++++++++++++++++++++++++++++++++ xmtp_mls/src/groups/mod.rs | 7 +- xmtp_mls/src/groups/sync.rs | 76 +++++++++++++++++++-- 3 files changed, 208 insertions(+), 7 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index bde83c03e..eb1df7f4c 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -2156,6 +2156,138 @@ mod tests { assert!(stream_messages.is_closed()); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_send_message_when_out_of_sync() { + let alix = new_test_client().await; + let bo = new_test_client().await; + let caro = new_test_client().await; + let davon = new_test_client().await; + let eri = new_test_client().await; + let frankie = new_test_client().await; + + let alix_group = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + bo.conversations().sync().await.unwrap(); + let bo_group = bo.group(alix_group.id()).unwrap(); + + bo_group.send("bo1".as_bytes().to_vec()).await.unwrap(); + alix_group.send("alix1".as_bytes().to_vec()).await.unwrap(); + + // Move the group forward by 3 epochs (as Alix's max_past_epochs is + // configured to 3) without Bo syncing + alix_group + .add_members(vec![ + caro.account_address.clone(), + davon.account_address.clone(), + ]) + .await + .unwrap(); + alix_group + .remove_members(vec![ + caro.account_address.clone(), + davon.account_address.clone(), + ]) + .await + .unwrap(); + alix_group + .add_members(vec![ + eri.account_address.clone(), + frankie.account_address.clone(), + ]) + .await + .unwrap(); + + // Bo sends messages to Alix while 3 epochs behind + bo_group.send("bo3".as_bytes().to_vec()).await.unwrap(); + alix_group.send("alix3".as_bytes().to_vec()).await.unwrap(); + bo_group.send("bo4".as_bytes().to_vec()).await.unwrap(); + bo_group.send("bo5".as_bytes().to_vec()).await.unwrap(); + + alix_group.sync().await.unwrap(); + let alix_messages = alix_group + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + + bo_group.sync().await.unwrap(); + let bo_messages = bo_group + .find_messages(FfiListMessagesOptions::default()) + .unwrap(); + assert_eq!(bo_messages.len(), 9); + assert_eq!(alix_messages.len(), 10); + + assert_eq!( + bo_messages[bo_messages.len() - 1].id, + alix_messages[alix_messages.len() - 1].id + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_add_members_when_out_of_sync() { + let alix = new_test_client().await; + let bo = new_test_client().await; + let caro = new_test_client().await; + let davon = new_test_client().await; + let eri = new_test_client().await; + let frankie = new_test_client().await; + + let alix_group = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + bo.conversations().sync().await.unwrap(); + let bo_group = bo.group(alix_group.id()).unwrap(); + + bo_group.send("bo1".as_bytes().to_vec()).await.unwrap(); + alix_group.send("alix1".as_bytes().to_vec()).await.unwrap(); + + // Move the group forward by 3 epochs (as Alix's max_past_epochs is + // configured to 3) without Bo syncing + alix_group + .add_members(vec![ + caro.account_address.clone(), + davon.account_address.clone(), + ]) + .await + .unwrap(); + alix_group + .remove_members(vec![ + caro.account_address.clone(), + davon.account_address.clone(), + ]) + .await + .unwrap(); + alix_group + .add_members(vec![eri.account_address.clone()]) + .await + .unwrap(); + + // Bo adds a member while 3 epochs behind + bo_group + .add_members(vec![frankie.account_address.clone()]) + .await + .unwrap(); + + bo_group.sync().await.unwrap(); + let bo_members = bo_group.list_members().unwrap(); + assert_eq!(bo_members.len(), 4); + + alix_group.sync().await.unwrap(); + let alix_members = alix_group.list_members().unwrap(); + assert_eq!(alix_members.len(), 4); + } + // test is also showing intermittent failures with database locked msg #[ignore] #[tokio::test(flavor = "multi_thread", worker_threads = 5)] diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index fab5e49df..c8e318204 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -440,10 +440,12 @@ impl MlsGroup { let message_id = self.prepare_message(message, &conn); // Skipping a full sync here and instead just firing and forgetting - if let Err(err) = self.publish_intents(conn, client).await { + if let Err(err) = self.publish_intents(conn.clone(), client).await { log::error!("Send: error publishing intents: {:?}", err); } + self.sync_until_last_intent_resolved(conn, client).await?; + message_id } @@ -459,7 +461,8 @@ impl MlsGroup { let update_interval = Some(5_000_000); self.maybe_update_installations(conn.clone(), update_interval, client) .await?; - self.publish_intents(conn, client).await?; + self.publish_intents(conn.clone(), client).await?; + self.sync_until_last_intent_resolved(conn, client).await?; Ok(()) } diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index ef059cd66..6801d4b25 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -46,6 +46,7 @@ use openmls::{ credentials::BasicCredential, extensions::Extensions, framing::{MlsMessageOut, ProtocolMessage}, + group::GroupEpoch, prelude::{ tls_codec::{Deserialize, Serialize}, LeafNodeIndex, MlsGroup as OpenMlsGroup, MlsMessageBodyIn, MlsMessageIn, PrivateMessageIn, @@ -133,6 +134,28 @@ impl MlsGroup { Ok(()) } + pub(super) async fn sync_until_last_intent_resolved( + &self, + conn: DbConnection, + client: &Client, + ) -> Result<(), GroupError> + where + ApiClient: XmtpApi, + { + let intents = conn.find_group_intents( + self.group_id.clone(), + Some(vec![IntentState::ToPublish, IntentState::Published]), + None, + )?; + + if intents.is_empty() { + return Ok(()); + } + + self.sync_until_intent_resolved(conn, intents[intents.len() - 1].id, client) + .await + } + /** * Sync the group and wait for the intent to be deleted * Group syncing may involve picking up messages unrelated to the intent, so simply checking for errors @@ -188,6 +211,37 @@ impl MlsGroup { Err(last_err.unwrap_or(GroupError::Generic("failed to wait for intent".to_string()))) } + fn is_valid_epoch( + inbox_id: InboxId, + intent_id: i32, + group_epoch: GroupEpoch, + message_epoch: GroupEpoch, + max_past_epochs: usize, + ) -> bool { + if message_epoch.as_u64() + max_past_epochs as u64 <= group_epoch.as_u64() { + log::warn!( + "[{}] own message epoch {} is {} or more less than group epoch {} for intent {}. Retrying message", + inbox_id, + message_epoch, + max_past_epochs, + group_epoch, + intent_id + ); + return false; + } else if message_epoch.as_u64() > group_epoch.as_u64() { + // Should not happen, logging proactively + log::error!( + "[{}] own message epoch {} is greater than group epoch {} for intent {}. Retrying message", + inbox_id, + message_epoch, + group_epoch, + intent_id + ); + return false; + } + true + } + #[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "trace", skip_all)] async fn process_own_message( @@ -203,11 +257,15 @@ impl MlsGroup { if intent.state == IntentState::Committed { return Ok(()); } + let message_epoch = message.epoch(); + let group_epoch = openmls_group.epoch(); debug!( - "[{}] processing own message for intent {} / {:?}", + "[{}] processing own message for intent {} / {:?}, group epoch: {}, message_epoch: {}", self.context.inbox_id(), intent.id, - intent.kind + intent.kind, + group_epoch, + message_epoch ); let conn = provider.conn(); @@ -223,8 +281,6 @@ impl MlsGroup { let maybe_pending_commit = openmls_group.pending_commit(); // We don't get errors with merge_pending_commit when there are no commits to merge if maybe_pending_commit.is_none() { - let message_epoch = message.epoch(); - let group_epoch = openmls_group.epoch(); debug!( "no pending commit to merge. Group epoch: {}. Message epoch: {}", group_epoch, message_epoch @@ -281,6 +337,16 @@ impl MlsGroup { } } IntentKind::SendMessage => { + if !Self::is_valid_epoch( + self.context.inbox_id(), + intent.id, + group_epoch, + message_epoch, + 3, // max_past_epochs, TODO: expose from OpenMLS MlsGroup + ) { + conn.set_group_intent_to_publish(intent.id)?; + return Ok(()); + } if let Some(id) = intent.message_id()? { conn.set_delivery_status_to_published(&id, envelope_timestamp_ns)?; } @@ -532,7 +598,7 @@ impl MlsGroup { // Intent with the payload hash matches Ok(Some(intent)) => { log::info!( - "client [{}] is about to process own envelope [{}]", + "client [{}] is about to process own envelope [{}]", client.inbox_id(), envelope.id );