From 7d414613efbc35685917cb5ebb14440aa0801633 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 6 Aug 2024 18:18:12 -0600 Subject: [PATCH] When a new installation is added the members list becomes incorrect (#937) * lock stuff * reproduce the member list issue in test * Fix missing members due to cache inconsistency (#944) Depends on https://github.com/xmtp/libxmtp/pull/937 We currently write the association state cache whenever we compute an association state from scratch. Also, whenever we read an association state, we check the cache, and if it's not there, we go ahead and re-compute it. However, there are two codepaths that behave a little differently, causing us to underreport the members in a group: 1. One codepath computes incremental updates to an existing state, and does not update the cache. 2. One codepath batch reads multiple association states from the cache, and silently omits anything it doesn't find rather than re-computing it. I've done a few things in this PR: 1. When computing incremental updates, make sure we also write to the cache 2. When batch reading from the cache, bake in a hard assumption that the cache will be up-to-date, and throw errors if we don't find what we expect 3. Some small improvements to our logging --------- Co-authored-by: Richard Hua --- bindings_ffi/src/logger.rs | 2 +- bindings_ffi/src/mls.rs | 81 ++++++++++++++++++- .../up.sql | 2 + .../2024-05-15-145138_new_schema/up.sql | 2 + xmtp_mls/src/api/identity.rs | 1 + xmtp_mls/src/groups/group_membership.rs | 1 + xmtp_mls/src/groups/members.rs | 19 ++++- xmtp_mls/src/identity_updates.rs | 38 ++++++++- .../encrypted_store/association_state.rs | 31 +++++-- xmtp_mls/src/storage/encrypted_store/mod.rs | 2 +- 10 files changed, 160 insertions(+), 19 deletions(-) diff --git a/bindings_ffi/src/logger.rs b/bindings_ffi/src/logger.rs index 7f5c45f9d..c3dd68922 100644 --- a/bindings_ffi/src/logger.rs +++ b/bindings_ffi/src/logger.rs @@ -20,7 +20,7 @@ impl log::Log for RustLogger { self.logger.lock().expect("Logger mutex is poisoned!").log( record.level() as u32, record.level().to_string(), - format!("[libxmtp] {}", record.args()), + format!("[libxmtp][t:{}] {}", thread_id::get(), record.args()), ); } } diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 39fea18f6..d13cc477d 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1507,6 +1507,10 @@ mod tests { } impl LocalWalletInboxOwner { + pub fn with_wallet(wallet: xmtp_cryptography::utils::LocalWallet) -> Self { + Self { wallet } + } + pub fn new() -> Self { Self { wallet: xmtp_cryptography::utils::LocalWallet::new(&mut rng()), @@ -1532,7 +1536,7 @@ mod tests { impl FfiLogger for MockLogger { fn log(&self, _level: u32, level_label: String, message: String) { - println!("[{}][t:{}]: {}", level_label, thread_id::get(), message) + println!("[{}]{}", level_label, message) } } @@ -1607,8 +1611,11 @@ mod tests { client.register_identity(signature_request).await.unwrap(); } - async fn new_test_client() -> Arc { - let ffi_inbox_owner = LocalWalletInboxOwner::new(); + /// Create a new test client with a given wallet. + async fn new_test_client_with_wallet( + wallet: xmtp_cryptography::utils::LocalWallet, + ) -> Arc { + let ffi_inbox_owner = LocalWalletInboxOwner::with_wallet(wallet); let nonce = 1; let inbox_id = generate_inbox_id(&ffi_inbox_owner.get_address(), &nonce); @@ -1626,10 +1633,16 @@ mod tests { ) .await .unwrap(); + register_client(&ffi_inbox_owner, &client).await; client } + async fn new_test_client() -> Arc { + let wallet = xmtp_cryptography::utils::LocalWallet::new(&mut rng()); + new_test_client_with_wallet(wallet).await + } + #[tokio::test] async fn get_inbox_id() { let client = new_test_client().await; @@ -2228,6 +2241,68 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_create_new_installation_without_breaking_group() { + let wallet1_key = &mut rng(); + let wallet1 = xmtp_cryptography::utils::LocalWallet::new(wallet1_key); + let wallet2_key = &mut rng(); + let wallet2 = xmtp_cryptography::utils::LocalWallet::new(wallet2_key); + + // Create clients + let client1 = new_test_client_with_wallet(wallet1).await; + let client2 = new_test_client_with_wallet(wallet2.clone()).await; + // Create a new group with client1 including wallet2 + + let group = client1 + .conversations() + .create_group( + vec![client2.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Sync groups + client1.conversations().sync().await.unwrap(); + client2.conversations().sync().await.unwrap(); + + // Find groups for both clients + let client1_group = client1.group(group.id()).unwrap(); + let client2_group = client2.group(group.id()).unwrap(); + + // Sync both groups + client1_group.sync().await.unwrap(); + client2_group.sync().await.unwrap(); + + // Assert both clients see 2 members + let client1_members = client1_group.list_members().unwrap(); + assert_eq!(client1_members.len(), 2); + + let client2_members = client2_group.list_members().unwrap(); + assert_eq!(client2_members.len(), 2); + + // Drop and delete local database for client2 + client2.release_db_connection().unwrap(); + + // Recreate client2 (new installation) + let client2 = new_test_client_with_wallet(wallet2).await; + + // Send a message that will break the group + client1_group + .send("This message will break the group".as_bytes().to_vec()) + .await + .unwrap(); + + // Assert client1 still sees 2 members + let client1_members = client1_group.list_members().unwrap(); + assert_eq!(client1_members.len(), 2); + + client2.conversations().sync().await.unwrap(); + let client2_group = client2.group(group.id()).unwrap(); + let client2_members = client2_group.list_members().unwrap(); + assert_eq!(client2_members.len(), 2); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_send_messages_when_epochs_behind() { let alix = new_test_client().await; diff --git a/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql b/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql index ab63aba77..825bd9a1e 100644 --- a/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql +++ b/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql @@ -1,3 +1,5 @@ +-- Caches the computed association state at a given sequence ID in an inbox log, +-- so that we don't need to replay the whole log. CREATE TABLE association_state ( "inbox_id" TEXT NOT NULL, "sequence_id" BIGINT NOT NULL, diff --git a/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql b/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql index 4c0cb3a57..8b4030ab3 100644 --- a/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql +++ b/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql @@ -87,6 +87,8 @@ CREATE TABLE group_intents( CREATE INDEX group_intents_group_id_state ON group_intents(group_id, state); +-- Caches the identity update payload at a given sequence ID, so that API calls +-- don't need to be repeated. CREATE TABLE identity_updates( -- The inbox_id the update refers to "inbox_id" text NOT NULL, diff --git a/xmtp_mls/src/api/identity.rs b/xmtp_mls/src/api/identity.rs index cfb685392..dbf3a1e62 100644 --- a/xmtp_mls/src/api/identity.rs +++ b/xmtp_mls/src/api/identity.rs @@ -17,6 +17,7 @@ use xmtp_proto::xmtp::identity::api::v1::{ const GET_IDENTITY_UPDATES_CHUNK_SIZE: usize = 50; +#[derive(Debug)] /// A filter for querying identity updates. `sequence_id` is the starting sequence, and only later updates will be returned. pub struct GetIdentityUpdatesV2Filter { pub inbox_id: InboxId, diff --git a/xmtp_mls/src/groups/group_membership.rs b/xmtp_mls/src/groups/group_membership.rs index e14a5ea23..a614a7ea2 100644 --- a/xmtp_mls/src/groups/group_membership.rs +++ b/xmtp_mls/src/groups/group_membership.rs @@ -105,6 +105,7 @@ impl From<&GroupMembership> for Vec { } } +#[derive(Debug)] pub struct MembershipDiff<'inbox_id> { pub added_inboxes: Vec<&'inbox_id String>, pub removed_inboxes: Vec<&'inbox_id String>, diff --git a/xmtp_mls/src/groups/members.rs b/xmtp_mls/src/groups/members.rs index 4e9b073db..c25172bbb 100644 --- a/xmtp_mls/src/groups/members.rs +++ b/xmtp_mls/src/groups/members.rs @@ -40,14 +40,25 @@ impl MlsGroup { .members .into_iter() .map(|(inbox_id, sequence_id)| (inbox_id, sequence_id as i64)) + .filter(|(_, sequence_id)| *sequence_id != 0) // Skip the initial state .collect::>(); let conn = provider.conn_ref(); - let association_state_map = StoredAssociationState::batch_read_from_cache(conn, requests)?; + let association_states = + StoredAssociationState::batch_read_from_cache(conn, requests.clone())?; let mutable_metadata = self.mutable_metadata()?; - // TODO: Figure out what to do with missing members from the local DB. Do we go to the network? Load from identity updates? - // Right now I am just omitting them - let members = association_state_map + if association_states.len() != requests.len() { + // Cache miss - not expected to happen because: + // 1. We don't allow updates to the group metadata unless we have already validated the association state + // 2. When validating the association state, we must have written it to the cache + log::error!( + "Failed to load all members for group - metadata: {:?}, computed members: {:?}", + requests, + association_states + ); + return Err(GroupError::InvalidGroupMembership); + } + let members = association_states .into_iter() .map(|association_state| { let inbox_id_str = association_state.inbox_id().to_string(); diff --git a/xmtp_mls/src/identity_updates.rs b/xmtp_mls/src/identity_updates.rs index dc8432fc9..14a4cd44a 100644 --- a/xmtp_mls/src/identity_updates.rs +++ b/xmtp_mls/src/identity_updates.rs @@ -109,7 +109,6 @@ where if let Some(association_state) = StoredAssociationState::read_from_cache(conn, inbox_id.to_string(), last_sequence_id)? { - log::debug!("Loaded association state from cache"); return Ok(association_state); } @@ -125,7 +124,6 @@ where last_sequence_id, association_state.clone(), )?; - log::debug!("Wrote association state to cache"); Ok(association_state) } @@ -137,6 +135,12 @@ where starting_sequence_id: Option, ending_sequence_id: Option, ) -> Result { + log::debug!( + "Computing diff for {:?} from {:?} to {:?}", + inbox_id.as_ref(), + starting_sequence_id, + ending_sequence_id + ); if starting_sequence_id.is_none() { return Ok(self .get_association_state(conn, inbox_id.as_ref(), ending_sequence_id) @@ -148,8 +152,23 @@ where .get_association_state(conn, inbox_id.as_ref(), starting_sequence_id) .await?; - let incremental_updates = conn - .get_identity_updates(inbox_id, starting_sequence_id, ending_sequence_id)? + let incremental_updates = + conn.get_identity_updates(inbox_id.as_ref(), starting_sequence_id, ending_sequence_id)?; + + let last_sequence_id = incremental_updates.last().map(|update| update.sequence_id); + if ending_sequence_id.is_some() + && last_sequence_id.is_some() + && last_sequence_id != ending_sequence_id + { + log::error!( + "Did not find the expected last sequence id. Expected: {:?}, Found: {:?}", + ending_sequence_id, + last_sequence_id + ); + return Err(AssociationError::MissingIdentityUpdate.into()); + } + + let incremental_updates = incremental_updates .into_iter() .map(|update| update.try_into()) .collect::, AssociationError>>()?; @@ -159,6 +178,16 @@ where final_state = apply_update(final_state, update).await?; } + log::debug!("Final state at {:?}: {:?}", last_sequence_id, final_state); + if last_sequence_id.is_some() { + StoredAssociationState::write_to_cache( + conn, + inbox_id.as_ref().to_string(), + last_sequence_id.unwrap(), + final_state.clone(), + )?; + } + Ok(initial_state.diff(&final_state)) } @@ -335,6 +364,7 @@ pub async fn load_identity_updates( if inbox_ids.is_empty() { return Ok(HashMap::new()); } + log::debug!("Fetching identity updates for: {:?}", inbox_ids); let existing_sequence_ids = conn.get_latest_sequence_id(&inbox_ids)?; let filters: Vec = inbox_ids diff --git a/xmtp_mls/src/storage/encrypted_store/association_state.rs b/xmtp_mls/src/storage/encrypted_store/association_state.rs index 9ff51cca1..210009853 100644 --- a/xmtp_mls/src/storage/encrypted_store/association_state.rs +++ b/xmtp_mls/src/storage/encrypted_store/association_state.rs @@ -40,12 +40,22 @@ impl StoredAssociationState { state: AssociationState, ) -> Result<(), StorageError> { let state_proto: AssociationStateProto = state.into(); - StoredAssociationState { - inbox_id, + let result = StoredAssociationState { + inbox_id: inbox_id.clone(), sequence_id, state: state_proto.encode_to_vec(), } - .store_or_ignore(conn) + .store_or_ignore(conn); + + if result.is_ok() { + log::debug!( + "Wrote association state to cache: {} {}", + inbox_id, + sequence_id + ); + } + + result } pub fn read_from_cache( @@ -56,7 +66,7 @@ impl StoredAssociationState { let stored_state: Option = conn.fetch(&(inbox_id.to_string(), sequence_id))?; - stored_state + let result = stored_state .map(|stored_state| { stored_state .try_into() @@ -66,14 +76,23 @@ impl StoredAssociationState { )) }) }) - .transpose() + .transpose(); + + if result.is_ok() && result.as_ref().unwrap().is_some() { + log::debug!( + "Loaded association state from cache: {} {}", + inbox_id, + sequence_id + ); + } + + result } pub fn batch_read_from_cache( conn: &DbConnection, identifiers: Vec<(InboxId, i64)>, ) -> Result, StorageError> { - // If no identifier provided, return empty hash map if identifiers.is_empty() { return Ok(vec![]); } diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 2b3eb7138..3242c5b32 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -184,7 +184,7 @@ impl EncryptedMessageStore { .as_ref() .ok_or(StorageError::PoolNeedsConnection)?; - log::info!( + log::debug!( "Pulling connection from pool, idle_connections={}, total_connections={}", pool.state().idle_connections, pool.state().connections