From c855664fc278a6a2997dcf5071033f4f9584f8b5 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Thu, 22 Aug 2024 14:22:25 -0700 Subject: [PATCH] Refactor process_own_message to not mutate intent state directly --- xmtp_mls/src/groups/sync.rs | 64 +++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index d27f8b81d..d44435c20 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -267,9 +267,9 @@ impl MlsGroup { message: ProtocolMessage, envelope_timestamp_ns: u64, allow_epoch_increment: bool, - ) -> Result<(), MessageProcessingError> { + ) -> Result<IntentState, MessageProcessingError> { if intent.state == IntentState::Committed { - return Ok(()); + return Ok(IntentState::Committed); } let message_epoch = message.epoch(); let group_epoch = openmls_group.epoch(); @@ -298,13 +298,12 @@ impl MlsGroup { let group_epoch_u64 = group_epoch.as_u64(); if published_in_epoch_u64 != group_epoch_u64 { - conn.set_group_intent_to_publish(intent.id)?; log::warn!( "Intent was published in epoch {} but group is currently in epoch {}", published_in_epoch_u64, group_epoch_u64 ); - return Ok(()); + return Ok(IntentState::ToPublish); } } @@ -330,10 +329,9 @@ impl MlsGroup { .await; if maybe_validated_commit.is_err() { - conn.set_group_intent_error(intent.id)?; // Return before merging commit since it does not pass validation // Return OK so that the group intent update is still written to the DB - return Ok(()); + return Ok(IntentState::Error); } let validated_commit = maybe_validated_commit.expect("Checked for error"); @@ -345,7 +343,7 @@ impl MlsGroup { ); if let Err(err) = openmls_group.merge_staged_commit(&provider, pending_commit) { log::error!("error merging commit: {}", err); - conn.set_group_intent_to_publish(intent.id)?; + return Ok(IntentState::ToPublish); } else { // If no error committing the change, write a transcript message self.save_transcript_message(conn, validated_commit, envelope_timestamp_ns)?; @@ -359,8 +357,7 @@ impl MlsGroup { message_epoch, MAX_PAST_EPOCHS, ) { - conn.set_group_intent_to_publish(intent.id)?; - return Ok(()); + return Ok(IntentState::ToPublish); } if let Some(id) = intent.message_id()? { conn.set_delivery_status_to_published(&id, envelope_timestamp_ns)?; @@ -368,9 +365,7 @@ impl MlsGroup { } }; - conn.set_group_intent_committed(intent.id)?; - - Ok(()) + Ok(IntentState::Committed) } #[tracing::instrument(level = "trace", skip_all)] @@ -612,26 +607,39 @@ impl MlsGroup { match intent { // Intent with the payload hash matches Ok(Some(intent)) => { + let intent_id = intent.id; log::info!( - "client [{}] is about to process own envelope [{}]", + "client [{}] is about to process own envelope [{}] for intent [{}]", client.inbox_id(), - envelope.id - ); - log::info!( - "envelope [{}] is equal to intent [{}]", envelope.id, - intent.id + intent_id ); - self.process_own_message( - client, - intent, - openmls_group, - provider, - message.into(), - envelope.created_ns, - allow_epoch_increment, - ) - .await + match self + .process_own_message( + client, + intent, + openmls_group, + provider, + message.into(), + envelope.created_ns, + allow_epoch_increment, + ) + .await? + { + IntentState::ToPublish => { + Ok(provider.conn_ref().set_group_intent_to_publish(intent_id)?) + } + IntentState::Committed => { + Ok(provider.conn_ref().set_group_intent_committed(intent_id)?) + } + IntentState::Published => { + log::warn!("Unexpected behaviour: returned intent state published from process_own_message"); + Ok(()) + } + IntentState::Error => { + Ok(provider.conn_ref().set_group_intent_error(intent_id)?) + } + } } // No matching intent found Ok(None) => {