Skip to content

Commit

Permalink
use proto in http client (#1671)
Browse files Browse the repository at this point in the history
* use proto in http client

* handle grpc errors properly

* fix wasm build
  • Loading branch information
insipx authored Feb 26, 2025
1 parent a25da22 commit 9f0dd2c
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion xmtp_api_http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ async-trait.workspace = true
bytes = "1.9"
futures = { workspace = true, default-features = false }
pin-project-lite = "0.2.15"
reqwest.workspace = true
prost.workspace = true
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions xmtp_api_http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub enum HttpClientError {
HeaderName(#[from] reqwest::header::InvalidHeaderName),
#[error("error deserializing json response {0}")]
Json(#[from] serde_json::Error),
#[error(transparent)]
Decode(#[from] prost::DecodeError),
}

impl xmtp_common::RetryableError for HttpClientError {
Expand Down
117 changes: 63 additions & 54 deletions xmtp_api_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ pub mod util;

use futures::stream;
use http_stream::create_grpc_stream;
use prost::Message;
use reqwest::header;
use util::handle_error;
use reqwest::header::HeaderMap;
use util::handle_error_proto;
use xmtp_proto::api_client::{ApiBuilder, XmtpIdentityClient};
use xmtp_proto::xmtp::identity::api::v1::{
GetIdentityUpdatesRequest as GetIdentityUpdatesV2Request,
Expand Down Expand Up @@ -65,7 +67,7 @@ impl XmtpHttpApiClient {
})
}

fn builder() -> XmtpHttpApiClientBuilder {
pub fn builder() -> XmtpHttpApiClientBuilder {
Default::default()
}

Expand Down Expand Up @@ -154,6 +156,14 @@ impl ApiBuilder for XmtpHttpApiClientBuilder {
}
}

fn protobuf_headers() -> Result<HeaderMap, HttpClientError> {
let mut headers = HeaderMap::new();

headers.insert("Content-Type", "application/x-protobuf".parse()?);
headers.insert("Accept", "application/x-protobuf".parse()?);
Ok(headers)
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl XmtpMlsClient for XmtpHttpApiClient {
Expand All @@ -163,16 +173,15 @@ impl XmtpMlsClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::UPLOAD_KEY_PACKAGE))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::upload_kp)?
.bytes()
.await
.map_err(Error::upload_kp)?;

tracing::debug!("upload_key_package");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::UploadKeyPackage))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::UploadKeyPackage))
}

async fn fetch_key_packages(
Expand All @@ -182,32 +191,31 @@ impl XmtpMlsClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::FETCH_KEY_PACKAGES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::fetch_kps)?
.bytes()
.await
.map_err(Error::fetch_kps)?;

tracing::debug!("fetch_key_packages");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::FetchKeyPackages))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::FetchKeyPackages))
}

async fn send_group_messages(&self, request: SendGroupMessagesRequest) -> Result<(), Error> {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::SEND_GROUP_MESSAGES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::send_group_messages)?
.bytes()
.await
.map_err(Error::send_group_messages)?;

tracing::debug!("send_group_messages");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::SendGroupMessages))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::SendGroupMessages))
}

async fn send_welcome_messages(
Expand All @@ -217,16 +225,16 @@ impl XmtpMlsClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::SEND_WELCOME_MESSAGES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::send_welcome_messages)?
.bytes()
.await
.map_err(Error::send_welcome_messages)?;

tracing::debug!("send_welcome_messages");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::SendWelcomeMessages))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::SendWelcomeMessages))
}

async fn query_group_messages(
Expand All @@ -236,16 +244,16 @@ impl XmtpMlsClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::QUERY_GROUP_MESSAGES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::query_group_messages)?
.bytes()
.await
.map_err(Error::query_group_messages)?;

tracing::debug!("query_group_messages");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::QueryGroupMessages))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::QueryGroupMessages))
}

async fn query_welcome_messages(
Expand All @@ -255,16 +263,16 @@ impl XmtpMlsClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::QUERY_WELCOME_MESSAGES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::query_welcome_messages)?
.bytes()
.await
.map_err(Error::query_welcome_messages)?;

tracing::debug!("query_welcome_messages");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::QueryWelcomeMessages))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::QueryWelcomeMessages))
}
}

