diff --git a/.cargo/nextest.toml b/.cargo/nextest.toml index 07216b1ff..b7afc9364 100644 --- a/.cargo/nextest.toml +++ b/.cargo/nextest.toml @@ -1,2 +1,3 @@ [profile.default] retries = 3 +default-filter = "not test(test_stream_all_messages_does_not_lose_messages)" diff --git a/.github/workflows/lint-workspace.yaml b/.github/workflows/lint-workspace.yaml index 190987222..f6653b2bf 100644 --- a/.github/workflows/lint-workspace.yaml +++ b/.github/workflows/lint-workspace.yaml @@ -14,6 +14,7 @@ on: - "xmtp_mls/**" - "xmtp_proto/**" - "xmtp_v2/**" + - "xmtp_debug/**" - "Cargo.toml" - "Cargo.lock" - "rust-toolchain" diff --git a/.github/workflows/push-xbg.yml b/.github/workflows/push-xbg.yml new file mode 100644 index 000000000..55e69d314 --- /dev/null +++ b/.github/workflows/push-xbg.yml @@ -0,0 +1,49 @@ +name: Push XDBG Image + +on: + push: + branches: + - main + + workflow_dispatch: + +jobs: + push_to_registry: + name: Push Docker Image to GitHub Packages + runs-on: warp-ubuntu-latest-x64-16x + permissions: + contents: read + packages: write + outputs: + digest: ${{ steps.push.outputs.digest }} + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Log in to the container registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/${{ github.repository_owner }}/xdbg + tags: | + type=schedule + type=ref,event=branch + type=ref,event=tag + type=ref,event=pr + type=sha + - name: Build and push Docker image + uses: docker/build-push-action@v6 + id: push + with: + context: . + file: ./dev/xdbg/Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.github/workflows/test-http-api.yml b/.github/workflows/test-http-api.yml index e906f38aa..92e6c60cb 100644 --- a/.github/workflows/test-http-api.yml +++ b/.github/workflows/test-http-api.yml @@ -41,6 +41,6 @@ jobs: - name: Install nextest uses: taiki-e/install-action@nextest - name: build tests - run: cargo nextest run --no-run --tests --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api + run: cargo nextest run --config-file ".cargo/nextest.toml" --no-run --tests --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api - name: cargo test - run: cargo nextest run --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api --test-threads 2 + run: cargo nextest run --config-file ".cargo/nextest.toml" --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api --test-threads 2 diff --git a/Cargo.lock b/Cargo.lock index 7cf24af0f..cf5f4a245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1311,24 +1311,6 @@ dependencies = [ "time", ] -[[package]] -name = "diesel-wasm-sqlite" -version = "0.0.1" -source = "git+https://github.com/xmtp/sqlite-web-rs?branch=main#7f0f938aa4d49ed7bfe5624d086e7f6583805419" -dependencies = [ - "diesel", - "diesel_derives", - "js-sys", - "serde", - "serde-wasm-bindgen", - "talc", - "thiserror 2.0.6", - "tokio", - "tracing", - "wasm-bindgen", - "wasm-bindgen-futures", -] - [[package]] name = "diesel_derives" version = "2.2.0" @@ -3229,7 +3211,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -4655,9 +4637,9 @@ dependencies = [ [[package]] name = "redb" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84b1de48a7cf7ba193e81e078d17ee2b786236eed1d3f7c60f8a09545efc4925" +checksum = "a7c2a94325f9c5826b17c42af11067230f503747f870117a28180e85696e21ba" dependencies = [ "libc", ] @@ -5512,6 +5494,24 @@ dependencies = [ "der 0.7.9", ] +[[package]] +name = "sqlite-web" +version = "0.0.1" +source = "git+https://github.com/xmtp/sqlite-web-rs?branch=main#0aa53058cbf3a1044e79cb6dd68a91c857308059" +dependencies = [ + "diesel", + "diesel_derives", + "js-sys", + "serde", + "serde-wasm-bindgen", + "talc", + "thiserror 2.0.6", + "tokio", + "tracing", + "wasm-bindgen", + "wasm-bindgen-futures", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -6899,7 +6899,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -7299,11 +7299,13 @@ dependencies = [ "getrandom", "gloo-timers 0.3.0", "js-sys", + "once_cell", "parking_lot 0.12.3", "rand", "thiserror 2.0.6", "tokio", "tracing", + "tracing-flame", "tracing-subscriber", "tracing-wasm", "wasm-bindgen-futures", @@ -7401,7 +7403,6 @@ dependencies = [ "criterion", "ctor", "diesel", - "diesel-wasm-sqlite", "diesel_migrations", "dyn-clone", "ethers", @@ -7416,7 +7417,6 @@ dependencies = [ "libsqlite3-sys", "mockall", "mockito", - "once_cell", "openmls", "openmls_basic_credential", "openmls_rust_crypto", @@ -7430,6 +7430,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", + "sqlite-web", "tempfile", "thiserror 2.0.6", "tls_codec", @@ -7437,7 +7438,6 @@ dependencies = [ "tokio-stream", "toml 0.8.19", "tracing", - "tracing-flame", "tracing-subscriber", "tracing-wasm", "trait-variant", @@ -7505,7 +7505,9 @@ dependencies = [ name = "xmtpv3" version = "0.1.0" dependencies = [ + "criterion", "ethers", + "fdlimit", "futures", "paranoid-android", "parking_lot 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index 0dde776cf..b46b42352 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ members = [ "xtask", "xmtp_debug", "xmtp_content_types", - "common" + "common", ] # Make the feature resolver explicit. @@ -75,7 +75,6 @@ bincode = "1.3" console_error_panic_hook = "0.1" const_format = "0.2" diesel = { version = "2.2", default-features = false } -diesel-wasm-sqlite = "0.0.1" diesel_migrations = { version = "2.2", default-features = false } dyn-clone = "1" fdlimit = "0.3" @@ -87,6 +86,7 @@ libsqlite3-sys = { version = "0.29", features = [ openssl = { version = "0.10", features = ["vendored"] } openssl-sys = { version = "0.9", features = ["vendored"] } parking_lot = "0.12.3" +sqlite-web = "0.0.1" tonic = { version = "0.12", default-features = false } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", default-features = false } @@ -97,21 +97,29 @@ wasm-bindgen-futures = "0.4" wasm-bindgen-test = "0.3.49" web-sys = "0.3" zeroize = "1.8" +criterion = { version = "0.5", features = [ + "html_reports", + "async_tokio", +]} + once_cell = "1.2" # Internal Crate Dependencies xmtp_api_grpc = { path = "xmtp_api_grpc" } +xmtp_common = { path = "common" } +xmtp_content_types = { path = "xmtp_content_types" } xmtp_cryptography = { path = "xmtp_cryptography" } xmtp_id = { path = "xmtp_id" } xmtp_mls = { path = "xmtp_mls" } xmtp_proto = { path = "xmtp_proto" } -xmtp_content_types = { path = "xmtp_content_types" } -xmtp_common = { path = "common" } [profile.dev] # Disabling debug info speeds up builds a bunch, # and we don't rely on it for debugging that much. debug = 0 +[profile.bench] +debug = true + # Setting opt-level to 3 for proc macros/build scripts # speeds up buildtime [profile.dev.build-override] @@ -147,6 +155,6 @@ opt-level = "s" # (cfg-specific patche support does not exist) [patch.crates-io] diesel = { git = "https://github.com/diesel-rs/diesel", branch = "master" } -diesel-wasm-sqlite = { git = "https://github.com/xmtp/sqlite-web-rs", branch = "main" } diesel_derives = { git = "https://github.com/diesel-rs/diesel", branch = "master" } diesel_migrations = { git = "https://github.com/diesel-rs/diesel", branch = "master" } +sqlite-web = { git = "https://github.com/xmtp/sqlite-web-rs", branch = "main" } diff --git a/bindings_ffi/Cargo.toml b/bindings_ffi/Cargo.toml index 7e44b3498..5831c3d7d 100644 --- a/bindings_ffi/Cargo.toml +++ b/bindings_ffi/Cargo.toml @@ -24,6 +24,11 @@ xmtp_user_preferences = { path = "../xmtp_user_preferences" } xmtp_v2 = { path = "../xmtp_v2" } xmtp_common.workspace = true +# Bench +criterion = { workspace = true, optional = true } +fdlimit = { version = "0.3", optional = true} + + [target.'cfg(target_os = "android")'.dependencies] paranoid-android = "0.2" @@ -47,3 +52,12 @@ uuid = { workspace = true, features = ["v4", "fast-rng"] } xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] } xmtp_mls = { path = "../xmtp_mls", features = ["test-utils"] } xmtp_proto = { path = "../xmtp_proto", features = ["test-utils"] } + +[features] +bench = ["xmtp_mls/bench", "xmtp_common/bench", "dep:criterion", "dep:fdlimit"] + +[[bench]] +harness = false +name = "create_client" +required-features = ["bench"] + diff --git a/bindings_ffi/benches/create_client.rs b/bindings_ffi/benches/create_client.rs new file mode 100644 index 000000000..2e6ca88da --- /dev/null +++ b/bindings_ffi/benches/create_client.rs @@ -0,0 +1,148 @@ +//! NOTE: +// `MAX_DB_POOL_SIZE` in `configuration.rs` must be set to `10` +// in order for these benchmarks to succesfully run & generate a report. +// (file descriptor issue) + +use crate::tracing::Instrument; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +use tokio::runtime::{Builder, Runtime}; +use xmtp_common::{bench::BENCH_ROOT_SPAN, tmp_path}; +use xmtp_id::InboxOwner; +use xmtp_mls::utils::test::HISTORY_SYNC_URL; +use xmtpv3::generate_inbox_id; + +#[macro_use] +extern crate tracing; + +fn setup() -> Runtime { + Builder::new_multi_thread() + .enable_time() + .enable_io() + .thread_name("xmtp-bencher") + .build() + .unwrap() +} + +fn network_url() -> (String, bool) { + let dev = std::env::var("DEV_GRPC"); + let is_dev_network = matches!(dev, Ok(d) if d == "true" || d == "1"); + + if is_dev_network { + (xmtp_api_grpc::DEV_ADDRESS.to_string(), true) + } else { + (xmtp_api_grpc::LOCALHOST_ADDRESS.to_string(), false) + } +} + +fn create_ffi_client(c: &mut Criterion) { + xmtp_common::bench::logger(); + + let runtime = setup(); + + let _ = fdlimit::raise_fd_limit(); + let mut benchmark_group = c.benchmark_group("create_client"); + + // benchmark_group.sample_size(10); + benchmark_group.sampling_mode(criterion::SamplingMode::Flat); + benchmark_group.bench_function("create_ffi_client", |b| { + let span = trace_span!(BENCH_ROOT_SPAN); + b.to_async(&runtime).iter_batched( + || { + let wallet = xmtp_cryptography::utils::generate_local_wallet(); + let nonce = 1; + let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap(); + let path = tmp_path(); + let (network, is_secure) = network_url(); + ( + inbox_id, + wallet.get_address(), + nonce, + path, + network, + is_secure, + span.clone(), + ) + }, + |(inbox_id, address, nonce, path, network, is_secure, span)| async move { + xmtpv3::mls::create_client( + network, + is_secure, + Some(path), + Some(vec![0u8; 32]), + &inbox_id, + address, + nonce, + None, + Some(HISTORY_SYNC_URL.to_string()), + ) + .instrument(span) + .await + .unwrap(); + }, + BatchSize::SmallInput, + ) + }); + + benchmark_group.finish(); +} + +fn cached_create_ffi_client(c: &mut Criterion) { + xmtp_common::bench::logger(); + + let runtime = setup(); + + let _ = fdlimit::raise_fd_limit(); + let mut benchmark_group = c.benchmark_group("create_client_from_cached"); + let wallet = xmtp_cryptography::utils::generate_local_wallet(); + let nonce = 1; + let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap(); + let address = wallet.get_address(); + let path = tmp_path(); + + // benchmark_group.sample_size(10); + benchmark_group.sampling_mode(criterion::SamplingMode::Flat); + benchmark_group.bench_function("cached_create_ffi_client", |b| { + let span = trace_span!(BENCH_ROOT_SPAN); + b.to_async(&runtime).iter_batched( + || { + let (network, is_secure) = network_url(); + ( + inbox_id.clone(), + address.clone(), + nonce, + path.clone(), + HISTORY_SYNC_URL.to_string(), + network, + is_secure, + span.clone(), + ) + }, + |(inbox_id, address, nonce, path, history_sync, network, is_secure, span)| async move { + xmtpv3::mls::create_client( + network, + is_secure, + Some(path), + Some(vec![0u8; 32]), + &inbox_id, + address, + nonce, + None, + Some(history_sync), + ) + .instrument(span) + .await + .unwrap(); + }, + BatchSize::SmallInput, + ) + }); + + benchmark_group.finish(); +} + +criterion_group!( + name = create_client; + config = Criterion::default().sample_size(10); + targets = create_ffi_client, cached_create_ffi_client +); +criterion_main!(create_client); diff --git a/bindings_ffi/src/logger.rs b/bindings_ffi/src/logger.rs index c4b00e349..a2b2d7a27 100644 --- a/bindings_ffi/src/logger.rs +++ b/bindings_ffi/src/logger.rs @@ -91,6 +91,6 @@ static LOGGER_INIT: Once = Once::new(); pub fn init_logger() { LOGGER_INIT.call_once(|| { let native_layer = native_layer(); - tracing_subscriber::registry().with(native_layer).init() + let _ = tracing_subscriber::registry().with(native_layer).try_init(); }); } diff --git a/bindings_node/CHANGELOG.md b/bindings_node/CHANGELOG.md index 3a46a8658..98fc50dc7 100644 --- a/bindings_node/CHANGELOG.md +++ b/bindings_node/CHANGELOG.md @@ -1,5 +1,9 @@ # @xmtp/node-bindings +## 0.0.29 + +- Added support for custom permission policy sets + ## 0.0.28 - Removed `is_installation_authorized` and `is_address_authorized` from `Client` diff --git a/bindings_node/package.json b/bindings_node/package.json index a8a0907b0..4bc65d3e6 100644 --- a/bindings_node/package.json +++ b/bindings_node/package.json @@ -1,6 +1,6 @@ { "name": "@xmtp/node-bindings", - "version": "0.0.28", + "version": "0.0.29", "repository": { "type": "git", "url": "git+https://git@github.com/xmtp/libxmtp.git", diff --git a/bindings_node/src/conversation.rs b/bindings_node/src/conversation.rs index 20ffb2386..e8a4066d0 100644 --- a/bindings_node/src/conversation.rs +++ b/bindings_node/src/conversation.rs @@ -9,6 +9,8 @@ use xmtp_cryptography::signature::ed25519_public_key_to_address; use xmtp_mls::{ groups::{ group_metadata::GroupMetadata as XmtpGroupMetadata, + group_mutable_metadata::MetadataField as XmtpMetadataField, + intents::PermissionUpdateType as XmtpPermissionUpdateType, members::PermissionLevel as XmtpPermissionLevel, MlsGroup, UpdateAdminListType, }, storage::{ @@ -23,7 +25,7 @@ use crate::{ consent_state::ConsentState, encoded_content::EncodedContent, message::{ListMessagesOptions, Message}, - permissions::GroupPermissions, + permissions::{GroupPermissions, MetadataField, PermissionPolicy, PermissionUpdateType}, streams::StreamCloser, ErrorWrapper, }; @@ -654,4 +656,31 @@ impl Conversation { Ok(group.dm_inbox_id().map_err(ErrorWrapper::from)?) } + + #[napi] + pub async fn update_permission_policy( + &self, + permission_update_type: PermissionUpdateType, + permission_policy_option: PermissionPolicy, + metadata_field: Option, + ) -> Result<()> { + let group = MlsGroup::new( + self.inner_client.clone(), + self.group_id.clone(), + self.created_at_ns, + ); + + group + .update_permission_policy( + XmtpPermissionUpdateType::from(&permission_update_type), + permission_policy_option + .try_into() + .map_err(ErrorWrapper::from)?, + metadata_field.map(|field| XmtpMetadataField::from(&field)), + ) + .await + .map_err(ErrorWrapper::from)?; + + Ok(()) + } } diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index f957a9807..2b0247bb2 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -12,7 +12,7 @@ use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState; use xmtp_mls::storage::group::GroupQueryArgs; use crate::message::Message; -use crate::permissions::GroupPermissionsOptions; +use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet}; use crate::ErrorWrapper; use crate::{client::RustXmtpClient, conversation::Conversation, streams::StreamCloser}; @@ -105,6 +105,7 @@ pub struct CreateGroupOptions { pub group_image_url_square: Option, pub group_description: Option, pub group_pinned_frame_url: Option, + pub custom_permission_policy_set: Option, } impl CreateGroupOptions { @@ -143,9 +144,22 @@ impl Conversations { group_image_url_square: None, group_description: None, group_pinned_frame_url: None, + custom_permission_policy_set: None, }, }; + if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions { + if options.custom_permission_policy_set.is_none() { + return Err(Error::from_reason("CustomPolicy must include policy set")); + } + } else if options.custom_permission_policy_set.is_some() { + return Err(Error::from_reason( + "Only CustomPolicy may specify a policy set", + )); + } + + let metadata_options = options.clone().into_group_metadata_options(); + let group_permissions = match options.permissions { Some(GroupPermissionsOptions::AllMembers) => { Some(PreconfiguredPolicies::AllMembers.to_policy_set()) @@ -153,11 +167,20 @@ impl Conversations { Some(GroupPermissionsOptions::AdminOnly) => { Some(PreconfiguredPolicies::AdminsOnly.to_policy_set()) } + Some(GroupPermissionsOptions::CustomPolicy) => { + if let Some(policy_set) = options.custom_permission_policy_set { + Some( + policy_set + .try_into() + .map_err(|e| Error::from_reason(format!("{}", e).as_str()))?, + ) + } else { + None + } + } _ => None, }; - let metadata_options = options.clone().into_group_metadata_options(); - let convo = if account_addresses.is_empty() { self .inner_client diff --git a/bindings_node/src/permissions.rs b/bindings_node/src/permissions.rs index 98e672951..3008c4b4e 100644 --- a/bindings_node/src/permissions.rs +++ b/bindings_node/src/permissions.rs @@ -1,10 +1,12 @@ -use napi::bindgen_prelude::{Error, Result}; +use napi::bindgen_prelude::Result; use napi_derive::napi; +use std::collections::HashMap; use xmtp_mls::groups::{ - group_mutable_metadata::MetadataField, + group_mutable_metadata::MetadataField as XmtpMetadataField, group_permissions::{ - BasePolicies, GroupMutablePermissions, MembershipPolicies, MetadataBasePolicies, - MetadataPolicies, PermissionsBasePolicies, PermissionsPolicies, + BasePolicies, GroupMutablePermissions, GroupMutablePermissionsError, MembershipPolicies, + MetadataBasePolicies, MetadataPolicies, PermissionsBasePolicies, PermissionsPolicies, + PolicySet, }, intents::{PermissionPolicyOption, PermissionUpdateType as XmtpPermissionUpdateType}, PreconfiguredPolicies, @@ -49,15 +51,15 @@ pub enum PermissionPolicy { } impl TryInto for PermissionPolicy { - type Error = Error; + type Error = GroupMutablePermissionsError; - fn try_into(self) -> Result { + fn try_into(self) -> std::result::Result { match self { PermissionPolicy::Allow => Ok(PermissionPolicyOption::Allow), PermissionPolicy::Deny => Ok(PermissionPolicyOption::Deny), PermissionPolicy::Admin => Ok(PermissionPolicyOption::AdminOnly), PermissionPolicy::SuperAdmin => Ok(PermissionPolicyOption::SuperAdminOnly), - _ => Err(Error::from_reason("InvalidPermissionPolicyOption")), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), } } } @@ -107,7 +109,49 @@ impl From<&PermissionsPolicies> for PermissionPolicy { } } +impl TryInto for PermissionPolicy { + type Error = GroupMutablePermissionsError; + + fn try_into(self) -> std::result::Result { + match self { + PermissionPolicy::Allow => Ok(MetadataPolicies::allow()), + PermissionPolicy::Deny => Ok(MetadataPolicies::deny()), + PermissionPolicy::Admin => Ok(MetadataPolicies::allow_if_actor_admin()), + PermissionPolicy::SuperAdmin => Ok(MetadataPolicies::allow_if_actor_super_admin()), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), + } + } +} + +impl TryInto for PermissionPolicy { + type Error = GroupMutablePermissionsError; + + fn try_into(self) -> std::result::Result { + match self { + PermissionPolicy::Deny => Ok(PermissionsPolicies::deny()), + PermissionPolicy::Admin => Ok(PermissionsPolicies::allow_if_actor_admin()), + PermissionPolicy::SuperAdmin => Ok(PermissionsPolicies::allow_if_actor_super_admin()), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), + } + } +} + +impl TryInto for PermissionPolicy { + type Error = GroupMutablePermissionsError; + + fn try_into(self) -> std::result::Result { + match self { + PermissionPolicy::Allow => Ok(MembershipPolicies::allow()), + PermissionPolicy::Deny => Ok(MembershipPolicies::deny()), + PermissionPolicy::Admin => Ok(MembershipPolicies::allow_if_actor_admin()), + PermissionPolicy::SuperAdmin => Ok(MembershipPolicies::allow_if_actor_super_admin()), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), + } + } +} + #[napi(object)] +#[derive(Clone)] pub struct PermissionPolicySet { pub add_member_policy: PermissionPolicy, pub remove_member_policy: PermissionPolicy, @@ -163,10 +207,67 @@ impl GroupPermissions { remove_member_policy: PermissionPolicy::from(&policy_set.remove_member_policy), add_admin_policy: PermissionPolicy::from(&policy_set.add_admin_policy), remove_admin_policy: PermissionPolicy::from(&policy_set.remove_admin_policy), - update_group_name_policy: get_policy(MetadataField::GroupName.as_str()), - update_group_description_policy: get_policy(MetadataField::Description.as_str()), - update_group_image_url_square_policy: get_policy(MetadataField::GroupImageUrlSquare.as_str()), - update_group_pinned_frame_url_policy: get_policy(MetadataField::GroupPinnedFrameUrl.as_str()), + update_group_name_policy: get_policy(XmtpMetadataField::GroupName.as_str()), + update_group_description_policy: get_policy(XmtpMetadataField::Description.as_str()), + update_group_image_url_square_policy: get_policy( + XmtpMetadataField::GroupImageUrlSquare.as_str(), + ), + update_group_pinned_frame_url_policy: get_policy( + XmtpMetadataField::GroupPinnedFrameUrl.as_str(), + ), + }) + } +} + +impl TryFrom for PolicySet { + type Error = GroupMutablePermissionsError; + fn try_from( + policy_set: PermissionPolicySet, + ) -> std::result::Result { + let mut metadata_permissions_map: HashMap = HashMap::new(); + metadata_permissions_map.insert( + XmtpMetadataField::GroupName.to_string(), + policy_set.update_group_name_policy.try_into()?, + ); + metadata_permissions_map.insert( + XmtpMetadataField::Description.to_string(), + policy_set.update_group_description_policy.try_into()?, + ); + metadata_permissions_map.insert( + XmtpMetadataField::GroupImageUrlSquare.to_string(), + policy_set.update_group_image_url_square_policy.try_into()?, + ); + metadata_permissions_map.insert( + XmtpMetadataField::GroupPinnedFrameUrl.to_string(), + policy_set.update_group_pinned_frame_url_policy.try_into()?, + ); + + Ok(PolicySet { + add_member_policy: policy_set.add_member_policy.try_into()?, + remove_member_policy: policy_set.remove_member_policy.try_into()?, + add_admin_policy: policy_set.add_admin_policy.try_into()?, + remove_admin_policy: policy_set.remove_admin_policy.try_into()?, + update_metadata_policy: metadata_permissions_map, + update_permissions_policy: PermissionsPolicies::allow_if_actor_super_admin(), }) } } + +#[napi] +pub enum MetadataField { + GroupName, + Description, + ImageUrlSquare, + PinnedFrameUrl, +} + +impl From<&MetadataField> for XmtpMetadataField { + fn from(field: &MetadataField) -> Self { + match field { + MetadataField::GroupName => XmtpMetadataField::GroupName, + MetadataField::Description => XmtpMetadataField::Description, + MetadataField::ImageUrlSquare => XmtpMetadataField::GroupImageUrlSquare, + MetadataField::PinnedFrameUrl => XmtpMetadataField::GroupPinnedFrameUrl, + } + } +} diff --git a/bindings_node/test/Conversations.test.ts b/bindings_node/test/Conversations.test.ts index 951aa61df..ee40b431d 100644 --- a/bindings_node/test/Conversations.test.ts +++ b/bindings_node/test/Conversations.test.ts @@ -9,6 +9,9 @@ import { Conversation, GroupPermissionsOptions, Message, + MetadataField, + PermissionPolicy, + PermissionUpdateType, } from '../dist' const SLEEP_MS = 100 @@ -80,6 +83,96 @@ describe('Conversations', () => { expect((await client2.conversations().listGroups()).length).toBe(1) }) + it('should create a group with custom permissions', async () => { + const user1 = createUser() + const user2 = createUser() + const client1 = await createRegisteredClient(user1) + const client2 = await createRegisteredClient(user2) + const group = await client1 + .conversations() + .createGroup([user2.account.address], { + permissions: GroupPermissionsOptions.CustomPolicy, + customPermissionPolicySet: { + addAdminPolicy: 2, + addMemberPolicy: 3, + removeAdminPolicy: 1, + removeMemberPolicy: 0, + updateGroupNamePolicy: 2, + updateGroupDescriptionPolicy: 1, + updateGroupImageUrlSquarePolicy: 0, + updateGroupPinnedFrameUrlPolicy: 3, + }, + }) + expect(group).toBeDefined() + expect(group.groupPermissions().policyType()).toBe( + GroupPermissionsOptions.CustomPolicy + ) + expect(group.groupPermissions().policySet()).toEqual({ + addAdminPolicy: 2, + addMemberPolicy: 3, + removeAdminPolicy: 1, + removeMemberPolicy: 0, + updateGroupNamePolicy: 2, + updateGroupDescriptionPolicy: 1, + updateGroupImageUrlSquarePolicy: 0, + updateGroupPinnedFrameUrlPolicy: 3, + }) + }) + + it('should update group permission policy', async () => { + const user1 = createUser() + const user2 = createUser() + const client1 = await createRegisteredClient(user1) + const client2 = await createRegisteredClient(user2) + const group = await client1 + .conversations() + .createGroup([user2.account.address]) + + expect(group.groupPermissions().policySet()).toEqual({ + addMemberPolicy: 0, + removeMemberPolicy: 2, + addAdminPolicy: 3, + removeAdminPolicy: 3, + updateGroupNamePolicy: 0, + updateGroupDescriptionPolicy: 0, + updateGroupImageUrlSquarePolicy: 0, + updateGroupPinnedFrameUrlPolicy: 0, + }) + + await group.updatePermissionPolicy( + PermissionUpdateType.AddAdmin, + PermissionPolicy.Deny + ) + + expect(group.groupPermissions().policySet()).toEqual({ + addMemberPolicy: 0, + removeMemberPolicy: 2, + addAdminPolicy: 1, + removeAdminPolicy: 3, + updateGroupNamePolicy: 0, + updateGroupDescriptionPolicy: 0, + updateGroupImageUrlSquarePolicy: 0, + updateGroupPinnedFrameUrlPolicy: 0, + }) + + await group.updatePermissionPolicy( + PermissionUpdateType.UpdateMetadata, + PermissionPolicy.Deny, + MetadataField.GroupName + ) + + expect(group.groupPermissions().policySet()).toEqual({ + addMemberPolicy: 0, + removeMemberPolicy: 2, + addAdminPolicy: 1, + removeAdminPolicy: 3, + updateGroupNamePolicy: 1, + updateGroupDescriptionPolicy: 0, + updateGroupImageUrlSquarePolicy: 0, + updateGroupPinnedFrameUrlPolicy: 0, + }) + }) + it('should create a dm group', async () => { const user1 = createUser() const user2 = createUser() diff --git a/bindings_wasm/CHANGELOG.md b/bindings_wasm/CHANGELOG.md index 7755ac0c9..b55cfc295 100644 --- a/bindings_wasm/CHANGELOG.md +++ b/bindings_wasm/CHANGELOG.md @@ -1,5 +1,9 @@ # @xmtp/wasm-bindings +## 0.0.8 + +- Added support for custom permission policy sets + ## 0.0.7 - Moved `verify_signed_with_public_key` out of `Client` diff --git a/bindings_wasm/package.json b/bindings_wasm/package.json index 014358de6..82002185a 100644 --- a/bindings_wasm/package.json +++ b/bindings_wasm/package.json @@ -1,6 +1,6 @@ { "name": "@xmtp/wasm-bindings", - "version": "0.0.7", + "version": "0.0.8", "type": "module", "license": "MIT", "description": "WASM bindings for the libXMTP rust library", diff --git a/bindings_wasm/src/conversation.rs b/bindings_wasm/src/conversation.rs index e7ded1a19..d211d5cae 100644 --- a/bindings_wasm/src/conversation.rs +++ b/bindings_wasm/src/conversation.rs @@ -6,10 +6,13 @@ use xmtp_mls::storage::group::ConversationType; use crate::client::RustXmtpClient; use crate::encoded_content::EncodedContent; use crate::messages::{ListMessagesOptions, Message}; +use crate::permissions::{MetadataField, PermissionPolicy, PermissionUpdateType}; use crate::{consent_state::ConsentState, permissions::GroupPermissions}; use xmtp_cryptography::signature::ed25519_public_key_to_address; use xmtp_mls::groups::{ group_metadata::GroupMetadata as XmtpGroupMetadata, + group_mutable_metadata::MetadataField as XmtpMetadataField, + intents::PermissionUpdateType as XmtpPermissionUpdateType, members::PermissionLevel as XmtpPermissionLevel, MlsGroup, UpdateAdminListType, }; use xmtp_mls::storage::group_message::{GroupMessageKind as XmtpGroupMessageKind, MsgQueryArgs}; @@ -546,4 +549,22 @@ impl Conversation { .dm_inbox_id() .map_err(|e| JsError::new(&format!("{e}"))) } + + #[wasm_bindgen(js_name = updatePermissionPolicy)] + pub async fn update_permission_policy( + &self, + permission_update_type: PermissionUpdateType, + permission_policy_option: PermissionPolicy, + metadata_field: Option, + ) -> Result<(), JsError> { + self + .to_mls_group() + .update_permission_policy( + XmtpPermissionUpdateType::from(&permission_update_type), + permission_policy_option.try_into()?, + metadata_field.map(|field| XmtpMetadataField::from(&field)), + ) + .await + .map_err(Into::into) + } } diff --git a/bindings_wasm/src/conversations.rs b/bindings_wasm/src/conversations.rs index 50f0790a2..599b95427 100644 --- a/bindings_wasm/src/conversations.rs +++ b/bindings_wasm/src/conversations.rs @@ -7,7 +7,7 @@ use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState; use xmtp_mls::storage::group::GroupQueryArgs; use crate::messages::Message; -use crate::permissions::GroupPermissionsOptions; +use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet}; use crate::{client::RustXmtpClient, conversation::Conversation}; #[wasm_bindgen] @@ -127,6 +127,8 @@ pub struct CreateGroupOptions { pub group_description: Option, #[wasm_bindgen(js_name = groupPinnedFrameUrl)] pub group_pinned_frame_url: Option, + #[wasm_bindgen(js_name = customPermissionPolicySet)] + pub custom_permission_policy_set: Option, } #[wasm_bindgen] @@ -138,6 +140,7 @@ impl CreateGroupOptions { group_image_url_square: Option, group_description: Option, group_pinned_frame_url: Option, + custom_permission_policy_set: Option, ) -> Self { Self { permissions, @@ -145,6 +148,7 @@ impl CreateGroupOptions { group_image_url_square, group_description, group_pinned_frame_url, + custom_permission_policy_set, } } } @@ -187,9 +191,20 @@ impl Conversations { group_image_url_square: None, group_description: None, group_pinned_frame_url: None, + custom_permission_policy_set: None, }, }; + if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions { + if options.custom_permission_policy_set.is_none() { + return Err(JsError::new("CustomPolicy must include policy set")); + } + } else if options.custom_permission_policy_set.is_some() { + return Err(JsError::new("Only CustomPolicy may specify a policy set")); + } + + let metadata_options = options.clone().into_group_metadata_options(); + let group_permissions = match options.permissions { Some(GroupPermissionsOptions::AllMembers) => { Some(PreconfiguredPolicies::AllMembers.to_policy_set()) @@ -197,11 +212,20 @@ impl Conversations { Some(GroupPermissionsOptions::AdminOnly) => { Some(PreconfiguredPolicies::AdminsOnly.to_policy_set()) } + Some(GroupPermissionsOptions::CustomPolicy) => { + if let Some(policy_set) = options.custom_permission_policy_set { + Some( + policy_set + .try_into() + .map_err(|e| JsError::new(format!("{}", e).as_str()))?, + ) + } else { + None + } + } _ => None, }; - let metadata_options = options.clone().into_group_metadata_options(); - let convo = if account_addresses.is_empty() { self .inner_client diff --git a/bindings_wasm/src/permissions.rs b/bindings_wasm/src/permissions.rs index 97ebc9f6d..78ce50221 100644 --- a/bindings_wasm/src/permissions.rs +++ b/bindings_wasm/src/permissions.rs @@ -1,9 +1,11 @@ +use std::collections::HashMap; use wasm_bindgen::{prelude::wasm_bindgen, JsError}; use xmtp_mls::groups::{ - group_mutable_metadata::MetadataField, + group_mutable_metadata::MetadataField as XmtpMetadataField, group_permissions::{ - BasePolicies, GroupMutablePermissions, MembershipPolicies, MetadataBasePolicies, - MetadataPolicies, PermissionsBasePolicies, PermissionsPolicies, + BasePolicies, GroupMutablePermissions, GroupMutablePermissionsError, MembershipPolicies, + MetadataBasePolicies, MetadataPolicies, PermissionsBasePolicies, PermissionsPolicies, + PolicySet, }, intents::{PermissionPolicyOption, PermissionUpdateType as XmtpPermissionUpdateType}, PreconfiguredPolicies, @@ -108,7 +110,49 @@ impl From<&PermissionsPolicies> for PermissionPolicy { } } +impl TryInto for PermissionPolicy { + type Error = GroupMutablePermissionsError; + + fn try_into(self) -> Result { + match self { + PermissionPolicy::Allow => Ok(MetadataPolicies::allow()), + PermissionPolicy::Deny => Ok(MetadataPolicies::deny()), + PermissionPolicy::Admin => Ok(MetadataPolicies::allow_if_actor_admin()), + PermissionPolicy::SuperAdmin => Ok(MetadataPolicies::allow_if_actor_super_admin()), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), + } + } +} + +impl TryInto for PermissionPolicy { + type Error = GroupMutablePermissionsError; + + fn try_into(self) -> Result { + match self { + PermissionPolicy::Deny => Ok(PermissionsPolicies::deny()), + PermissionPolicy::Admin => Ok(PermissionsPolicies::allow_if_actor_admin()), + PermissionPolicy::SuperAdmin => Ok(PermissionsPolicies::allow_if_actor_super_admin()), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), + } + } +} + +impl TryInto for PermissionPolicy { + type Error = GroupMutablePermissionsError; + + fn try_into(self) -> Result { + match self { + PermissionPolicy::Allow => Ok(MembershipPolicies::allow()), + PermissionPolicy::Deny => Ok(MembershipPolicies::deny()), + PermissionPolicy::Admin => Ok(MembershipPolicies::allow_if_actor_admin()), + PermissionPolicy::SuperAdmin => Ok(MembershipPolicies::allow_if_actor_super_admin()), + _ => Err(GroupMutablePermissionsError::InvalidPermissionPolicyOption), + } + } +} + #[wasm_bindgen(getter_with_clone)] +#[derive(Clone)] pub struct PermissionPolicySet { #[wasm_bindgen(js_name = addMemberPolicy)] pub add_member_policy: PermissionPolicy, @@ -201,10 +245,65 @@ impl GroupPermissions { remove_member_policy: PermissionPolicy::from(&policy_set.remove_member_policy), add_admin_policy: PermissionPolicy::from(&policy_set.add_admin_policy), remove_admin_policy: PermissionPolicy::from(&policy_set.remove_admin_policy), - update_group_name_policy: get_policy(MetadataField::GroupName.as_str()), - update_group_description_policy: get_policy(MetadataField::Description.as_str()), - update_group_image_url_square_policy: get_policy(MetadataField::GroupImageUrlSquare.as_str()), - update_group_pinned_frame_url_policy: get_policy(MetadataField::GroupPinnedFrameUrl.as_str()), + update_group_name_policy: get_policy(XmtpMetadataField::GroupName.as_str()), + update_group_description_policy: get_policy(XmtpMetadataField::Description.as_str()), + update_group_image_url_square_policy: get_policy( + XmtpMetadataField::GroupImageUrlSquare.as_str(), + ), + update_group_pinned_frame_url_policy: get_policy( + XmtpMetadataField::GroupPinnedFrameUrl.as_str(), + ), + }) + } +} + +impl TryFrom for PolicySet { + type Error = GroupMutablePermissionsError; + fn try_from(policy_set: PermissionPolicySet) -> Result { + let mut metadata_permissions_map: HashMap = HashMap::new(); + metadata_permissions_map.insert( + XmtpMetadataField::GroupName.to_string(), + policy_set.update_group_name_policy.try_into()?, + ); + metadata_permissions_map.insert( + XmtpMetadataField::Description.to_string(), + policy_set.update_group_description_policy.try_into()?, + ); + metadata_permissions_map.insert( + XmtpMetadataField::GroupImageUrlSquare.to_string(), + policy_set.update_group_image_url_square_policy.try_into()?, + ); + metadata_permissions_map.insert( + XmtpMetadataField::GroupPinnedFrameUrl.to_string(), + policy_set.update_group_pinned_frame_url_policy.try_into()?, + ); + + Ok(PolicySet { + add_member_policy: policy_set.add_member_policy.try_into()?, + remove_member_policy: policy_set.remove_member_policy.try_into()?, + add_admin_policy: policy_set.add_admin_policy.try_into()?, + remove_admin_policy: policy_set.remove_admin_policy.try_into()?, + update_metadata_policy: metadata_permissions_map, + update_permissions_policy: PermissionsPolicies::allow_if_actor_super_admin(), }) } } + +#[wasm_bindgen] +pub enum MetadataField { + GroupName, + Description, + ImageUrlSquare, + PinnedFrameUrl, +} + +impl From<&MetadataField> for XmtpMetadataField { + fn from(field: &MetadataField) -> Self { + match field { + MetadataField::GroupName => XmtpMetadataField::GroupName, + MetadataField::Description => XmtpMetadataField::Description, + MetadataField::ImageUrlSquare => XmtpMetadataField::GroupImageUrlSquare, + MetadataField::PinnedFrameUrl => XmtpMetadataField::GroupPinnedFrameUrl, + } + } +} diff --git a/common/Cargo.toml b/common/Cargo.toml index 76b836916..41dcc4711 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -14,6 +14,8 @@ xmtp_cryptography.workspace = true parking_lot = { workspace = true, optional = true } tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "ansi", "json"], optional = true } +once_cell = { workspace = true, optional = true } +tracing-flame = { version = "0.2", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { workspace = true, features = ["js"] } @@ -36,3 +38,4 @@ tokio = { workspace = true, features = ["time", "macros", "rt-multi-thread", "sy [features] test-utils = ["dep:parking_lot", "dep:tracing-subscriber", "dep:tracing-wasm", "dep:console_error_panic_hook"] +bench = ["test-utils", "dep:tracing-subscriber", "dep:once_cell", "dep:tracing-flame"] diff --git a/common/src/bench.rs b/common/src/bench.rs new file mode 100644 index 000000000..09ec4e94e --- /dev/null +++ b/common/src/bench.rs @@ -0,0 +1,74 @@ +use once_cell::sync::OnceCell; +use std::sync::Once; +use tracing::{Metadata, Subscriber}; +use tracing_flame::{FlameLayer, FlushGuard}; +use tracing_subscriber::{ + layer::{Context, Filter, Layer, SubscriberExt}, + registry::LookupSpan, + util::SubscriberInitExt, + EnvFilter, +}; +static INIT: Once = Once::new(); + +static LOGGER: OnceCell>> = OnceCell::new(); + +pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench"; + +/// initializes logging for benchmarks +/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options. +/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace` +pub fn logger() { + INIT.call_once(|| { + let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap(); + let flame_layer = flame_layer + .with_threads_collapsed(true) + .with_module_path(true); + // .with_empty_samples(false); + + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env())) + .with( + flame_layer + // .with_filter(BenchFilter) + .with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")), + ) + .init(); + + LOGGER.set(guard).unwrap(); + }) +} + +/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync +/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky +/// workaround to allow us to async setup some groups. +pub fn bench_async_setup(fun: F) -> T +where + F: Fn() -> Fut, + Fut: futures::future::Future, +{ + use tokio::runtime::Handle; + tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await })) +} + +/// Filters for only spans where the root span name is "bench" +pub struct BenchFilter; + +impl Filter for BenchFilter +where + S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug, + for<'lookup> >::Data: std::fmt::Debug, +{ + fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { + if meta.name() == BENCH_ROOT_SPAN { + return true; + } + if let Some(id) = cx.current_span().id() { + if let Some(s) = cx.span_scope(id) { + if let Some(s) = s.from_root().take(1).collect::>().first() { + return s.name() == BENCH_ROOT_SPAN; + } + } + } + false + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index a198962ad..8b2ad8d38 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -7,6 +7,9 @@ mod test; #[cfg(feature = "test-utils")] pub use test::*; +#[cfg(feature = "bench")] +pub mod bench; + pub mod retry; pub use retry::*; diff --git a/dev/docker/docker-compose.yml b/dev/docker/docker-compose.yml index 299fe0251..9574889bb 100644 --- a/dev/docker/docker-compose.yml +++ b/dev/docker/docker-compose.yml @@ -69,13 +69,13 @@ services: # note: the SHA here is tied to the XTMPD_CONTRACTS_*_ADDRESSes # if you bump the version of anvil-xmtpd you will have to change the contracts # you can find them inside the anvil-xmtpd image via `docker exec libxmtp-chain-1 cat contracts.env` - image: ghcr.io/xmtp/anvil-xmtpd:sha-0b3421b + image: ghcr.io/xmtp/anvil-xmtpd:sha-10808fb command: ["--host", "0.0.0.0"] repnode: platform: linux/amd64 # note: avoid using :latest while xmtpd is under development to avoid breaking changes - image: ghcr.io/xmtp/xmtpd:sha-0b3421b + image: ghcr.io/xmtp/xmtpd:sha-10808fb environment: XMTPD_DB_WRITER_CONNECTION_STRING: "postgres://postgres:xmtp@replicationdb:5432/postgres?sslmode=disable" XMTPD_CONTRACTS_RPC_URL: "http://chain:8545" @@ -84,9 +84,11 @@ services: XMTPD_CONTRACTS_IDENTITY_UPDATES_ADDRESS: 0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0 XMTPD_SIGNER_PRIVATE_KEY: 0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d XMTPD_PAYER_PRIVATE_KEY: 0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d - XMTPD_MLS_VALIDATION_GRPC_ADDRESS: "validation:50051" + XMTPD_MLS_VALIDATION_GRPC_ADDRESS: "http://validation:50051" XMTPD_PAYER_ENABLE: true XMTPD_REPLICATION_ENABLE: true + XMTPD_INDEXER_ENABLE: true + XMTPD_SYNC_ENABLE: true depends_on: chain: condition: service_started diff --git a/dev/xdbg/Dockerfile b/dev/xdbg/Dockerfile new file mode 100644 index 000000000..fb5769ce7 --- /dev/null +++ b/dev/xdbg/Dockerfile @@ -0,0 +1,10 @@ +FROM rust:1-bullseye as builder +WORKDIR /code +COPY . . +COPY .git /.git +RUN cargo build --release --package xdbg + +FROM debian:bullseye-slim +COPY --from=builder /code/target/release/xdbg /usr/local/bin/xdbg +ENV RUST_LOG=info +ENTRYPOINT ["xdbg"] diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs index fa7683de7..0fdadaecf 100644 --- a/xmtp_api_grpc/src/grpc_api_helper.rs +++ b/xmtp_api_grpc/src/grpc_api_helper.rs @@ -8,6 +8,7 @@ use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; use tokio::sync::oneshot; use tonic::transport::ClientTlsConfig; use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming}; +use tracing::Instrument; use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams}; use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage}; @@ -28,7 +29,9 @@ use xmtp_proto::{ Error, ErrorKind, }; +#[tracing::instrument(level = "trace", skip_all)] pub async fn create_tls_channel(address: String) -> Result { + let span = tracing::trace_span!("grpc_connect", address); let channel = Channel::from_shared(address) .map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))? // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC. @@ -58,6 +61,7 @@ pub async fn create_tls_channel(address: String) -> Result { .tls_config(ClientTlsConfig::new().with_enabled_roots()) .map_err(|e| Error::new(ErrorKind::SetupTLSConfigError).with(e))? .connect() + .instrument(span) .await .map_err(|e| Error::new(ErrorKind::SetupConnectionError).with(e))?; @@ -74,6 +78,7 @@ pub struct Client { } impl Client { + #[tracing::instrument(level = "trace", skip_all)] pub async fn create(host: impl ToString, is_secure: bool) -> Result { let host = host.to_string(); let app_version = MetadataValue::try_from(&String::from("0.0.0")) diff --git a/xmtp_debug/Cargo.toml b/xmtp_debug/Cargo.toml index f1ec81d46..aa6ad00e6 100644 --- a/xmtp_debug/Cargo.toml +++ b/xmtp_debug/Cargo.toml @@ -21,7 +21,7 @@ color-eyre = "0.6" tracing-logfmt = "0.3" owo-colors = "4.1" url.workspace = true -redb = "2.2" +redb = "2.3" directories = "5.0" const_format.workspace = true speedy = "0.8" diff --git a/xmtp_debug/src/app.rs b/xmtp_debug/src/app.rs index 79f6c6473..5dcd4e810 100644 --- a/xmtp_debug/src/app.rs +++ b/xmtp_debug/src/app.rs @@ -21,6 +21,7 @@ mod store; /// Types shared between App Functions mod types; +use clap::CommandFactory; use color_eyre::eyre::{self, Result}; use directories::ProjectDirs; use std::{fs, path::PathBuf, sync::Arc}; @@ -95,6 +96,11 @@ impl App { } = opts; debug!(fdlimit = get_fdlimit()); + if cmd.is_none() && !clear { + AppOpts::command().print_help()?; + eyre::bail!("No subcommand was specified"); + } + if let Some(cmd) = cmd { match cmd { Generate(g) => generate::Generate::new(g, backend, db).run().await, diff --git a/xmtp_debug/src/app/clients.rs b/xmtp_debug/src/app/clients.rs index 6824ca39e..66e92356e 100644 --- a/xmtp_debug/src/app/clients.rs +++ b/xmtp_debug/src/app/clients.rs @@ -57,10 +57,7 @@ async fn new_client_inner( wallet: &LocalWallet, db_path: Option, ) -> Result { - let url = url::Url::from(network.clone()); - let is_secure = url.scheme() == "https"; - trace!(url = %url, is_secure, "create grpc"); - let api = crate::GrpcClient::create(url.as_str().to_string(), is_secure).await?; + let api = network.connect().await?; let nonce = 1; let inbox_id = generate_inbox_id(&wallet.get_address(), &nonce).unwrap(); @@ -125,9 +122,7 @@ async fn existing_client_inner( network: &args::BackendOpts, db_path: PathBuf, ) -> Result { - let url = url::Url::from(network.clone()); - let is_secure = url.scheme() == "https"; - let api = crate::GrpcClient::create(url.as_str().to_string(), is_secure).await?; + let api = network.connect().await?; let store = EncryptedMessageStore::new( StorageOption::Persistent(db_path.clone().into_os_string().into_string().unwrap()), diff --git a/xmtp_debug/src/args.rs b/xmtp_debug/src/args.rs index d4ca414ce..3c2cf67c5 100644 --- a/xmtp_debug/src/args.rs +++ b/xmtp_debug/src/args.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use clap::{Args, Parser, Subcommand, ValueEnum}; use clap_verbosity_flag::{InfoLevel, Verbosity}; +use color_eyre::eyre; use xxhash_rust::xxh3; mod types; pub use types::*; @@ -56,7 +57,7 @@ pub struct Generate { #[arg(long, short)] pub amount: usize, /// Specify amount of random identities to invite to group - #[arg(long, short)] + #[arg(long)] pub invite: Option, #[command(flatten)] pub message_opts: MessageGenerateOpts, @@ -193,6 +194,76 @@ pub struct BackendOpts { conflicts_with = "constant-backend" )] pub url: Option, + #[arg( + short, + long, + group = "custom-backend", + conflicts_with = "constant-backend" + )] + pub payer_url: Option, + /// Enable the decentralization backend + #[arg(short, long)] + pub d14n: bool, +} + +impl BackendOpts { + pub fn payer_url(&self) -> eyre::Result { + use BackendKind::*; + + if let Some(p) = &self.payer_url { + return Ok(p.clone()); + } + + match (self.backend, self.d14n) { + (Dev, false) => eyre::bail!("No payer for V3"), + (Production, false) => eyre::bail!("No payer for V3"), + (Local, false) => eyre::bail!("No payer for V3"), + (Dev, true) => Ok((*crate::constants::XMTP_DEV_PAYER).clone()), + (Production, true) => Ok((*crate::constants::XMTP_PRODUCTION_PAYER).clone()), + (Local, true) => Ok((*crate::constants::XMTP_LOCAL_PAYER).clone()), + } + } + + pub fn network_url(&self) -> url::Url { + use BackendKind::*; + + if let Some(n) = &self.url { + return n.clone(); + } + + match (self.backend, self.d14n) { + (Dev, false) => (*crate::constants::XMTP_DEV).clone(), + (Production, false) => (*crate::constants::XMTP_PRODUCTION).clone(), + (Local, false) => (*crate::constants::XMTP_LOCAL).clone(), + (Dev, true) => (*crate::constants::XMTP_DEV_D14N).clone(), + (Production, true) => (*crate::constants::XMTP_PRODUCTION_D14N).clone(), + (Local, true) => (*crate::constants::XMTP_LOCAL_D14N).clone(), + } + } + + pub async fn connect(&self) -> eyre::Result> { + let network = self.network_url(); + let is_secure = network.scheme() == "https"; + + if self.d14n { + let payer = self.payer_url()?; + trace!(url = %network, payer = %payer, is_secure, "create grpc"); + + Ok(Box::new( + xmtp_api_grpc::replication_client::ClientV4::create( + network.as_str().to_string(), + payer.as_str().to_string(), + is_secure, + ) + .await?, + )) + } else { + trace!(url = %network, is_secure, "create grpc"); + Ok(Box::new( + crate::GrpcClient::create(network.as_str().to_string(), is_secure).await?, + )) + } + } } impl<'a> From<&'a BackendOpts> for u64 { @@ -202,10 +273,13 @@ impl<'a> From<&'a BackendOpts> for u64 { if let Some(ref url) = value.url { xxh3::xxh3_64(url.as_str().as_bytes()) } else { - match value.backend { - Production => 2, - Dev => 1, - Local => 0, + match (value.backend, value.d14n) { + (Production, false) => 2, + (Dev, false) => 1, + (Local, false) => 0, + (Production, true) => 5, + (Dev, true) => 4, + (Local, true) => 3, } } } @@ -219,8 +293,10 @@ impl From for u64 { impl From for url::Url { fn from(value: BackendOpts) -> Self { - let BackendOpts { backend, url } = value; - url.unwrap_or(backend.into()) + let BackendOpts { + backend, url, d14n, .. + } = value; + url.unwrap_or(backend.to_network_url(d14n)) } } @@ -232,6 +308,20 @@ pub enum BackendKind { Local, } +impl BackendKind { + fn to_network_url(self, d14n: bool) -> url::Url { + use BackendKind::*; + match (self, d14n) { + (Dev, false) => (*crate::constants::XMTP_DEV).clone(), + (Production, false) => (*crate::constants::XMTP_PRODUCTION).clone(), + (Local, false) => (*crate::constants::XMTP_LOCAL).clone(), + (Dev, true) => (*crate::constants::XMTP_DEV_D14N).clone(), + (Production, true) => (*crate::constants::XMTP_PRODUCTION_D14N).clone(), + (Local, true) => (*crate::constants::XMTP_LOCAL_D14N).clone(), + } + } +} + impl From for url::Url { fn from(value: BackendKind) -> Self { use BackendKind::*; diff --git a/xmtp_debug/src/constants.rs b/xmtp_debug/src/constants.rs index fb970d518..d8bd69bb3 100644 --- a/xmtp_debug/src/constants.rs +++ b/xmtp_debug/src/constants.rs @@ -9,5 +9,18 @@ pub static XMTP_DEV: LazyLock = LazyLock::new(|| Url::parse("https://grpc.dev.xmtp.network:443").unwrap()); pub static XMTP_LOCAL: LazyLock = LazyLock::new(|| Url::parse("http://localhost:5556").unwrap()); + +pub static XMTP_PRODUCTION_D14N: LazyLock = LazyLock::new(|| Url::parse("").unwrap()); +pub static XMTP_DEV_D14N: LazyLock = + LazyLock::new(|| Url::parse("https://grpc.testnet.xmtp.network:443").unwrap()); +pub static XMTP_LOCAL_D14N: LazyLock = + LazyLock::new(|| Url::parse("http://localhost:5050").unwrap()); + +pub static XMTP_PRODUCTION_PAYER: LazyLock = LazyLock::new(|| Url::parse("").unwrap()); +pub static XMTP_DEV_PAYER: LazyLock = + LazyLock::new(|| Url::parse("https://payer.testnet.xmtp.network:443").unwrap()); +pub static XMTP_LOCAL_PAYER: LazyLock = + LazyLock::new(|| Url::parse("http://localhost:5050").unwrap()); + pub static TMPDIR: LazyLock = LazyLock::::new(|| TempDir::new().unwrap()); pub const STORAGE_PREFIX: &str = "xdbg"; diff --git a/xmtp_debug/src/main.rs b/xmtp_debug/src/main.rs index 10dbeb294..fbaf4deb7 100644 --- a/xmtp_debug/src/main.rs +++ b/xmtp_debug/src/main.rs @@ -7,8 +7,10 @@ use clap::Parser; use color_eyre::eyre::Result; use xmtp_api_grpc::grpc_api_helper::Client as GrpcClient; +use xmtp_mls::XmtpApi; -pub type DbgClient = xmtp_mls::client::Client; +// pub type DbgClient = xmtp_mls::client::Client; +type DbgClient = xmtp_mls::client::Client>; #[macro_use] extern crate tracing; diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index ef9d49670..0e160e485 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -19,13 +19,12 @@ bench = [ "indicatif", "tracing-subscriber", "anyhow", - "tracing-flame", - "once_cell", "dep:xmtp_api_grpc", "criterion", "dep:fdlimit", "dep:ethers", "dep:const_format", + "xmtp_common/bench" ] default = ["grpc-api"] grpc-api = ["dep:xmtp_api_grpc"] @@ -40,7 +39,7 @@ test-utils = [ "xmtp_api_grpc/test-utils", "dep:const_format", "mockall", - "xmtp_common/test-utils" + "xmtp_common/test-utils", ] update-schema = ["toml"] @@ -69,8 +68,8 @@ tokio-stream = { version = "0.1", default-features = false, features = [ ] } tracing.workspace = true trait-variant.workspace = true -zeroize.workspace = true xmtp_common.workspace = true +zeroize.workspace = true # XMTP/Local xmtp_content_types = { path = "../xmtp_content_types" } @@ -89,15 +88,10 @@ xmtp_api_http = { path = "../xmtp_api_http", optional = true } # Test/Bench Utils anyhow = { workspace = true, optional = true } -criterion = { version = "0.5", features = [ - "html_reports", - "async_tokio", -], optional = true } +criterion = { workspace = true, optional = true } hmac = "0.12.1" indicatif = { version = "0.17", optional = true } mockall = { version = "0.13.1", optional = true } -once_cell = { version = "1.19", optional = true } -tracing-flame = { version = "0.2", optional = true } tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", @@ -133,10 +127,10 @@ diesel = { workspace = true, features = [ "r2d2", "returning_clauses_for_sqlite_3_35", ] } -diesel-wasm-sqlite = { workspace = true } getrandom = { workspace = true, features = ["js"] } gloo-timers = { workspace = true, features = ["futures"] } openmls = { workspace = true, features = ["js"] } +sqlite-web = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "time"] } wasm-bindgen-futures.workspace = true web-sys.workspace = true @@ -147,10 +141,10 @@ anyhow.workspace = true const_format.workspace = true mockall = "0.13.1" openmls_basic_credential.workspace = true +wasm-bindgen-test.workspace = true +xmtp_common = { workspace = true, features = ["test-utils"] } xmtp_id = { path = "../xmtp_id", features = ["test-utils"] } xmtp_proto = { workspace = true, features = ["test-utils"] } -xmtp_common = { workspace = true, features = ["test-utils"]} -wasm-bindgen-test.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] ctor.workspace = true @@ -169,12 +163,9 @@ xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] console_error_panic_hook = { version = "0.1" } -diesel-wasm-sqlite = { workspace = true, features = [ - "unsafe-debug-query", - "r2d2", -] } ethers = { workspace = true, features = ["rustls"] } openmls = { workspace = true, features = ["js"] } +sqlite-web = { workspace = true, features = ["unsafe-debug-query", "r2d2"] } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } tracing-wasm = { version = "0.2" } wasm-bindgen-test.workspace = true diff --git a/xmtp_mls/benches/group_limit.rs b/xmtp_mls/benches/group_limit.rs index ba3027eb3..ac2be6a5c 100755 --- a/xmtp_mls/benches/group_limit.rs +++ b/xmtp_mls/benches/group_limit.rs @@ -6,13 +6,11 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri use std::{collections::HashMap, sync::Arc}; use tokio::runtime::{Builder, Runtime}; use tracing::{trace_span, Instrument}; +use xmtp_common::bench::{self, bench_async_setup, BENCH_ROOT_SPAN}; use xmtp_mls::{ builder::ClientBuilder, groups::GroupMetadataOptions, - utils::bench::{ - bench_async_setup, create_identities_if_dont_exist, init_logging, BenchClient, Identity, - BENCH_ROOT_SPAN, - }, + utils::bench::{create_identities_if_dont_exist, BenchClient, Identity}, }; pub const IDENTITY_SAMPLES: [usize; 9] = [10, 20, 40, 80, 100, 200, 300, 400, 450]; @@ -50,7 +48,7 @@ fn setup() -> (Arc, Vec, Runtime) { } fn add_to_empty_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_to_empty_group"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -88,7 +86,7 @@ fn add_to_empty_group(c: &mut Criterion) { } fn add_to_empty_group_by_inbox_id(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_to_empty_group_by_inbox_id"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -130,7 +128,7 @@ fn add_to_empty_group_by_inbox_id(c: &mut Criterion) { } fn add_to_100_member_group_by_inbox_id(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_to_100_member_group_by_inbox_id"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -188,7 +186,7 @@ fn add_to_100_member_group_by_inbox_id(c: &mut Criterion) { } fn remove_all_members_from_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("remove_all_members_from_group"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -233,7 +231,7 @@ fn remove_all_members_from_group(c: &mut Criterion) { } fn remove_half_members_from_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("remove_half_members_from_group"); benchmark_group.sample_size(SAMPLE_SIZE); @@ -281,7 +279,7 @@ fn remove_half_members_from_group(c: &mut Criterion) { } fn add_1_member_to_group(c: &mut Criterion) { - init_logging(); + bench::logger(); let mut benchmark_group = c.benchmark_group("add_1_member_to_group"); benchmark_group.sample_size(SAMPLE_SIZE); diff --git a/xmtp_mls/benches/identity.rs b/xmtp_mls/benches/identity.rs index a7b10709f..80f267552 100644 --- a/xmtp_mls/benches/identity.rs +++ b/xmtp_mls/benches/identity.rs @@ -1,6 +1,7 @@ use crate::tracing::Instrument; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use tokio::runtime::{Builder, Runtime}; +use xmtp_common::bench::{self, bench_async_setup, BENCH_ROOT_SPAN}; use xmtp_id::{ associations::{ builder::SignatureRequest, @@ -8,8 +9,7 @@ use xmtp_id::{ }, InboxOwner, }; -use xmtp_mls::utils::bench::{bench_async_setup, BenchClient, BENCH_ROOT_SPAN}; -use xmtp_mls::utils::bench::{clients, init_logging}; +use xmtp_mls::utils::bench::{clients, BenchClient}; #[macro_use] extern crate tracing; @@ -38,7 +38,7 @@ async fn ecdsa_signature(client: &BenchClient, owner: impl InboxOwner) -> Signat } fn register_identity_eoa(c: &mut Criterion) { - init_logging(); + bench::logger(); let runtime = setup(); diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index f8411744e..d767f7573 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -65,6 +65,7 @@ impl Client { } impl ClientBuilder { + #[tracing::instrument(level = "trace", skip_all)] pub fn new(strategy: IdentityStrategy) -> Self { Self { api_client: None, @@ -101,7 +102,7 @@ impl ClientBuilder { self.app_version = Some(version); self } - + #[tracing::instrument(level = "trace", skip_all)] pub fn scw_signature_verifier(mut self, verifier: V) -> Self { self.scw_verifier = Some(verifier); self @@ -125,6 +126,7 @@ where ApiClient: XmtpApi + 'static + Send + Sync, { /// Build with the default [`RemoteSignatureVerifier`] + #[tracing::instrument(level = "trace", skip_all)] pub async fn build(self) -> Result, ClientBuilderError> { let (mut builder, api_client) = inner_build_api_client(self)?; builder = builder.scw_signature_verifier(RemoteSignatureVerifier::new(api_client.clone())); @@ -158,6 +160,7 @@ where Ok((builder, Arc::new(api_client))) } +#[tracing::instrument(level = "trace", skip_all)] async fn inner_build( client: ClientBuilder, api_client: Arc, @@ -197,7 +200,6 @@ where installation_id = hex::encode(identity.installation_keys.public_bytes()), "Initialized identity" ); - // get sequence_id from identity updates and loaded into the DB load_identity_updates( &api_client_wrapper, diff --git a/xmtp_mls/src/configuration.rs b/xmtp_mls/src/configuration.rs index d8fa3ea65..118231a57 100644 --- a/xmtp_mls/src/configuration.rs +++ b/xmtp_mls/src/configuration.rs @@ -28,6 +28,8 @@ pub const MAX_GROUP_SIZE: usize = 400; pub const MAX_PAST_EPOCHS: usize = 3; +pub const MAX_DB_POOL_SIZE: u32 = 25; + /// the max amount of data that can be sent in one gRPC call /// we leave 5 * 1024 * 1024 as extra buffer room pub const GRPC_DATA_LIMIT: usize = 45 * 1024 * 1024; diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 9649f3b53..070caabe6 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -1,18 +1,19 @@ use super::{GroupError, MlsGroup}; -use crate::configuration::NS_IN_HOUR; -use crate::storage::group::{ConversationType, GroupQueryArgs}; -use crate::storage::group_message::MsgQueryArgs; -use crate::storage::DbConnection; -use crate::subscriptions::{LocalEvents, StreamMessages, SubscribeError, SyncMessage}; -use crate::xmtp_openmls_provider::XmtpOpenMlsProvider; +#[cfg(any(test, feature = "test-utils"))] +pub use crate::utils::WorkerHandle; use crate::{ client::ClientError, + configuration::NS_IN_HOUR, storage::{ consent_record::StoredConsentRecord, group::StoredGroup, + group::{ConversationType, GroupQueryArgs}, + group_message::MsgQueryArgs, group_message::{GroupMessageKind, StoredGroupMessage}, - StorageError, + DbConnection, StorageError, }, + subscriptions::{LocalEvents, StreamMessages, SubscribeError, SyncMessage}, + xmtp_openmls_provider::XmtpOpenMlsProvider, Client, Store, }; use aes_gcm::aead::generic_array::GenericArray; @@ -26,7 +27,7 @@ use rand::{Rng, RngCore}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use thiserror::Error; -use tokio::sync::{OnceCell}; +use tokio::sync::OnceCell; use tracing::{instrument, warn}; use xmtp_common::time::{now_ns, Duration}; use xmtp_common::{retry_async, Retry, RetryableError}; @@ -114,17 +115,6 @@ impl RetryableError for DeviceSyncError { } } -#[cfg(any(test, feature = "test-utils"))] -impl Client { - pub fn sync_worker_handle(&self) -> Option> { - self.sync_worker_handle.lock().clone() - } - - pub(crate) fn set_sync_worker_handle(&self, handle: Arc) { - *self.sync_worker_handle.lock() = Some(handle); - } -} - impl Client where ApiClient: XmtpApi + Send + Sync + 'static, @@ -158,54 +148,7 @@ pub struct SyncWorker { // Number of events processed #[cfg(any(test, feature = "test-utils"))] - handle: Arc, -} - -#[cfg(any(test, feature = "test-utils"))] -pub struct WorkerHandle { - processed: AtomicUsize, - notify: Notify, -} - -#[cfg(any(test, feature = "test-utils"))] -impl WorkerHandle { - pub async fn wait_for_new_events(&self, mut count: usize) -> Result<(), Elapsed> { - timeout(Duration::from_secs(3), async { - while count > 0 { - self.notify.notified().await; - count -= 1; - } - }) - .await?; - - Ok(()) - } - - pub async fn wait_for_processed_count(&self, expected: usize) -> Result<(), Elapsed> { - timeout(Duration::from_secs(3), async { - while self.processed.load(Ordering::SeqCst) < expected { - self.notify.notified().await; - } - }) - .await?; - - Ok(()) - } - - pub async fn block_for_num_events(&self, num_events: usize, op: Fut) -> Result<(), Elapsed> - where - Fut: Future, - { - let processed_count = self.processed_count(); - op.await; - self.wait_for_processed_count(processed_count + num_events) - .await?; - Ok(()) - } - - pub fn processed_count(&self) -> usize { - self.processed.load(Ordering::SeqCst) - } + handle: std::sync::Arc, } impl SyncWorker @@ -246,8 +189,7 @@ where #[cfg(any(test, feature = "test-utils"))] { - self.handle.processed.fetch_add(1, Ordering::SeqCst); - self.handle.notify.notify_waiters(); + self.handle.increment(); } } Ok(()) @@ -375,10 +317,7 @@ where retry, #[cfg(any(test, feature = "test-utils"))] - handle: Arc::new(WorkerHandle { - processed: AtomicUsize::new(0), - notify: Notify::new(), - }), + handle: std::sync::Arc::new(Default::default()), } } diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index 10a05344a..e8b904c56 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -82,6 +82,7 @@ impl IdentityStrategy { /// Create a new Identity Strategy, with [`IdentityStrategy::CreateIfNotFound`]. /// If an Identity is not found in the local store, creates a new one. + #[tracing::instrument(level = "trace", skip_all)] pub fn new( inbox_id: InboxId, address: String, @@ -106,6 +107,7 @@ impl IdentityStrategy { * the inbox_id configured on the strategy. * **/ + #[tracing::instrument(level = "trace", skip_all)] pub(crate) async fn initialize_identity( self, api_client: &ApiClientWrapper, @@ -131,6 +133,12 @@ impl IdentityStrategy { legacy_signed_private_key, } => { if let Some(stored_identity) = stored_identity { + tracing::debug!( + installation_id = + hex::encode(stored_identity.installation_keys.public_bytes()), + inbox_id = stored_identity.inbox_id, + "Found existing identity in store" + ); if inbox_id != stored_identity.inbox_id { return Err(IdentityError::InboxIdMismatch { id: inbox_id.clone(), @@ -258,6 +266,7 @@ impl Identity { /// If a legacy key is provided, it will be used to sign the identity update and no wallet signature is needed. /// /// If no legacy key is provided, a wallet signature is always required. + #[tracing::instrument(level = "trace", skip_all)] pub(crate) async fn new( inbox_id: InboxId, address: String, diff --git a/xmtp_mls/src/storage/encrypted_store/db_connection.rs b/xmtp_mls/src/storage/encrypted_store/db_connection.rs index d0ef3942a..078892e0a 100644 --- a/xmtp_mls/src/storage/encrypted_store/db_connection.rs +++ b/xmtp_mls/src/storage/encrypted_store/db_connection.rs @@ -7,7 +7,7 @@ use std::sync::Arc; pub type DbConnection = DbConnectionPrivate; #[cfg(target_arch = "wasm32")] -pub type DbConnection = DbConnectionPrivate; +pub type DbConnection = DbConnectionPrivate; /// A wrapper for RawDbConnection that houses all XMTP DB operations. /// Uses a [`Mutex]` internally for interior mutability, so that the connection diff --git a/xmtp_mls/src/storage/encrypted_store/identity_update.rs b/xmtp_mls/src/storage/encrypted_store/identity_update.rs index 1c5c6b564..0a0c87b0a 100644 --- a/xmtp_mls/src/storage/encrypted_store/identity_update.rs +++ b/xmtp_mls/src/storage/encrypted_store/identity_update.rs @@ -9,7 +9,7 @@ use super::{ use diesel::{dsl::max, prelude::*}; #[cfg(target_arch = "wasm32")] -use diesel_wasm_sqlite::dsl::RunQueryDsl; +use sqlite_web::dsl::RunQueryDsl; use xmtp_id::associations::{unverified::UnverifiedIdentityUpdate, AssociationError}; diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index c7172dd5b..e89e306bd 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -42,9 +42,7 @@ pub use sqlcipher_connection::EncryptedConnection; #[cfg(target_arch = "wasm32")] pub use self::wasm::SqliteConnection; #[cfg(target_arch = "wasm32")] -pub use diesel_wasm_sqlite::{ - connection::WasmSqliteConnection as RawDbConnection, WasmSqlite as Sqlite, -}; +pub use sqlite_web::{connection::WasmSqliteConnection as RawDbConnection, WasmSqlite as Sqlite}; use super::StorageError; use crate::{xmtp_openmls_provider::XmtpOpenMlsProviderPrivate, Store}; @@ -108,6 +106,7 @@ pub type EncryptedMessageStore = self::private::EncryptedMessageStore Result { Self::new_database(opts, Some(enc_key)) } @@ -118,6 +117,7 @@ impl EncryptedMessageStore { } /// This function is private so that an unencrypted database cannot be created by accident + #[tracing::instrument(level = "trace", skip_all)] fn new_database( opts: StorageOption, enc_key: Option, @@ -172,6 +172,7 @@ pub mod private { where Db: XmtpDb, { + #[tracing::instrument(level = "trace", skip_all)] pub(super) fn init_db(&mut self) -> Result<(), StorageError> { self.db.validate(&self.opts)?; self.db.conn()?.raw_query(|conn| { diff --git a/xmtp_mls/src/storage/encrypted_store/native.rs b/xmtp_mls/src/storage/encrypted_store/native.rs index b254f820e..635b1b4c7 100644 --- a/xmtp_mls/src/storage/encrypted_store/native.rs +++ b/xmtp_mls/src/storage/encrypted_store/native.rs @@ -120,9 +120,9 @@ impl NativeDb { StorageOption::Ephemeral => builder .max_size(1) .build(ConnectionManager::new(":memory:"))?, - StorageOption::Persistent(ref path) => { - builder.max_size(25).build(ConnectionManager::new(path))? - } + StorageOption::Persistent(ref path) => builder + .max_size(crate::configuration::MAX_DB_POOL_SIZE) + .build(ConnectionManager::new(path))?, }; Ok(Self { @@ -186,9 +186,9 @@ impl XmtpDb for NativeDb { StorageOption::Ephemeral => builder .max_size(1) .build(ConnectionManager::new(":memory:"))?, - StorageOption::Persistent(ref path) => { - builder.max_size(25).build(ConnectionManager::new(path))? - } + StorageOption::Persistent(ref path) => builder + .max_size(crate::configuration::MAX_DB_POOL_SIZE) + .build(ConnectionManager::new(path))?, }; let mut pool_write = self.pool.write(); diff --git a/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs b/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs index 42e17f02f..4f13cd184 100644 --- a/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs +++ b/xmtp_mls/src/storage/encrypted_store/wallet_addresses.rs @@ -3,9 +3,9 @@ use crate::storage::{DbConnection, StorageError}; use crate::{impl_fetch, impl_fetch_list_with_key, impl_store, FetchListWithKey}; use diesel::prelude::*; use diesel::{Insertable, Queryable}; -#[cfg(target_arch = "wasm32")] -use diesel_wasm_sqlite::dsl::RunQueryDsl; use serde::{Deserialize, Serialize}; +#[cfg(target_arch = "wasm32")] +use sqlite_web::dsl::RunQueryDsl; use xmtp_id::{InboxId, WalletAddress}; #[derive(Insertable, Queryable, Debug, Clone, Deserialize, Serialize)] diff --git a/xmtp_mls/src/storage/encrypted_store/wasm.rs b/xmtp_mls/src/storage/encrypted_store/wasm.rs index 7cbc05fc0..e2f223161 100644 --- a/xmtp_mls/src/storage/encrypted_store/wasm.rs +++ b/xmtp_mls/src/storage/encrypted_store/wasm.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use diesel::{connection::AnsiTransactionManager, prelude::*}; -pub use diesel_wasm_sqlite::connection::WasmSqliteConnection as SqliteConnection; use parking_lot::Mutex; +pub use sqlite_web::connection::WasmSqliteConnection as SqliteConnection; use super::{db_connection::DbConnectionPrivate, StorageError, StorageOption, XmtpDb}; @@ -26,7 +26,7 @@ impl std::fmt::Debug for WasmDb { impl WasmDb { pub async fn new(opts: &StorageOption) -> Result { use super::StorageOption::*; - diesel_wasm_sqlite::init_sqlite().await; + sqlite_web::init_sqlite().await; let conn = match opts { Ephemeral => SqliteConnection::establish(":memory:"), Persistent(ref db_path) => SqliteConnection::establish(db_path), diff --git a/xmtp_mls/src/storage/mod.rs b/xmtp_mls/src/storage/mod.rs index d3c525897..f8ce9357c 100644 --- a/xmtp_mls/src/storage/mod.rs +++ b/xmtp_mls/src/storage/mod.rs @@ -9,7 +9,7 @@ pub use errors::*; /// Initialize the SQLite WebAssembly Library #[cfg(target_arch = "wasm32")] pub async fn init_sqlite() { - diesel_wasm_sqlite::init_sqlite().await; + sqlite_web::init_sqlite().await; } #[cfg(not(target_arch = "wasm32"))] pub async fn init_sqlite() {} diff --git a/xmtp_mls/src/utils/bench/mod.rs b/xmtp_mls/src/utils/bench/mod.rs index 5466c6dd9..bb1ef3d2c 100644 --- a/xmtp_mls/src/utils/bench/mod.rs +++ b/xmtp_mls/src/utils/bench/mod.rs @@ -8,20 +8,7 @@ pub use identity_gen::*; pub mod clients; pub use clients::*; -use once_cell::sync::OnceCell; -use std::sync::Once; use thiserror::Error; -use tracing::{Metadata, Subscriber}; -use tracing_flame::{FlameLayer, FlushGuard}; -use tracing_subscriber::{ - layer::{Context, Filter, Layer, SubscriberExt}, - registry::LookupSpan, - util::SubscriberInitExt, - EnvFilter, -}; - -pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench"; - /// Re-export of functions in private modules for benchmarks pub mod re_export { pub use crate::hpke::encrypt_welcome; @@ -34,66 +21,3 @@ pub enum BenchError { #[error(transparent)] Io(#[from] std::io::Error), } - -static INIT: Once = Once::new(); - -static LOGGER: OnceCell>> = OnceCell::new(); - -/// initializes logging for benchmarks -/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options. -/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace` -pub fn init_logging() { - INIT.call_once(|| { - let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap(); - let flame_layer = flame_layer - .with_threads_collapsed(true) - .with_module_path(true); - // .with_empty_samples(false); - - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env())) - .with( - flame_layer - .with_filter(BenchFilter) - .with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")), - ) - .init(); - - LOGGER.set(guard).unwrap(); - }) -} - -/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync -/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky -/// workaround to allow us to async setup some groups. -pub fn bench_async_setup(fun: F) -> T -where - F: Fn() -> Fut, - Fut: futures::future::Future, -{ - use tokio::runtime::Handle; - tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await })) -} - -/// Filters for only spans where the root span name is "bench" -pub struct BenchFilter; - -impl Filter for BenchFilter -where - S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug, - for<'lookup> >::Data: std::fmt::Debug, -{ - fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { - if meta.name() == BENCH_ROOT_SPAN { - return true; - } - if let Some(id) = cx.current_span().id() { - if let Some(s) = cx.span_scope(id) { - if let Some(s) = s.from_root().take(1).collect::>().first() { - return s.name() == BENCH_ROOT_SPAN; - } - } - } - false - } -} diff --git a/xmtp_mls/src/utils/mod.rs b/xmtp_mls/src/utils/mod.rs index 357410b65..3ea0f821f 100644 --- a/xmtp_mls/src/utils/mod.rs +++ b/xmtp_mls/src/utils/mod.rs @@ -3,6 +3,9 @@ pub mod bench; #[cfg(any(test, feature = "test-utils"))] pub mod test; +#[cfg(any(test, feature = "test-utils"))] +pub use self::test::*; + pub mod hash { pub use xmtp_cryptography::hash::sha256_bytes as sha256; } diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index ff44452b2..636467fc0 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -1,7 +1,14 @@ #![allow(clippy::unwrap_used)] -use std::sync::Arc; +use std::{ + future::Future, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use tokio::sync::Notify; +use xmtp_common::time::{timeout, Expired}; use xmtp_id::{ associations::{ generate_inbox_id, @@ -238,6 +245,68 @@ where } } +#[derive(Default)] +pub struct WorkerHandle { + processed: AtomicUsize, + notify: Notify, +} + +impl WorkerHandle { + pub async fn wait_for_new_events(&self, mut count: usize) -> Result<(), Expired> { + timeout(xmtp_common::time::Duration::from_secs(3), async { + while count > 0 { + self.notify.notified().await; + count -= 1; + } + }) + .await?; + + Ok(()) + } + + pub async fn wait_for_processed_count(&self, expected: usize) -> Result<(), Expired> { + timeout(xmtp_common::time::Duration::from_secs(3), async { + while self.processed.load(Ordering::SeqCst) < expected { + self.notify.notified().await; + } + }) + .await?; + + Ok(()) + } + + pub async fn block_for_num_events(&self, num_events: usize, op: Fut) -> Result<(), Expired> + where + Fut: Future, + { + let processed_count = self.processed_count(); + op.await; + self.wait_for_processed_count(processed_count + num_events) + .await?; + Ok(()) + } + + pub fn processed_count(&self) -> usize { + self.processed.load(Ordering::SeqCst) + } + + pub fn increment(&self) { + self.processed + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.notify.notify_waiters(); + } +} + +impl Client { + pub fn sync_worker_handle(&self) -> Option> { + self.sync_worker_handle.lock().clone() + } + + pub(crate) fn set_sync_worker_handle(&self, handle: Arc) { + *self.sync_worker_handle.lock() = Some(handle); + } +} + pub async fn register_client( client: &Client, owner: impl InboxOwner,