Skip to content

Commit

Permalink
re-use prepare_message for message history (#975)
Browse files Browse the repository at this point in the history
- re-uses the prepare_message function for message history
- introduces a closure for prepare_message to return a context-specific PlaintextEnvelope
  • Loading branch information
insipx authored Aug 20, 2024
1 parent 8eac6ab commit 2864619
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 48 deletions.
83 changes: 43 additions & 40 deletions xmtp_mls/src/groups/message_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm,
};
use prost::Message;
use rand::{
distributions::{Alphanumeric, DistString},
Rng, RngCore,
Expand All @@ -29,16 +28,13 @@ use xmtp_proto::{

use super::GroupError;

use crate::client::MessageProcessingError;
use crate::XmtpApi;
use crate::{
client::ClientError,
configuration::DELIMITER,
groups::{intents::SendMessageIntentData, GroupMessageKind, StoredGroupMessage},
storage::{
group::StoredGroup,
group_intent::{IntentKind, NewGroupIntent},
StorageError,
},
groups::{GroupMessageKind, StoredGroupMessage},
storage::{group::StoredGroup, StorageError},
Client, Store,
};

Expand Down Expand Up @@ -112,22 +108,21 @@ where
// build the request
let history_request = HistoryRequest::new();
let pin_code = history_request.pin_code.clone();
let idempotency_key = new_request_id();
let envelope = PlaintextEnvelope {
content: Some(Content::V2(V2 {
message_type: Some(Request(history_request.into())),
idempotency_key,
})),
};

// build the intent
let mut encoded_envelope = vec![];
envelope
.encode(&mut encoded_envelope)
.map_err(GroupError::EncodeError)?;
let intent_data: Vec<u8> = SendMessageIntentData::new(encoded_envelope).into();
let intent = NewGroupIntent::new(IntentKind::SendMessage, sync_group_id, intent_data);
intent.store(&conn)?;
let content_bytes = format!(
"{}{DELIMITER}{}",
history_request.request_id, history_request.pin_code
)
.into_bytes();
let _message_id =
sync_group.prepare_message(content_bytes.as_slice(), &conn, move |_time_ns| {
PlaintextEnvelope {
content: Some(Content::V2(V2 {
message_type: Some(Request(history_request.into())),
idempotency_key: new_request_id(),
})),
}
})?;

// publish the intent
if let Err(err) = sync_group.publish_intents(conn, self).await {
Expand All @@ -148,27 +143,35 @@ where
.pop()
.ok_or(GroupError::GroupNotFound)?
.id;
let sync_group = self.group(sync_group_id)?;

// build the reply
let envelope = PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: new_request_id(),
message_type: Some(Reply(contents)),
})),
};

// build the intent
let mut encoded_envelope = vec![];
envelope
.encode(&mut encoded_envelope)
.map_err(GroupError::EncodeError)?;
let intent_data: Vec<u8> = SendMessageIntentData::new(encoded_envelope).into();
let intent =
NewGroupIntent::new(IntentKind::SendMessage, sync_group_id.clone(), intent_data);
intent.store(&conn)?;
// the reply message
let content_bytes = format!(
"{}{DELIMITER}{:?}{DELIMITER}{:?}{DELIMITER}{:?}",
contents.url,
contents
.encryption_key
.as_ref()
.ok_or(MessageProcessingError::InvalidPayload)?,
contents
.signing_key
.as_ref()
.ok_or(MessageProcessingError::InvalidPayload)?,
contents.bundle_hash.as_slice()
)
.into_bytes();

let _message_id =
sync_group.prepare_message(content_bytes.as_slice(), &conn, move |_time_ns| {
PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: new_request_id(),
message_type: Some(Reply(contents)),
})),
}
})?;

// publish the intent
let sync_group = self.group(sync_group_id)?;
if let Err(err) = sync_group.publish_intents(conn, self).await {
log::error!("error publishing sync group intents: {:?}", err);
}
Expand Down
31 changes: 23 additions & 8 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ impl MlsGroup {
self.maybe_update_installations(conn.clone(), update_interval, client)
.await?;

let message_id = self.prepare_message(message, &conn);
let message_id =
self.prepare_message(message, &conn, |now| Self::into_envelope(message, now));

// Skipping a full sync here and instead just firing and forgetting
if let Err(err) = self.publish_intents(conn.clone(), client).await {
Expand Down Expand Up @@ -476,15 +477,29 @@ impl MlsGroup {
/// Send a message, optimistically returning the ID of the message before the result of a message publish.
pub fn send_message_optimistic(&self, message: &[u8]) -> Result<Vec<u8>, GroupError> {
let conn = self.context.store.conn()?;
let message_id = self.prepare_message(message, &conn)?;

let message_id =
self.prepare_message(message, &conn, |now| Self::into_envelope(message, now))?;
Ok(message_id)
}

/// Prepare a message (intent & id) on this users XMTP [`Client`].
fn prepare_message(&self, message: &[u8], conn: &DbConnection) -> Result<Vec<u8>, GroupError> {
/// Prepare a [`IntentKind::SendMessage`] intent, and [`StoredGroupMessage`] on this users XMTP [`Client`].
///
/// # Arguments
/// * message: UTF-8 or encoded message bytes
/// * conn: Connection to SQLite database
/// * envelope: closure that returns context-specific [`PlaintextEnvelope`]. Closure accepts
/// timestamp attached to intent & stored message.
fn prepare_message<F>(
&self,
message: &[u8],
conn: &DbConnection,
envelope: F,
) -> Result<Vec<u8>, GroupError>
where
F: FnOnce(i64) -> PlaintextEnvelope,
{
let now = now_ns();
let plain_envelope = Self::into_envelope(message, &now.to_string());
let plain_envelope = envelope(now);
let mut encoded_envelope = vec![];
plain_envelope
.encode(&mut encoded_envelope)
Expand Down Expand Up @@ -512,11 +527,11 @@ impl MlsGroup {
Ok(message_id)
}

fn into_envelope(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope {
fn into_envelope(encoded_msg: &[u8], idempotency_key: i64) -> PlaintextEnvelope {
PlaintextEnvelope {
content: Some(Content::V1(V1 {
content: encoded_msg.to_vec(),
idempotency_key: idempotency_key.into(),
idempotency_key: idempotency_key.to_string(),
})),
}
}
Expand Down

0 comments on commit 2864619

Please sign in to comment.