diff --git a/xmtp_mls/src/groups/message_history.rs b/xmtp_mls/src/groups/message_history.rs index 383e1a3f1..3a5450db8 100644 --- a/xmtp_mls/src/groups/message_history.rs +++ b/xmtp_mls/src/groups/message_history.rs @@ -7,7 +7,6 @@ use aes_gcm::{ aead::{Aead, KeyInit}, Aes256Gcm, }; -use prost::Message; use rand::{ distributions::{Alphanumeric, DistString}, Rng, RngCore, @@ -29,16 +28,13 @@ use xmtp_proto::{ use super::GroupError; +use crate::client::MessageProcessingError; use crate::XmtpApi; use crate::{ client::ClientError, configuration::DELIMITER, - groups::{intents::SendMessageIntentData, GroupMessageKind, StoredGroupMessage}, - storage::{ - group::StoredGroup, - group_intent::{IntentKind, NewGroupIntent}, - StorageError, - }, + groups::{GroupMessageKind, StoredGroupMessage}, + storage::{group::StoredGroup, StorageError}, Client, Store, }; @@ -112,22 +108,21 @@ where // build the request let history_request = HistoryRequest::new(); let pin_code = history_request.pin_code.clone(); - let idempotency_key = new_request_id(); - let envelope = PlaintextEnvelope { - content: Some(Content::V2(V2 { - message_type: Some(Request(history_request.into())), - idempotency_key, - })), - }; - // build the intent - let mut encoded_envelope = vec![]; - envelope - .encode(&mut encoded_envelope) - .map_err(GroupError::EncodeError)?; - let intent_data: Vec = SendMessageIntentData::new(encoded_envelope).into(); - let intent = NewGroupIntent::new(IntentKind::SendMessage, sync_group_id, intent_data); - intent.store(&conn)?; + let content_bytes = format!( + "{}{DELIMITER}{}", + history_request.request_id, history_request.pin_code + ) + .into_bytes(); + let _message_id = + sync_group.prepare_message(content_bytes.as_slice(), &conn, move |_time_ns| { + PlaintextEnvelope { + content: Some(Content::V2(V2 { + message_type: Some(Request(history_request.into())), + idempotency_key: new_request_id(), + })), + } + })?; // publish the intent if let Err(err) = sync_group.publish_intents(conn, self).await { @@ -148,27 +143,35 @@ where .pop() .ok_or(GroupError::GroupNotFound)? .id; + let sync_group = self.group(sync_group_id)?; - // build the reply - let envelope = PlaintextEnvelope { - content: Some(Content::V2(V2 { - idempotency_key: new_request_id(), - message_type: Some(Reply(contents)), - })), - }; - - // build the intent - let mut encoded_envelope = vec![]; - envelope - .encode(&mut encoded_envelope) - .map_err(GroupError::EncodeError)?; - let intent_data: Vec = SendMessageIntentData::new(encoded_envelope).into(); - let intent = - NewGroupIntent::new(IntentKind::SendMessage, sync_group_id.clone(), intent_data); - intent.store(&conn)?; + // the reply message + let content_bytes = format!( + "{}{DELIMITER}{:?}{DELIMITER}{:?}{DELIMITER}{:?}", + contents.url, + contents + .encryption_key + .as_ref() + .ok_or(MessageProcessingError::InvalidPayload)?, + contents + .signing_key + .as_ref() + .ok_or(MessageProcessingError::InvalidPayload)?, + contents.bundle_hash.as_slice() + ) + .into_bytes(); + + let _message_id = + sync_group.prepare_message(content_bytes.as_slice(), &conn, move |_time_ns| { + PlaintextEnvelope { + content: Some(Content::V2(V2 { + idempotency_key: new_request_id(), + message_type: Some(Reply(contents)), + })), + } + })?; // publish the intent - let sync_group = self.group(sync_group_id)?; if let Err(err) = sync_group.publish_intents(conn, self).await { log::error!("error publishing sync group intents: {:?}", err); } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 16f24b636..3160f98b5 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -444,7 +444,8 @@ impl MlsGroup { self.maybe_update_installations(conn.clone(), update_interval, client) .await?; - let message_id = self.prepare_message(message, &conn); + let message_id = + self.prepare_message(message, &conn, |now| Self::into_envelope(message, now)); // Skipping a full sync here and instead just firing and forgetting if let Err(err) = self.publish_intents(conn.clone(), client).await { @@ -476,15 +477,29 @@ impl MlsGroup { /// Send a message, optimistically returning the ID of the message before the result of a message publish. pub fn send_message_optimistic(&self, message: &[u8]) -> Result, GroupError> { let conn = self.context.store.conn()?; - let message_id = self.prepare_message(message, &conn)?; - + let message_id = + self.prepare_message(message, &conn, |now| Self::into_envelope(message, now))?; Ok(message_id) } - /// Prepare a message (intent & id) on this users XMTP [`Client`]. - fn prepare_message(&self, message: &[u8], conn: &DbConnection) -> Result, GroupError> { + /// Prepare a [`IntentKind::SendMessage`] intent, and [`StoredGroupMessage`] on this users XMTP [`Client`]. + /// + /// # Arguments + /// * message: UTF-8 or encoded message bytes + /// * conn: Connection to SQLite database + /// * envelope: closure that returns context-specific [`PlaintextEnvelope`]. Closure accepts + /// timestamp attached to intent & stored message. + fn prepare_message( + &self, + message: &[u8], + conn: &DbConnection, + envelope: F, + ) -> Result, GroupError> + where + F: FnOnce(i64) -> PlaintextEnvelope, + { let now = now_ns(); - let plain_envelope = Self::into_envelope(message, &now.to_string()); + let plain_envelope = envelope(now); let mut encoded_envelope = vec![]; plain_envelope .encode(&mut encoded_envelope) @@ -512,11 +527,11 @@ impl MlsGroup { Ok(message_id) } - fn into_envelope(encoded_msg: &[u8], idempotency_key: &str) -> PlaintextEnvelope { + fn into_envelope(encoded_msg: &[u8], idempotency_key: i64) -> PlaintextEnvelope { PlaintextEnvelope { content: Some(Content::V1(V1 { content: encoded_msg.to_vec(), - idempotency_key: idempotency_key.into(), + idempotency_key: idempotency_key.to_string(), })), } }