From 493ee99e9f1609863efd2d128b90eb0567460984 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 4 Dec 2024 18:52:28 -0500 Subject: [PATCH] fix sync worker err loop --- bindings_ffi/src/mls.rs | 15 +- examples/cli/cli-client.rs | 2 +- xmtp_mls/Cargo.toml | 8 +- xmtp_mls/benches/sync.rs | 102 +++--- xmtp_mls/src/client.rs | 26 +- xmtp_mls/src/groups/device_sync.rs | 323 ++++++++++-------- .../src/groups/device_sync/message_sync.rs | 90 +++++ xmtp_mls/src/utils/test/mod.rs | 2 +- 8 files changed, 352 insertions(+), 216 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index e899374bf..248e2621d 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -440,23 +440,16 @@ impl FfiXmtpClient { .register_identity(signature_request.clone()) .await?; - self.maybe_start_sync_worker().await?; + self.maybe_start_sync_worker(); Ok(()) } /// Starts the sync worker if the history sync url is present. - async fn maybe_start_sync_worker(&self) -> Result<(), GenericError> { - if self.inner_client.history_sync_url().is_none() { - return Ok(()); + fn maybe_start_sync_worker(&self) { + if self.inner_client.history_sync_url().is_some() { + self.inner_client.start_sync_worker(); } - - self.inner_client - .start_sync_worker() - .await - .map_err(GenericError::from_error)?; - - Ok(()) } pub async fn send_sync_request(&self, kind: FfiDeviceSyncKind) -> Result<(), GenericError> { diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index df7fa4c42..b371edef1 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> { let conn = client.store().conn().unwrap(); let provider = client.mls_provider().unwrap(); client.sync_welcomes(&conn).await.unwrap(); - client.start_sync_worker().await.unwrap(); + client.start_sync_worker(); client .send_sync_request(&provider, DeviceSyncKind::MessageHistory) .await diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index e900fcf95..2c9350b25 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -192,8 +192,8 @@ harness = false name = "identity" required-features = ["bench"] -[[bench]] -harness = false -name = "sync" -required-features = ["bench"] +#[[bench]] +#harness = false +#name = "sync" +#required-features = ["bench"] diff --git a/xmtp_mls/benches/sync.rs b/xmtp_mls/benches/sync.rs index 970cfaecd..821a1de20 100644 --- a/xmtp_mls/benches/sync.rs +++ b/xmtp_mls/benches/sync.rs @@ -1,55 +1,49 @@ -//! Benchmarking for syncing functions -use crate::tracing::Instrument; -use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; -use tokio::runtime::{Builder, Runtime}; -use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN}; -use xmtp_mls::utils::bench::{clients, init_logging}; +// //! Benchmarking for syncing functions +// use crate::tracing::Instrument; +// use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +// use tokio::runtime::{Builder, Runtime}; +// use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN}; +// use xmtp_mls::utils::bench::{clients, init_logging}; +// +// #[macro_use] +// extern crate tracing; +// +// fn setup() -> Runtime { +// Builder::new_multi_thread() +// .enable_time() +// .enable_io() +// .thread_name("xmtp-bencher") +// .build() +// .unwrap() +// } +// +// fn start_sync_worker(c: &mut Criterion) { +// init_logging(); +// +// let runtime = setup(); +// let mut benchmark_group = c.benchmark_group("start_sync_worker"); +// benchmark_group.sample_size(10); +// benchmark_group.bench_function("start_sync_worker", |b| { +// let span = trace_span!(BENCH_ROOT_SPAN); +// b.to_async(&runtime).iter_batched( +// || { +// bench_async_setup(|| async { +// let client = clients::new_client(true).await; +// // set history sync URL +// (client, span.clone()) +// }) +// }, +// |(client, span)| async move { client.start_sync_worker().instrument(span) }, +// BatchSize::SmallInput, +// ) +// }); +// +// benchmark_group.finish(); +// } -#[macro_use] -extern crate tracing; - -fn setup() -> Runtime { - Builder::new_multi_thread() - .enable_time() - .enable_io() - .thread_name("xmtp-bencher") - .build() - .unwrap() -} - -fn start_sync_worker(c: &mut Criterion) { - init_logging(); - - let runtime = setup(); - let mut benchmark_group = c.benchmark_group("start_sync_worker"); - benchmark_group.sample_size(10); - benchmark_group.bench_function("start_sync_worker", |b| { - let span = trace_span!(BENCH_ROOT_SPAN); - b.to_async(&runtime).iter_batched( - || { - bench_async_setup(|| async { - let client = clients::new_client(true).await; - // set history sync URL - (client, span.clone()) - }) - }, - |(client, span)| async move { - client - .start_sync_worker() - .instrument(span) - .await - .unwrap() - }, - BatchSize::SmallInput, - ) - }); - - benchmark_group.finish(); -} - -criterion_group!( - name = sync; - config = Criterion::default().sample_size(10); - targets = start_sync_worker -); -criterion_main!(sync); +// criterion_group!( +// name = sync; +// config = Criterion::default().sample_size(10); +// targets = start_sync_worker +// ); +// criterion_main!(sync); diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 8efd0da2d..eb6a86ab1 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -40,13 +40,13 @@ use crate::{ mutex_registry::MutexRegistry, retry::Retry, retry_async, retryable, - storage::wallet_addresses::WalletEntry, storage::{ consent_record::{ConsentState, ConsentType, StoredConsentRecord}, db_connection::DbConnection, group::{GroupMembershipState, GroupQueryArgs, StoredGroup}, group_message::StoredGroupMessage, refresh_state::EntityKind, + wallet_addresses::WalletEntry, EncryptedMessageStore, StorageError, }, subscriptions::{LocalEventError, LocalEvents}, @@ -253,6 +253,25 @@ where } } +impl Client +where + ApiClient: XmtpApi + Send + Sync + 'static, + V: SmartContractSignatureVerifier + Send + Sync + 'static, +{ + /// Reconnect to the client's database if it has previously been released + pub fn reconnect_db(&self) -> Result<(), ClientError> { + self.context.store.reconnect()?; + // restart all the workers + // TODO: The only worker we have right now are the + // sync workers. if we have other workers we + // should create a better way to track them. + if self.history_sync_url.is_some() { + self.start_sync_worker(); + } + Ok(()) + } +} + impl Client where ApiClient: XmtpApi, @@ -467,11 +486,6 @@ where Ok(()) } - /// Reconnect to the client's database if it has previously been released - pub fn reconnect_db(&self) -> Result<(), ClientError> { - self.context.store.reconnect()?; - Ok(()) - } /// Get a reference to the client's identity struct pub fn identity(&self) -> &Identity { &self.context.identity diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 8f887f81c..e2f6ae4d9 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -1,6 +1,6 @@ use super::{GroupError, MlsGroup}; use crate::configuration::NS_IN_HOUR; -use crate::retry::{RetryBuilder, RetryableError}; +use crate::retry::{Retry, RetryableError}; use crate::storage::group::{ConversationType, GroupQueryArgs}; use crate::storage::group_message::MsgQueryArgs; use crate::storage::DbConnection; @@ -23,14 +23,16 @@ use aes_gcm::{ aead::{Aead, KeyInit}, Aes256Gcm, }; -use futures::{pin_mut, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use rand::{ distributions::{Alphanumeric, DistString}, Rng, RngCore, }; use serde::{Deserialize, Serialize}; +use std::pin::Pin; use std::time::Duration; use thiserror::Error; +use tokio::sync::OnceCell; use tracing::{instrument, warn}; use xmtp_cryptography::utils as crypto_utils; use xmtp_id::scw_verifier::SmartContractSignatureVerifier; @@ -120,169 +122,219 @@ where { // TODO: Should we ensure that only one sync worker is running at a time? #[instrument(level = "trace", skip_all)] - pub async fn start_sync_worker(&self) -> Result<(), DeviceSyncError> { - crate::spawn(None, { - let client = self.clone(); - tracing::debug!( - inbox_id = client.inbox_id(), - installation_id = hex::encode(client.installation_public_key()), - "starting sync worker" - ); - let receiver = client.local_events.subscribe(); - let sync_stream = receiver.stream_sync_messages(); - - async move { - pin_mut!(sync_stream); - let inbox_id = client.inbox_id(); - let installation_id = hex::encode(client.installation_public_key()); - // scope ensures the provider is dropped once init is finished, and not - // held for entirety of sync. - let res = async { - let provider = client.mls_provider()?; - client.sync_init(&provider).await?; - Ok::<_, DeviceSyncError>(()) - }; - - if let Err(e) = res.await { - tracing::error!( - inbox_id, - installation_id, - "sync worker failed to init error = {e}" - ); - } - - while let Err(err) = client.sync_worker(&mut sync_stream).await { - tracing::error!(inbox_id, installation_id, "Sync worker error: {err}"); - } - Ok::<_, DeviceSyncError>(()) - } - }); + pub fn start_sync_worker(&self) { + let client = self.clone(); + tracing::debug!( + inbox_id = client.inbox_id(), + installation_id = hex::encode(client.installation_public_key()), + "starting sync worker" + ); - Ok(()) + SyncWorker::new(client).spawn_worker(); } } -impl Client +pub struct SyncWorker { + client: Client, + /// The sync events stream + #[allow(clippy::type_complexity)] + stream: Pin< + Box>, SubscribeError>> + Send>, + >, + init: OnceCell<()>, + retry: Retry, +} + +impl SyncWorker where - ApiClient: XmtpApi, - V: SmartContractSignatureVerifier, + ApiClient: XmtpApi + 'static, + V: SmartContractSignatureVerifier + 'static, { - pub(crate) async fn sync_worker( - &self, - sync_stream: &mut (impl Stream, SubscribeError>> + Unpin), - ) -> Result<(), DeviceSyncError> { - let provider = self.mls_provider()?; + async fn run(&mut self) -> Result<(), DeviceSyncError> { + self.sync_init().await?; - let query_retry = RetryBuilder::default() - .retries(5) - .duration(Duration::from_millis(20)) - .build(); - - while let Some(event) = sync_stream.next().await { + while let Some(event) = self.stream.next().await { let event = event?; match event { LocalEvents::SyncMessage(msg) => match msg { SyncMessage::Reply { message_id } => { - let conn = provider.conn_ref(); - let msg = retry_async!( - &query_retry, - (async { - conn.get_group_message(&message_id)?.ok_or( - DeviceSyncError::Storage(StorageError::NotFound(format!( - "Message id {message_id:?} not found." - ))), - ) - }) - )?; - - let msg_content: DeviceSyncContent = - serde_json::from_slice(&msg.decrypted_message_bytes)?; - let DeviceSyncContent::Reply(reply) = msg_content else { - unreachable!(); - }; - - if let Err(err) = self.process_sync_reply(&provider, reply).await { - tracing::warn!( - inbox_id = self.inbox_id(), - installation_id = hex::encode(self.installation_public_key()), - "Sync worker error: {err}" - ); - } + let provider = self.client.mls_provider()?; + self.on_reply(message_id, &provider).await? } SyncMessage::Request { message_id } => { - let conn = provider.conn_ref(); - let msg = retry_async!( - &query_retry, - (async { - conn.get_group_message(&message_id)?.ok_or( - DeviceSyncError::Storage(StorageError::NotFound(format!( - "Message id {message_id:?} not found." - ))), - ) - }) - )?; - - let msg_content: DeviceSyncContent = - serde_json::from_slice(&msg.decrypted_message_bytes)?; - let DeviceSyncContent::Request(request) = msg_content else { - unreachable!(); - }; - - if let Err(err) = self.reply_to_sync_request(&provider, request).await { - tracing::warn!( - inbox_id = self.inbox_id(), - installation_id = hex::encode(self.installation_public_key()), - "Sync worker error: {err}" - ); - } + let provider = self.client.mls_provider()?; + self.on_request(message_id, &provider).await? } }, LocalEvents::OutgoingConsentUpdates(consent_records) => { + let provider = self.client.mls_provider()?; for consent_record in consent_records { - self.send_consent_update(&provider, &consent_record).await?; + self.client + .send_consent_update(&provider, &consent_record) + .await?; } } LocalEvents::IncomingConsentUpdates(consent_records) => { - let conn = provider.conn_ref(); - - conn.insert_or_replace_consent_records(&consent_records)?; + let provider = self.client.mls_provider()?; + provider + .conn_ref() + .insert_or_replace_consent_records(&consent_records)?; } _ => {} } } + Ok(()) + } + + async fn on_reply( + &mut self, + message_id: Vec, + provider: &XmtpOpenMlsProvider, + ) -> Result<(), DeviceSyncError> { + let conn = provider.conn_ref(); + let Self { + ref client, + ref retry, + .. + } = self; + + let msg = retry_async!( + retry, + (async { + conn.get_group_message(&message_id)? + .ok_or(DeviceSyncError::Storage(StorageError::NotFound(format!( + "Message id {message_id:?} not found." + )))) + }) + )?; + + let msg_content: DeviceSyncContent = serde_json::from_slice(&msg.decrypted_message_bytes)?; + let DeviceSyncContent::Reply(reply) = msg_content else { + unreachable!(); + }; + + client.process_sync_reply(provider, reply).await?; + Ok(()) + } + async fn on_request( + &mut self, + message_id: Vec, + provider: &XmtpOpenMlsProvider, + ) -> Result<(), DeviceSyncError> { + let conn = provider.conn_ref(); + let Self { + ref client, retry, .. + } = self; + + let msg = retry_async!( + retry, + (async { + conn.get_group_message(&message_id)? + .ok_or(DeviceSyncError::Storage(StorageError::NotFound(format!( + "Message id {message_id:?} not found." + )))) + }) + )?; + + let msg_content: DeviceSyncContent = serde_json::from_slice(&msg.decrypted_message_bytes)?; + let DeviceSyncContent::Request(request) = msg_content else { + unreachable!(); + }; + + client.reply_to_sync_request(provider, request).await?; Ok(()) } - /** - * Ideally called when the client is registered. - * Will auto-send a sync request if sync group is created. - */ + //// Ideally called when the client is registered. + //// Will auto-send a sync request if sync group is created. #[instrument(level = "trace", skip_all)] - pub async fn sync_init(&self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> { - tracing::info!( - inbox_id = self.inbox_id(), - installation_id = hex::encode(self.installation_public_key()), - "Initializing device sync... url: {:?}", - self.history_sync_url - ); - if self.get_sync_group(provider.conn_ref()).is_err() { - self.ensure_sync_group(provider).await?; + pub async fn sync_init(&mut self) -> Result<(), DeviceSyncError> { + let Self { + ref init, + ref client, + .. + } = self; + + init.get_or_try_init(|| async { + let provider = self.client.mls_provider()?; + tracing::info!( + inbox_id = client.inbox_id(), + installation_id = hex::encode(client.installation_public_key()), + "Initializing device sync... url: {:?}", + client.history_sync_url + ); + if client.get_sync_group(provider.conn_ref()).is_err() { + client.ensure_sync_group(&provider).await?; + + client + .send_sync_request(&provider, DeviceSyncKind::Consent) + .await?; + client + .send_sync_request(&provider, DeviceSyncKind::MessageHistory) + .await?; + } + tracing::info!( + inbox_id = client.inbox_id(), + installation_id = hex::encode(client.installation_public_key()), + "Device sync initialized." + ); + + Ok(()) + }) + .await + .copied() + } +} - self.send_sync_request(provider, DeviceSyncKind::Consent) - .await?; - self.send_sync_request(provider, DeviceSyncKind::MessageHistory) - .await?; +impl SyncWorker +where + ApiClient: XmtpApi + Send + Sync + 'static, + V: SmartContractSignatureVerifier + Send + Sync + 'static, +{ + fn new(client: Client) -> Self { + let retry = Retry::builder() + .retries(5) + .duration(Duration::from_millis(20)) + .build(); + + let receiver = client.local_events.subscribe(); + let stream = Box::pin(receiver.stream_sync_messages()); + + Self { + client, + stream, + init: OnceCell::new(), + retry, } - tracing::info!( - inbox_id = self.inbox_id(), - installation_id = hex::encode(self.installation_public_key()), - "Device sync initialized." - ); + } - Ok(()) + fn spawn_worker(mut self) { + crate::spawn(None, async move { + let inbox_id = self.client.inbox_id().to_string(); + let installation_id = hex::encode(self.client.installation_public_key()); + while let Err(err) = self.run().await { + match err { + DeviceSyncError::Client(ClientError::Storage( + StorageError::PoolNeedsConnection, + )) => { + tracing::warn!("Pool disconnected. task will restart on reconnect"); + break; + } + _ => { + tracing::error!(inbox_id, installation_id, "sync worker error {err}"); + } + } + } + }); } +} +impl Client +where + ApiClient: XmtpApi, + V: SmartContractSignatureVerifier, +{ #[instrument(level = "trace", skip_all)] async fn ensure_sync_group( &self, @@ -341,14 +393,7 @@ where })?; // publish the intent - if let Err(err) = sync_group.publish_intents(provider).await { - tracing::error!( - inbox_id = self.inbox_id(), - installation_id = hex::encode(self.installation_public_key()), - "error publishing sync group intents: {:?}", - err - ); - } + sync_group.publish_intents(provider).await?; Ok(request) } diff --git a/xmtp_mls/src/groups/device_sync/message_sync.rs b/xmtp_mls/src/groups/device_sync/message_sync.rs index c2e4976a7..9d2c05362 100644 --- a/xmtp_mls/src/groups/device_sync/message_sync.rs +++ b/xmtp_mls/src/groups/device_sync/message_sync.rs @@ -148,6 +148,96 @@ pub(crate) mod tests { } } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_sync_continues_during_db_disconnect() { + let wallet = generate_local_wallet(); + let amal_a = ClientBuilder::new_test_client_with_history(&wallet, HISTORY_SYNC_URL).await; + + let amal_a_provider = amal_a.mls_provider().unwrap(); + let amal_a_conn = amal_a_provider.conn_ref(); + + // make sure amal's worker has time to sync + // 3 Intents: + // 1.) UpdateGroupMembership Intent for new sync group + // 2.) Device Sync Request + // 3.) MessageHistory Sync Request + wait_for_min_intents(amal_a_conn, 3).await; + tracing::info!("Waiting for intents published"); + let old_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + + // let old_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + tracing::info!("Disconnecting"); + amal_a.release_db_connection().unwrap(); + + // Create a second installation for amal. + let amal_b = ClientBuilder::new_test_client_with_history(&wallet, HISTORY_SYNC_URL).await; + let amal_b_provider = amal_b.mls_provider().unwrap(); + let amal_b_conn = amal_b_provider.conn_ref(); + + let groups_b = amal_b.syncable_groups(amal_b_conn).unwrap(); + assert_eq!(groups_b.len(), 0); + + // make sure amal's worker has time to sync + // 3 Intents: + // 1.) UpdateGroupMembership Intent for new sync group + // 2.) Device Sync Request + // 3.) MessageHistory Sync Request + wait_for_min_intents(amal_b_conn, 3).await; + tracing::info!("Waiting for intents published"); + + // Have the second installation request for a consent sync. + amal_b + .send_sync_request(&amal_b_provider, DeviceSyncKind::MessageHistory) + .await + .unwrap(); + + amal_a.reconnect_db().unwrap(); + + // make sure amal's worker has time to sync + // 2 Intents: + // 1.) Device Sync Request + // 2.) MessageHistory Sync Request + wait_for_min_intents(amal_a_conn, 2).await; + tracing::info!("Waiting for intents published"); + + // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). + amal_a + .sync_welcomes(amal_a_conn) + .await + .expect("sync_welcomes"); + let new_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + // group id should have changed to the new sync group created by the second installation + assert_ne!(old_group_id, new_group_id); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn disconnect_does_not_effect_init() { + let wallet = generate_local_wallet(); + let amal_a = ClientBuilder::new_test_client_with_history(&wallet, HISTORY_SYNC_URL).await; + + let amal_a_provider = amal_a.mls_provider().unwrap(); + let amal_a_conn = amal_a_provider.conn_ref(); + + //release db conn right after creating client, not giving the worker time to do initial + //sync + amal_a.release_db_connection().unwrap(); + + let sync_group = amal_a.get_sync_group(amal_a_conn); + crate::assert_err!(sync_group, GroupError::GroupNotFound); + + amal_a.reconnect_db().unwrap(); + + // make sure amal's worker has time to sync + // 3 Intents: + // 1.) Sync Group Creation + // 2.) Device Sync Request + // 3.) MessageHistory Sync Request + wait_for_min_intents(amal_a_conn, 3).await; + tracing::info!("Waiting for intents published"); + let sync_group = amal_a.get_sync_group(amal_a_conn); + assert!(sync_group.is_ok()); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_prepare_groups_to_sync() { let wallet = generate_local_wallet(); diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index aded954b2..5bfc36341 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -236,7 +236,7 @@ where register_client(&client, owner).await; if client.history_sync_url.is_some() { - client.start_sync_worker().await.unwrap(); + client.start_sync_worker(); } client