From 340b0b68c9cd3b720e4b443d9f31b1fbc1e3cbf8 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Wed, 7 Aug 2024 10:24:24 -0700 Subject: [PATCH] Abort if retry fails (#945) * Abort if retry fails * Add error message * Add test --- xmtp_mls/src/groups/mod.rs | 53 ++++++++++++++++++++- xmtp_mls/src/groups/sync.rs | 13 ++++- xmtp_mls/src/storage/encrypted_store/mod.rs | 2 +- 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 1fbcd10c7..388f2c045 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -1199,6 +1199,7 @@ fn build_group_join_config() -> MlsGroupJoinConfig { #[cfg(test)] mod tests { + use diesel::connection::SimpleConnection; use openmls::prelude::{tls_codec::Serialize, Member, MlsGroup as OpenMlsGroup}; use prost::Message; use std::sync::Arc; @@ -1229,7 +1230,7 @@ mod tests { use super::{ intents::{Installation, SendWelcomesAction}, - MlsGroup, + GroupError, MlsGroup, }; async fn receive_group_invite(client: &Client) -> MlsGroup @@ -2754,4 +2755,54 @@ mod tests { ] ); } + + #[tokio::test(flavor = "multi_thread")] + async fn process_messages_abort_on_retryable_error() { + let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await; + let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await; + + let alix_group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + + alix_group + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + + // Create two commits + alix_group + .update_group_name(&alix, "foo".to_string()) + .await + .unwrap(); + alix_group + .update_group_name(&alix, "bar".to_string()) + .await + .unwrap(); + + let bo_group = receive_group_invite(&bo).await; + // Get the group messages before we lock the DB, simulating an error that happens + // in the middle of a sync instead of the beginning + let bo_messages = bo + .query_group_messages(&bo_group.group_id, &bo.store().conn().unwrap()) + .await + .unwrap(); + + let conn_1 = bo.store().conn().unwrap(); + let mut conn_2 = bo.store().raw_conn().unwrap(); + + // Begin an exclusive transaction on a second connection to lock the database + conn_2.batch_execute("BEGIN EXCLUSIVE").unwrap(); + let process_result = bo_group.process_messages(bo_messages, conn_1, &bo).await; + if let Some(GroupError::ReceiveErrors(errors)) = process_result.err() { + assert_eq!(errors.len(), 1); + assert!(errors + .first() + .unwrap() + .to_string() + .contains("database is locked")); + } else { + panic!("Expected error") + } + } } diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index 42fa69604..3e4686fac 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -28,7 +28,7 @@ use crate::{ hpke::{encrypt_welcome, HpkeError}, identity::parse_credential, identity_updates::load_identity_updates, - retry::Retry, + retry::{Retry, RetryableError}, retry_async, storage::{ db_connection::DbConnection, @@ -692,7 +692,18 @@ impl MlsGroup { }) ); if let Err(e) = result { + let is_retryable = e.is_retryable(); + let error_message = e.to_string(); receive_errors.push(e); + // If the error is retryable we cannot move on to the next message + // otherwise you can get into a forked group state. + if is_retryable { + log::error!( + "Aborting message processing for retryable error: {}", + error_message + ); + break; + } } } diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 3242c5b32..6c5251a5f 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -175,7 +175,7 @@ impl EncryptedMessageStore { Ok(()) } - fn raw_conn( + pub(crate) fn raw_conn( &self, ) -> Result>, StorageError> { let pool_guard = self.pool.read();