Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmarks for create_client #1392

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ wasm-bindgen-futures = "0.4"
wasm-bindgen-test = "0.3.49"
web-sys = "0.3"
zeroize = "1.8"
criterion = { version = "0.5", features = [
"html_reports",
"async_tokio",
]}
once_cell = "1.2"

# Internal Crate Dependencies
xmtp_api_grpc = { path = "xmtp_api_grpc" }
Expand All @@ -112,6 +117,9 @@ xmtp_common = { path = "common" }
# and we don't rely on it for debugging that much.
debug = 0

[profile.bench]
debug = true

# Setting opt-level to 3 for proc macros/build scripts
# speeds up buildtime
[profile.dev.build-override]
Expand Down
14 changes: 14 additions & 0 deletions bindings_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ xmtp_user_preferences = { path = "../xmtp_user_preferences" }
xmtp_v2 = { path = "../xmtp_v2" }
xmtp_common.workspace = true

# Bench
criterion = { workspace = true, optional = true }
fdlimit = { version = "0.3", optional = true}


[target.'cfg(target_os = "android")'.dependencies]
paranoid-android = "0.2"

Expand All @@ -47,3 +52,12 @@ uuid = { workspace = true, features = ["v4", "fast-rng"] }
xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] }
xmtp_mls = { path = "../xmtp_mls", features = ["test-utils"] }
xmtp_proto = { path = "../xmtp_proto", features = ["test-utils"] }

[features]
bench = ["xmtp_mls/bench", "xmtp_common/bench", "dep:criterion", "dep:fdlimit"]

[[bench]]
harness = false
name = "create_client"
required-features = ["bench"]

148 changes: 148 additions & 0 deletions bindings_ffi/benches/create_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//! NOTE:
// `MAX_DB_POOL_SIZE` in `configuration.rs` must be set to `10`
// in order for these benchmarks to succesfully run & generate a report.
// (file descriptor issue)

use crate::tracing::Instrument;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use tokio::runtime::{Builder, Runtime};
use xmtp_common::{bench::BENCH_ROOT_SPAN, tmp_path};
use xmtp_id::InboxOwner;
use xmtp_mls::utils::test::HISTORY_SYNC_URL;
use xmtpv3::generate_inbox_id;

#[macro_use]
extern crate tracing;

fn setup() -> Runtime {
Builder::new_multi_thread()
.enable_time()
.enable_io()
.thread_name("xmtp-bencher")
.build()
.unwrap()
}

fn network_url() -> (String, bool) {
let dev = std::env::var("DEV_GRPC");
let is_dev_network = matches!(dev, Ok(d) if d == "true" || d == "1");

if is_dev_network {
(xmtp_api_grpc::DEV_ADDRESS.to_string(), true)
} else {
(xmtp_api_grpc::LOCALHOST_ADDRESS.to_string(), false)
}
}

