diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index a7f0cb8a8..817d98ed3 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -6,9 +6,10 @@ use futures::Stream; use super::{extract_message_v1, GroupError, MlsGroup}; use crate::storage::group_message::StoredGroupMessage; +use crate::storage::refresh_state::EntityKind; use crate::subscriptions::{MessagesStreamInfo, StreamHandle}; -use crate::XmtpApi; use crate::{retry::Retry, retry_async, Client}; +use crate::{retry_sync, XmtpApi}; use prost::Message; use xmtp_proto::xmtp::mls::api::v1::GroupMessage; @@ -31,53 +32,55 @@ impl MlsGroup { ); let created_ns = msgv1.created_ns; - let client_pointer = client.clone(); - let process_result = retry_async!( - Retry::default(), - (async { - let client_pointer = client_pointer.clone(); - let client_id = client_id.clone(); - let msgv1 = msgv1.clone(); - self.context - .store - .transaction_async(|provider| async move { - let mut openmls_group = self.load_mls_group(&provider)?; - - // Attempt processing immediately, but fail if the message is not an Application Message - // Returning an error should roll back the DB tx - log::info!( - "current epoch for [{}] in process_stream_entry() is Epoch: [{}]", - client_id, - openmls_group.epoch() - ); - - self.process_message( - client_pointer.as_ref(), - &mut openmls_group, - &provider, - &msgv1, - false, - ) + if !self.has_already_synced(msg_id)? { + let client_pointer = client.clone(); + let process_result = retry_async!( + Retry::default(), + (async { + let client_pointer = client_pointer.clone(); + let client_id = client_id.clone(); + let msgv1 = msgv1.clone(); + self.context + .store + .transaction_async(|provider| async move { + let mut openmls_group = self.load_mls_group(&provider)?; + + // Attempt processing immediately, but fail if the message is not an Application Message + // Returning an error should roll back the DB tx + log::info!( + "current epoch for [{}] in process_stream_entry() is Epoch: [{}]", + client_id, + openmls_group.epoch() + ); + + self.process_message( + client_pointer.as_ref(), + &mut openmls_group, + &provider, + &msgv1, + false, + ) + .await + .map_err(GroupError::ReceiveError) + }) .await - .map_err(GroupError::ReceiveError) - }) - .await - }) - ); - - if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() { - // Swallow errors here, since another process may have successfully saved the message - // to the DB - match self.sync_with_conn(&client.mls_provider()?, &client).await { - Ok(_) => { - log::debug!("Sync triggered by streamed message successful") - } - Err(err) => { - log::warn!("Sync triggered by streamed message failed: {}", err); - } - }; - } else if process_result.is_err() { - log::error!("Process stream entry {:?}", process_result.err()); + }) + ); + + if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() { + // Swallow errors here, since another process may have successfully saved the message + // to the DB + match self.sync_with_conn(&client.mls_provider()?, &client).await { + Ok(_) => { + log::debug!("Sync triggered by streamed message successful") + } + Err(err) => { + log::warn!("Sync triggered by streamed message failed: {}", err); + } + }; + } else if process_result.is_err() { + log::error!("Process stream entry {:?}", process_result.err()); + } } // Load the message from the DB to handle cases where it may have been already processed in @@ -91,6 +94,19 @@ impl MlsGroup { Ok(new_message) } + // Checks if a message has already been processed through a sync + fn has_already_synced(&self, id: u64) -> Result { + let last_id = retry_sync!( + Retry::default(), + (|| { + let conn = self.context.store.conn()?; + conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group) + }) + )?; + + return Ok(last_id >= id as i64); + } + pub async fn process_streamed_group_message( &self, envelope_bytes: Vec,