Skip to content

Commit

Permalink
only queue intent inside of a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Dec 5, 2024
1 parent df1583f commit 1c5c181
Show file tree
Hide file tree
Showing 25 changed files with 473 additions and 275 deletions.
2 changes: 2 additions & 0 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub enum GenericError {
pub enum FfiSubscribeError {
#[error("Subscribe Error {0}")]
Subscribe(#[from] xmtp_mls::subscriptions::SubscribeError),
#[error("Storage error: {0}")]
Storage(#[from] xmtp_mls::storage::StorageError),
}

impl From<String> for GenericError {
Expand Down
60 changes: 44 additions & 16 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,8 +881,8 @@ impl FfiConversations {

pub async fn sync(&self) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;
inner.sync_welcomes(&conn).await?;
let provider = inner.mls_provider()?;
inner.sync_welcomes(&provider).await?;
Ok(())
}

Expand All @@ -895,9 +895,9 @@ impl FfiConversations {

pub async fn sync_all_conversations(&self) -> Result<u32, GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;
let provider = inner.mls_provider()?;

let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn).await?;
let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&provider).await?;

// Convert usize to u32 for compatibility with Uniffi
let num_groups_synced: u32 = num_groups_synced
Expand Down Expand Up @@ -1262,9 +1262,10 @@ impl FfiConversation {
&self,
envelope_bytes: Vec<u8>,
) -> Result<FfiMessage, FfiSubscribeError> {
let provider = self.inner.mls_provider()?;
let message = self
.inner
.process_streamed_group_message(envelope_bytes)
.process_streamed_group_message(&provider, envelope_bytes)
.await?;
let ffi_message = message.into();

Expand Down Expand Up @@ -1848,6 +1849,8 @@ mod tests {
conversations: Mutex<Vec<Arc<FfiConversation>>>,
consent_updates: Mutex<Vec<FfiConsent>>,
notify: Notify,
inbox_id: Option<String>,
installation_id: Option<String>,
}

impl RustStreamCallback {
Expand All @@ -1867,12 +1870,22 @@ mod tests {
.await?;
Ok(())
}

pub fn from_client(client: &FfiXmtpClient) -> Self {
RustStreamCallback {
inbox_id: Some(client.inner_client.inbox_id().to_string()),
installation_id: Some(hex::encode(client.inner_client.installation_public_key())),
..Default::default()
}
}
}

impl FfiMessageCallback for RustStreamCallback {
fn on_message(&self, message: FfiMessage) {
let mut messages = self.messages.lock().unwrap();
log::info!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"ON MESSAGE Received\n-------- \n{}\n----------",
String::from_utf8_lossy(&message.content)
);
Expand All @@ -1888,7 +1901,11 @@ mod tests {

impl FfiConversationCallback for RustStreamCallback {
fn on_conversation(&self, group: Arc<super::FfiConversation>) {
log::debug!("received conversation");
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received conversation"
);
let _ = self.num_messages.fetch_add(1, Ordering::SeqCst);
let mut convos = self.conversations.lock().unwrap();
convos.push(group);
Expand All @@ -1902,7 +1919,11 @@ mod tests {

impl FfiConsentCallback for RustStreamCallback {
fn on_consent_update(&self, mut consent: Vec<FfiConsent>) {
log::debug!("received consent update");
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received consent update"
);
let mut consent_updates = self.consent_updates.lock().unwrap();
consent_updates.append(&mut consent);
self.notify.notify_one();
Expand Down Expand Up @@ -2751,7 +2772,7 @@ mod tests {
let caro = new_test_client().await;

// Alix begins a stream for all messages
let message_callbacks = Arc::new(RustStreamCallback::default());
let message_callbacks = Arc::new(RustStreamCallback::from_client(&alix));
let stream_messages = alix
.conversations()
.stream_all_messages(message_callbacks.clone())
Expand Down Expand Up @@ -2798,12 +2819,12 @@ mod tests {
let bo2 = new_test_client_with_wallet(bo_wallet).await;

// Bo begins a stream for all messages
let bo_message_callbacks = Arc::new(RustStreamCallback::default());
let bo_stream_messages = bo2
let bo2_message_callbacks = Arc::new(RustStreamCallback::from_client(&bo2));
let bo2_stream_messages = bo2
.conversations()
.stream_all_messages(bo_message_callbacks.clone())
.stream_all_messages(bo2_message_callbacks.clone())
.await;
bo_stream_messages.wait_for_ready().await;
bo2_stream_messages.wait_for_ready().await;

alix_group.update_installations().await.unwrap();

Expand Down Expand Up @@ -3167,7 +3188,7 @@ mod tests {
let bo = new_test_client().await;
let caro = new_test_client().await;

let caro_conn = caro.inner_client.store().conn().unwrap();
let caro_provider = caro.inner_client.mls_provider().unwrap();

let alix_group = alix
.conversations()
Expand Down Expand Up @@ -3197,7 +3218,11 @@ mod tests {
)
.await
.unwrap();
let _ = caro.inner_client.sync_welcomes(&caro_conn).await.unwrap();
let _ = caro
.inner_client
.sync_welcomes(&caro_provider)
.await
.unwrap();

bo_group.send("second".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery(None).await.unwrap();
Expand All @@ -3216,7 +3241,7 @@ mod tests {
let amal = new_test_client().await;
let bola = new_test_client().await;

let bola_conn = bola.inner_client.store().conn().unwrap();
let bola_provider = bola.inner_client.mls_provider().unwrap();

let amal_group: Arc<FfiConversation> = amal
.conversations()
Expand All @@ -3227,7 +3252,10 @@ mod tests {
.await
.unwrap();

bola.inner_client.sync_welcomes(&bola_conn).await.unwrap();
bola.inner_client
.sync_welcomes(&bola_provider)
.await
.unwrap();
let bola_group = bola.conversation(amal_group.id()).unwrap();

let stream_callback = Arc::new(RustStreamCallback::default());
Expand Down
3 changes: 2 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ impl Conversation {
self.created_at_ns,
);
let envelope_bytes: Vec<u8> = envelope_bytes.deref().to_vec();
let provider = group.mls_provider().map_err(ErrorWrapper::from)?;
let message = group
.process_streamed_group_message(envelope_bytes)
.process_streamed_group_message(&provider, envelope_bytes)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
14 changes: 6 additions & 8 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,30 +235,28 @@ impl Conversations {

#[napi]
pub async fn sync(&self) -> Result<()> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(ErrorWrapper::from)?;
self
.inner_client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.map_err(ErrorWrapper::from)?;
Ok(())
}

#[napi]
pub async fn sync_all_conversations(&self) -> Result<usize> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(ErrorWrapper::from)?;

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn)
.sync_all_welcomes_and_groups(&provider)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
14 changes: 6 additions & 8 deletions bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,13 @@ impl Conversations {

#[wasm_bindgen]
pub async fn sync(&self) -> Result<(), JsError> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;
self
.inner_client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand All @@ -285,15 +284,14 @@ impl Conversations {

#[wasm_bindgen(js_name = syncAllConversations)]
pub async fn sync_all_conversations(&self) -> Result<usize, JsError> {
let conn = self
let provider = self
.inner_client
.store()
.conn()
.mls_provider()
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn)
.sync_all_welcomes_and_groups(&provider)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand Down
17 changes: 8 additions & 9 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ async fn main() -> color_eyre::eyre::Result<()> {
}
Commands::ListGroups {} => {
info!("List Groups");
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client
.sync_welcomes(&conn)
.sync_welcomes(&provider)
.await
.expect("failed to sync welcomes");

Expand Down Expand Up @@ -440,9 +440,8 @@ async fn main() -> color_eyre::eyre::Result<()> {
);
}
Commands::RequestHistorySync {} => {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.sync_welcomes(&provider).await.unwrap();
client.start_sync_worker();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
Expand All @@ -451,9 +450,9 @@ async fn main() -> color_eyre::eyre::Result<()> {
info!("Sent history sync request in sync group.")
}
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let group = client.get_sync_group(&conn)?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let group = client.get_sync_group(provider.conn_ref())?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
let messages = group
Expand Down Expand Up @@ -574,8 +573,8 @@ where
}

async fn get_group(client: &Client, group_id: Vec<u8>) -> Result<MlsGroup, CliError> {
let conn = client.store().conn().unwrap();
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&provider).await?;
let group = client.group(group_id)?;
group
.sync()
Expand Down
6 changes: 2 additions & 4 deletions xmtp_debug/src/app/generate/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
use color_eyre::eyre::{self, eyre, Result};
use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
use std::sync::Arc;
use xmtp_mls::XmtpOpenMlsProvider;

mod content_type;

Expand Down Expand Up @@ -118,10 +117,9 @@ impl GenerateMessages {
hex::encode(inbox_id)
))?;
let client = app::client_from_identity(&identity, &network).await?;
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let group = client.group(group.id.into())?;
let provider: XmtpOpenMlsProvider = conn.into();
group.maybe_update_installations(&provider, None).await?;
group.sync_with_conn(&provider).await?;
let words = rng.gen_range(0..*max_message_size);
Expand Down
4 changes: 2 additions & 2 deletions xmtp_debug/src/app/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl Send {
.ok_or(eyre!("No Identity with inbox_id [{}]", hex::encode(member)))?;

let client = crate::app::client_from_identity(&identity, network).await?;
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let provider = client.mls_provider()?;
client.sync_welcomes(&provider).await?;
let xmtp_group = client.group(group.id.to_vec())?;
xmtp_group.send_message(data.as_bytes()).await?;
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ where
}

#[tracing::instrument(level = "trace", skip_all)]
pub async fn query_welcome_messages(
pub async fn query_welcome_messages<Id: AsRef<[u8]> + Copy>(
&self,
installation_id: &[u8],
installation_id: Id,
id_cursor: Option<u64>,
) -> Result<Vec<WelcomeMessage>, ApiError> {
tracing::debug!(
Expand All @@ -135,7 +135,7 @@ where
(async {
self.api_client
.query_welcome_messages(QueryWelcomeMessagesRequest {
installation_key: installation_id.to_vec(),
installation_key: installation_id.as_ref().to_vec(),
paging_info: Some(PagingInfo {
id_cursor: id_cursor.unwrap_or(0),
limit: page_size,
Expand Down Expand Up @@ -297,7 +297,7 @@ where

pub async fn subscribe_welcome_messages(
&self,
installation_key: Vec<u8>,
installation_key: &[u8],
id_cursor: Option<u64>,
) -> Result<impl futures::Stream<Item = Result<WelcomeMessage, ApiError>> + '_, ApiError>
where
Expand All @@ -307,7 +307,7 @@ where
self.api_client
.subscribe_welcome_messages(SubscribeWelcomeMessagesRequest {
filters: vec![WelcomeFilterProto {
installation_key,
installation_key: installation_key.to_vec(),
id_cursor: id_cursor.unwrap_or(0),
}],
})
Expand Down
11 changes: 6 additions & 5 deletions xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,16 @@ where
.take()
.ok_or(ClientBuilderError::MissingParameter { parameter: "store" })?;

debug!(
inbox_id = identity_strategy.inbox_id(),
"Initializing identity"
);

let identity = identity_strategy
.initialize_identity(&api_client_wrapper, &store, &scw_verifier)
.await?;

debug!(
inbox_id = identity.inbox_id(),
installation_id = hex::encode(identity.installation_keys.public_bytes()),
"Initialized identity"
);

// get sequence_id from identity updates and loaded into the DB
load_identity_updates(
&api_client_wrapper,
Expand Down
Loading

0 comments on commit 1c5c181

Please sign in to comment.