Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mchenani committed Dec 9, 2024
1 parent 59a0b6e commit 653f6c9
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 208 deletions.
7 changes: 4 additions & 3 deletions xmtp_mls/src/groups/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ where
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<Vec<GroupMember>, GroupError> {
let openmls_group = self.load_mls_group(provider)?;
// TODO: Replace with try_into from extensions
let group_membership = extract_group_membership(openmls_group.extensions())?;
let group_membership = self.load_mls_group_with_lock(provider, |mls_group| {
// Extract group membership from extensions
Ok(extract_group_membership(mls_group.extensions())?)
})?;
let requests = group_membership
.members
.into_iter()
Expand Down
269 changes: 134 additions & 135 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,6 @@ where
};

let last_cursor = conn.get_last_cursor_for_id(&self.group_id, message_entity_kind)?;
tracing::info!("### last cursor --> [{:?}]", last_cursor);
let should_skip_message = last_cursor > msgv1.id as i64;
if should_skip_message {
tracing::info!(
Expand Down Expand Up @@ -882,104 +881,104 @@ where
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<(), GroupError> {
let mut openmls_group = self.load_mls_group(provider)?;
self.load_mls_group_with_lock_async(provider, |mut mls_group| async move {
let intents = provider.conn_ref().find_group_intents(
self.group_id.clone(),
Some(vec![IntentState::ToPublish]),
None,
)?;

for intent in intents {
let result = retry_async!(
Retry::default(),
(async {
self.get_publish_intent_data(provider, &mut mls_group, &intent)
.await
})
);

let intents = provider.conn_ref().find_group_intents(
self.group_id.clone(),
Some(vec![IntentState::ToPublish]),
None,
)?;
match result {
Err(err) => {
tracing::error!(error = %err, "error getting publish intent data {:?}", err);
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
tracing::error!(
intent.id,
intent.kind = %intent.kind,
inbox_id = self.client.inbox_id(),
group_id = hex::encode(&self.group_id),
"intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider
.conn_ref()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn_ref()
.increment_intent_publish_attempt_count(intent.id)?;
}

for intent in intents {
let result = retry_async!(
Retry::default(),
(async {
self.get_publish_intent_data(provider, &mut openmls_group, &intent)
.await
})
);
return Err(err);
}
Ok(Some(PublishIntentData {
payload_to_publish,
post_commit_action,
staged_commit,
})) => {
let payload_slice = payload_to_publish.as_slice();
let has_staged_commit = staged_commit.is_some();
provider.conn_ref().set_group_intent_published(
intent.id,
sha256(payload_slice),
post_commit_action,
staged_commit,
mls_group.epoch().as_u64() as i64,
)?;
tracing::debug!(
inbox_id = self.client.inbox_id(),
installation_id = hex::encode(self.client.installation_id()),
intent.id,
intent.kind = %intent.kind,
group_id = hex::encode(&self.group_id),
"client [{}] set stored intent [{}] to state `published`",
self.client.inbox_id(),
intent.id
);

match result {
Err(err) => {
tracing::error!(error = %err, "error getting publish intent data {:?}", err);
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
tracing::error!(
self.client
.api()
.send_group_messages(vec![payload_slice])
.await?;

tracing::info!(
intent.id,
intent.kind = %intent.kind,
inbox_id = self.client.inbox_id(),
installation_id = hex::encode(self.client.installation_id()),
group_id = hex::encode(&self.group_id),
"intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider
.conn_ref()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn_ref()
.increment_intent_publish_attempt_count(intent.id)?;
"[{}] published intent [{}] of type [{}]",
self.client.inbox_id(),
intent.id,
intent.kind
);
if has_staged_commit {
tracing::info!("Commit sent. Stopping further publishes for this round");
return Ok(());
}
}

return Err(err);
}
Ok(Some(PublishIntentData {
payload_to_publish,
post_commit_action,
staged_commit,
})) => {
let payload_slice = payload_to_publish.as_slice();
let has_staged_commit = staged_commit.is_some();
provider.conn_ref().set_group_intent_published(
intent.id,
sha256(payload_slice),
post_commit_action,
staged_commit,
openmls_group.epoch().as_u64() as i64,
)?;
tracing::debug!(
inbox_id = self.client.inbox_id(),
installation_id = hex::encode(self.client.installation_id()),
intent.id,
intent.kind = %intent.kind,
group_id = hex::encode(&self.group_id),
"client [{}] set stored intent [{}] to state `published`",
self.client.inbox_id(),
intent.id
);

self.client
.api()
.send_group_messages(vec![payload_slice])
.await?;

tracing::info!(
intent.id,
intent.kind = %intent.kind,
inbox_id = self.client.inbox_id(),
installation_id = hex::encode(self.client.installation_id()),
group_id = hex::encode(&self.group_id),
"[{}] published intent [{}] of type [{}]",
self.client.inbox_id(),
intent.id,
intent.kind
);
if has_staged_commit {
tracing::info!("Commit sent. Stopping further publishes for this round");
return Ok(());
Ok(None) => {
tracing::info!(
inbox_id = self.client.inbox_id(),
installation_id = hex::encode(self.client.installation_id()),
"Skipping intent because no publish data returned"
);
let deleter: &dyn Delete<StoredGroupIntent, Key = i32> = provider.conn_ref();
deleter.delete(intent.id)?;
}
}
Ok(None) => {
tracing::info!(
inbox_id = self.client.inbox_id(),
installation_id = hex::encode(self.client.installation_id()),
"Skipping intent because no publish data returned"
);
let deleter: &dyn Delete<StoredGroupIntent, Key = i32> = provider.conn_ref();
deleter.delete(intent.id)?;
}
}
}

Ok(())
Ok(())
}).await
}

// Takes a StoredGroupIntent and returns the payload and post commit data as a tuple
Expand Down Expand Up @@ -1210,58 +1209,58 @@ where
inbox_ids_to_add: &[InboxIdRef<'_>],
inbox_ids_to_remove: &[InboxIdRef<'_>],
) -> Result<UpdateGroupMembershipIntentData, GroupError> {
let mls_group = self.load_mls_group(provider)?;
let existing_group_membership = extract_group_membership(mls_group.extensions())?;

// TODO:nm prevent querying for updates on members who are being removed
let mut inbox_ids = existing_group_membership.inbox_ids();
inbox_ids.extend_from_slice(inbox_ids_to_add);
let conn = provider.conn_ref();
// Load any missing updates from the network
load_identity_updates(self.client.api(), conn, &inbox_ids).await?;

let latest_sequence_id_map = conn.get_latest_sequence_id(&inbox_ids as &[&str])?;

// Get a list of all inbox IDs that have increased sequence_id for the group
let changed_inbox_ids =
inbox_ids
.iter()
.try_fold(HashMap::new(), |mut updates, inbox_id| {
match (
latest_sequence_id_map.get(inbox_id as &str),
existing_group_membership.get(inbox_id),
) {
// This is an update. We have a new sequence ID and an existing one
(Some(latest_sequence_id), Some(current_sequence_id)) => {
let latest_sequence_id_u64 = *latest_sequence_id as u64;
if latest_sequence_id_u64.gt(current_sequence_id) {
updates.insert(inbox_id.to_string(), latest_sequence_id_u64);
self.load_mls_group_with_lock_async(provider, |mut mls_group| async move {
let existing_group_membership = extract_group_membership(mls_group.extensions())?;
// TODO:nm prevent querying for updates on members who are being removed
let mut inbox_ids = existing_group_membership.inbox_ids();
inbox_ids.extend_from_slice(inbox_ids_to_add);
let conn = provider.conn_ref();
// Load any missing updates from the network
load_identity_updates(self.client.api(), conn, &inbox_ids).await?;

let latest_sequence_id_map = conn.get_latest_sequence_id(&inbox_ids as &[&str])?;

// Get a list of all inbox IDs that have increased sequence_id for the group
let changed_inbox_ids =
inbox_ids
.iter()
.try_fold(HashMap::new(), |mut updates, inbox_id| {
match (
latest_sequence_id_map.get(inbox_id as &str),
existing_group_membership.get(inbox_id),
) {
// This is an update. We have a new sequence ID and an existing one
(Some(latest_sequence_id), Some(current_sequence_id)) => {
let latest_sequence_id_u64 = *latest_sequence_id as u64;
if latest_sequence_id_u64.gt(current_sequence_id) {
updates.insert(inbox_id.to_string(), latest_sequence_id_u64);
}
}
// This is for new additions to the group
(Some(latest_sequence_id), _) => {
// This is the case for net new members to the group
updates.insert(inbox_id.to_string(), *latest_sequence_id as u64);
}
(_, _) => {
tracing::warn!(
"Could not find existing sequence ID for inbox {}",
inbox_id
);
return Err(GroupError::MissingSequenceId);
}
}
// This is for new additions to the group
(Some(latest_sequence_id), _) => {
// This is the case for net new members to the group
updates.insert(inbox_id.to_string(), *latest_sequence_id as u64);
}
(_, _) => {
tracing::warn!(
"Could not find existing sequence ID for inbox {}",
inbox_id
);
return Err(GroupError::MissingSequenceId);
}
}

Ok(updates)
})?;
Ok(updates)
})?;

Ok(UpdateGroupMembershipIntentData::new(
changed_inbox_ids,
inbox_ids_to_remove
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>(),
))
Ok(UpdateGroupMembershipIntentData::new(
changed_inbox_ids,
inbox_ids_to_remove
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>(),
))
}).await
}

/**
Expand Down
Loading

0 comments on commit 653f6c9

Please sign in to comment.