diff --git a/xmtp_mls/migrations/2023-10-29-205333_state_machine_init/up.sql b/xmtp_mls/migrations/2023-10-29-205333_state_machine_init/up.sql index 0f275483b..96cda90fc 100644 --- a/xmtp_mls/migrations/2023-10-29-205333_state_machine_init/up.sql +++ b/xmtp_mls/migrations/2023-10-29-205333_state_machine_init/up.sql @@ -35,8 +35,11 @@ CREATE INDEX group_messages_group_id_sort_idx ON group_messages(group_id, sent_a -- Used to keep track of the last seen message timestamp in a topic CREATE TABLE refresh_state ( + -- E.g. the Id of the group "entity_id" BLOB NOT NULL, + -- Welcomes or other types "entity_kind" INTEGER NOT NULL, -- Need to allow for groups and welcomes to be separated, since a malicious client could manipulate their group ID to match someone's installation_id and make a mess + -- Where you are in the topic "cursor" BIGINT NOT NULL, PRIMARY KEY (entity_id, entity_kind) diff --git a/xmtp_mls/src/groups/message_history.rs b/xmtp_mls/src/groups/message_history.rs new file mode 100644 index 000000000..4afdee61c --- /dev/null +++ b/xmtp_mls/src/groups/message_history.rs @@ -0,0 +1,82 @@ +use xmtp_proto::{ + api_client::XmtpMlsClient, + xmtp::mls::message_contents::plaintext_envelope::v2::MessageType::{ + MessageHistoryRequest as HistoryRequest, MessageHistoryResponse as HistoryResponse, + }, + xmtp::mls::message_contents::plaintext_envelope::{Content, V2}, + xmtp::mls::message_contents::PlaintextEnvelope, + xmtp::mls::message_contents::{MessageHistoryRequest, MessageHistoryResponse}, +}; + +use super::{GroupError, MlsGroup}; + +impl<'c, ApiClient> MlsGroup<'c, ApiClient> +where + ApiClient: XmtpMlsClient, +{ + #[allow(dead_code)] + pub(crate) async fn send_message_history_request(&self) -> Result<(), GroupError> { + let pin_code = "1234".to_string(); + let request_id = "abc123".to_string(); + let _request = PlaintextEnvelope { + content: Some(Content::V2(V2 { + idempotency_key: String::from("unique"), + message_type: Some(HistoryRequest(MessageHistoryRequest { + pin_code, + request_id, + })), + })), + }; + // TODO: Implement sending request to network + Ok(()) + } + + #[allow(dead_code)] + pub(crate) async fn send_message_history_response(&self) -> Result<(), GroupError> { + let backup_url = "https://example.com/uploads/long-id-123".to_string(); + let request_id = "abc123".to_string(); + let backup_file_hash = b"ABC123DEF456"; + let expiration_time_ns = 123; + let _request = PlaintextEnvelope { + content: Some(Content::V2(V2 { + idempotency_key: String::from("unique"), + message_type: Some(HistoryResponse(MessageHistoryResponse { + backup_url, + request_id, + backup_file_hash: backup_file_hash.into(), + expiration_time_ns, + })), + })), + }; + // TODO: Implement sending (responding) request to network + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use crate::assert_ok; + use crate::builder::ClientBuilder; + use xmtp_cryptography::utils::generate_local_wallet; + + #[tokio::test] + async fn test_send_mesage_history_request() { + let wallet = generate_local_wallet(); + let client = ClientBuilder::new_test_client(&wallet).await; + let group = client.create_group(None).expect("create group"); + + let result = group.send_message_history_request().await; + assert_ok!(result); + } + + #[tokio::test] + async fn test_send_mesage_history_response() { + let wallet = generate_local_wallet(); + let client = ClientBuilder::new_test_client(&wallet).await; + let group = client.create_group(None).expect("create group"); + + let result = group.send_message_history_response().await; + assert_ok!(result); + } +} diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index f10814fe7..adcdb5e90 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -2,6 +2,7 @@ pub mod group_metadata; mod group_permissions; mod intents; mod members; +mod message_history; mod subscriptions; mod sync; pub mod validated_commit; @@ -175,7 +176,6 @@ where } // Create a new group and save it to the DB - #[allow(clippy::unwrap_or_default)] pub fn create_and_insert( client: &'c Client, membership_state: GroupMembershipState, @@ -185,9 +185,7 @@ where let provider = XmtpOpenMlsProvider::new(&conn); let protected_metadata = build_protected_metadata_extension( &client.identity, - permissions - .unwrap_or(PreconfiguredPolicies::default()) - .to_policy_set(), + permissions.unwrap_or_default().to_policy_set(), )?; let group_config = build_group_config(protected_metadata)?; @@ -246,7 +244,7 @@ where Self::create_from_welcome(client, provider, welcome) } - fn add_idempotency_key(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope { + fn into_envelope(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope { PlaintextEnvelope { content: Some(Content::V1(V1 { content: encoded_msg.to_vec(), @@ -263,7 +261,7 @@ where .await?; let now = now_ns(); - let plain_envelope = Self::add_idempotency_key(message, &now.to_string()); + let plain_envelope = Self::into_envelope(message, &now.to_string()); let mut encoded_envelope = vec![]; plain_envelope .encode(&mut encoded_envelope) @@ -378,6 +376,7 @@ where self.sync_until_intent_resolved(conn, intent.id).await } + // Used in tests #[allow(dead_code)] pub(crate) async fn remove_members_by_installation_id( &self, diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index ce7a491b1..78b606b59 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -25,7 +25,7 @@ use xmtp_proto::{ }, GroupMessage, WelcomeMessageInput, }, - xmtp::mls::message_contents::plaintext_envelope::{Content, V1}, + xmtp::mls::message_contents::plaintext_envelope::{Content, V1, V2}, xmtp::mls::message_contents::GroupMembershipChanges, xmtp::mls::message_contents::PlaintextEnvelope, }; @@ -236,10 +236,17 @@ where conn.set_delivery_status_to_published(&message_id, envelope_timestamp_ns)?; } - Some(Content::V2(_)) => { + Some(Content::V2(V2 { + idempotency_key: _, + message_type, + })) => { + debug!( + "Send Message History Request with message_type {:#?}", + message_type + ); return Err(MessageProcessingError::Generic( "not yet implemented".into(), - )) + )); } None => return Err(MessageProcessingError::InvalidPayload), }; @@ -299,10 +306,17 @@ where } .store(provider.conn())? } - Some(Content::V2(_)) => { + Some(Content::V2(V2 { + idempotency_key: _, + message_type, + })) => { + debug!( + "Received Message History Request with message_type {:#?}", + message_type + ); return Err(MessageProcessingError::Generic( "not yet implemented".into(), - )) + )); } None => return Err(MessageProcessingError::InvalidPayload), } @@ -749,12 +763,10 @@ where } IdentityUpdate::RevokeInstallation(_) => { log::warn!("Revocation found. Not handled"); - None } IdentityUpdate::Invalid => { log::warn!("Invalid identity update found"); - None } }) diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index dbc19477e..fae5f7a03 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -51,7 +51,7 @@ impl StoredGroup { } } - /// Create a new [`Purpose::Sync`] group + /// Create a new [`Purpose::Sync`] group. This is less common and is used to sync message history. pub fn new_sync_group( id: ID, created_at_ns: i64, @@ -68,7 +68,7 @@ impl StoredGroup { } impl DbConnection<'_> { - /// Return regular [`Purpose::Conversation`] groups with additional filters + /// Return regular [`Purpose::Conversation`] groups with additional optional filters pub fn find_groups( &self, allowed_states: Option>, @@ -335,6 +335,8 @@ pub(crate) mod tests { // Sync groups SHOULD NOT be returned let synced_groups = conn.find_sync_groups().unwrap(); assert_eq!(synced_groups.len(), 0); + + // test that ONLY normal groups show up. }) }