Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Aug 21, 2024
2 parents cdbe6ef + 2ca8be1 commit cab4e30
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 296 deletions.
11 changes: 11 additions & 0 deletions bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

243 changes: 231 additions & 12 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::convert::TryInto;
use std::sync::Arc;
use tokio::{sync::Mutex, task::AbortHandle};
use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient;
use xmtp_id::associations::AssociationState;
use xmtp_id::{
associations::{
builder::SignatureRequest, generate_inbox_id as xmtp_id_generate_inbox_id,
Expand Down Expand Up @@ -295,6 +296,31 @@ impl FfiXmtpClient {
let result = inner.find_inbox_id_from_address(address).await?;
Ok(result)
}

/**
* Get the client's inbox state.
*
* If `refresh_from_network` is true, the client will go to the network first to refresh the state.
* Otherwise, the state will be read from the local database.
*/
pub async fn inbox_state(
&self,
refresh_from_network: bool,
) -> Result<FfiInboxState, GenericError> {
let state = self.inner_client.inbox_state(refresh_from_network).await?;
Ok(state.into())
}

pub async fn get_latest_inbox_state(
&self,
inbox_id: String,
) -> Result<FfiInboxState, GenericError> {
let state = self
.inner_client
.get_latest_association_state(&self.inner_client.store().conn()?, &inbox_id)
.await?;
Ok(state.into())
}
}

#[uniffi::export(async_runtime = "tokio")]
Expand Down Expand Up @@ -333,12 +359,9 @@ impl FfiXmtpClient {
existing_wallet_address: &str,
new_wallet_address: &str,
) -> Result<Arc<FfiSignatureRequest>, GenericError> {
let inbox_id = self.inner_client.inbox_id();
let signature_request = self.inner_client.associate_wallet(
inbox_id,
existing_wallet_address.into(),
new_wallet_address.into(),
)?;
let signature_request = self
.inner_client
.associate_wallet(existing_wallet_address.into(), new_wallet_address.into())?;

let request = Arc::new(FfiSignatureRequest {
inner: Arc::new(tokio::sync::Mutex::new(signature_request)),
Expand All @@ -364,10 +387,9 @@ impl FfiXmtpClient {
&self,
wallet_address: &str,
) -> Result<Arc<FfiSignatureRequest>, GenericError> {
let inbox_id = self.inner_client.inbox_id();
let signature_request = self
.inner_client
.revoke_wallet(inbox_id, wallet_address.into())
.revoke_wallets(vec![wallet_address.into()])
.await?;

let request = Arc::new(FfiSignatureRequest {
Expand All @@ -376,6 +398,49 @@ impl FfiXmtpClient {

Ok(request)
}

/**
* Revokes all installations except the one the client is currently using
*/
pub async fn revoke_all_other_installations(
&self,
) -> Result<Arc<FfiSignatureRequest>, GenericError> {
let installation_id = self.inner_client.installation_public_key();
let inbox_state = self.inner_client.inbox_state(true).await?;
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.collect();

let signature_request = self
.inner_client
.revoke_installations(other_installation_ids)
.await?;

Ok(Arc::new(FfiSignatureRequest {
inner: Arc::new(tokio::sync::Mutex::new(signature_request)),
}))
}
}

#[derive(uniffi::Record)]
pub struct FfiInboxState {
pub inbox_id: String,
pub recovery_address: String,
pub installation_ids: Vec<Vec<u8>>,
pub account_addresses: Vec<String>,
}

impl From<AssociationState> for FfiInboxState {
fn from(state: AssociationState) -> Self {
Self {
inbox_id: state.inbox_id().to_string(),
recovery_address: state.recovery_address().to_string(),
installation_ids: state.installation_ids(),
account_addresses: state.account_addresses(),
}
}
}

#[derive(uniffi::Record, Default)]
Expand Down Expand Up @@ -687,6 +752,14 @@ impl FfiConversations {
Ok(())
}

pub async fn sync_all_groups(&self) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
let groups = inner.find_groups(None, None, None, None)?;

inner.sync_all_groups(groups).await?;
Ok(())
}

pub async fn list(
&self,
opts: FfiListConversationsOptions,
Expand Down Expand Up @@ -1273,7 +1346,7 @@ impl FfiGroup {
}
}

#[derive(uniffi::Enum)]
#[derive(uniffi::Enum, PartialEq)]
pub enum FfiGroupMessageKind {
Application,
MembershipChange,
Expand Down Expand Up @@ -1479,9 +1552,10 @@ impl FfiGroupPermissions {
mod tests {
use crate::{
get_inbox_id_for_address, inbox_owner::SigningError, logger::FfiLogger,
FfiConversationCallback, FfiCreateGroupOptions, FfiGroup, FfiGroupPermissionsOptions,
FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField,
FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType,
FfiConversationCallback, FfiCreateGroupOptions, FfiGroup, FfiGroupMessageKind,
FfiGroupPermissionsOptions, FfiInboxOwner, FfiListConversationsOptions,
FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet,
FfiPermissionUpdateType,
};
use std::{
env,
Expand Down Expand Up @@ -2170,6 +2244,57 @@ mod tests {
assert!(stream_messages.is_closed());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_sync_all_groups() {
let alix = new_test_client().await;
let bo = new_test_client().await;

for _i in 0..30 {
alix.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();
}

bo.conversations().sync().await.unwrap();
let alix_groups = alix
.conversations()
.list(FfiListConversationsOptions::default())
.await
.unwrap();

let alix_group1 = alix_groups[0].clone();
let alix_group5 = alix_groups[5].clone();
let bo_group1 = bo.group(alix_group1.id()).unwrap();
let bo_group5 = bo.group(alix_group5.id()).unwrap();

alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap();
alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap();

let bo_messages1 = bo_group1
.find_messages(FfiListMessagesOptions::default())
.unwrap();
let bo_messages5 = bo_group5
.find_messages(FfiListMessagesOptions::default())
.unwrap();
assert_eq!(bo_messages1.len(), 0);
assert_eq!(bo_messages5.len(), 0);

bo.conversations().sync_all_groups().await.unwrap();

let bo_messages1 = bo_group1
.find_messages(FfiListMessagesOptions::default())
.unwrap();
let bo_messages5 = bo_group5
.find_messages(FfiListMessagesOptions::default())
.unwrap();
assert_eq!(bo_messages1.len(), 1);
assert_eq!(bo_messages5.len(), 1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_send_message_when_out_of_sync() {
let alix = new_test_client().await;
Expand Down Expand Up @@ -2541,6 +2666,60 @@ mod tests {
assert_eq!(alix_members.len(), 4);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_removed_members_no_longer_update() {
let alix = new_test_client().await;
let bo = new_test_client().await;

let alix_group = alix
.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

bo.conversations().sync().await.unwrap();
let bo_group = bo.group(alix_group.id()).unwrap();

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
assert_eq!(alix_members.len(), 2);

bo_group.sync().await.unwrap();
let bo_members = bo_group.list_members().unwrap();
assert_eq!(bo_members.len(), 2);

let bo_messages = bo_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
assert_eq!(bo_messages.len(), 0);

alix_group
.remove_members(vec![bo.account_address.clone()])
.await
.unwrap();

alix_group.send("hello".as_bytes().to_vec()).await.unwrap();

bo_group.sync().await.unwrap();
assert!(!bo_group.is_active().unwrap());

let bo_messages = bo_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
assert!(bo_messages.first().unwrap().kind == FfiGroupMessageKind::MembershipChange);
assert_eq!(bo_messages.len(), 1);

let bo_members = bo_group.list_members().unwrap();
assert_eq!(bo_members.len(), 1);

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
assert_eq!(alix_members.len(), 1);
}

// test is also showing intermittent failures with database locked msg
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
Expand Down Expand Up @@ -3268,4 +3447,44 @@ mod tests {

assert!(results_4.is_ok());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_revoke_all_installations() {
let wallet = xmtp_cryptography::utils::LocalWallet::new(&mut rng());
let client_1 = new_test_client_with_wallet(wallet.clone()).await;
let client_2 = new_test_client_with_wallet(wallet.clone()).await;

let client_1_state = client_1.inbox_state(true).await.unwrap();
let client_2_state = client_2.inbox_state(true).await.unwrap();
assert_eq!(client_1_state.installation_ids.len(), 2);
assert_eq!(client_2_state.installation_ids.len(), 2);

let signature_request = client_1.revoke_all_other_installations().await.unwrap();
sign_with_wallet(&wallet, &signature_request).await;
client_1
.apply_signature_request(signature_request)
.await
.unwrap();

let client_1_state_after_revoke = client_1.inbox_state(true).await.unwrap();
let client_2_state_after_revoke = client_2.inbox_state(true).await.unwrap();
assert_eq!(client_1_state_after_revoke.installation_ids.len(), 1);
assert_eq!(client_2_state_after_revoke.installation_ids.len(), 1);
assert_eq!(
client_1_state_after_revoke
.installation_ids
.first()
.unwrap()
.clone(),
client_1.installation_id()
);
assert_eq!(
client_2_state_after_revoke
.installation_ids
.first()
.unwrap()
.clone(),
client_1.installation_id()
);
}
}
11 changes: 11 additions & 0 deletions bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ xmtp_cryptography = { workspace = true }
xmtp_id = { path = "../xmtp_id" }
xmtp_proto = { workspace = true, features = ["proto_full", "convert"] }
xmtp_v2 = { path = "../xmtp_v2" }
scoped-futures = "0.1"

# Test/Bench Utils
xmtp_api_grpc = { path = "../xmtp_api_grpc", optional = true }
Expand All @@ -83,7 +84,6 @@ xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
async-barrier = "1.1"
anyhow.workspace = true
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
scoped-futures = "0.1"

[[bench]]
name = "group_limit"
Expand Down
Loading

0 comments on commit cab4e30

Please sign in to comment.