Skip to content

Commit

Permalink
share provider with transactions & everywhere else
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Dec 6, 2024
1 parent e1178cc commit 6797962
Show file tree
Hide file tree
Showing 29 changed files with 668 additions and 506 deletions.
2 changes: 2 additions & 0 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub enum GenericError {
pub enum FfiSubscribeError {
#[error("Subscribe Error {0}")]
Subscribe(#[from] xmtp_mls::subscriptions::SubscribeError),
#[error("Storage error: {0}")]
Storage(#[from] xmtp_mls::storage::StorageError),
}

impl From<String> for GenericError {
Expand Down
69 changes: 48 additions & 21 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ impl FfiXmtpClient {

pub async fn find_inbox_id(&self, address: String) -> Result<Option<String>, GenericError> {
let inner = self.inner_client.as_ref();

let result = inner.find_inbox_id_from_address(address).await?;
let conn = self.inner_client.store().conn()?;
let result = inner.find_inbox_id_from_address(&conn, address).await?;
Ok(result)
}

Expand Down Expand Up @@ -881,8 +881,8 @@ impl FfiConversations {

pub async fn sync(&self) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;
inner.sync_welcomes(&conn).await?;
let provider = inner.mls_provider()?;
inner.sync_welcomes(&provider).await?;
Ok(())
}

Expand All @@ -898,12 +898,11 @@ impl FfiConversations {
consent_state: Option<FfiConsentState>,
) -> Result<u32, GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;

let provider = inner.mls_provider()?;
let consent: Option<ConsentState> = consent_state.map(|state| state.into());

let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn, consent).await?;

let num_groups_synced: usize = inner
.sync_all_welcomes_and_groups(&provider, consent)
.await?;
// Convert usize to u32 for compatibility with Uniffi
let num_groups_synced: u32 = num_groups_synced
.try_into()
Expand Down Expand Up @@ -1267,9 +1266,10 @@ impl FfiConversation {
&self,
envelope_bytes: Vec<u8>,
) -> Result<FfiMessage, FfiSubscribeError> {
let provider = self.inner.mls_provider()?;
let message = self
.inner
.process_streamed_group_message(envelope_bytes)
.process_streamed_group_message(&provider, envelope_bytes)
.await?;
let ffi_message = message.into();

Expand Down Expand Up @@ -1853,6 +1853,8 @@ mod tests {
conversations: Mutex<Vec<Arc<FfiConversation>>>,
consent_updates: Mutex<Vec<FfiConsent>>,
notify: Notify,
inbox_id: Option<String>,
installation_id: Option<String>,
}

impl RustStreamCallback {
Expand All @@ -1872,12 +1874,22 @@ mod tests {
.await?;
Ok(())
}

pub fn from_client(client: &FfiXmtpClient) -> Self {
RustStreamCallback {
inbox_id: Some(client.inner_client.inbox_id().to_string()),
installation_id: Some(hex::encode(client.inner_client.installation_public_key())),
..Default::default()
}
}
}

impl FfiMessageCallback for RustStreamCallback {
fn on_message(&self, message: FfiMessage) {
let mut messages = self.messages.lock().unwrap();
log::info!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"ON MESSAGE Received\n-------- \n{}\n----------",
String::from_utf8_lossy(&message.content)
);
Expand All @@ -1893,7 +1905,11 @@ mod tests {

impl FfiConversationCallback for RustStreamCallback {
fn on_conversation(&self, group: Arc<super::FfiConversation>) {
log::debug!("received conversation");
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received conversation"
);
let _ = self.num_messages.fetch_add(1, Ordering::SeqCst);
let mut convos = self.conversations.lock().unwrap();
convos.push(group);
Expand All @@ -1907,7 +1923,11 @@ mod tests {

impl FfiConsentCallback for RustStreamCallback {
fn on_consent_update(&self, mut consent: Vec<FfiConsent>) {
log::debug!("received consent update");
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received consent update"
);
let mut consent_updates = self.consent_updates.lock().unwrap();
consent_updates.append(&mut consent);
self.notify.notify_one();
Expand Down Expand Up @@ -2774,7 +2794,7 @@ mod tests {
let caro = new_test_client().await;

// Alix begins a stream for all messages
let message_callbacks = Arc::new(RustStreamCallback::default());
let message_callbacks = Arc::new(RustStreamCallback::from_client(&alix));
let stream_messages = alix
.conversations()
.stream_all_messages(message_callbacks.clone())
Expand Down Expand Up @@ -2821,12 +2841,12 @@ mod tests {
let bo2 = new_test_client_with_wallet(bo_wallet).await;

// Bo begins a stream for all messages
let bo_message_callbacks = Arc::new(RustStreamCallback::default());
let bo_stream_messages = bo2
let bo2_message_callbacks = Arc::new(RustStreamCallback::from_client(&bo2));
let bo2_stream_messages = bo2
.conversations()
.stream_all_messages(bo_message_callbacks.clone())
.stream_all_messages(bo2_message_callbacks.clone())
.await;
bo_stream_messages.wait_for_ready().await;
bo2_stream_messages.wait_for_ready().await;

alix_group.update_installations().await.unwrap();

Expand Down Expand Up @@ -3190,7 +3210,7 @@ mod tests {
let bo = new_test_client().await;
let caro = new_test_client().await;

let caro_conn = caro.inner_client.store().conn().unwrap();
let caro_provider = caro.inner_client.mls_provider().unwrap();

let alix_group = alix
.conversations()
Expand Down Expand Up @@ -3220,7 +3240,11 @@ mod tests {
)
.await
.unwrap();
let _ = caro.inner_client.sync_welcomes(&caro_conn).await.unwrap();
let _ = caro
.inner_client
.sync_welcomes(&caro_provider)
.await
.unwrap();

bo_group.send("second".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery(None).await.unwrap();
Expand All @@ -3239,7 +3263,7 @@ mod tests {
let amal = new_test_client().await;
let bola = new_test_client().await;

let bola_conn = bola.inner_client.store().conn().unwrap();
let bola_provider = bola.inner_client.mls_provider().unwrap();

let amal_group: Arc<FfiConversation> = amal
.conversations()
Expand All @@ -3250,7 +3274,10 @@ mod tests {
.await
.unwrap();

bola.inner_client.sync_welcomes(&bola_conn).await.unwrap();
bola.inner_client
.sync_welcomes(&bola_provider)
.await
.unwrap();
let bola_group = bola.conversation(amal_group.id()).unwrap();

let stream_callback = Arc::new(RustStreamCallback::default());
Expand Down
7 changes: 6 additions & 1 deletion bindings_node/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,14 @@ impl Client {

#[napi]
pub async fn find_inbox_id_by_address(&self, address: String) -> Result<Option<String>> {
let conn = self
.inner_client()
.store()
.conn()
.map_err(ErrorWrapper::from)?;
let inbox_id = self
.inner_client
.find_inbox_id_from_address(address)
.find_inbox_id_from_address(&conn, address)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
3 changes: 2 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ impl Conversation {
self.created_at_ns,
);
let envelope_bytes: Vec<u8> = envelope_bytes.deref().to_vec();
let provider = group.mls_provider().map_err(ErrorWrapper::from)?;
let message = group
.process_streamed_group_message(envelope_bytes)
.process_streamed_group_message(&provider, envelope_bytes)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
14 changes: 6 additions & 8 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,30 +235,28 @@ impl Conversations {

#[napi]
pub async fn sync(&self) -> Result<()> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(ErrorWrapper::from)?;
self
.inner_client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.map_err(ErrorWrapper::from)?;
Ok(())
}

#[napi]
pub async fn sync_all_conversations(&self) -> Result<usize> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(ErrorWrapper::from)?;

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn, None)
.sync_all_welcomes_and_groups(&provider, None)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
8 changes: 6 additions & 2 deletions bindings_wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use xmtp_api_http::XmtpHttpApiClient;
use xmtp_cryptography::signature::ed25519_public_key_to_address;
use xmtp_id::associations::builder::SignatureRequest;
use xmtp_mls::builder::ClientBuilder;
use xmtp_mls::groups::scoped_client::ScopedGroupClient;
use xmtp_mls::identity::IdentityStrategy;
use xmtp_mls::storage::{EncryptedMessageStore, EncryptionKey, StorageOption};
use xmtp_mls::Client as MlsClient;
Expand Down Expand Up @@ -273,9 +272,14 @@ impl Client {

#[wasm_bindgen(js_name = findInboxIdByAddress)]
pub async fn find_inbox_id_by_address(&self, address: String) -> Result<Option<String>, JsError> {
let conn = self
.inner_client
.store()
.conn()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;
let inbox_id = self
.inner_client
.find_inbox_id_from_address(address)
.find_inbox_id_from_address(&conn, address)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand Down
14 changes: 6 additions & 8 deletions bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,13 @@ impl Conversations {

#[wasm_bindgen]
pub async fn sync(&self) -> Result<(), JsError> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;
self
.inner_client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand All @@ -285,15 +284,14 @@ impl Conversations {

#[wasm_bindgen(js_name = syncAllConversations)]
pub async fn sync_all_conversations(&self) -> Result<usize, JsError> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn, None)
.sync_all_welcomes_and_groups(&provider, None)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand Down
17 changes: 8 additions & 9 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ async fn main() -> color_eyre::eyre::Result<()> {
}
Commands::ListGroups {} => {
info!("List Groups");
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.expect("failed to sync welcomes");

Expand Down Expand Up @@ -440,9 +440,8 @@ async fn main() -> color_eyre::eyre::Result<()> {
);
}
Commands::RequestHistorySync {} => {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.sync_welcomes(&provider).await.unwrap();
client.start_sync_worker();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
Expand All @@ -451,9 +450,9 @@ async fn main() -> color_eyre::eyre::Result<()> {
info!("Sent history sync request in sync group.")
}
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let group = client.get_sync_group(&conn)?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let group = client.get_sync_group(provider.conn_ref())?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
let messages = group
Expand Down Expand Up @@ -574,8 +573,8 @@ where
}

async fn get_group(client: &Client, group_id: Vec<u8>) -> Result<MlsGroup, CliError> {
let conn = client.store().conn().unwrap();
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&provider).await?;
let group = client.group(group_id)?;
group
.sync()
Expand Down
6 changes: 2 additions & 4 deletions xmtp_debug/src/app/generate/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
use color_eyre::eyre::{self, eyre, Result};
use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
use std::sync::Arc;
use xmtp_mls::XmtpOpenMlsProvider;

mod content_type;

Expand Down Expand Up @@ -118,10 +117,9 @@ impl GenerateMessages {
hex::encode(inbox_id)
))?;
let client = app::client_from_identity(&identity, &network).await?;
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let group = client.group(group.id.into())?;
let provider: XmtpOpenMlsProvider = conn.into();
group.maybe_update_installations(&provider, None).await?;
group.sync_with_conn(&provider).await?;
let words = rng.gen_range(0..*max_message_size);
Expand Down
4 changes: 2 additions & 2 deletions xmtp_debug/src/app/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl Send {
.ok_or(eyre!("No Identity with inbox_id [{}]", hex::encode(member)))?;

let client = crate::app::client_from_identity(&identity, network).await?;
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let xmtp_group = client.group(group.id.to_vec())?;
xmtp_group.send_message(data.as_bytes()).await?;
Ok(())
Expand Down
Loading

0 comments on commit 6797962

Please sign in to comment.