Skip to content

Commit

Permalink
speed up sync init
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Dec 3, 2024
1 parent 85dd6d3 commit aa492be
Show file tree
Hide file tree
Showing 22 changed files with 302 additions and 115 deletions.
19 changes: 8 additions & 11 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl FfiXmtpClient {
}

pub fn installation_id(&self) -> Vec<u8> {
self.inner_client.installation_public_key()
self.inner_client.installation_public_key().to_vec()
}

pub fn release_db_connection(&self) -> Result<(), GenericError> {
Expand Down Expand Up @@ -380,7 +380,7 @@ impl FfiXmtpClient {
signature_bytes: Vec<u8>,
) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
let public_key = inner.installation_public_key();
let public_key = inner.installation_public_key().to_vec();

self.verify_signed_with_public_key(signature_text, signature_bytes, public_key)
}
Expand Down Expand Up @@ -454,12 +454,8 @@ impl FfiXmtpClient {
return Ok(());
}

let provider = self
.inner_client
.mls_provider()
.map_err(GenericError::from_error)?;
self.inner_client
.start_sync_worker(&provider)
.start_sync_worker()
.await
.map_err(GenericError::from_error)?;

Expand Down Expand Up @@ -537,7 +533,7 @@ impl FfiXmtpClient {
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.filter(|id| id != installation_id)
.collect();

let signature_request = self
Expand Down Expand Up @@ -911,7 +907,8 @@ impl FfiConversations {

pub fn get_sync_group(&self) -> Result<FfiConversation, GenericError> {
let inner = self.inner_client.as_ref();
let sync_group = inner.get_sync_group()?;
let conn = inner.store().conn()?;
let sync_group = inner.get_sync_group(&conn)?;
Ok(sync_group.into())
}

Expand Down Expand Up @@ -2096,7 +2093,7 @@ mod tests {
.unwrap();
register_client(&ffi_inbox_owner, &client_a).await;

let installation_pub_key = client_a.inner_client.installation_public_key();
let installation_pub_key = client_a.inner_client.installation_public_key().to_vec();
drop(client_a);

let client_b = create_client(
Expand All @@ -2114,7 +2111,7 @@ mod tests {
.await
.unwrap();

let other_installation_pub_key = client_b.inner_client.installation_public_key();
let other_installation_pub_key = client_b.inner_client.installation_public_key().to_vec();
drop(client_b);

assert!(
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/signatures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Client {
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.filter(|id| id != installation_id)
.collect();
let signature_request = self
.inner_client()
Expand Down
2 changes: 1 addition & 1 deletion bindings_wasm/src/signatures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Client {
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.filter(|id| id != installation_id)
.collect();
let signature_request = self
.inner_client()
Expand Down
4 changes: 2 additions & 2 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.start_sync_worker(&provider).await.unwrap();
client.start_sync_worker().await.unwrap();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await
Expand All @@ -453,7 +453,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let group = client.get_sync_group()?;
let group = client.get_sync_group(&conn)?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
let messages = group
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn debug_welcome_messages(
) -> Result<(), String> {
let api_client = client.api();
let envelopes = api_client
.query_welcome_messages(installation_id, None)
.query_welcome_messages(&installation_id, None)
.await
.unwrap();
for envelope in envelopes {
Expand Down
5 changes: 4 additions & 1 deletion xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
"registry"
], optional = true }


Expand Down Expand Up @@ -151,6 +153,7 @@ tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
] }
xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] }
xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] }
Expand All @@ -163,7 +166,7 @@ diesel-wasm-sqlite = { workspace = true, features = [
] }
ethers = { workspace = true, features = ["rustls"] }
openmls = { workspace = true, features = ["js"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
tracing-wasm = { version = "0.2" }
wasm-bindgen-test.workspace = true
xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] }
Expand Down
7 changes: 3 additions & 4 deletions xmtp_mls/benches/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ fn start_sync_worker(c: &mut Criterion) {
|| {
bench_async_setup(|| async {
let client = clients::new_client(true).await;
let provider = client.mls_provider().unwrap();
// set history sync URL
(client, provider, span.clone())
(client, span.clone())
})
},
|(client, provider, span)| async move {
|(client, span)| async move {
client
.start_sync_worker(&provider)
.start_sync_worker()
.instrument(span)
.await
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ where
#[tracing::instrument(level = "trace", skip_all)]
pub async fn query_welcome_messages(
&self,
installation_id: Vec<u8>,
installation_id: &[u8],
id_cursor: Option<u64>,
) -> Result<Vec<WelcomeMessage>, ApiError> {
tracing::debug!(
installation_id = hex::encode(&installation_id),
installation_id = hex::encode(installation_id),
cursor = id_cursor,
inbox_id = self.inbox_id,
"query welcomes"
Expand All @@ -135,7 +135,7 @@ where
(async {
self.api_client
.query_welcome_messages(QueryWelcomeMessagesRequest {
installation_key: installation_id.clone(),
installation_key: installation_id.to_vec(),
paging_info: Some(PagingInfo {
id_cursor: id_cursor.unwrap_or(0),
limit: page_size,
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ pub(crate) mod tests {
register_client(&client_a, wallet).await;
assert!(client_a.identity().is_ready());

let keybytes_a = client_a.installation_public_key();
let keybytes_a = client_a.installation_public_key().to_vec();
drop(client_a);

// Reload the existing store and wallet
Expand All @@ -729,7 +729,7 @@ pub(crate) mod tests {
.build_with_verifier()
.await
.unwrap();
let keybytes_b = client_b.installation_public_key();
let keybytes_b = client_b.installation_public_key().to_vec();
drop(client_b);

// Ensure the persistence was used to store the generated keys
Expand Down Expand Up @@ -762,7 +762,7 @@ pub(crate) mod tests {
.build_with_verifier()
.await
.unwrap();
assert_eq!(client_d.installation_public_key(), keybytes_a);
assert_eq!(client_d.installation_public_key().to_vec(), keybytes_a);
}

/// anvil cannot be used in WebAssembly
Expand Down
56 changes: 35 additions & 21 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ pub struct XmtpMlsLocalContext {

impl XmtpMlsLocalContext {
/// The installation public key is the primary identifier for an installation
pub fn installation_public_key(&self) -> Vec<u8> {
self.identity.installation_keys.public_slice().to_vec()
pub fn installation_public_key(&self) -> &[u8; 32] {
self.identity.installation_keys.public_bytes()
}

/// Get the account address of the blockchain account associated with this client
Expand Down Expand Up @@ -259,7 +259,7 @@ where
V: SmartContractSignatureVerifier,
{
/// Retrieves the client's installation public key, sometimes also called `installation_id`
pub fn installation_public_key(&self) -> Vec<u8> {
pub fn installation_public_key(&self) -> &[u8; 32] {
self.context.installation_public_key()
}
/// Retrieves the client's inbox ID
Expand Down Expand Up @@ -560,20 +560,25 @@ where
Ok(group)
}

pub(crate) fn create_sync_group(&self) -> Result<MlsGroup<Self>, ClientError> {
pub(crate) fn create_sync_group(
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<MlsGroup<Self>, ClientError> {
tracing::info!("creating sync group");
let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()))?;
let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()), provider)?;

Ok(sync_group)
}

/**
* Look up a group by its ID
*
* Returns a [`MlsGroup`] if the group exists, or an error if it does not
*/
pub fn group(&self, group_id: Vec<u8>) -> Result<MlsGroup<Self>, ClientError> {
let conn = &mut self.store().conn()?;
/// Look up a group by its ID
///
/// Returns a [`MlsGroup`] if the group exists, or an error if it does not
///
pub fn group_with_conn(
&self,
conn: &DbConnection,
group_id: Vec<u8>,
) -> Result<MlsGroup<Self>, ClientError> {
let stored_group: Option<StoredGroup> = conn.fetch(&group_id)?;
match stored_group {
Some(group) => Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)),
Expand All @@ -584,6 +589,15 @@ where
}
}

/// Look up a group by its ID
///
/// Returns a [`MlsGroup`] if the group exists, or an error if it does not
///
pub fn group(&self, group_id: Vec<u8>) -> Result<MlsGroup<Self>, ClientError> {
let conn = &mut self.store().conn()?;
self.group_with_conn(conn, group_id)
}

/**
* Look up a DM group by the target's inbox_id.
*
Expand Down Expand Up @@ -697,7 +711,7 @@ where
conn: &DbConnection,
) -> Result<Vec<WelcomeMessage>, ClientError> {
let installation_id = self.installation_public_key();
let id_cursor = conn.get_last_cursor_for_id(&installation_id, EntityKind::Welcome)?;
let id_cursor = conn.get_last_cursor_for_id(installation_id, EntityKind::Welcome)?;

let welcomes = self
.api_client
Expand Down Expand Up @@ -747,7 +761,7 @@ where
(async {
let welcome_v1 = &welcome_v1;
self.intents.process_for_id(
&id,
id,
EntityKind::Welcome,
welcome_v1.id,
|provider| async move {
Expand Down Expand Up @@ -1021,7 +1035,7 @@ pub(crate) mod tests {

// Get original KeyPackage.
let kp1 = client
.get_key_packages_for_installation_ids(vec![client.installation_public_key()])
.get_key_packages_for_installation_ids(vec![client.installation_public_key().to_vec()])
.await
.unwrap();
assert_eq!(kp1.len(), 1);
Expand All @@ -1031,7 +1045,7 @@ pub(crate) mod tests {
client.rotate_key_package().await.unwrap();

let kp2 = client
.get_key_packages_for_installation_ids(vec![client.installation_public_key()])
.get_key_packages_for_installation_ids(vec![client.installation_public_key().to_vec()])
.await
.unwrap();
assert_eq!(kp2.len(), 1);
Expand Down Expand Up @@ -1299,9 +1313,9 @@ pub(crate) mod tests {
let bo_store = bo.store();

let alix_original_init_key =
get_key_package_init_key(&alix, &alix.installation_public_key()).await;
get_key_package_init_key(&alix, alix.installation_public_key()).await;
let bo_original_init_key =
get_key_package_init_key(&bo, &bo.installation_public_key()).await;
get_key_package_init_key(&bo, bo.installation_public_key()).await;

// Bo's original key should be deleted
let bo_original_from_db = bo_store
Expand All @@ -1320,19 +1334,19 @@ pub(crate) mod tests {

bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();

let bo_new_key = get_key_package_init_key(&bo, &bo.installation_public_key()).await;
let bo_new_key = get_key_package_init_key(&bo, bo.installation_public_key()).await;
// Bo's key should have changed
assert_ne!(bo_original_init_key, bo_new_key);

bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();
let bo_new_key_2 = get_key_package_init_key(&bo, &bo.installation_public_key()).await;
let bo_new_key_2 = get_key_package_init_key(&bo, bo.installation_public_key()).await;
// Bo's key should not have changed syncing the second time.
assert_eq!(bo_new_key, bo_new_key_2);

alix.sync_welcomes(&alix.store().conn().unwrap())
.await
.unwrap();
let alix_key_2 = get_key_package_init_key(&alix, &alix.installation_public_key()).await;
let alix_key_2 = get_key_package_init_key(&alix, alix.installation_public_key()).await;
// Alix's key should not have changed at all
assert_eq!(alix_original_init_key, alix_key_2);

Expand Down
Loading

0 comments on commit aa492be

Please sign in to comment.