Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share provider everywhere #1390

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub enum GenericError {
pub enum FfiSubscribeError {
#[error("Subscribe Error {0}")]
Subscribe(#[from] xmtp_mls::subscriptions::SubscribeError),
#[error("Storage error: {0}")]
Storage(#[from] xmtp_mls::storage::StorageError),
}

impl From<String> for GenericError {
Expand Down
69 changes: 48 additions & 21 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ impl FfiXmtpClient {

pub async fn find_inbox_id(&self, address: String) -> Result<Option<String>, GenericError> {
let inner = self.inner_client.as_ref();

let result = inner.find_inbox_id_from_address(address).await?;
let conn = self.inner_client.store().conn()?;
let result = inner.find_inbox_id_from_address(&conn, address).await?;
Ok(result)
}

Expand Down Expand Up @@ -881,8 +881,8 @@ impl FfiConversations {

pub async fn sync(&self) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;
inner.sync_welcomes(&conn).await?;
let provider = inner.mls_provider()?;
inner.sync_welcomes(&provider).await?;
Ok(())
}

Expand All @@ -898,12 +898,11 @@ impl FfiConversations {
consent_state: Option<FfiConsentState>,
) -> Result<u32, GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;

let provider = inner.mls_provider()?;
let consent: Option<ConsentState> = consent_state.map(|state| state.into());

let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn, consent).await?;

let num_groups_synced: usize = inner
.sync_all_welcomes_and_groups(&provider, consent)
.await?;
// Convert usize to u32 for compatibility with Uniffi
let num_groups_synced: u32 = num_groups_synced
.try_into()
Expand Down Expand Up @@ -1267,9 +1266,10 @@ impl FfiConversation {
&self,
envelope_bytes: Vec<u8>,
) -> Result<FfiMessage, FfiSubscribeError> {
let provider = self.inner.mls_provider()?;
let message = self
.inner
.process_streamed_group_message(envelope_bytes)
.process_streamed_group_message(&provider, envelope_bytes)
.await?;
let ffi_message = message.into();

Expand Down Expand Up @@ -1853,6 +1853,8 @@ mod tests {
conversations: Mutex<Vec<Arc<FfiConversation>>>,
consent_updates: Mutex<Vec<FfiConsent>>,
notify: Notify,
inbox_id: Option<String>,
installation_id: Option<String>,
}

impl RustStreamCallback {
Expand All @@ -1872,12 +1874,22 @@ mod tests {
.await?;
Ok(())
}

pub fn from_client(client: &FfiXmtpClient) -> Self {
RustStreamCallback {
inbox_id: Some(client.inner_client.inbox_id().to_string()),
installation_id: Some(hex::encode(client.inner_client.installation_public_key())),
..Default::default()
}
}
}

impl FfiMessageCallback for RustStreamCallback {
fn on_message(&self, message: FfiMessage) {
let mut messages = self.messages.lock().unwrap();
log::info!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"ON MESSAGE Received\n-------- \n{}\n----------",
String::from_utf8_lossy(&message.content)
);
Expand All @@ -1893,7 +1905,11 @@ mod tests {

impl FfiConversationCallback for RustStreamCallback {
fn on_conversation(&self, group: Arc<super::FfiConversation>) {
log::debug!("received conversation");
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received conversation"
);
let _ = self.num_messages.fetch_add(1, Ordering::SeqCst);
let mut convos = self.conversations.lock().unwrap();
convos.push(group);
Expand All @@ -1907,7 +1923,11 @@ mod tests {

impl FfiConsentCallback for RustStreamCallback {
fn on_consent_update(&self, mut consent: Vec<FfiConsent>) {
log::debug!("received consent update");
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received consent update"
);
let mut consent_updates = self.consent_updates.lock().unwrap();
consent_updates.append(&mut consent);
self.notify.notify_one();
Expand Down Expand Up @@ -2774,7 +2794,7 @@ mod tests {
let caro = new_test_client().await;

// Alix begins a stream for all messages
let message_callbacks = Arc::new(RustStreamCallback::default());
let message_callbacks = Arc::new(RustStreamCallback::from_client(&alix));
let stream_messages = alix
.conversations()
.stream_all_messages(message_callbacks.clone())
Expand Down Expand Up @@ -2821,12 +2841,12 @@ mod tests {
let bo2 = new_test_client_with_wallet(bo_wallet).await;

// Bo begins a stream for all messages
let bo_message_callbacks = Arc::new(RustStreamCallback::default());
let bo_stream_messages = bo2
let bo2_message_callbacks = Arc::new(RustStreamCallback::from_client(&bo2));
let bo2_stream_messages = bo2
.conversations()
.stream_all_messages(bo_message_callbacks.clone())
.stream_all_messages(bo2_message_callbacks.clone())
.await;
bo_stream_messages.wait_for_ready().await;
bo2_stream_messages.wait_for_ready().await;

alix_group.update_installations().await.unwrap();

Expand Down Expand Up @@ -3190,7 +3210,7 @@ mod tests {
let bo = new_test_client().await;
let caro = new_test_client().await;

let caro_conn = caro.inner_client.store().conn().unwrap();
let caro_provider = caro.inner_client.mls_provider().unwrap();

let alix_group = alix
.conversations()
Expand Down Expand Up @@ -3220,7 +3240,11 @@ mod tests {
)
.await
.unwrap();
let _ = caro.inner_client.sync_welcomes(&caro_conn).await.unwrap();
let _ = caro
.inner_client
.sync_welcomes(&caro_provider)
.await
.unwrap();

bo_group.send("second".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery(None).await.unwrap();
Expand All @@ -3239,7 +3263,7 @@ mod tests {
let amal = new_test_client().await;
let bola = new_test_client().await;

let bola_conn = bola.inner_client.store().conn().unwrap();
let bola_provider = bola.inner_client.mls_provider().unwrap();

let amal_group: Arc<FfiConversation> = amal
.conversations()
Expand All @@ -3250,7 +3274,10 @@ mod tests {
.await
.unwrap();

bola.inner_client.sync_welcomes(&bola_conn).await.unwrap();
bola.inner_client
.sync_welcomes(&bola_provider)
.await
.unwrap();
let bola_group = bola.conversation(amal_group.id()).unwrap();

let stream_callback = Arc::new(RustStreamCallback::default());
Expand Down
7 changes: 6 additions & 1 deletion bindings_node/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,14 @@ impl Client {

#[napi]
pub async fn find_inbox_id_by_address(&self, address: String) -> Result<Option<String>> {
let conn = self
.inner_client()
.store()
.conn()
.map_err(ErrorWrapper::from)?;
let inbox_id = self
.inner_client
.find_inbox_id_from_address(address)
.find_inbox_id_from_address(&conn, address)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
3 changes: 2 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ impl Conversation {
self.created_at_ns,
);
let envelope_bytes: Vec<u8> = envelope_bytes.deref().to_vec();
let provider = group.mls_provider().map_err(ErrorWrapper::from)?;
let message = group
.process_streamed_group_message(envelope_bytes)
.process_streamed_group_message(&provider, envelope_bytes)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
14 changes: 6 additions & 8 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,30 +235,28 @@ impl Conversations {

#[napi]
pub async fn sync(&self) -> Result<()> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(ErrorWrapper::from)?;
self
.inner_client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.map_err(ErrorWrapper::from)?;
Ok(())
}

#[napi]
pub async fn sync_all_conversations(&self) -> Result<usize> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(ErrorWrapper::from)?;

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn, None)
.sync_all_welcomes_and_groups(&provider, None)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
8 changes: 6 additions & 2 deletions bindings_wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use xmtp_api_http::XmtpHttpApiClient;
use xmtp_cryptography::signature::ed25519_public_key_to_address;
use xmtp_id::associations::builder::SignatureRequest;
use xmtp_mls::builder::ClientBuilder;
use xmtp_mls::groups::scoped_client::ScopedGroupClient;
use xmtp_mls::identity::IdentityStrategy;
use xmtp_mls::storage::{EncryptedMessageStore, EncryptionKey, StorageOption};
use xmtp_mls::Client as MlsClient;
Expand Down Expand Up @@ -273,9 +272,14 @@ impl Client {

#[wasm_bindgen(js_name = findInboxIdByAddress)]
pub async fn find_inbox_id_by_address(&self, address: String) -> Result<Option<String>, JsError> {
let conn = self
.inner_client
.store()
.conn()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;
let inbox_id = self
.inner_client
.find_inbox_id_from_address(address)
.find_inbox_id_from_address(&conn, address)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand Down
14 changes: 6 additions & 8 deletions bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,13 @@ impl Conversations {

#[wasm_bindgen]
pub async fn sync(&self) -> Result<(), JsError> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;
self
.inner_client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand All @@ -285,15 +284,14 @@ impl Conversations {

#[wasm_bindgen(js_name = syncAllConversations)]
pub async fn sync_all_conversations(&self) -> Result<usize, JsError> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn, None)
.sync_all_welcomes_and_groups(&provider, None)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand Down
17 changes: 8 additions & 9 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ async fn main() -> color_eyre::eyre::Result<()> {
}
Commands::ListGroups {} => {
info!("List Groups");
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.expect("failed to sync welcomes");

Expand Down Expand Up @@ -440,9 +440,8 @@ async fn main() -> color_eyre::eyre::Result<()> {
);
}
Commands::RequestHistorySync {} => {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.sync_welcomes(&provider).await.unwrap();
client.start_sync_worker();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
Expand All @@ -451,9 +450,9 @@ async fn main() -> color_eyre::eyre::Result<()> {
info!("Sent history sync request in sync group.")
}
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let group = client.get_sync_group(&conn)?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let group = client.get_sync_group(provider.conn_ref())?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
let messages = group
Expand Down Expand Up @@ -574,8 +573,8 @@ where
}

async fn get_group(client: &Client, group_id: Vec<u8>) -> Result<MlsGroup, CliError> {
let conn = client.store().conn().unwrap();
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&provider).await?;
let group = client.group(group_id)?;
group
.sync()
Expand Down
6 changes: 2 additions & 4 deletions xmtp_debug/src/app/generate/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
use color_eyre::eyre::{self, eyre, Result};
use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
use std::sync::Arc;
use xmtp_mls::XmtpOpenMlsProvider;

mod content_type;

Expand Down Expand Up @@ -118,10 +117,9 @@ impl GenerateMessages {
hex::encode(inbox_id)
))?;
let client = app::client_from_identity(&identity, &network).await?;
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let group = client.group(group.id.into())?;
let provider: XmtpOpenMlsProvider = conn.into();
group.maybe_update_installations(&provider, None).await?;
group.sync_with_conn(&provider).await?;
let words = rng.gen_range(0..*max_message_size);
Expand Down
4 changes: 2 additions & 2 deletions xmtp_debug/src/app/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl Send {
.ok_or(eyre!("No Identity with inbox_id [{}]", hex::encode(member)))?;

let client = crate::app::client_from_identity(&identity, network).await?;
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let xmtp_group = client.group(group.id.to_vec())?;
xmtp_group.send_message(data.as_bytes()).await?;
Ok(())
Expand Down
Loading
Loading