diff --git a/.cargo/nextest.toml b/.cargo/nextest.toml index 07216b1ff..b7afc9364 100644 --- a/.cargo/nextest.toml +++ b/.cargo/nextest.toml @@ -1,2 +1,3 @@ [profile.default] retries = 3 +default-filter = "not test(test_stream_all_messages_does_not_lose_messages)" diff --git a/.github/workflows/test-http-api.yml b/.github/workflows/test-http-api.yml index e906f38aa..92e6c60cb 100644 --- a/.github/workflows/test-http-api.yml +++ b/.github/workflows/test-http-api.yml @@ -41,6 +41,6 @@ jobs: - name: Install nextest uses: taiki-e/install-action@nextest - name: build tests - run: cargo nextest run --no-run --tests --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api + run: cargo nextest run --config-file ".cargo/nextest.toml" --no-run --tests --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api - name: cargo test - run: cargo nextest run --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api --test-threads 2 + run: cargo nextest run --config-file ".cargo/nextest.toml" --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api --test-threads 2 diff --git a/Cargo.lock b/Cargo.lock index 7cf24af0f..ff1f48d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3229,7 +3229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -6899,7 +6899,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -7299,11 +7299,13 @@ dependencies = [ "getrandom", "gloo-timers 0.3.0", "js-sys", + "once_cell", "parking_lot 0.12.3", "rand", "thiserror 2.0.6", "tokio", "tracing", + "tracing-flame", "tracing-subscriber", "tracing-wasm", "wasm-bindgen-futures", @@ -7416,7 +7418,6 @@ dependencies = [ "libsqlite3-sys", "mockall", "mockito", - "once_cell", "openmls", "openmls_basic_credential", "openmls_rust_crypto", @@ -7437,7 +7438,6 @@ dependencies = [ "tokio-stream", "toml 0.8.19", "tracing", - "tracing-flame", "tracing-subscriber", "tracing-wasm", "trait-variant", @@ -7505,7 +7505,9 @@ dependencies = [ name = "xmtpv3" version = "0.1.0" dependencies = [ + "criterion", "ethers", + "fdlimit", "futures", "paranoid-android", "parking_lot 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index 0dde776cf..94aa624e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,11 @@ wasm-bindgen-futures = "0.4" wasm-bindgen-test = "0.3.49" web-sys = "0.3" zeroize = "1.8" +criterion = { version = "0.5", features = [ + "html_reports", + "async_tokio", +]} + once_cell = "1.2" # Internal Crate Dependencies xmtp_api_grpc = { path = "xmtp_api_grpc" } @@ -112,6 +117,9 @@ xmtp_common = { path = "common" } # and we don't rely on it for debugging that much. debug = 0 +[profile.bench] +debug = true + # Setting opt-level to 3 for proc macros/build scripts # speeds up buildtime [profile.dev.build-override] diff --git a/bindings_ffi/Cargo.toml b/bindings_ffi/Cargo.toml index 7e44b3498..5831c3d7d 100644 --- a/bindings_ffi/Cargo.toml +++ b/bindings_ffi/Cargo.toml @@ -24,6 +24,11 @@ xmtp_user_preferences = { path = "../xmtp_user_preferences" } xmtp_v2 = { path = "../xmtp_v2" } xmtp_common.workspace = true +# Bench +criterion = { workspace = true, optional = true } +fdlimit = { version = "0.3", optional = true} + + [target.'cfg(target_os = "android")'.dependencies] paranoid-android = "0.2" @@ -47,3 +52,12 @@ uuid = { workspace = true, features = ["v4", "fast-rng"] } xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] } xmtp_mls = { path = "../xmtp_mls", features = ["test-utils"] } xmtp_proto = { path = "../xmtp_proto", features = ["test-utils"] } + +[features] +bench = ["xmtp_mls/bench", "xmtp_common/bench", "dep:criterion", "dep:fdlimit"] + +[[bench]] +harness = false +name = "create_client" +required-features = ["bench"] + diff --git a/bindings_ffi/benches/create_client.rs b/bindings_ffi/benches/create_client.rs new file mode 100644 index 000000000..2e6ca88da --- /dev/null +++ b/bindings_ffi/benches/create_client.rs @@ -0,0 +1,148 @@ +//! NOTE: +// `MAX_DB_POOL_SIZE` in `configuration.rs` must be set to `10` +// in order for these benchmarks to succesfully run & generate a report. +// (file descriptor issue) + +use crate::tracing::Instrument; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +use tokio::runtime::{Builder, Runtime}; +use xmtp_common::{bench::BENCH_ROOT_SPAN, tmp_path}; +use xmtp_id::InboxOwner; +use xmtp_mls::utils::test::HISTORY_SYNC_URL; +use xmtpv3::generate_inbox_id; + +#[macro_use] +extern crate tracing; + +fn setup() -> Runtime { + Builder::new_multi_thread() + .enable_time() + .enable_io() + .thread_name("xmtp-bencher") + .build() + .unwrap() +} + +fn network_url() -> (String, bool) { + let dev = std::env::var("DEV_GRPC"); + let is_dev_network = matches!(dev, Ok(d) if d == "true" || d == "1"); + + if is_dev_network { + (xmtp_api_grpc::DEV_ADDRESS.to_string(), true) + } else { + (xmtp_api_grpc::LOCALHOST_ADDRESS.to_string(), false) + } +} + +fn create_ffi_client(c: &mut Criterion) { + xmtp_common::bench::logger(); + + let runtime = setup(); + + let _ = fdlimit::raise_fd_limit(); + let mut benchmark_group = c.benchmark_group("create_client"); + + // benchmark_group.sample_size(10); + benchmark_group.sampling_mode(criterion::SamplingMode::Flat); + benchmark_group.bench_function("create_ffi_client", |b| { + let span = trace_span!(BENCH_ROOT_SPAN); + b.to_async(&runtime).iter_batched( + || { + let wallet = xmtp_cryptography::utils::generate_local_wallet(); + let nonce = 1; + let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap(); + let path = tmp_path(); + let (network, is_secure) = network_url(); + ( + inbox_id, + wallet.get_address(), + nonce, + path, + network, + is_secure, + span.clone(), + ) + }, + |(inbox_id, address, nonce, path, network, is_secure, span)| async move { + xmtpv3::mls::create_client( + network, + is_secure, + Some(path), + Some(vec![0u8; 32]), + &inbox_id, + address, + nonce, + None, + Some(HISTORY_SYNC_URL.to_string()), + ) + .instrument(span) + .await + .unwrap(); + }, + BatchSize::SmallInput, + ) + }); + + benchmark_group.finish(); +} + +fn cached_create_ffi_client(c: &mut Criterion) { + xmtp_common::bench::logger(); + + let runtime = setup(); + + let _ = fdlimit::raise_fd_limit(); + let mut benchmark_group = c.benchmark_group("create_client_from_cached"); + let wallet = xmtp_cryptography::utils::generate_local_wallet(); + let nonce = 1; + let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap(); + let address = wallet.get_address(); + let path = tmp_path(); + + // benchmark_group.sample_size(10); + benchmark_group.sampling_mode(criterion::SamplingMode::Flat); + benchmark_group.bench_function("cached_create_ffi_client", |b| { + let span = trace_span!(BENCH_ROOT_SPAN); + b.to_async(&runtime).iter_batched( + || { + let (network, is_secure) = network_url(); + ( + inbox_id.clone(), + address.clone(), + nonce, + path.clone(), + HISTORY_SYNC_URL.to_string(), + network, + is_secure, + span.clone(), + ) + }, + |(inbox_id, address, nonce, path, history_sync, network, is_secure, span)| async move { + xmtpv3::mls::create_client( + network, + is_secure, + Some(path), + Some(vec![0u8; 32]), + &inbox_id, + address, + nonce, + None, + Some(history_sync), + ) + .instrument(span) + .await + .unwrap(); + }, + BatchSize::SmallInput, + ) + }); + + benchmark_group.finish(); +} + +criterion_group!( + name = create_client; + config = Criterion::default().sample_size(10); + targets = create_ffi_client, cached_create_ffi_client +); +criterion_main!(create_client); diff --git a/bindings_ffi/src/logger.rs b/bindings_ffi/src/logger.rs index c4b00e349..a2b2d7a27 100644 --- a/bindings_ffi/src/logger.rs +++ b/bindings_ffi/src/logger.rs @@ -91,6 +91,6 @@ static LOGGER_INIT: Once = Once::new(); pub fn init_logger() { LOGGER_INIT.call_once(|| { let native_layer = native_layer(); - tracing_subscriber::registry().with(native_layer).init() + let _ = tracing_subscriber::registry().with(native_layer).try_init(); }); } diff --git a/common/Cargo.toml b/common/Cargo.toml index 76b836916..41dcc4711 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -14,6 +14,8 @@ xmtp_cryptography.workspace = true parking_lot = { workspace = true, optional = true } tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "ansi", "json"], optional = true } +once_cell = { workspace = true, optional = true } +tracing-flame = { version = "0.2", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { workspace = true, features = ["js"] } @@ -36,3 +38,4 @@ tokio = { workspace = true, features = ["time", "macros", "rt-multi-thread", "sy [features] test-utils = ["dep:parking_lot", "dep:tracing-subscriber", "dep:tracing-wasm", "dep:console_error_panic_hook"] +bench = ["test-utils", "dep:tracing-subscriber", "dep:once_cell", "dep:tracing-flame"] diff --git a/common/src/bench.rs b/common/src/bench.rs new file mode 100644 index 000000000..09ec4e94e --- /dev/null +++ b/common/src/bench.rs @@ -0,0 +1,74 @@ +use once_cell::sync::OnceCell; +use std::sync::Once; +use tracing::{Metadata, Subscriber}; +use tracing_flame::{FlameLayer, FlushGuard}; +use tracing_subscriber::{ + layer::{Context, Filter, Layer, SubscriberExt}, + registry::LookupSpan, + util::SubscriberInitExt, + EnvFilter, +}; +static INIT: Once = Once::new(); + +static LOGGER: OnceCell>> = OnceCell::new(); + +pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench"; + +/// initializes logging for benchmarks +/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options. +/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace` +pub fn logger() { + INIT.call_once(|| { + let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap(); + let flame_layer = flame_layer + .with_threads_collapsed(true) + .with_module_path(true); + // .with_empty_samples(false); + + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env())) + .with( + flame_layer + // .with_filter(BenchFilter) + .with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")), + ) + .init(); + + LOGGER.set(guard).unwrap(); + }) +} + +/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync +/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky +/// workaround to allow us to async setup some groups. +pub fn bench_async_setup(fun: F) -> T +where + F: Fn() -> Fut, + Fut: futures::future::Future, +{ + use tokio::runtime::Handle; + tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await })) +} + +/// Filters for only spans where the root span name is "bench" +pub struct BenchFilter; + +impl Filter for BenchFilter +where + S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug, + for<'lookup> >::Data: std::fmt::Debug, +{ + fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { + if meta.name() == BENCH_ROOT_SPAN { + return true; + } + if let Some(id) = cx.current_span().id() { + if let Some(s) = cx.span_scope(id) { + if let Some(s) = s.from_root().take(1).collect::>().first() { + return s.name() == BENCH_ROOT_SPAN; + } + } + } + false + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index a198962ad..8b2ad8d38 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -7,6 +7,9 @@ mod test; #[cfg(feature = "test-utils")] pub use test::*; +#[cfg(feature = "bench")] +pub mod bench; + pub mod retry; pub use retry::*; diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs index fa7683de7..0fdadaecf 100644 --- a/xmtp_api_grpc/src/grpc_api_helper.rs +++ b/xmtp_api_grpc/src/grpc_api_helper.rs @@ -8,6 +8,7 @@ use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; use tokio::sync::oneshot; use tonic::transport::ClientTlsConfig; use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming}; +use tracing::Instrument; use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams}; use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage}; @@ -28,7 +29,9 @@ use xmtp_proto::{ Error, ErrorKind, }; +#[tracing::instrument(level = "trace", skip_all)] pub async fn create_tls_channel(address: String) -> Result { + let span = tracing::trace_span!("grpc_connect", address); let channel = Channel::from_shared(address) .map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))? // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC. @@ -58,6 +61,7 @@ pub async fn create_tls_channel(address: String) -> Result { .tls_config(ClientTlsConfig::new().with_enabled_roots()) .map_err(|e| Error::new(ErrorKind::SetupTLSConfigError).with(e))? .connect() + .instrument(span) .await .map_err(|e| Error::new(ErrorKind::SetupConnectionError).with(e))?; @@ -74,6 +78,7 @@ pub struct Client { } impl Client { + #[tracing::instrument(level = "trace", skip_all)] pub async fn create(host: impl ToString, is_secure: bool) -> Result { let host = host.to_string(); let app_version = MetadataValue::try_from(&String::from("0.0.0")) diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index ef9d49670..04cfabc05 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -19,13 +19,12 @@ bench = [ "indicatif", "tracing-subscriber", "anyhow", - "tracing-flame", - "once_cell", "dep:xmtp_api_grpc", "criterion", "dep:fdlimit", "dep:ethers", "dep:const_format", + "xmtp_common/bench" ] default = ["grpc-api"] grpc-api = ["dep:xmtp_api_grpc"] @@ -89,15 +88,10 @@ xmtp_api_http = { path = "../xmtp_api_http", optional = true } # Test/Bench Utils anyhow = { workspace = true, optional = true } -criterion = { version = "0.5", features = [ - "html_reports", - "async_tokio", -], optional = true } +criterion = { workspace = true, optional = true } hmac = "0.12.1" indicatif = { version = "0.17", optional = true } mockall = { version = "0.13.1", optional = true } -once_cell = { version = "1.19", optional = true } -tracing-flame = { version = "0.2", optional = true } tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", diff --git a/xmtp_mls/benches/group_limit.rs b/xmtp_mls/benches/group_limit.rs index ba3027eb3..ac2be6a5c 100755 --- a/xmtp_mls/benches/group_limit.rs +++ b/xmtp_mls/benches/group_limit.rs @@ -6,13 +6,11 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri use std::{collections::HashMap, sync::Arc}; use tokio::runtime::{Builder, Runtime}; use tracing::{trace_span, Instrument}; +use xmtp_common::bench::{self, bench_async_setup, BENCH_ROOT_SPAN}; use xmtp_mls::{ builder::ClientBuilder, groups::GroupMetadataOptions, - utils::bench::{ - bench_async_setup, create_identities_if_dont_exist, init_logging, BenchClient, Identity, - BENCH_ROOT_SPAN, - }, + utils::bench::{create_identities_if_dont_exist, BenchClient, Identity}, }; pub const IDENTITY_SAMPLES: [usize; 9] = [10, 20, 40, 80, 100, 200, 300, 400, 450]; @@ -50,7 +48,7 @@ fn setup() -> (Arc, Vec, Runtime) { } fn add_to_empty_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_to_empty_group"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -88,7 +86,7 @@ fn add_to_empty_group(c: &mut Criterion) { } fn add_to_empty_group_by_inbox_id(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_to_empty_group_by_inbox_id"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -130,7 +128,7 @@ fn add_to_empty_group_by_inbox_id(c: &mut Criterion) { } fn add_to_100_member_group_by_inbox_id(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_to_100_member_group_by_inbox_id"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -188,7 +186,7 @@ fn add_to_100_member_group_by_inbox_id(c: &mut Criterion) { } fn remove_all_members_from_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("remove_all_members_from_group"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -233,7 +231,7 @@ fn remove_all_members_from_group(c: &mut Criterion) { } fn remove_half_members_from_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("remove_half_members_from_group"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -281,7 +279,7 @@ fn remove_half_members_from_group(c: &mut Criterion) { } fn add_1_member_to_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_1_member_to_group"); benchmark_group.sample_size(SAMPLE_SIZE); diff --git a/xmtp_mls/benches/identity.rs b/xmtp_mls/benches/identity.rs index a7b10709f..80f267552 100644 --- a/xmtp_mls/benches/identity.rs +++ b/xmtp_mls/benches/identity.rs @@ -1,6 +1,7 @@ use crate::tracing::Instrument; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use tokio::runtime::{Builder, Runtime}; +use xmtp_common::bench::{self, bench_async_setup, BENCH_ROOT_SPAN}; use xmtp_id::{ associations::{ builder::SignatureRequest, @@ -8,8 +9,7 @@ use xmtp_id::{ }, InboxOwner, }; -use xmtp_mls::utils::bench::{bench_async_setup, BenchClient, BENCH_ROOT_SPAN}; -use xmtp_mls::utils::bench::{clients, init_logging}; +use xmtp_mls::utils::bench::{clients, BenchClient}; #[macro_use] extern crate tracing; @@ -38,7 +38,7 @@ async fn ecdsa_signature(client: &BenchClient, owner: impl InboxOwner) -> Signat } fn register_identity_eoa(c: &mut Criterion) { - init_logging(); + bench::logger(); let runtime = setup(); diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index f8411744e..d767f7573 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -65,6 +65,7 @@ impl Client { } impl ClientBuilder { + #[tracing::instrument(level = "trace", skip_all)] pub fn new(strategy: IdentityStrategy) -> Self { Self { api_client: None, @@ -101,7 +102,7 @@ impl ClientBuilder { self.app_version = Some(version); self } - + #[tracing::instrument(level = "trace", skip_all)] pub fn scw_signature_verifier(mut self, verifier: V) -> Self { self.scw_verifier = Some(verifier); self @@ -125,6 +126,7 @@ where ApiClient: XmtpApi + 'static + Send + Sync, { /// Build with the default [`RemoteSignatureVerifier`] + #[tracing::instrument(level = "trace", skip_all)] pub async fn build(self) -> Result, ClientBuilderError> { let (mut builder, api_client) = inner_build_api_client(self)?; builder = builder.scw_signature_verifier(RemoteSignatureVerifier::new(api_client.clone())); @@ -158,6 +160,7 @@ where Ok((builder, Arc::new(api_client))) } +#[tracing::instrument(level = "trace", skip_all)] async fn inner_build( client: ClientBuilder, api_client: Arc, @@ -197,7 +200,6 @@ where installation_id = hex::encode(identity.installation_keys.public_bytes()), "Initialized identity" ); - // get sequence_id from identity updates and loaded into the DB load_identity_updates( &api_client_wrapper, diff --git a/xmtp_mls/src/configuration.rs b/xmtp_mls/src/configuration.rs index d8fa3ea65..118231a57 100644 --- a/xmtp_mls/src/configuration.rs +++ b/xmtp_mls/src/configuration.rs @@ -28,6 +28,8 @@ pub const MAX_GROUP_SIZE: usize = 400; pub const MAX_PAST_EPOCHS: usize = 3; +pub const MAX_DB_POOL_SIZE: u32 = 25; + /// the max amount of data that can be sent in one gRPC call /// we leave 5 * 1024 * 1024 as extra buffer room pub const GRPC_DATA_LIMIT: usize = 45 * 1024 * 1024; diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 65f233256..070caabe6 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -1,18 +1,19 @@ use super::{GroupError, MlsGroup}; -use crate::configuration::NS_IN_HOUR; -use crate::storage::group::{ConversationType, GroupQueryArgs}; -use crate::storage::group_message::MsgQueryArgs; -use crate::storage::DbConnection; -use crate::subscriptions::{LocalEvents, StreamMessages, SubscribeError, SyncMessage}; -use crate::xmtp_openmls_provider::XmtpOpenMlsProvider; +#[cfg(any(test, feature = "test-utils"))] +pub use crate::utils::WorkerHandle; use crate::{ client::ClientError, + configuration::NS_IN_HOUR, storage::{ consent_record::StoredConsentRecord, group::StoredGroup, + group::{ConversationType, GroupQueryArgs}, + group_message::MsgQueryArgs, group_message::{GroupMessageKind, StoredGroupMessage}, - StorageError, + DbConnection, StorageError, }, + subscriptions::{LocalEvents, StreamMessages, SubscribeError, SyncMessage}, + xmtp_openmls_provider::XmtpOpenMlsProvider, Client, Store, }; use aes_gcm::aead::generic_array::GenericArray; @@ -24,14 +25,9 @@ use futures::{Stream, StreamExt}; use preference_sync::UserPreferenceUpdate; use rand::{Rng, RngCore}; use serde::{Deserialize, Serialize}; -use std::future::Future; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; use thiserror::Error; -use tokio::sync::{Notify, OnceCell}; -use tokio::time::error::Elapsed; -use tokio::time::timeout; +use tokio::sync::OnceCell; use tracing::{instrument, warn}; use xmtp_common::time::{now_ns, Duration}; use xmtp_common::{retry_async, Retry, RetryableError}; @@ -119,17 +115,6 @@ impl RetryableError for DeviceSyncError { } } -#[cfg(any(test, feature = "test-utils"))] -impl Client { - pub fn sync_worker_handle(&self) -> Option> { - self.sync_worker_handle.lock().clone() - } - - pub(crate) fn set_sync_worker_handle(&self, handle: Arc) { - *self.sync_worker_handle.lock() = Some(handle); - } -} - impl Client where ApiClient: XmtpApi + Send + Sync + 'static, @@ -163,54 +148,7 @@ pub struct SyncWorker { // Number of events processed #[cfg(any(test, feature = "test-utils"))] - handle: Arc, -} - -#[cfg(any(test, feature = "test-utils"))] -pub struct WorkerHandle { - processed: AtomicUsize, - notify: Notify, -} - -#[cfg(any(test, feature = "test-utils"))] -impl WorkerHandle { - pub async fn wait_for_new_events(&self, mut count: usize) -> Result<(), Elapsed> { - timeout(Duration::from_secs(3), async { - while count > 0 { - self.notify.notified().await; - count -= 1; - } - }) - .await?; - - Ok(()) - } - - pub async fn wait_for_processed_count(&self, expected: usize) -> Result<(), Elapsed> { - timeout(Duration::from_secs(3), async { - while self.processed.load(Ordering::SeqCst) < expected { - self.notify.notified().await; - } - }) - .await?; - - Ok(()) - } - - pub async fn block_for_num_events(&self, num_events: usize, op: Fut) -> Result<(), Elapsed> - where - Fut: Future, - { - let processed_count = self.processed_count(); - op.await; - self.wait_for_processed_count(processed_count + num_events) - .await?; - Ok(()) - } - - pub fn processed_count(&self) -> usize { - self.processed.load(Ordering::SeqCst) - } + handle: std::sync::Arc, } impl SyncWorker @@ -251,8 +189,7 @@ where #[cfg(any(test, feature = "test-utils"))] { - self.handle.processed.fetch_add(1, Ordering::SeqCst); - self.handle.notify.notify_waiters(); + self.handle.increment(); } } Ok(()) @@ -380,10 +317,7 @@ where retry, #[cfg(any(test, feature = "test-utils"))] - handle: Arc::new(WorkerHandle { - processed: AtomicUsize::new(0), - notify: Notify::new(), - }), + handle: std::sync::Arc::new(Default::default()), } } diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index 10a05344a..e8b904c56 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -82,6 +82,7 @@ impl IdentityStrategy { /// Create a new Identity Strategy, with [`IdentityStrategy::CreateIfNotFound`]. /// If an Identity is not found in the local store, creates a new one. + #[tracing::instrument(level = "trace", skip_all)] pub fn new( inbox_id: InboxId, address: String, @@ -106,6 +107,7 @@ impl IdentityStrategy { * the inbox_id configured on the strategy. * **/ + #[tracing::instrument(level = "trace", skip_all)] pub(crate) async fn initialize_identity( self, api_client: &ApiClientWrapper, @@ -131,6 +133,12 @@ impl IdentityStrategy { legacy_signed_private_key, } => { if let Some(stored_identity) = stored_identity { + tracing::debug!( + installation_id = + hex::encode(stored_identity.installation_keys.public_bytes()), + inbox_id = stored_identity.inbox_id, + "Found existing identity in store" + ); if inbox_id != stored_identity.inbox_id { return Err(IdentityError::InboxIdMismatch { id: inbox_id.clone(), @@ -258,6 +266,7 @@ impl Identity { /// If a legacy key is provided, it will be used to sign the identity update and no wallet signature is needed. /// /// If no legacy key is provided, a wallet signature is always required. + #[tracing::instrument(level = "trace", skip_all)] pub(crate) async fn new( inbox_id: InboxId, address: String, diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index c7172dd5b..d874c32e0 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -108,6 +108,7 @@ pub type EncryptedMessageStore = self::private::EncryptedMessageStore Result { Self::new_database(opts, Some(enc_key)) } @@ -118,6 +119,7 @@ impl EncryptedMessageStore { } /// This function is private so that an unencrypted database cannot be created by accident + #[tracing::instrument(level = "trace", skip_all)] fn new_database( opts: StorageOption, enc_key: Option, @@ -172,6 +174,7 @@ pub mod private { where Db: XmtpDb, { + #[tracing::instrument(level = "trace", skip_all)] pub(super) fn init_db(&mut self) -> Result<(), StorageError> { self.db.validate(&self.opts)?; self.db.conn()?.raw_query(|conn| { diff --git a/xmtp_mls/src/storage/encrypted_store/native.rs b/xmtp_mls/src/storage/encrypted_store/native.rs index b254f820e..635b1b4c7 100644 --- a/xmtp_mls/src/storage/encrypted_store/native.rs +++ b/xmtp_mls/src/storage/encrypted_store/native.rs @@ -120,9 +120,9 @@ impl NativeDb { StorageOption::Ephemeral => builder .max_size(1) .build(ConnectionManager::new(":memory:"))?, - StorageOption::Persistent(ref path) => { - builder.max_size(25).build(ConnectionManager::new(path))? - } + StorageOption::Persistent(ref path) => builder + .max_size(crate::configuration::MAX_DB_POOL_SIZE) + .build(ConnectionManager::new(path))?, }; Ok(Self { @@ -186,9 +186,9 @@ impl XmtpDb for NativeDb { StorageOption::Ephemeral => builder .max_size(1) .build(ConnectionManager::new(":memory:"))?, - StorageOption::Persistent(ref path) => { - builder.max_size(25).build(ConnectionManager::new(path))? - } + StorageOption::Persistent(ref path) => builder + .max_size(crate::configuration::MAX_DB_POOL_SIZE) + .build(ConnectionManager::new(path))?, }; let mut pool_write = self.pool.write(); diff --git a/xmtp_mls/src/utils/bench/mod.rs b/xmtp_mls/src/utils/bench/mod.rs index 5466c6dd9..bb1ef3d2c 100644 --- a/xmtp_mls/src/utils/bench/mod.rs +++ b/xmtp_mls/src/utils/bench/mod.rs @@ -8,20 +8,7 @@ pub use identity_gen::*; pub mod clients; pub use clients::*; -use once_cell::sync::OnceCell; -use std::sync::Once; use thiserror::Error; -use tracing::{Metadata, Subscriber}; -use tracing_flame::{FlameLayer, FlushGuard}; -use tracing_subscriber::{ - layer::{Context, Filter, Layer, SubscriberExt}, - registry::LookupSpan, - util::SubscriberInitExt, - EnvFilter, -}; - -pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench"; - /// Re-export of functions in private modules for benchmarks pub mod re_export { pub use crate::hpke::encrypt_welcome; @@ -34,66 +21,3 @@ pub enum BenchError { #[error(transparent)] Io(#[from] std::io::Error), } - -static INIT: Once = Once::new(); - -static LOGGER: OnceCell>> = OnceCell::new(); - -/// initializes logging for benchmarks -/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options. -/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace` -pub fn init_logging() { - INIT.call_once(|| { - let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap(); - let flame_layer = flame_layer - .with_threads_collapsed(true) - .with_module_path(true); - // .with_empty_samples(false); - - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env())) - .with( - flame_layer - .with_filter(BenchFilter) - .with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")), - ) - .init(); - - LOGGER.set(guard).unwrap(); - }) -} - -/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync -/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky -/// workaround to allow us to async setup some groups. -pub fn bench_async_setup(fun: F) -> T -where - F: Fn() -> Fut, - Fut: futures::future::Future, -{ - use tokio::runtime::Handle; - tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await })) -} - -/// Filters for only spans where the root span name is "bench" -pub struct BenchFilter; - -impl Filter for BenchFilter -where - S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug, - for<'lookup> >::Data: std::fmt::Debug, -{ - fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { - if meta.name() == BENCH_ROOT_SPAN { - return true; - } - if let Some(id) = cx.current_span().id() { - if let Some(s) = cx.span_scope(id) { - if let Some(s) = s.from_root().take(1).collect::>().first() { - return s.name() == BENCH_ROOT_SPAN; - } - } - } - false - } -} diff --git a/xmtp_mls/src/utils/mod.rs b/xmtp_mls/src/utils/mod.rs index 357410b65..3ea0f821f 100644 --- a/xmtp_mls/src/utils/mod.rs +++ b/xmtp_mls/src/utils/mod.rs @@ -3,6 +3,9 @@ pub mod bench; #[cfg(any(test, feature = "test-utils"))] pub mod test; +#[cfg(any(test, feature = "test-utils"))] +pub use self::test::*; + pub mod hash { pub use xmtp_cryptography::hash::sha256_bytes as sha256; } diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index ff44452b2..636467fc0 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -1,7 +1,14 @@ #![allow(clippy::unwrap_used)] -use std::sync::Arc; +use std::{ + future::Future, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use tokio::sync::Notify; +use xmtp_common::time::{timeout, Expired}; use xmtp_id::{ associations::{ generate_inbox_id, @@ -238,6 +245,68 @@ where } } +#[derive(Default)] +pub struct WorkerHandle { + processed: AtomicUsize, + notify: Notify, +} + +impl WorkerHandle { + pub async fn wait_for_new_events(&self, mut count: usize) -> Result<(), Expired> { + timeout(xmtp_common::time::Duration::from_secs(3), async { + while count > 0 { + self.notify.notified().await; + count -= 1; + } + }) + .await?; + + Ok(()) + } + + pub async fn wait_for_processed_count(&self, expected: usize) -> Result<(), Expired> { + timeout(xmtp_common::time::Duration::from_secs(3), async { + while self.processed.load(Ordering::SeqCst) < expected { + self.notify.notified().await; + } + }) + .await?; + + Ok(()) + } + + pub async fn block_for_num_events(&self, num_events: usize, op: Fut) -> Result<(), Expired> + where + Fut: Future, + { + let processed_count = self.processed_count(); + op.await; + self.wait_for_processed_count(processed_count + num_events) + .await?; + Ok(()) + } + + pub fn processed_count(&self) -> usize { + self.processed.load(Ordering::SeqCst) + } + + pub fn increment(&self) { + self.processed + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.notify.notify_waiters(); + } +} + +impl Client { + pub fn sync_worker_handle(&self) -> Option> { + self.sync_worker_handle.lock().clone() + } + + pub(crate) fn set_sync_worker_handle(&self, handle: Arc) { + *self.sync_worker_handle.lock() = Some(handle); + } +} + pub async fn register_client( client: &Client, owner: impl InboxOwner,