Skip to content

Commit

Permalink
Skip duplicate message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 30, 2024
1 parent c4bc84f commit 12293f1
Showing 1 changed file with 63 additions and 47 deletions.
110 changes: 63 additions & 47 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use futures::Stream;

use super::{extract_message_v1, GroupError, MlsGroup};
use crate::storage::group_message::StoredGroupMessage;
use crate::storage::refresh_state::EntityKind;
use crate::storage::StorageError;
use crate::subscriptions::{MessagesStreamInfo, StreamHandle};
use crate::XmtpApi;
use crate::{retry::Retry, retry_async, Client};
use crate::{retry_sync, XmtpApi};
use prost::Message;
use xmtp_proto::xmtp::mls::api::v1::GroupMessage;

Expand All @@ -31,53 +33,55 @@ impl MlsGroup {
);
let created_ns = msgv1.created_ns;

let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
let msgv1 = msgv1.clone();
self.context
.store
.transaction_async(|provider| async move {
let mut openmls_group = self.load_mls_group(&provider)?;

// Attempt processing immediately, but fail if the message is not an Application Message
// Returning an error should roll back the DB tx
log::info!(
"current epoch for [{}] in process_stream_entry() is Epoch: [{}]",
client_id,
openmls_group.epoch()
);

self.process_message(
client_pointer.as_ref(),
&mut openmls_group,
&provider,
&msgv1,
false,
)
if !self.has_already_synced(msg_id)? {
let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
let msgv1 = msgv1.clone();
self.context
.store
.transaction_async(|provider| async move {
let mut openmls_group = self.load_mls_group(&provider)?;

// Attempt processing immediately, but fail if the message is not an Application Message
// Returning an error should roll back the DB tx
log::info!(
"current epoch for [{}] in process_stream_entry() is Epoch: [{}]",
client_id,
openmls_group.epoch()
);

self.process_message(
client_pointer.as_ref(),
&mut openmls_group,
&provider,
&msgv1,
false,
)
.await
.map_err(GroupError::ReceiveError)
})
.await
.map_err(GroupError::ReceiveError)
})
.await
})
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
// Swallow errors here, since another process may have successfully saved the message
// to the DB
match self.sync_with_conn(&client.mls_provider()?, &client).await {
Ok(_) => {
log::debug!("Sync triggered by streamed message successful")
}
Err(err) => {
log::warn!("Sync triggered by streamed message failed: {}", err);
}
};
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
})
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
// Swallow errors here, since another process may have successfully saved the message
// to the DB
match self.sync_with_conn(&client.mls_provider()?, &client).await {
Ok(_) => {
log::debug!("Sync triggered by streamed message successful")
}
Err(err) => {
log::warn!("Sync triggered by streamed message failed: {}", err);
}
};
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
}
}

// Load the message from the DB to handle cases where it may have been already processed in
Expand All @@ -91,6 +95,18 @@ impl MlsGroup {
Ok(new_message)
}

// Checks if a message has already been processed through a sync
fn has_already_synced(&self, id: u64) -> Result<bool, GroupError> {
let check_for_last_cursor = || -> Result<i64, StorageError> {
let conn = self.context.store.conn()?;
conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group)
};

let last_id = retry_sync!(Retry::default(), check_for_last_cursor)?;

Ok(last_id >= id as i64)
}

pub async fn process_streamed_group_message<ApiClient>(
&self,
envelope_bytes: Vec<u8>,
Expand Down

0 comments on commit 12293f1

Please sign in to comment.