Skip to content

Commit

Permalink
most tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Aug 9, 2024
1 parent b124af3 commit 95ff5e5
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 98 deletions.
250 changes: 213 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ethers-core = "2.0.4"
futures = "0.3.30"
futures-core = "0.3.30"
hex = "0.4.3"
log = { version = "0.4", features = ["release_max_level_debug"] }
log = { version = "0.4" }
openmls = { git = "https://github.com/xmtp/openmls", rev = "9cb3207", default-features = false }
openmls_basic_credential = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" }
openmls_rust_crypto = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" }
Expand All @@ -51,7 +51,7 @@ thiserror = "1.0"
tls_codec = "0.4.0"
tokio = { version = "1.35.1", default-features = false }
async-stream = "0.3"
tracing = { version = "0.1", features = ["release_max_level_debug"] }
tracing = { version = "0.1" }
tracing-subscriber = "0.3"
url = "2.5.0"
tonic = "^0.11"
Expand Down
4 changes: 2 additions & 2 deletions bindings_wasm/src/mls_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn create_client(
encryption_key: Option<Uint8Array>,
history_sync_url: Option<String>,
) -> Result<WasmClient, JsError> {
let api_client = XmtpHttpApiClient::create(host.clone());
let api_client = XmtpHttpApiClient::new(host.clone());

let storage_option = StorageOption::Ephemeral;
let store = match encryption_key {
Expand Down Expand Up @@ -86,7 +86,7 @@ pub async fn get_inbox_id_for_address(
account_address: String,
) -> Result<Option<String>, JsError> {
let account_address = account_address.to_lowercase();
let api_client = ApiClientWrapper::new(XmtpHttpApiClient::create(host.clone()), Retry::default());
let api_client = ApiClientWrapper::new(XmtpHttpApiClient::new(host.clone()), Retry::default());

let results = api_client
.get_inbox_ids(vec![account_address.clone()])
Expand Down
24 changes: 12 additions & 12 deletions xmtp_api_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ pub struct XmtpHttpApiClient {
}

impl XmtpHttpApiClient {
pub fn create(host_url: String) -> Self {
pub fn new(host_url: String) -> Self {
let client = reqwest::Client::builder()
.connection_verbose(true)
.build()
.unwrap();

XmtpHttpApiClient {
http_client: reqwest::Client::new(),
http_client: client,
host_url,
}
}
Expand Down Expand Up @@ -184,30 +189,25 @@ impl XmtpMlsClient for XmtpHttpApiClient {
request: SubscribeGroupMessagesRequest,
) -> Result<GroupMessageStream, Error> {
log::debug!("subscribe_group_messages");

let stream = create_grpc_stream::<_, GroupMessage>(
create_grpc_stream::<_, GroupMessage>(
request,
self.endpoint(ApiEndpoints::SUBSCRIBE_GROUP_MESSAGES),
self.http_client.clone(),
)
.await;

Ok(stream)
.await
}

async fn subscribe_welcome_messages(
&self,
request: SubscribeWelcomeMessagesRequest,
) -> Result<WelcomeMessageStream, Error> {
log::debug!("subscribe_welcome_messages");
let stream = create_grpc_stream::<_, WelcomeMessage>(
create_grpc_stream::<_, WelcomeMessage>(
request,
self.endpoint(ApiEndpoints::SUBSCRIBE_WELCOME_MESSAGES),
self.http_client.clone(),
)
.await;

Ok(stream)
.await
}
}

Expand Down Expand Up @@ -283,7 +283,7 @@ mod tests {

#[tokio::test]
async fn test_register_installation() {
let client = XmtpHttpApiClient::create(ApiUrls::LOCAL_ADDRESS.to_string());
let client = XmtpHttpApiClient::new(ApiUrls::LOCAL_ADDRESS.to_string());
let result = client
.register_installation(RegisterInstallationRequest {
is_inbox_id_credential: false,
Expand Down
23 changes: 10 additions & 13 deletions xmtp_api_http/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,25 @@ pub async fn create_grpc_stream<
request: T,
endpoint: String,
http_client: reqwest::Client,
) -> BoxStream<'static, Result<R, Error>> {
) -> Result<BoxStream<'static, Result<R, Error>>, Error> {
let stream = async_stream::stream! {
let bytes_stream = http_client
.post(endpoint)
.json(&request)
.send()
.await
.map_err(|e| Error::new(ErrorKind::MlsError).with(e))?
.bytes_stream();

log::debug!("Spawning grpc http stream");
let request = http_client
.post(endpoint)
.json(&request)
.send()
.await
.map_err(|e| Error::new(ErrorKind::MlsError).with(e))?;

let mut remaining = vec![];
for await bytes in bytes_stream {
for await bytes in request.bytes_stream() {
let bytes = bytes
.map_err(|e| Error::new(ErrorKind::SubscriptionUpdateError).with(e.to_string()))?;

let bytes = &[remaining.as_ref(), bytes.as_ref()].concat();
let de = Deserializer::from_slice(bytes);
let mut stream = de.into_iter::<GrpcResponse<R>>();
'messages: loop {
let response = stream.next();

let res = match response {
Some(Ok(GrpcResponse::Ok(response))) => Ok(response),
Some(Ok(GrpcResponse::SubscriptionItem(item))) => Ok(item.result),
Expand All @@ -91,7 +88,7 @@ pub async fn create_grpc_stream<
}
};

Box::pin(stream)
Ok(Box::pin(stream))
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion xmtp_id/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ chrono.workspace = true
ed25519-dalek = { workspace = true, features = ["digest"] }
ed25519.workspace = true
ethers.workspace = true
ethers-core.workspace = true
futures.workspace = true
hex.workspace = true
log.workspace = true
Expand All @@ -32,7 +33,6 @@ xmtp_proto = { workspace = true, features = ["proto_full"] }
[dev-dependencies]
ctor = "0.2.5"
ed25519-dalek = { workspace = true, features = ["digest"] }
ethers.workspace = true
futures = "0.3"
regex = "1.10"
tokio = { workspace = true, features = ["macros", "time"] }
Expand Down
4 changes: 2 additions & 2 deletions xmtp_id/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use ethers::{
signers::{LocalWallet, Signer},
types::Address,
};
use futures::executor;
use openmls_traits::types::CryptoError;
use thiserror::Error;
use xmtp_cryptography::signature::{h160addr_to_string, RecoverableSignature, SignatureError};
Expand Down Expand Up @@ -54,7 +53,8 @@ impl InboxOwner for LocalWallet {
}

fn sign(&self, text: &str) -> Result<RecoverableSignature, SignatureError> {
Ok(executor::block_on(self.sign_message(text))?.to_vec().into())
let message_hash = ethers_core::utils::hash_message(text);
Ok(self.sign_hash(message_hash)?.to_vec().into())
}
}

Expand Down
14 changes: 8 additions & 6 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ path = "src/bin/update-schema.rs"
default = ["native"]
grpc = ["xmtp_proto/grpc"]
native = ["libsqlite3-sys/bundled-sqlcipher-vendored-openssl"]
test-utils = ["xmtp_api_grpc"]
bench = ["test-utils", "indicatif", "tracing-subscriber", "anyhow", "tracing-flame", "once_cell"]
test-utils = []
bench = ["test-utils", "indicatif", "tracing-subscriber", "anyhow", "tracing-flame", "once_cell", "xmtp_api_grpc"]
http-api = ["xmtp_api_http"]
grpc-api = ["xmtp_api_grpc"]

[dependencies]
aes-gcm = { version = "0.10.3", features = ["std"] }
Expand Down Expand Up @@ -50,8 +51,8 @@ sha2.workspace = true
smart-default = "0.7.1"
thiserror = { workspace = true }
tls_codec = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "tracing"] }
tokio-stream = { version = "0.1", features = ["sync"] }
async-stream.workspace = true
toml = "0.8.4"
xmtp_cryptography = { workspace = true }
Expand All @@ -61,17 +62,17 @@ xmtp_v2 = { path = "../xmtp_v2" }

# Test/Bench Utils
xmtp_api_grpc = { path = "../xmtp_api_grpc", optional = true }
xmtp_api_http = { path = "../xmtp_api_http", optional = true }
tracing-subscriber = { workspace = true, optional = true }
indicatif = { version = "0.17", optional = true }
anyhow = { workspace = true, optional = true }
tracing-flame = { version = "0.2", optional = true }
once_cell = { version = "1.19", optional = true }
xmtp_api_http = { path = "../xmtp_api_http", optional = true }

[dev-dependencies]
ctor.workspace = true
flume = "0.11"
mockall = "0.11.4"
mockall = "0.13.0"
mockito = "1.4.0"
tempfile = "3.5.0"
tracing.workspace = true
Expand All @@ -83,6 +84,7 @@ xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
async-barrier = "1.1"
anyhow.workspace = true
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
console-subscriber = "0.4"

[[bench]]
name = "group_limit"
Expand Down
18 changes: 14 additions & 4 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,21 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_subscribe_multiple() {
let amal = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let group = amal
.create_group(None, GroupMetadataOptions::default())
.unwrap();
let group = Arc::new(
amal.create_group(None, GroupMetadataOptions::default())
.unwrap(),
);

let stream = group.stream(amal.clone()).await.unwrap();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let amal_ptr = amal.clone();
let group_ptr = group.clone();
tokio::spawn(async move {
let mut stream = group_ptr.stream(amal_ptr).await.unwrap();
while let Some(item) = stream.next().await {
let _ = tx.send(item);
}
});

for i in 0..10 {
group
Expand Down
1 change: 0 additions & 1 deletion xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ impl MlsGroup {
if !errors.is_empty() {
return Err(GroupError::Sync(errors));
}

Ok(())
}

Expand Down
9 changes: 2 additions & 7 deletions xmtp_mls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,11 @@ mod tests {

// Execute once before any tests are run
// Capture traces in a variable that can be checked in tests, as well as outputting them to stdout on test failure
// #[traced_test]
#[traced_test]
#[ctor::ctor]
fn setup() {
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
// Capture logs (e.g. log::info!()) as traces too
// let _ = tracing_log::LogTracer::init_with_filter(LevelFilter::Debug);
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let _ = tracing_log::LogTracer::init_with_filter(LevelFilter::Debug);
}

/// Note: tests that use this must have the #[traced_test] attribute
Expand Down
4 changes: 2 additions & 2 deletions xmtp_mls/src/owner/evm_owner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub use ethers::signers::{LocalWallet, Signer};
use futures::executor;

use xmtp_cryptography::signature::{h160addr_to_string, RecoverableSignature, SignatureError};

Expand All @@ -11,6 +10,7 @@ impl InboxOwner for LocalWallet {
}

fn sign(&self, text: &str) -> Result<RecoverableSignature, SignatureError> {
Ok(executor::block_on(self.sign_message(text))?.to_vec().into())
let message_hash = ethers_core::utils::hash_message(text);
Ok(self.sign_hash(message_hash)?.to_vec().into())
}
}
30 changes: 23 additions & 7 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,24 +390,40 @@ mod tests {
};
use xmtp_cryptography::utils::generate_local_wallet;

#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "current_thread")]
async fn test_stream_welcomes() {
let alice = ClientBuilder::new_test_client(&generate_local_wallet()).await;
let bob = ClientBuilder::new_test_client(&generate_local_wallet()).await;

let alice = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let bob = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let alice_bob_group = alice
.create_group(None, GroupMetadataOptions::default())
.unwrap();

let mut bob_stream = bob.stream_conversations().await.unwrap();
// FIXME:insipx we run into an issue where the reqwest::post().send() request
// blocks the executor and we cannot progress the runtime if we dont `tokio::spawn` this.
// A solution might be to use `hyper` instead, and implement a custom connection pool with
// `deadpool`. This is a bit more work but shouldn't be too complicated since
// we're only using `post` requests. It would be nice for all streams to work
// w/o spawning a separate task.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let bob_ptr = bob.clone();
tokio::spawn(async move {
let mut bob_stream = bob_ptr.stream_conversations().await.unwrap();
while let Some(item) = bob_stream.next().await {
let _ = tx.send(item);
}
});

let group_id = alice_bob_group.group_id.clone();
alice_bob_group
.add_members_by_inbox_id(&alice, vec![bob.inbox_id()])
.await
.unwrap();

let bob_received_groups = bob_stream.next().await.unwrap();
assert_eq!(bob_received_groups.group_id, alice_bob_group.group_id);
let bob_received_groups = stream.next().await.unwrap();
assert_eq!(bob_received_groups.group_id, group_id);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_stream_messages() {
let alice = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
Expand Down
4 changes: 2 additions & 2 deletions xmtp_mls/src/utils/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ pub fn rand_time() -> i64 {
#[cfg(feature = "http-api")]
impl XmtpTestClient for XmtpHttpApiClient {
async fn create_local() -> Self {
XmtpHttpApiClient::create("http://localhost:5555".into())
XmtpHttpApiClient::new("http://localhost:5555".into())
}

async fn create_dev() -> Self {
XmtpHttpApiClient::create("https://grpc.dev.xmtp.network:443".into())
XmtpHttpApiClient::new("https://grpc.dev.xmtp.network:443".into())
}
}

Expand Down

0 comments on commit 95ff5e5

Please sign in to comment.