Skip to content

Commit

Permalink
try a different approach
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Aug 16, 2024
1 parent afb7202 commit b78568d
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 26 additions & 35 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::GenericError;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;
use tokio::{sync::Mutex, task::spawn_blocking, task::AbortHandle};
use tokio::{sync::Mutex, task::AbortHandle};
use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient;
use xmtp_id::{
associations::{
Expand Down Expand Up @@ -885,38 +885,29 @@ impl FfiGroup {
Ok(ffi_message)
}

pub async fn list_members(&self) -> Result<Vec<FfiGroupMember>, GenericError> {
pub fn list_members(&self) -> Result<Vec<FfiGroupMember>, GenericError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let members = spawn_blocking(move || {
// Perform the blocking operation inside the closure
group
.members()
.map(|members| {
members
.into_iter()
.map(|member| FfiGroupMember {
inbox_id: member.inbox_id,
account_addresses: member.account_addresses,
installation_ids: member.installation_ids,
permission_level: match member.permission_level {
PermissionLevel::Member => FfiPermissionLevel::Member,
PermissionLevel::Admin => FfiPermissionLevel::Admin,
PermissionLevel::SuperAdmin => FfiPermissionLevel::SuperAdmin,
},
})
.collect()
})
.map_err(|e| GenericError::from(e)) // Convert the error to the appropriate type explicitly
})
.await
.map_err(|e| GenericError::from(e))?; // Handle the error from the JoinHandle if necessary
let members: Vec<FfiGroupMember> = group
.members()?
.into_iter()
.map(|member| FfiGroupMember {
inbox_id: member.inbox_id,
account_addresses: member.account_addresses,
installation_ids: member.installation_ids,
permission_level: match member.permission_level {
PermissionLevel::Member => FfiPermissionLevel::Member,
PermissionLevel::Admin => FfiPermissionLevel::Admin,
PermissionLevel::SuperAdmin => FfiPermissionLevel::SuperAdmin,
},
})
.collect();

members
Ok(members)
}

pub async fn add_members(&self, account_addresses: Vec<String>) -> Result<(), GenericError> {
Expand Down Expand Up @@ -2088,7 +2079,7 @@ mod tests {
.await
.unwrap();

let members = group.list_members().await.unwrap();
let members = group.list_members().unwrap();
assert_eq!(members.len(), 2);
}

Expand All @@ -2112,7 +2103,7 @@ mod tests {

let tasks: Vec<_> = groups
.into_iter()
.map(|group| task::spawn(async move { group.list_members().await.unwrap() }))
.map(|group| task::spawn(async move { group.list_members().unwrap() }))
.collect();

// Await all tasks in parallel and handle any errors
Expand Down Expand Up @@ -2143,7 +2134,7 @@ mod tests {
.await
.unwrap();

let members = group.list_members().await.unwrap();
let members = group.list_members().unwrap();
assert_eq!(members.len(), 2);
assert_eq!(group.group_name().unwrap(), "Group Name");
assert_eq!(group.group_image_url_square().unwrap(), "url");
Expand Down Expand Up @@ -2315,10 +2306,10 @@ mod tests {
client2_group.sync().await.unwrap();

// Assert both clients see 2 members
let client1_members = client1_group.list_members().await.unwrap();
let client1_members = client1_group.list_members().unwrap();
assert_eq!(client1_members.len(), 2);

let client2_members = client2_group.list_members().await.unwrap();
let client2_members = client2_group.list_members().unwrap();
assert_eq!(client2_members.len(), 2);

// Drop and delete local database for client2
Expand All @@ -2334,12 +2325,12 @@ mod tests {
.unwrap();

// Assert client1 still sees 2 members
let client1_members = client1_group.list_members().await.unwrap();
let client1_members = client1_group.list_members().unwrap();
assert_eq!(client1_members.len(), 2);

client2.conversations().sync().await.unwrap();
let client2_group = client2.group(group.id()).unwrap();
let client2_members = client2_group.list_members().await.unwrap();
let client2_members = client2_group.list_members().unwrap();
assert_eq!(client2_members.len(), 2);
}

Expand Down Expand Up @@ -2586,11 +2577,11 @@ mod tests {
.unwrap();

bo_group.sync().await.unwrap();
let bo_members = bo_group.list_members().await.unwrap();
let bo_members = bo_group.list_members().unwrap();
assert_eq!(bo_members.len(), 4);

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
assert_eq!(alix_members.len(), 4);
}

Expand Down
1 change: 1 addition & 0 deletions bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ xmtp_cryptography = { workspace = true }
xmtp_id = { path = "../xmtp_id" }
xmtp_proto = { workspace = true, features = ["proto_full", "convert"] }
xmtp_v2 = { path = "../xmtp_v2" }
rayon = "1.5"

# Test/Bench Utils
xmtp_api_grpc = { path = "../xmtp_api_grpc", optional = true }
Expand Down
39 changes: 22 additions & 17 deletions xmtp_mls/src/groups/members.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use xmtp_id::InboxId;
use rayon::prelude::*;

use super::{validated_commit::extract_group_membership, GroupError, MlsGroup};

Expand All @@ -24,7 +25,7 @@ pub enum PermissionLevel {
impl MlsGroup {
// Load the member list for the group from the DB, merging together multiple installations into a single entry
pub fn members(&self) -> Result<Vec<GroupMember>, GroupError> {
let conn = self.context.store.conn()?;
let conn = self.context.store.conn()?; // Reuse the connection
let provider = self.context.mls_provider(conn);
self.members_with_provider(&provider)
}
Expand All @@ -34,32 +35,36 @@ impl MlsGroup {
provider: &XmtpOpenMlsProvider,
) -> Result<Vec<GroupMember>, GroupError> {
let openmls_group = self.load_mls_group(provider)?;
// TODO: Replace with try_into from extensions
let group_membership = extract_group_membership(openmls_group.extensions())?;
let requests = group_membership
let requests: Vec<_> = group_membership
.members
.into_iter()
.map(|(inbox_id, sequence_id)| (inbox_id, sequence_id as i64))
.filter(|(_, sequence_id)| *sequence_id != 0) // Skip the initial state
.collect::<Vec<_>>();
.collect();

let conn = provider.conn_ref();
let association_states =
StoredAssociationState::batch_read_from_cache(conn, requests.clone())?;

let mutable_metadata = self.mutable_metadata()?;
if association_states.len() != requests.len() {
// Cache miss - not expected to happen because:
// 1. We don't allow updates to the group metadata unless we have already validated the association state
// 2. When validating the association state, we must have written it to the cache
log::error!(
"Failed to load all members for group - metadata: {:?}, computed members: {:?}",
requests,
association_states
);
if log::log_enabled!(log::Level::Error) {
log::error!(
"Failed to load all members for group - metadata: {:?}, computed members: {:?}",
requests,
association_states
);
}
return Err(GroupError::InvalidGroupMembership);
}
let members = association_states
.into_iter()

// Estimate vector capacity based on the number of association states
let mut members: Vec<GroupMember> = Vec::with_capacity(association_states.len());

// Process association states in parallel
members = association_states
.into_par_iter()
.map(|association_state| {
let inbox_id_str = association_state.inbox_id().to_string();
let is_admin = mutable_metadata.is_admin(&inbox_id_str);
Expand All @@ -72,14 +77,14 @@ impl MlsGroup {
PermissionLevel::Member
};

Ok(GroupMember {
GroupMember {
inbox_id: inbox_id_str,
account_addresses: association_state.account_addresses(),
installation_ids: association_state.installation_ids(),
permission_level,
})
}
})
.collect::<Result<Vec<GroupMember>, GroupError>>()?;
.collect();

Ok(members)
}
Expand Down

0 comments on commit b78568d

Please sign in to comment.