Skip to content

Commit

Permalink
Message History 3/N (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuddman authored Apr 13, 2024
1 parent 1331100 commit 8cf15ef
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ CREATE INDEX group_messages_group_id_sort_idx ON group_messages(group_id, sent_a

-- Used to keep track of the last seen message timestamp in a topic
CREATE TABLE refresh_state (
-- E.g. the Id of the group
"entity_id" BLOB NOT NULL,
-- Welcomes or other types
"entity_kind" INTEGER NOT NULL, -- Need to allow for groups and welcomes to be separated, since a malicious client could manipulate their group ID to match someone's installation_id and make a mess
-- Where you are in the topic
"cursor" BIGINT NOT NULL,

PRIMARY KEY (entity_id, entity_kind)
Expand Down
82 changes: 82 additions & 0 deletions xmtp_mls/src/groups/message_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use xmtp_proto::{
api_client::XmtpMlsClient,
xmtp::mls::message_contents::plaintext_envelope::v2::MessageType::{
MessageHistoryRequest as HistoryRequest, MessageHistoryResponse as HistoryResponse,
},
xmtp::mls::message_contents::plaintext_envelope::{Content, V2},
xmtp::mls::message_contents::PlaintextEnvelope,
xmtp::mls::message_contents::{MessageHistoryRequest, MessageHistoryResponse},
};

use super::{GroupError, MlsGroup};

impl<'c, ApiClient> MlsGroup<'c, ApiClient>
where
ApiClient: XmtpMlsClient,
{
#[allow(dead_code)]
pub(crate) async fn send_message_history_request(&self) -> Result<(), GroupError> {
let pin_code = "1234".to_string();
let request_id = "abc123".to_string();
let _request = PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: String::from("unique"),
message_type: Some(HistoryRequest(MessageHistoryRequest {
pin_code,
request_id,
})),
})),
};
// TODO: Implement sending request to network
Ok(())
}

#[allow(dead_code)]
pub(crate) async fn send_message_history_response(&self) -> Result<(), GroupError> {
let backup_url = "https://example.com/uploads/long-id-123".to_string();
let request_id = "abc123".to_string();
let backup_file_hash = b"ABC123DEF456";
let expiration_time_ns = 123;
let _request = PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: String::from("unique"),
message_type: Some(HistoryResponse(MessageHistoryResponse {
backup_url,
request_id,
backup_file_hash: backup_file_hash.into(),
expiration_time_ns,
})),
})),
};
// TODO: Implement sending (responding) request to network
Ok(())
}
}

#[cfg(test)]
mod tests {

use crate::assert_ok;
use crate::builder::ClientBuilder;
use xmtp_cryptography::utils::generate_local_wallet;

#[tokio::test]
async fn test_send_mesage_history_request() {
let wallet = generate_local_wallet();
let client = ClientBuilder::new_test_client(&wallet).await;
let group = client.create_group(None).expect("create group");

let result = group.send_message_history_request().await;
assert_ok!(result);
}

#[tokio::test]
async fn test_send_mesage_history_response() {
let wallet = generate_local_wallet();
let client = ClientBuilder::new_test_client(&wallet).await;
let group = client.create_group(None).expect("create group");

let result = group.send_message_history_response().await;
assert_ok!(result);
}
}
11 changes: 5 additions & 6 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod group_metadata;
mod group_permissions;
mod intents;
mod members;
mod message_history;
mod subscriptions;
mod sync;
pub mod validated_commit;
Expand Down Expand Up @@ -175,7 +176,6 @@ where
}

// Create a new group and save it to the DB
#[allow(clippy::unwrap_or_default)]
pub fn create_and_insert(
client: &'c Client<ApiClient>,
membership_state: GroupMembershipState,
Expand All @@ -185,9 +185,7 @@ where
let provider = XmtpOpenMlsProvider::new(&conn);
let protected_metadata = build_protected_metadata_extension(
&client.identity,
permissions
.unwrap_or(PreconfiguredPolicies::default())
.to_policy_set(),
permissions.unwrap_or_default().to_policy_set(),
)?;
let group_config = build_group_config(protected_metadata)?;

Expand Down Expand Up @@ -246,7 +244,7 @@ where
Self::create_from_welcome(client, provider, welcome)
}

