Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication client #1174

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ pub async fn get_inbox_id_for_address(
account_address: String,
) -> Result<Option<String>, GenericError> {
let api_client = ApiClientWrapper::new(
TonicApiClient::create(host.clone(), is_secure).await?,
TonicApiClient::create(host.clone(), is_secure)
.await?
.into(),
Retry::default(),
);

Expand Down
3 changes: 2 additions & 1 deletion bindings_node/src/inbox_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub async fn get_inbox_id_for_address(
let api_client = ApiClientWrapper::new(
TonicApiClient::create(host.clone(), is_secure)
.await
.map_err(ErrorWrapper::from)?,
.map_err(ErrorWrapper::from)?
.into(),
Retry::default(),
);

Expand Down
2 changes: 1 addition & 1 deletion bindings_wasm/src/inbox_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn get_inbox_id_for_address(
) -> Result<Option<String>, JsError> {
let account_address = account_address.to_lowercase();
let api_client = ApiClientWrapper::new(
XmtpHttpApiClient::new(host.clone()).unwrap(),
XmtpHttpApiClient::new(host.clone()).unwrap().into(),
insipx marked this conversation as resolved.
Show resolved Hide resolved
Retry::default(),
);

Expand Down
115 changes: 67 additions & 48 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder};
use futures::future::join_all;
use kv_log_macro::{error, info};
use prost::Message;
use xmtp_api_grpc::replication_client::ClientV4;
use xmtp_id::associations::unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature};
use xmtp_mls::client::FindGroupParams;

use xmtp_mls::groups::device_sync::DeviceSyncContent;
use xmtp_mls::storage::group_message::{GroupMessageKind, MsgQueryArgs};
use xmtp_mls::XmtpApi;

use crate::{
json_logger::make_value,
serializable::{SerializableGroup, SerializableMessage},
};
use serializable::maybe_get_text;
use thiserror::Error;
use xmtp_api_grpc::grpc_api_helper::Client as ApiClient;
use xmtp_api_grpc::grpc_api_helper::Client as ClientV3;
use xmtp_cryptography::{
signature::{RecoverableSignature, SignatureError},
utils::rng,
Expand All @@ -49,8 +51,8 @@ use xmtp_mls::{
utils::time::now_ns,
InboxOwner,
};
type Client = xmtp_mls::client::Client<ApiClient>;
type ClientBuilder = xmtp_mls::builder::ClientBuilder<ApiClient>;

type Client = xmtp_mls::client::Client<Box<dyn XmtpApi>>;
type MlsGroup = xmtp_mls::groups::MlsGroup<Client>;

/// A fictional versioning CLI
Expand All @@ -67,6 +69,8 @@ struct Cli {
local: bool,
#[clap(long, default_value_t = false)]
json: bool,
#[clap(long, default_value_t = false)]
testnet: bool,
}

#[derive(ValueEnum, Debug, Copy, Clone)]
Expand Down Expand Up @@ -179,34 +183,54 @@ async fn main() {
}
info!("Starting CLI Client....");

let grpc = match (cli.testnet, cli.local) {
(true, true) => Box::new(
ClientV4::create("http://localhost:5050".into(), false)
.await
.unwrap(),
) as Box<dyn XmtpApi>,
(true, false) => Box::new(
ClientV4::create("https://grpc.testnet.xmtp.network:443".into(), true)
.await
.unwrap(),
) as Box<dyn XmtpApi>,
(false, true) => Box::new(
ClientV3::create("http://localhost:5556".into(), false)
.await
.unwrap(),
) as Box<dyn XmtpApi>,
(false, false) => Box::new(
ClientV3::create("https://grpc.dev.xmtp.network:443".into(), true)
.await
.unwrap(),
) as Box<dyn XmtpApi>,
};
insipx marked this conversation as resolved.
Show resolved Hide resolved

if let Commands::Register { seed_phrase } = &cli.command {
info!("Register");
if let Err(e) = register(&cli, seed_phrase.clone()).await {
if let Err(e) = register(&cli, seed_phrase.clone(), grpc).await {
error!("Registration failed: {:?}", e)
}
return;
}

let client = create_client(&cli, IdentityStrategy::CachedOnly, grpc)
.await
.unwrap();

Comment on lines +217 to +220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid unwrapping results when creating the client

Using .unwrap() can lead to panics if an error occurs. It's better to handle the error or propagate it to maintain application stability.

Modify the code to handle errors:

-let client = create_client(&cli, IdentityStrategy::CachedOnly, grpc)
-    .await
-    .unwrap();
+let client = match create_client(&cli, IdentityStrategy::CachedOnly, grpc).await {
+    Ok(client) => client,
+    Err(e) => {
+        error!("Failed to create client: {:?}", e);
+        return;
+    }
+};

Ensure that any errors during client creation are appropriately logged and handled.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let client = create_client(&cli, IdentityStrategy::CachedOnly, grpc)
.await
.unwrap();
let client = match create_client(&cli, IdentityStrategy::CachedOnly, grpc).await {
Ok(client) => client,
Err(e) => {
error!("Failed to create client: {:?}", e);
return;
}
};

match &cli.command {
#[allow(unused_variables)]
Commands::Register { seed_phrase } => {
unreachable!()
}
Commands::Info {} => {
info!("Info");
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let installation_id = hex::encode(client.installation_public_key());
info!("identity info", { command_output: true, account_address: client.inbox_id(), installation_id: installation_id });
}
Commands::ListGroups {} => {
info!("List Groups");
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let conn = client.store().conn().unwrap();

client
.sync_welcomes(&conn)
.await
Expand Down Expand Up @@ -236,9 +260,6 @@ async fn main() {
}
Commands::Send { group_id, msg } => {
info!("Sending message to group", { group_id: group_id, message: msg });
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
info!("Inbox ID is: {}", client.inbox_id());
let group = get_group(&client, hex::decode(group_id).expect("group id decode"))
.await
Expand All @@ -248,9 +269,6 @@ async fn main() {
}
Commands::ListGroupMessages { group_id } => {
info!("Recv");
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();

let group = get_group(&client, hex::decode(group_id).expect("group id decode"))
.await
Expand All @@ -277,10 +295,6 @@ async fn main() {
group_id,
account_addresses,
} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();

let group = get_group(&client, hex::decode(group_id).expect("group id decode"))
.await
.expect("failed to get group");
Expand All @@ -299,10 +313,6 @@ async fn main() {
group_id,
account_addresses,
} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();

let group = get_group(&client, hex::decode(group_id).expect("group id decode"))
.await
.expect("failed to get group");
Expand All @@ -324,10 +334,6 @@ async fn main() {
xmtp_mls::groups::PreconfiguredPolicies::AdminsOnly
}
};
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();

let group = client
.create_group(
Some(group_permissions.to_policy_set()),
Expand All @@ -338,9 +344,6 @@ async fn main() {
info!("Created group {}", group_id, { command_output: true, group_id: group_id})
}
Commands::GroupInfo { group_id } => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let group = &client
.group(hex::decode(group_id).expect("bad group id"))
.expect("group not found");
Expand All @@ -349,9 +352,6 @@ async fn main() {
info!("Group {}", group_id, { command_output: true, group_id: group_id, group_info: make_value(&serializable) })
}
Commands::RequestHistorySync {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
Expand All @@ -361,9 +361,6 @@ async fn main() {
info!("Sent history sync request in sync group {group_id_str}", { group_id: group_id_str})
}
Commands::ReplyToHistorySyncRequest {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let provider = client.mls_provider().unwrap();
let group = client.get_sync_group().unwrap();
let group_id_str = hex::encode(group.group_id);
Expand All @@ -376,9 +373,6 @@ async fn main() {
info!("Reply: {:?}", reply);
}
Commands::ProcessHistorySyncReply {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
Expand All @@ -388,9 +382,6 @@ async fn main() {
info!("History bundle downloaded and inserted into user DB", {})
}
Commands::ProcessConsentSyncReply {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
Expand All @@ -400,9 +391,6 @@ async fn main() {
info!("Consent bundle downloaded and inserted into user DB", {})
}
Commands::ListHistorySyncMessages {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
Expand Down Expand Up @@ -437,6 +425,28 @@ async fn main() {
}
}

async fn create_client<C: XmtpApi>(
cli: &Cli,
account: IdentityStrategy,
grpc: C,
) -> Result<xmtp_mls::client::Client<C>, CliError> {
let msg_store = get_encrypted_store(&cli.db).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors when initializing the message store

Unwrapping the result of get_encrypted_store can cause a panic if initialization fails. It's advisable to propagate the error instead.

Update the code to use the ? operator:

-let msg_store = get_encrypted_store(&cli.db).await.unwrap();
+let msg_store = get_encrypted_store(&cli.db).await?;

This change ensures that any error during message store initialization is properly handled by the calling function.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let msg_store = get_encrypted_store(&cli.db).await.unwrap();
let msg_store = get_encrypted_store(&cli.db).await?;

let mut builder = xmtp_mls::builder::ClientBuilder::<C>::new(account).store(msg_store);

builder = builder.api_client(grpc);

if cli.local {
builder = builder.history_sync_url(MessageHistoryUrls::LOCAL_ADDRESS);
} else {
builder = builder.history_sync_url(MessageHistoryUrls::DEV_ADDRESS);
}

let client = builder.build().await.map_err(CliError::ClientBuilder)?;

Ok(client)
}

/*
async fn create_client(cli: &Cli, account: IdentityStrategy) -> Result<Client, CliError> {
let msg_store = get_encrypted_store(&cli.db).await.unwrap();
let mut builder = ClientBuilder::new(account).store(msg_store);
Expand Down Expand Up @@ -465,8 +475,16 @@ async fn create_client(cli: &Cli, account: IdentityStrategy) -> Result<Client, C

Ok(client)
}
*/

async fn register(cli: &Cli, maybe_seed_phrase: Option<String>) -> Result<(), CliError> {
async fn register<C>(
cli: &Cli,
maybe_seed_phrase: Option<String>,
client: C,
) -> Result<(), CliError>
where
C: XmtpApi,
{
let w: Wallet = if let Some(seed_phrase) = maybe_seed_phrase {
Wallet::LocalWallet(
MnemonicBuilder::<English>::default()
Expand All @@ -483,6 +501,7 @@ async fn register(cli: &Cli, maybe_seed_phrase: Option<String>) -> Result<(), Cl
let client = create_client(
cli,
IdentityStrategy::CreateIfNotFound(inbox_id, w.get_address(), nonce, None),
client,
)
.await?;
let mut signature_request = client.identity().signature_request().unwrap();
Expand Down
4 changes: 1 addition & 3 deletions examples/cli/serializable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ pub struct SerializableGroup {
}

impl SerializableGroup {
pub async fn from<ApiClient: XmtpApi + Clone>(
group: &MlsGroup<xmtp_mls::Client<ApiClient>>,
) -> Self {
pub async fn from<ApiClient: XmtpApi>(group: &MlsGroup<xmtp_mls::Client<ApiClient>>) -> Self {
let group_id = hex::encode(group.group_id.clone());
let members = group
.members()
Expand Down
1 change: 1 addition & 0 deletions xmtp_api_grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tonic = { workspace = true, features = ["tls", "tls-native-roots", "tls-webpki-r
tracing.workspace = true
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] }
xmtp_v2 = { path = "../xmtp_v2" }
async-trait = "0.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider managing async-trait version at workspace level.

For consistency with other dependencies in this file, consider moving the async-trait version specification to the workspace level and using .workspace = true here.

-async-trait = "0.1"
+async-trait.workspace = true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async-trait = "0.1"
async-trait.workspace = true


[dev-dependencies]
uuid = { workspace = true, features = ["v4"] }
Expand Down
6 changes: 6 additions & 0 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl ClientWithMetadata for Client {
}
}

#[async_trait::async_trait]
impl XmtpApiClient for Client {
type Subscription = Subscription;
type MutableSubscription = GrpcMutableSubscription;
Expand Down Expand Up @@ -267,6 +268,7 @@ impl Subscription {
}
}

#[async_trait::async_trait]
impl XmtpApiSubscription for Subscription {
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
Expand Down Expand Up @@ -321,6 +323,7 @@ impl Stream for GrpcMutableSubscription {
}
}

#[async_trait::async_trait]
impl MutableApiSubscription for GrpcMutableSubscription {
async fn update(&mut self, req: SubscribeRequest) -> Result<(), Error> {
self.update_channel
Expand All @@ -336,6 +339,8 @@ impl MutableApiSubscription for GrpcMutableSubscription {
self.update_channel.close_channel();
}
}

#[async_trait::async_trait]
impl XmtpMlsClient for Client {
#[tracing::instrument(level = "trace", skip_all)]
async fn upload_key_package(&self, req: UploadKeyPackageRequest) -> Result<(), Error> {
Expand Down Expand Up @@ -453,6 +458,7 @@ impl Stream for WelcomeMessageStream {
}
}

#[async_trait::async_trait]
impl XmtpMlsStreams for Client {
type GroupMessageStream<'a> = GroupMessageStream;
type WelcomeMessageStream<'a> = WelcomeMessageStream;
Expand Down
Loading