fn create_ffi_client(c: &mut Criterion) {
xmtp_common::bench::logger();

let runtime = setup();

let _ = fdlimit::raise_fd_limit();
let mut benchmark_group = c.benchmark_group("create_client");

// benchmark_group.sample_size(10);
benchmark_group.sampling_mode(criterion::SamplingMode::Flat);
benchmark_group.bench_function("create_ffi_client", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let nonce = 1;
let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap();
let path = tmp_path();
let (network, is_secure) = network_url();
(
inbox_id,
wallet.get_address(),
nonce,
path,
network,
is_secure,
span.clone(),
)
},
|(inbox_id, address, nonce, path, network, is_secure, span)| async move {
xmtpv3::mls::create_client(
network,
is_secure,
Some(path),
Some(vec![0u8; 32]),
&inbox_id,
address,
nonce,
None,
Some(HISTORY_SYNC_URL.to_string()),
)
.instrument(span)
.await
.unwrap();
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

fn cached_create_ffi_client(c: &mut Criterion) {
xmtp_common::bench::logger();

let runtime = setup();

let _ = fdlimit::raise_fd_limit();
let mut benchmark_group = c.benchmark_group("create_client_from_cached");
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let nonce = 1;
let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap();
let address = wallet.get_address();
let path = tmp_path();

// benchmark_group.sample_size(10);
benchmark_group.sampling_mode(criterion::SamplingMode::Flat);
benchmark_group.bench_function("cached_create_ffi_client", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
let (network, is_secure) = network_url();
(
inbox_id.clone(),
address.clone(),
nonce,
path.clone(),
HISTORY_SYNC_URL.to_string(),
network,
is_secure,
span.clone(),
)
},
|(inbox_id, address, nonce, path, history_sync, network, is_secure, span)| async move {
xmtpv3::mls::create_client(
network,
is_secure,
Some(path),
Some(vec![0u8; 32]),
&inbox_id,
address,
nonce,
None,
Some(history_sync),
)
.instrument(span)
.await
.unwrap();
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

criterion_group!(
name = create_client;
config = Criterion::default().sample_size(10);
targets = create_ffi_client, cached_create_ffi_client
);
criterion_main!(create_client);
2 changes: 1 addition & 1 deletion bindings_ffi/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ static LOGGER_INIT: Once = Once::new();
pub fn init_logger() {
LOGGER_INIT.call_once(|| {
let native_layer = native_layer();
tracing_subscriber::registry().with(native_layer).init()
let _ = tracing_subscriber::registry().with(native_layer).try_init();
});
}
3 changes: 3 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ xmtp_cryptography.workspace = true

parking_lot = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "ansi", "json"], optional = true }
once_cell = { workspace = true, optional = true }
tracing-flame = { version = "0.2", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { workspace = true, features = ["js"] }
Expand All @@ -36,3 +38,4 @@ tokio = { workspace = true, features = ["time", "macros", "rt-multi-thread", "sy

[features]
test-utils = ["dep:parking_lot", "dep:tracing-subscriber", "dep:tracing-wasm", "dep:console_error_panic_hook"]
bench = ["test-utils", "dep:tracing-subscriber", "dep:once_cell", "dep:tracing-flame"]
74 changes: 74 additions & 0 deletions common/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use once_cell::sync::OnceCell;
use std::sync::Once;
use tracing::{Metadata, Subscriber};
use tracing_flame::{FlameLayer, FlushGuard};
use tracing_subscriber::{
layer::{Context, Filter, Layer, SubscriberExt},
registry::LookupSpan,
util::SubscriberInitExt,
EnvFilter,
};
static INIT: Once = Once::new();

static LOGGER: OnceCell<FlushGuard<std::io::BufWriter<std::fs::File>>> = OnceCell::new();

pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench";

/// initializes logging for benchmarks
/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options.
/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace`
pub fn logger() {
INIT.call_once(|| {
let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap();
let flame_layer = flame_layer
.with_threads_collapsed(true)
.with_module_path(true);
// .with_empty_samples(false);

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()))
.with(
flame_layer
// .with_filter(BenchFilter)
.with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")),
)
.init();

LOGGER.set(guard).unwrap();
})
}

/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync
/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky
/// workaround to allow us to async setup some groups.
pub fn bench_async_setup<F, T, Fut>(fun: F) -> T
where
F: Fn() -> Fut,
Fut: futures::future::Future<Output = T>,
{
use tokio::runtime::Handle;
tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await }))
}

/// Filters for only spans where the root span name is "bench"
pub struct BenchFilter;

impl<S> Filter<S> for BenchFilter
where
S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug,
for<'lookup> <S as LookupSpan<'lookup>>::Data: std::fmt::Debug,
{
fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool {
if meta.name() == BENCH_ROOT_SPAN {
return true;
}
if let Some(id) = cx.current_span().id() {
if let Some(s) = cx.span_scope(id) {
if let Some(s) = s.from_root().take(1).collect::<Vec<_>>().first() {
return s.name() == BENCH_ROOT_SPAN;
}
}
}
false
}
}
3 changes: 3 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod test;
#[cfg(feature = "test-utils")]
pub use test::*;

#[cfg(feature = "bench")]
pub mod bench;

pub mod retry;
pub use retry::*;

Expand Down
5 changes: 5 additions & 0 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::sync::oneshot;
use tonic::transport::ClientTlsConfig;
use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming};
use tracing::Instrument;

use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams};
use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage};
Expand All @@ -28,7 +29,9 @@ use xmtp_proto::{
Error, ErrorKind,
};

#[tracing::instrument(level = "trace", skip_all)]
pub async fn create_tls_channel(address: String) -> Result<Channel, Error> {
let span = tracing::trace_span!("grpc_connect", address);
let channel = Channel::from_shared(address)
.map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))?
// Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC.
Expand Down Expand Up @@ -58,6 +61,7 @@ pub async fn create_tls_channel(address: String) -> Result<Channel, Error> {
.tls_config(ClientTlsConfig::new().with_enabled_roots())
.map_err(|e| Error::new(ErrorKind::SetupTLSConfigError).with(e))?
.connect()
.instrument(span)
.await
.map_err(|e| Error::new(ErrorKind::SetupConnectionError).with(e))?;

Expand All @@ -74,6 +78,7 @@ pub struct Client {
}

impl Client {
#[tracing::instrument(level = "trace", skip_all)]
pub async fn create(host: impl ToString, is_secure: bool) -> Result<Self, Error> {
let host = host.to_string();
let app_version = MetadataValue::try_from(&String::from("0.0.0"))
Expand Down
Loading
Loading