Skip to content

Commit

Permalink
gate the worker handle behind test utils
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Dec 12, 2024
1 parent aa9cbe9 commit ee48a79
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
2 changes: 1 addition & 1 deletion xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ where
)
.await?;

let mut client = Client::new(
let client = Client::new(
api_client_wrapper,
identity,
store,
Expand Down
17 changes: 13 additions & 4 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ use xmtp_proto::xmtp::mls::api::v1::{
GroupMessage, WelcomeMessage,
};

#[cfg(feature = "test-utils")]
use crate::groups::device_sync::WorkerHandle;

use crate::{
api::ApiClientWrapper,
groups::{
device_sync::{preference_sync::UserPreferenceUpdate, WorkerHandle},
group_permissions::PolicySet,
device_sync::preference_sync::UserPreferenceUpdate, group_permissions::PolicySet,
GroupError, GroupMetadataOptions, MlsGroup,
},
identity::{parse_credential, Identity, IdentityError},
Expand Down Expand Up @@ -144,9 +146,11 @@ pub struct Client<ApiClient, V = RemoteSignatureVerifier<ApiClient>> {
pub(crate) context: Arc<XmtpMlsLocalContext>,
pub(crate) history_sync_url: Option<String>,
pub(crate) local_events: broadcast::Sender<LocalEvents<Self>>,
sync_worker_handle: Arc<Mutex<Option<Arc<WorkerHandle>>>>,
/// The method of verifying smart contract wallet signatures for this Client
pub(crate) scw_verifier: Arc<V>,

#[cfg(feature = "test-utils")]
sync_worker_handle: Arc<Mutex<Option<Arc<WorkerHandle>>>>,
}

// most of these things are `Arc`'s
Expand All @@ -157,8 +161,10 @@ impl<ApiClient, V> Clone for Client<ApiClient, V> {
context: self.context.clone(),
history_sync_url: self.history_sync_url.clone(),
local_events: self.local_events.clone(),
sync_worker_handle: self.sync_worker_handle.clone(),
scw_verifier: self.scw_verifier.clone(),

#[cfg(feature = "test-utils")]
sync_worker_handle: self.sync_worker_handle.clone(),
}
}
}
Expand Down Expand Up @@ -244,6 +250,7 @@ where
context,
history_sync_url,
local_events: tx,
#[cfg(feature = "test-utils")]
sync_worker_handle: Arc::new(Mutex::default()),
scw_verifier: scw_verifier.into(),
}
Expand All @@ -253,10 +260,12 @@ where
&self.scw_verifier
}

#[cfg(feature = "test-utils")]
pub fn sync_worker_handle(&self) -> Option<Arc<WorkerHandle>> {
self.sync_worker_handle.lock().clone()
}

#[cfg(feature = "test-utils")]
pub(crate) fn set_sync_worker_handle(&self, handle: Arc<WorkerHandle>) {
*self.sync_worker_handle.lock() = Some(handle);
}
Expand Down
13 changes: 11 additions & 2 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ where
);

let worker = SyncWorker::new(client);
#[cfg(feature = "test-utils")]
self.set_sync_worker_handle(worker.handle.clone());
worker.spawn_worker();
}
Expand All @@ -150,13 +151,17 @@ pub struct SyncWorker<ApiClient, V> {
retry: Retry,

// Number of events processed
#[cfg(feature = "test-utils")]
handle: Arc<WorkerHandle>,
}

#[cfg(feature = "test-utils")]
pub struct WorkerHandle {
processed: AtomicUsize,
notify: Notify,
}

#[cfg(feature = "test-utils")]
impl WorkerHandle {
pub async fn wait_for_new_events(&self, mut count: usize) -> Result<(), Elapsed> {
timeout(Duration::from_secs(3), async {
Expand Down Expand Up @@ -233,8 +238,11 @@ where
_ => {}
}

self.handle.processed.fetch_add(1, Ordering::SeqCst);
self.handle.notify.notify_waiters();
#[cfg(feature = "test-utils")]
{
self.handle.processed.fetch_add(1, Ordering::SeqCst);
self.handle.notify.notify_waiters();
}
}
Ok(())
}
Expand Down Expand Up @@ -360,6 +368,7 @@ where
init: OnceCell::new(),
retry,

#[cfg(feature = "test-utils")]
handle: Arc::new(WorkerHandle {
processed: AtomicUsize::new(0),
notify: Notify::new(),
Expand Down

0 comments on commit ee48a79

Please sign in to comment.