From ae7a267d4ff59eb1ab5c5d0d48c2c47d9f562438 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Thu, 7 Nov 2024 21:55:17 +0100 Subject: [PATCH] feat(inbox): cache wallet address lookup (#1226) Co-authored-by: Dakota Brink <779390+codabrink@users.noreply.github.com> --- xmtp_id/src/lib.rs | 2 + .../down.sql | 2 + .../up.sql | 6 + xmtp_mls/src/client.rs | 48 ++++- xmtp_mls/src/lib.rs | 16 ++ xmtp_mls/src/storage/encrypted_store/mod.rs | 34 +++ .../src/storage/encrypted_store/schema.rs | 8 + .../encrypted_store/wallet_addresses.rs | 204 ++++++++++++++++++ 8 files changed, 313 insertions(+), 7 deletions(-) create mode 100644 xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/down.sql create mode 100644 xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/up.sql create mode 100644 xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs diff --git a/xmtp_id/src/lib.rs b/xmtp_id/src/lib.rs index a02e7c537..5134a4dea 100755 --- a/xmtp_id/src/lib.rs +++ b/xmtp_id/src/lib.rs @@ -34,6 +34,8 @@ pub enum IdentityError { /// The global InboxID Type. pub type InboxId = String; +pub type WalletAddress = String; + // Check if the given address is a smart contract by checking if there is code at the given address. pub async fn is_smart_contract( address: Address, diff --git a/xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/down.sql b/xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/down.sql new file mode 100644 index 000000000..3c4c53cec --- /dev/null +++ b/xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/down.sql @@ -0,0 +1,2 @@ +DROP TABLE wallet_addresses; +DROP INDEX idx_wallet_inbox_id; diff --git a/xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/up.sql b/xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/up.sql new file mode 100644 index 000000000..2a3fb248e --- /dev/null +++ b/xmtp_mls/migrations/2024-11-05-191238_cache_wallet_addresses/up.sql @@ -0,0 +1,6 @@ +CREATE TABLE wallet_addresses( + inbox_id TEXT NOT NULL, + wallet_address TEXT PRIMARY KEY NOT NULL +); + +CREATE INDEX idx_wallet_inbox_id ON wallet_addresses(inbox_id); \ No newline at end of file diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 2eadd83b2..2cf09f093 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -35,6 +35,7 @@ use xmtp_proto::xmtp::mls::api::v1::{ GroupMessage, WelcomeMessage, }; +use crate::storage::wallet_addresses::WalletEntry; use crate::{ api::ApiClientWrapper, groups::{ @@ -58,7 +59,7 @@ use crate::{ subscriptions::LocalEvents, verified_key_package_v2::{KeyPackageVerificationError, VerifiedKeyPackageV2}, xmtp_openmls_provider::XmtpOpenMlsProvider, - Fetch, XmtpApi, + Fetch, Store, XmtpApi, }; /// Enum representing the network the Client is connected to @@ -374,13 +375,46 @@ where addresses: &[String], ) -> Result>, ClientError> { let sanitized_addresses = sanitize_evm_addresses(addresses)?; - let mut results = self - .api_client - .get_inbox_ids(sanitized_addresses.clone()) - .await?; - let inbox_ids: Vec> = sanitized_addresses + let conn = self.store().conn()?; + + let local_results: Vec = + conn.fetch_wallets_list_with_key(&sanitized_addresses)?; + + let mut results: HashMap = local_results .into_iter() - .map(|address| results.remove(&address)) + .map(|entry| (entry.wallet_address, entry.inbox_id)) + .collect(); + + let missing_addresses: Vec = sanitized_addresses + .iter() + .filter(|address| !results.contains_key(*address)) + .cloned() + .collect(); + + if missing_addresses.is_empty() { + let inbox_ids: Vec> = sanitized_addresses + .iter() + .map(|address| results.remove(address)) + .collect(); + return Ok(inbox_ids); + } + + let web_results = self.api_client.get_inbox_ids(missing_addresses).await?; + + for (address, inbox_id) in web_results { + results + .insert(address.clone(), inbox_id.clone()) + .unwrap_or_default(); + let new_entry = WalletEntry { + inbox_id: InboxId::from(inbox_id), + wallet_address: address, + }; + new_entry.store(&conn).ok(); + } + + let inbox_ids: Vec> = sanitized_addresses + .iter() + .map(|address| results.remove(address)) .collect(); Ok(inbox_ids) diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index c8c278a45..16f10f586 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -49,6 +49,22 @@ pub trait Fetch { fn fetch(&self, key: &Self::Key) -> Result, StorageError>; } +/// Fetches all instances of `Model` from the data store. +/// Returns an empty list if no items are found or an error if the fetch fails. +pub trait FetchList { + fn fetch_list(&self) -> Result, StorageError>; +} + +/// Fetches a filtered list of `Model` instances matching the specified key. +/// Logs an error and returns an empty list if no items are found or if an error occurs. +/// +/// # Parameters +/// - `key`: The key used to filter the items in the data store. +pub trait FetchListWithKey { + type Key; + fn fetch_list_with_key(&self, keys: &[Self::Key]) -> Result, StorageError>; +} + /// Deletes a model from the underlying data store pub trait Delete { type Key; diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index b23808a9c..bc72fa7cc 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -26,6 +26,7 @@ pub mod refresh_state; pub mod schema; #[cfg(not(target_arch = "wasm32"))] mod sqlcipher_connection; +pub mod wallet_addresses; #[cfg(target_arch = "wasm32")] mod wasm; @@ -363,6 +364,39 @@ macro_rules! impl_fetch { }; } +#[macro_export] +macro_rules! impl_fetch_list { + ($model:ty, $table:ident) => { + impl $crate::FetchList<$model> + for $crate::storage::encrypted_store::db_connection::DbConnection + { + fn fetch_list(&self) -> Result, $crate::StorageError> { + use $crate::storage::encrypted_store::schema::$table::dsl::*; + Ok(self.raw_query(|conn| $table.load::<$model>(conn))?) + } + } + }; +} + +#[macro_export] +macro_rules! impl_fetch_list_with_key { + ($model:ty, $table:ident, $key:ty, $column:ident) => { + impl $crate::FetchListWithKey<$model> + for $crate::storage::encrypted_store::db_connection::DbConnection + { + type Key = $key; + fn fetch_list_with_key( + &self, + keys: &[Self::Key], + ) -> Result, $crate::StorageError> { + use $crate::storage::encrypted_store::schema::$table::dsl::{$column, *}; + Ok(self + .raw_query(|conn| $table.filter($column.eq_any(keys)).load::<$model>(conn))?) + } + } + }; +} + // Inserts the model into the database by primary key, erroring if the model already exists #[macro_export] macro_rules! impl_store { diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index 997d820ab..93254c687 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -107,6 +107,13 @@ diesel::table! { } } +diesel::table! { + wallet_addresses (wallet_address) { + inbox_id -> Text, + wallet_address -> Text, + } +} + diesel::joinable!(group_intents -> groups (group_id)); diesel::joinable!(group_messages -> groups (group_id)); @@ -122,4 +129,5 @@ diesel::allow_tables_to_appear_in_same_query!( openmls_key_store, openmls_key_value, refresh_state, + wallet_addresses, ); diff --git a/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs b/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs new file mode 100644 index 000000000..42e17f02f --- /dev/null +++ b/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs @@ -0,0 +1,204 @@ +use super::schema::wallet_addresses; +use crate::storage::{DbConnection, StorageError}; +use crate::{impl_fetch, impl_fetch_list_with_key, impl_store, FetchListWithKey}; +use diesel::prelude::*; +use diesel::{Insertable, Queryable}; +#[cfg(target_arch = "wasm32")] +use diesel_wasm_sqlite::dsl::RunQueryDsl; +use serde::{Deserialize, Serialize}; +use xmtp_id::{InboxId, WalletAddress}; + +#[derive(Insertable, Queryable, Debug, Clone, Deserialize, Serialize)] +#[diesel(table_name = wallet_addresses)] +#[diesel()] +pub struct WalletEntry { + pub inbox_id: InboxId, + pub wallet_address: WalletAddress, +} + +impl WalletEntry { + pub fn new(in_id: InboxId, wallet_address: WalletAddress) -> Self { + Self { + inbox_id: in_id, + wallet_address, + } + } +} + +impl_store!(WalletEntry, wallet_addresses); +impl_fetch!(WalletEntry, wallet_addresses); +impl_fetch_list_with_key!(WalletEntry, wallet_addresses, InboxId, inbox_id); + +impl DbConnection { + pub fn fetch_wallets_list_with_key( + &self, + keys: &[InboxId], + ) -> Result, StorageError> { + self.fetch_list_with_key(keys) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use crate::storage::wallet_addresses::WalletEntry; + use crate::{storage::encrypted_store::tests::with_connection, FetchListWithKey, Store}; + + // Test storing a single wallet + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn test_store_wallet() { + with_connection(|conn| { + let new_entry = WalletEntry { + inbox_id: "inbox_id_1".to_string(), + wallet_address: "wallet_address_1".to_string(), + }; + assert!(new_entry.store(conn).is_ok(), "Failed to store wallet"); + }) + .await; + } + + // Test storing duplicated wallets (same inbox_id and wallet_address) + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn test_store_duplicated_wallets() { + with_connection(|conn| { + let entry1 = WalletEntry { + inbox_id: "test_dup".to_string(), + wallet_address: "wallet_dup".to_string(), + }; + let entry2 = WalletEntry { + inbox_id: "test_dup".to_string(), + wallet_address: "wallet_dup".to_string(), + }; + entry1.store(conn).expect("Failed to store wallet"); + let result = entry2.store(conn); + assert!( + result.is_err(), + "Duplicated wallet stored without error, expected failure" + ); + }) + .await; + } + + // Test fetching wallets by a list of inbox_ids + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn test_fetch_wallets() { + with_connection(|conn| { + // Insert multiple entries with different inbox_ids + let new_entry1 = WalletEntry { + inbox_id: "fetch_test1".to_string(), + wallet_address: "wallet1".to_string(), + }; + let new_entry2 = WalletEntry { + inbox_id: "fetch_test2".to_string(), + wallet_address: "wallet2".to_string(), + }; + let new_entry3 = WalletEntry { + inbox_id: "fetch_test3".to_string(), + wallet_address: "wallet3".to_string(), + }; + new_entry1.store(conn).unwrap(); + new_entry2.store(conn).unwrap(); + new_entry3.store(conn).unwrap(); + + // Fetch wallets with inbox_ids "fetch_test1" and "fetch_test2" + let inbox_ids = vec!["fetch_test1".to_string(), "fetch_test2".to_string()]; + let fetched_wallets: Vec = + conn.fetch_list_with_key(&inbox_ids).unwrap_or_default(); + + // Verify that 3 entries are fetched (2 from "fetch_test1" and 1 from "fetch_test2") + assert_eq!( + fetched_wallets.len(), + 2, + "Expected 2 wallets, found {}", + fetched_wallets.len() + ); + + // Verify contents of fetched entries + let fetched_addresses: Vec = fetched_wallets + .iter() + .map(|w| w.wallet_address.clone()) + .collect(); + assert!( + fetched_addresses.contains(&"wallet1".to_string()), + "wallet1 not found in fetched results" + ); + assert!( + fetched_addresses.contains(&"wallet2".to_string()), + "wallet2 not found in fetched results" + ); + + // Fetch wallets with a non-existent list of inbox_ids + let non_existent_wallets: Vec = conn + .fetch_list_with_key(&["nonexistent".to_string()]) + .unwrap_or_default(); + assert!( + non_existent_wallets.is_empty(), + "Expected no wallets, found some" + ); + }) + .await; + } + + // Test storing and fetching multiple wallet addresses with multiple keys + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn test_store_wallet_addresses() { + with_connection(|conn| { + let new_entry1 = WalletEntry { + inbox_id: "test1".to_string(), + wallet_address: "wallet1".to_string(), + }; + let new_entry2 = WalletEntry { + inbox_id: "test1".to_string(), + wallet_address: "wallet2".to_string(), + }; + let new_entry3 = WalletEntry { + inbox_id: "test3".to_string(), + wallet_address: "wallet3".to_string(), + }; + let new_entry4 = WalletEntry { + inbox_id: "test4".to_string(), + wallet_address: "wallet4".to_string(), + }; + + // Store each wallet + new_entry1.store(conn).unwrap(); + new_entry2.store(conn).unwrap(); + new_entry3.store(conn).unwrap(); + new_entry4.store(conn).unwrap(); + + // Fetch wallets with inbox_ids "test1" and "test3" + let inbox_ids = vec!["test1".to_string(), "test3".to_string()]; + let stored_wallets: Vec = + conn.fetch_list_with_key(&inbox_ids).unwrap_or_default(); + + // Verify that 3 entries are fetched (2 from "test1" and 1 from "test3") + assert_eq!( + stored_wallets.len(), + 3, + "Expected 3 wallets with inbox_ids 'test1' and 'test3', found {}", + stored_wallets.len() + ); + + let fetched_addresses: Vec = stored_wallets + .iter() + .map(|w| w.wallet_address.clone()) + .collect(); + assert!( + fetched_addresses.contains(&"wallet1".to_string()), + "wallet1 not found in fetched results" + ); + assert!( + fetched_addresses.contains(&"wallet2".to_string()), + "wallet2 not found in fetched results" + ); + assert!( + fetched_addresses.contains(&"wallet3".to_string()), + "wallet3 not found in fetched results" + ); + }) + .await; + } +}