Expand Down Expand Up @@ -327,16 +335,16 @@ impl XmtpIdentityClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::PUBLISH_IDENTITY_UPDATE))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::publish_identity_update)?
.bytes()
.await
.map_err(Error::publish_identity_update)?;

tracing::debug!("publish_identity_update");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::PublishIdentityUpdate))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::PublishIdentityUpdate))
}

async fn get_identity_updates_v2(
Expand All @@ -346,16 +354,16 @@ impl XmtpIdentityClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::GET_IDENTITY_UPDATES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::get_identity_updates_v2)?
.bytes()
.await
.map_err(Error::get_identity_updates_v2)?;

tracing::debug!("get_identity_updates_v2");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::GetIdentityUpdatesV2))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::GetIdentityUpdatesV2))
}

async fn get_inbox_ids(
Expand All @@ -365,16 +373,16 @@ impl XmtpIdentityClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::GET_INBOX_IDS))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::get_inbox_ids)?
.bytes()
.await
.map_err(Error::get_inbox_ids)?;

tracing::debug!("get_inbox_ids");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::GetInboxIds))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::GetInboxIds))
}

async fn verify_smart_contract_wallet_signatures(
Expand All @@ -384,16 +392,16 @@ impl XmtpIdentityClient for XmtpHttpApiClient {
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::VERIFY_SMART_CONTRACT_WALLET_SIGNATURES))
.json(&request)
.headers(protobuf_headers()?)
.body(request.encode_to_vec())
.send()
.await
.map_err(Error::verify_scw_signature)?
.bytes()
.await
.map_err(Error::verify_scw_signature)?;

tracing::debug!("verify_smart_contract_wallet_signatures");
handle_error(&*res).map_err(|e| e.with(ApiEndpoint::VerifyScwSignature))
handle_error_proto(res)
.await
.map_err(|e| e.with(ApiEndpoint::VerifyScwSignature))
}
}

Expand Down Expand Up @@ -428,6 +436,7 @@ pub mod tests {
.await;

assert!(result.is_err());
println!("{:?}", result);
assert!(result
.as_ref()
.err()
Expand Down
19 changes: 19 additions & 0 deletions xmtp_api_http/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::http_stream::SubscriptionItem;
use crate::Error;
use crate::ErrorResponse;
use crate::HttpClientError;
use prost::Message;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::io::Read;

Expand All @@ -14,6 +15,24 @@ pub(crate) enum GrpcResponse<T> {
Empty {},
}

/// handle JSON response from gRPC, returning either
/// the expected deserialized response object or a gRPC [`Error`]
pub async fn handle_error_proto<T>(response: reqwest::Response) -> Result<T, Error>
where
T: prost::Message + Default,
{
if response.status().is_success() {
let res = response.bytes().await.map_err(HttpClientError::from)?;
return Ok(Message::decode(res).map_err(HttpClientError::from)?);
}

Err(HttpClientError::Grpc(ErrorResponse {
code: response.status().as_u16() as usize,
message: response.text().await.map_err(HttpClientError::from)?,
details: vec![],
})
.into())
}
/// handle JSON response from gRPC, returning either
/// the expected deserialized response object or a gRPC [`Error`]
pub fn handle_error<R: Read, T>(reader: R) -> Result<T, Error>
Expand Down
8 changes: 8 additions & 0 deletions xmtp_mls/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ pub mod test_util {
}
}

/// Disable sqlcipher memory security
pub fn disable_memory_security(&self) {
let query = r#"PRAGMA cipher_memory_security = OFF"#;
let query = diesel::sql_query(query);
let _ = self.raw_query_read(|c| query.clone().execute(c)).unwrap();
let _ = self.raw_query_write(|c| query.execute(c)).unwrap();
}

pub fn intents_published(&self) -> i32 {
self.raw_query_read(|conn| {
let mut row = conn
Expand Down
2 changes: 2 additions & 0 deletions xmtp_mls/src/utils/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
.unwrap();
let conn = client.store().conn().unwrap();
conn.register_triggers();
conn.disable_memory_security();
register_client(&client, owner).await;

client
Expand Down Expand Up @@ -215,6 +216,7 @@ where
let client = builder.build().await.unwrap();
let conn = client.store().conn().unwrap();
conn.register_triggers();
conn.disable_memory_security();
register_client(&client, owner).await;

client
Expand Down

0 comments on commit 9f0dd2c

Please sign in to comment.