Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message History 3/N #652

Merged
merged 13 commits into from
Apr 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ CREATE INDEX group_messages_group_id_sort_idx ON group_messages(group_id, sent_a

-- Used to keep track of the last seen message timestamp in a topic
CREATE TABLE refresh_state (
-- E.g. the Id of the group
"entity_id" BLOB NOT NULL,
-- Welcomes or other types
"entity_kind" INTEGER NOT NULL, -- Need to allow for groups and welcomes to be separated, since a malicious client could manipulate their group ID to match someone's installation_id and make a mess
-- Where you are in the topic
"cursor" BIGINT NOT NULL,

PRIMARY KEY (entity_id, entity_kind)
Expand Down
82 changes: 82 additions & 0 deletions xmtp_mls/src/groups/message_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use xmtp_proto::{
api_client::XmtpMlsClient,
xmtp::mls::message_contents::plaintext_envelope::v2::MessageType::{
MessageHistoryRequest as HistoryRequest, MessageHistoryResponse as HistoryResponse,
},
xmtp::mls::message_contents::plaintext_envelope::{Content, V2},
xmtp::mls::message_contents::PlaintextEnvelope,
xmtp::mls::message_contents::{MessageHistoryRequest, MessageHistoryResponse},
};

use super::{GroupError, MlsGroup};

impl<'c, ApiClient> MlsGroup<'c, ApiClient>
where
ApiClient: XmtpMlsClient,
{
#[allow(dead_code)]
pub(crate) async fn send_message_history_request(&self) -> Result<(), GroupError> {
let pin_code = "1234".to_string();
let request_id = "abc123".to_string();
let _request = PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: String::from("unique"),
message_type: Some(HistoryRequest(MessageHistoryRequest {
pin_code,
request_id,
})),
})),
};
// TODO: Implement sending request to network
Ok(())
}

#[allow(dead_code)]
pub(crate) async fn send_message_history_response(&self) -> Result<(), GroupError> {
let backup_url = "https://example.com/uploads/long-id-123".to_string();
let request_id = "abc123".to_string();
let backup_file_hash = b"ABC123DEF456";
let expiration_time_ns = 123;
let _request = PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: String::from("unique"),
message_type: Some(HistoryResponse(MessageHistoryResponse {
backup_url,
request_id,
backup_file_hash: backup_file_hash.into(),
expiration_time_ns,
})),
})),
};
// TODO: Implement sending (responding) request to network
Ok(())
}
}

#[cfg(test)]
mod tests {

use crate::assert_ok;
use crate::builder::ClientBuilder;
use xmtp_cryptography::utils::generate_local_wallet;

#[tokio::test]
async fn test_send_mesage_history_request() {
let wallet = generate_local_wallet();
let client = ClientBuilder::new_test_client(&wallet).await;
let group = client.create_group(None).expect("create group");

let result = group.send_message_history_request().await;
assert_ok!(result);
}

#[tokio::test]
async fn test_send_mesage_history_response() {
let wallet = generate_local_wallet();
let client = ClientBuilder::new_test_client(&wallet).await;
let group = client.create_group(None).expect("create group");

let result = group.send_message_history_response().await;
assert_ok!(result);
}
}
11 changes: 5 additions & 6 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod group_metadata;
mod group_permissions;
mod intents;
mod members;
mod message_history;
mod subscriptions;
mod sync;
pub mod validated_commit;
Expand Down Expand Up @@ -175,7 +176,6 @@ where
}

// Create a new group and save it to the DB
#[allow(clippy::unwrap_or_default)]
pub fn create_and_insert(
client: &'c Client<ApiClient>,
membership_state: GroupMembershipState,
Expand All @@ -185,9 +185,7 @@ where
let provider = XmtpOpenMlsProvider::new(&conn);
let protected_metadata = build_protected_metadata_extension(
&client.identity,
permissions
.unwrap_or(PreconfiguredPolicies::default())
.to_policy_set(),
permissions.unwrap_or_default().to_policy_set(),
)?;
let group_config = build_group_config(protected_metadata)?;

Expand Down Expand Up @@ -246,7 +244,7 @@ where
Self::create_from_welcome(client, provider, welcome)
}

fn add_idempotency_key(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope {
fn into_envelope(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope {
PlaintextEnvelope {
content: Some(Content::V1(V1 {
content: encoded_msg.to_vec(),
Expand All @@ -263,7 +261,7 @@ where
.await?;

let now = now_ns();
let plain_envelope = Self::add_idempotency_key(message, &now.to_string());
let plain_envelope = Self::into_envelope(message, &now.to_string());
let mut encoded_envelope = vec![];
plain_envelope
.encode(&mut encoded_envelope)
Expand Down Expand Up @@ -378,6 +376,7 @@ where
self.sync_until_intent_resolved(conn, intent.id).await
}

// Used in tests
#[allow(dead_code)]
pub(crate) async fn remove_members_by_installation_id(
&self,
Expand Down
26 changes: 19 additions & 7 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use xmtp_proto::{
},
GroupMessage, WelcomeMessageInput,
},
xmtp::mls::message_contents::plaintext_envelope::{Content, V1},
xmtp::mls::message_contents::plaintext_envelope::{Content, V1, V2},
xmtp::mls::message_contents::GroupMembershipChanges,
xmtp::mls::message_contents::PlaintextEnvelope,
};
Expand Down Expand Up @@ -236,10 +236,17 @@ where

conn.set_delivery_status_to_published(&message_id, envelope_timestamp_ns)?;
}
Some(Content::V2(_)) => {
Some(Content::V2(V2 {
idempotency_key: _,
message_type,
})) => {
debug!(
"Send Message History Request with message_type {:#?}",
message_type
);
return Err(MessageProcessingError::Generic(
"not yet implemented".into(),
))
));
}
None => return Err(MessageProcessingError::InvalidPayload),
};
Expand Down Expand Up @@ -299,10 +306,17 @@ where
}
.store(provider.conn())?
}
Some(Content::V2(_)) => {
Some(Content::V2(V2 {
idempotency_key: _,
message_type,
})) => {
debug!(
"Received Message History Request with message_type {:#?}",
message_type
);
return Err(MessageProcessingError::Generic(
"not yet implemented".into(),
))
));
}
None => return Err(MessageProcessingError::InvalidPayload),
}
Expand Down Expand Up @@ -749,12 +763,10 @@ where
}
IdentityUpdate::RevokeInstallation(_) => {
log::warn!("Revocation found. Not handled");

None
}
IdentityUpdate::Invalid => {
log::warn!("Invalid identity update found");

None
}
})
Expand Down
6 changes: 4 additions & 2 deletions xmtp_mls/src/storage/encrypted_store/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl StoredGroup {
}
}

/// Create a new [`Purpose::Sync`] group
/// Create a new [`Purpose::Sync`] group. This is less common and is used to sync message history.
pub fn new_sync_group(
id: ID,
created_at_ns: i64,
Expand All @@ -68,7 +68,7 @@ impl StoredGroup {
}

impl DbConnection<'_> {
/// Return regular [`Purpose::Conversation`] groups with additional filters
/// Return regular [`Purpose::Conversation`] groups with additional optional filters
pub fn find_groups(
&self,
allowed_states: Option<Vec<GroupMembershipState>>,
Expand Down Expand Up @@ -335,6 +335,8 @@ pub(crate) mod tests {
// Sync groups SHOULD NOT be returned
let synced_groups = conn.find_sync_groups().unwrap();
assert_eq!(synced_groups.len(), 0);

// test that ONLY normal groups show up.
})
}

Expand Down
Loading