Skip to content

Commit

Permalink
Move staged commit to intents
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 23, 2024
1 parent c4d3a6d commit 0e1c68f
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 72 deletions.
10 changes: 9 additions & 1 deletion bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2465,13 +2465,14 @@ mod tests {
let alix_group = alix.group(group.id()).unwrap();
let bo_group = bo.group(group.id()).unwrap();
let caro_group = caro.group(group.id()).unwrap();

log::info!("Alix sending first message");
// Alix sends a message in the group
alix_group
.send("First message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Caro sending second message");
// Caro sends a message in the group
caro_group
.send("Second message".as_bytes().to_vec())
Expand All @@ -2489,6 +2490,7 @@ mod tests {
.await;
bo_stream_messages.wait_for_ready().await;

log::info!("Alix sending third message after Bo's second installation added");
// Alix sends a message to the group
alix_group
.send("Third message".as_bytes().to_vec())
Expand All @@ -2499,21 +2501,27 @@ mod tests {
bo2.conversations().sync().await.unwrap();
let bo2_group = bo2.group(group.id()).unwrap();

log::info!("Bo sending fourth message");
// Bo sends a message to the group
bo2_group
.send("Fourth message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Caro sending fifth message");
// Caro sends a message in the group
caro_group
.send("Fifth message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Syncing alix");
alix_group.sync().await.unwrap();
log::info!("Syncing bo 1");
bo_group.sync().await.unwrap();
log::info!("Syncing bo 2");
bo2_group.sync().await.unwrap();
log::info!("Syncing caro");
caro_group.sync().await.unwrap();

// Get the message count for all the clients
Expand Down
2 changes: 1 addition & 1 deletion dev/docker/compose
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
set -eou pipefail

docker-compose -f dev/docker/docker-compose.yml -p "libxmtp" "$@"
docker compose -f dev/docker/docker-compose.yml -p "libxmtp" "$@"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file should undo anything in `up.sql`
ALTER TABLE group_intents
DROP COLUMN staged_commit;

ALTER TABLE group_intents
DROP COLUMN published_in_epoch;

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Your SQL goes here
ALTER TABLE group_intents
ADD COLUMN staged_commit BLOB;

ALTER TABLE group_intents
ADD COLUMN published_in_epoch BIGINT;

7 changes: 2 additions & 5 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ pub enum MessageProcessingError {
Identity(#[from] IdentityError),
#[error("openmls process message error: {0}")]
OpenMlsProcessMessage(#[from] openmls::prelude::ProcessMessageError),
#[error("merge pending commit: {0}")]
MergePendingCommit(
#[from] openmls::group::MergePendingCommitError<sql_key_store::SqlKeyStoreError>,
),
#[error("merge staged commit: {0}")]
MergeStagedCommit(#[from] openmls::group::MergeCommitError<sql_key_store::SqlKeyStoreError>),
#[error(
Expand Down Expand Up @@ -178,6 +174,8 @@ pub enum MessageProcessingError {
Group(#[from] Box<GroupError>),
#[error("generic:{0}")]
Generic(String),
#[error("intent is missing staged_commit field")]
IntentMissingStagedCommit,
}

impl crate::retry::RetryableError for MessageProcessingError {
Expand All @@ -186,7 +184,6 @@ impl crate::retry::RetryableError for MessageProcessingError {
Self::Group(group_error) => retryable!(group_error),
Self::Identity(identity_error) => retryable!(identity_error),
Self::OpenMlsProcessMessage(err) => retryable!(err),
Self::MergePendingCommit(err) => retryable!(err),
Self::MergeStagedCommit(err) => retryable!(err),
Self::Diesel(diesel_error) => retryable!(diesel_error),
Self::Storage(s) => retryable!(s),
Expand Down
39 changes: 35 additions & 4 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ pub enum GroupError {
MissingMetadataField { name: String },
#[error("Message was processed but is missing")]
MissingMessage,
#[error("sql key store error: {0}")]
SqlKeyStore(#[from] sql_key_store::SqlKeyStoreError),
#[error("No pending commit found")]
MissingPendingCommit,
}

impl RetryableError for GroupError {
Expand Down Expand Up @@ -927,10 +931,14 @@ impl MlsGroup {
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;
let intent = NewGroupIntent::new(IntentKind::KeyUpdate, self.group_id.clone(), vec![]);
intent.store(&conn)?;
let intent = conn.insert_group_intent(NewGroupIntent::new(
IntentKind::KeyUpdate,
self.group_id.clone(),
vec![],
))?;

self.sync_with_conn(&conn.into(), client).await
self.sync_until_intent_resolved(&conn.into(), intent.id, client)
.await
}

pub fn is_active(&self, provider: impl OpenMlsProvider) -> Result<bool, GroupError> {
Expand Down Expand Up @@ -1529,7 +1537,11 @@ mod tests {
.conn_ref()
.find_group_intents(
amal_group.group_id.clone(),
Some(vec![IntentState::ToPublish, IntentState::Published]),
Some(vec![
IntentState::ToPublish,
IntentState::Published,
IntentState::Error,
]),
None,
)
.unwrap();
Expand All @@ -1545,6 +1557,25 @@ mod tests {
.unwrap();
// Bola's attempted add should be deleted, since it will have been a no-op on the second try
assert_eq!(bola_failed_intents.len(), 0);

// Make sure sending and receiving both worked
amal_group
.send_message("hello from amal".as_bytes(), &amal)
.await
.unwrap();
bola_group
.send_message("hello from bola".as_bytes(), &bola)
.await
.unwrap();

let bola_messages = bola_group
.find_messages(None, None, None, None, None)
.unwrap();
let matching_message = bola_messages
.iter()
.find(|m| m.decrypted_message_bytes == "hello from amal".as_bytes());
log::info!("found message: {:?}", bola_messages);
assert!(matching_message.is_some());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down
3 changes: 2 additions & 1 deletion xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl MlsGroup {
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
self.sync(&client).await?;
self.sync_with_conn(&client.mls_provider()?, &client)
.await?;
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
}
Expand Down
Loading

0 comments on commit 0e1c68f

Please sign in to comment.