fn add_idempotency_key(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope {
fn into_envelope(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope {
PlaintextEnvelope {
content: Some(Content::V1(V1 {
content: encoded_msg.to_vec(),
Expand All @@ -263,7 +261,7 @@ where
.await?;

let now = now_ns();
let plain_envelope = Self::add_idempotency_key(message, &now.to_string());
let plain_envelope = Self::into_envelope(message, &now.to_string());
let mut encoded_envelope = vec![];
plain_envelope
.encode(&mut encoded_envelope)
Expand Down Expand Up @@ -378,6 +376,7 @@ where
self.sync_until_intent_resolved(conn, intent.id).await
}

// Used in tests
#[allow(dead_code)]
pub(crate) async fn remove_members_by_installation_id(
&self,
Expand Down
26 changes: 19 additions & 7 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use xmtp_proto::{
},
GroupMessage, WelcomeMessageInput,
},
xmtp::mls::message_contents::plaintext_envelope::{Content, V1},
xmtp::mls::message_contents::plaintext_envelope::{Content, V1, V2},
xmtp::mls::message_contents::GroupMembershipChanges,
xmtp::mls::message_contents::PlaintextEnvelope,
};
Expand Down Expand Up @@ -236,10 +236,17 @@ where

conn.set_delivery_status_to_published(&message_id, envelope_timestamp_ns)?;
}
Some(Content::V2(_)) => {
Some(Content::V2(V2 {
idempotency_key: _,
message_type,
})) => {
debug!(
"Send Message History Request with message_type {:#?}",
message_type
);
return Err(MessageProcessingError::Generic(
"not yet implemented".into(),
))
));
}
None => return Err(MessageProcessingError::InvalidPayload),
};
Expand Down Expand Up @@ -299,10 +306,17 @@ where
}
.store(provider.conn())?
}
Some(Content::V2(_)) => {
Some(Content::V2(V2 {
idempotency_key: _,
message_type,
})) => {
debug!(
"Received Message History Request with message_type {:#?}",
message_type
);
return Err(MessageProcessingError::Generic(
"not yet implemented".into(),
))
));
}
None => return Err(MessageProcessingError::InvalidPayload),
}
Expand Down Expand Up @@ -749,12 +763,10 @@ where
}
IdentityUpdate::RevokeInstallation(_) => {
log::warn!("Revocation found. Not handled");

None
}
IdentityUpdate::Invalid => {
log::warn!("Invalid identity update found");

None
}
})
Expand Down
6 changes: 4 additions & 2 deletions xmtp_mls/src/storage/encrypted_store/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl StoredGroup {
}
}

/// Create a new [`Purpose::Sync`] group
/// Create a new [`Purpose::Sync`] group. This is less common and is used to sync message history.
pub fn new_sync_group(
id: ID,
created_at_ns: i64,
Expand All @@ -68,7 +68,7 @@ impl StoredGroup {
}

impl DbConnection<'_> {
/// Return regular [`Purpose::Conversation`] groups with additional filters
/// Return regular [`Purpose::Conversation`] groups with additional optional filters
pub fn find_groups(
&self,
allowed_states: Option<Vec<GroupMembershipState>>,
Expand Down Expand Up @@ -335,6 +335,8 @@ pub(crate) mod tests {
// Sync groups SHOULD NOT be returned
let synced_groups = conn.find_sync_groups().unwrap();
assert_eq!(synced_groups.len(), 0);

// test that ONLY normal groups show up.
})
}

Expand Down

0 comments on commit 8cf15ef

Please sign in to comment.