Skip to content

Commit

Permalink
Fix optimistic sending (#887)
Browse files Browse the repository at this point in the history
* make test more explicit

* add `send_optimistic` to the bindings
  • Loading branch information
insipx authored Jul 8, 2024
1 parent 5c78fba commit d4d8134
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ dist
# Stores VSCode versions used for testing VSCode extensions
.vscode-test

# JetBrains IDE Info
.idea/

# yarn v2
.yarn/cache
.yarn/unplugged
Expand Down
40 changes: 40 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use xmtp_mls::groups::group_permissions::PermissionsPolicies;
use xmtp_mls::groups::intents::PermissionPolicyOption;
use xmtp_mls::groups::intents::PermissionUpdateType;
use xmtp_mls::groups::GroupMetadataOptions;
use xmtp_mls::groups::UnpublishedMessage;
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
Expand Down Expand Up @@ -645,6 +646,28 @@ pub struct FfiCreateGroupOptions {
pub group_pinned_frame_url: Option<String>,
}

#[derive(uniffi::Object)]
pub struct FfiUnpublishedMessage {
message: UnpublishedMessage<TonicApiClient>,
}

#[uniffi::export(async_runtime = "tokio")]
impl FfiUnpublishedMessage {
pub fn id(&self) -> Vec<u8> {
self.message.id().to_vec()
}

pub async fn publish(&self) -> Result<(), GenericError> {
self.message.publish().await.map_err(Into::into)
}
}

impl From<UnpublishedMessage<TonicApiClient>> for FfiUnpublishedMessage {
fn from(message: UnpublishedMessage<TonicApiClient>) -> FfiUnpublishedMessage {
Self { message }
}
}

impl FfiCreateGroupOptions {
pub fn into_group_metadata_options(self) -> GroupMetadataOptions {
GroupMetadataOptions {
Expand All @@ -671,6 +694,23 @@ impl FfiGroup {
Ok(message_id)
}

/// send a message without immediately publishing to the delivery service.
pub fn send_optimistic(
&self,
content_bytes: Vec<u8>,
) -> Result<FfiUnpublishedMessage, GenericError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let message =
group.send_message_optimistic(content_bytes.as_slice(), &self.inner_client)?;

Ok(message.into())
}

pub async fn sync(&self) -> Result<(), GenericError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
Expand Down
52 changes: 50 additions & 2 deletions bindings_node/src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use xmtp_mls::groups::{
group_metadata::{ConversationType, GroupMetadata},
group_permissions::GroupMutablePermissions,
members::PermissionLevel,
MlsGroup, PreconfiguredPolicies, UpdateAdminListType,
MlsGroup, PreconfiguredPolicies, UnpublishedMessage, UpdateAdminListType,
};
use xmtp_proto::xmtp::mls::message_contents::EncodedContent;

use crate::{
encoded_content::NapiEncodedContent,
messages::{NapiListMessagesOptions, NapiMessage},
mls_client::RustXmtpClient,
mls_client::{RustXmtpClient, TonicApiClient},
streams::NapiStreamCloser,
};

Expand Down Expand Up @@ -105,6 +105,32 @@ impl NapiGroupPermissions {
}
}

#[napi]
pub struct NapiUnpublishedMessage {
message: UnpublishedMessage<TonicApiClient>,
}

#[napi]
impl NapiUnpublishedMessage {
pub fn id(&self) -> Vec<u8> {
self.message.id().to_vec()
}

pub async fn publish(&self) -> Result<()> {
self
.message
.publish()
.await
.map_err(|e| Error::from_reason(format!("{}", e)))
}
}

impl From<UnpublishedMessage<TonicApiClient>> for NapiUnpublishedMessage {
fn from(message: UnpublishedMessage<TonicApiClient>) -> NapiUnpublishedMessage {
Self { message }
}
}

#[derive(Debug)]
#[napi]
pub struct NapiGroup {
Expand Down Expand Up @@ -147,6 +173,28 @@ impl NapiGroup {
Ok(hex::encode(message_id.clone()))
}

#[napi]
pub fn send_optimistic(
&self,
encoded_content: NapiEncodedContent,
) -> Result<NapiUnpublishedMessage> {
let encoded_content: EncodedContent = encoded_content.into();
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let message = group
.send_message_optimistic(
encoded_content.encode_to_vec().as_slice(),
&self.inner_client,
)
.map_err(|e| Error::from_reason(format!("{}", e)))?;

Ok(message.into())
}

#[napi]
pub async fn sync(&self) -> Result<()> {
let group = MlsGroup::new(
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/mls_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use napi_derive::napi;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient;
pub use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient;
use xmtp_cryptography::signature::ed25519_public_key_to_address;
use xmtp_id::associations::generate_inbox_id as xmtp_id_generate_inbox_id;
use xmtp_id::associations::{
Expand Down
140 changes: 130 additions & 10 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use self::{
message_history::MessageHistoryError,
validated_commit::CommitValidationError,
};
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc};
use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError};
use xmtp_id::InboxId;
use xmtp_proto::xmtp::mls::{
Expand Down Expand Up @@ -174,6 +174,10 @@ pub enum GroupError {
NoPSKSupport,
#[error("Metadata update must specify a metadata field")]
InvalidPermissionUpdate,
#[error("The intent publishing task was cancelled")]
PublishCancelled,
#[error("the publish failed to complete due to panic")]
PublishPanicked,
}

impl RetryableError for GroupError {
Expand Down Expand Up @@ -230,6 +234,46 @@ pub enum UpdateAdminListType {
RemoveSuper,
}

pub type MessagePublishFuture = Pin<Box<dyn Future<Output = Result<(), GroupError>> + Send>>;

/// An Unpublished message with an ID that can be `awaited` to publish all messages.
/// This message can be safely dropped, and [`MlsGroup::sync`] called manually instead.
pub struct UnpublishedMessage<ApiClient> {
message_id: Vec<u8>,
client: Arc<Client<ApiClient>>,
group: MlsGroup,
}

impl<ApiClient> UnpublishedMessage<ApiClient>
where
ApiClient: XmtpApi,
{
fn new(message_id: Vec<u8>, client: Arc<Client<ApiClient>>, group: MlsGroup) -> Self {
Self {
message_id,
client,
group,
}
}

pub fn id(&self) -> &[u8] {
&self.message_id
}

/// Publish messages to the delivery service
pub async fn publish(&self) -> Result<(), GroupError> {
let conn = self.group.context.store.conn()?;
let update_interval = Some(5_000_000);
self.group
.maybe_update_installations(conn.clone(), update_interval, self.client.as_ref())
.await?;
self.group
.publish_intents(conn, self.client.as_ref())
.await?;
Ok(())
}
}

impl MlsGroup {
// Creates a new group instance. Does not validate that the group exists in the DB
pub fn new(context: Arc<XmtpMlsLocalContext>, group_id: Vec<u8>, created_at_ns: i64) -> Self {
Expand Down Expand Up @@ -420,6 +464,7 @@ impl MlsGroup {
))
}

/// Send a message on this users XMTP [`Client`].
pub async fn send_message<ApiClient>(
&self,
message: &[u8],
Expand All @@ -428,12 +473,42 @@ impl MlsGroup {
where
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;

let update_interval = Some(5_000_000); // 5 seconds in nanoseconds
let conn = self.context.store.conn()?;
self.maybe_update_installations(conn.clone(), update_interval, client)
.await?;

let message_id = self.prepare_message(message, &conn);

// Skipping a full sync here and instead just firing and forgetting
if let Err(err) = self.publish_intents(conn, client).await {
log::error!("Send: error publishing intents: {:?}", err);
}

message_id
}

/// Send a message, optimistically retrieving ID before the result of a message send.
pub fn send_message_optimistic<ApiClient>(
&self,
message: &[u8],
client: &Arc<Client<ApiClient>>,
) -> Result<UnpublishedMessage<ApiClient>, GroupError>
where
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;
let message_id = self.prepare_message(message, &conn)?;

Ok(UnpublishedMessage::new(
message_id,
client.clone(),
self.clone(),
))
}

/// Prepare a message (intent & id) on this users XMTP [`Client`].
fn prepare_message(&self, message: &[u8], conn: &DbConnection) -> Result<Vec<u8>, GroupError> {
let now = now_ns();
let plain_envelope = Self::into_envelope(message, &now.to_string());
let mut encoded_envelope = vec![];
Expand All @@ -444,7 +519,7 @@ impl MlsGroup {
let intent_data: Vec<u8> = SendMessageIntentData::new(encoded_envelope).into();
let intent =
NewGroupIntent::new(IntentKind::SendMessage, self.group_id.clone(), intent_data);
intent.store(&conn)?;
intent.store(conn)?;

// store this unpublished message locally before sending
let message_id = calculate_message_id(&self.group_id, message, &now.to_string());
Expand All @@ -458,12 +533,8 @@ impl MlsGroup {
sender_inbox_id: self.context.inbox_id(),
delivery_status: DeliveryStatus::Unpublished,
};
group_message.store(&conn)?;
group_message.store(conn)?;

// Skipping a full sync here and instead just firing and forgetting
if let Err(err) = self.publish_intents(conn, client).await {
log::error!("Send: error publishing intents: {:?}", err);
}
Ok(message_id)
}

Expand Down Expand Up @@ -1162,6 +1233,7 @@ fn build_group_join_config() -> MlsGroupJoinConfig {
mod tests {
use openmls::prelude::{tls_codec::Serialize, Member, MlsGroup as OpenMlsGroup};
use prost::Message;
use std::sync::Arc;
use tracing_test::traced_test;
use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_proto::xmtp::mls::message_contents::EncodedContent;
Expand Down Expand Up @@ -1211,7 +1283,6 @@ mod tests {
{
group.sync(client).await.unwrap();
let mut messages = group.find_messages(None, None, None, None, None).unwrap();

messages.pop().unwrap()
}

Expand Down Expand Up @@ -2565,4 +2636,53 @@ mod tests {
let members = bola_group.members().unwrap();
assert_eq!(members.len(), 3);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_optimistic_send() {
let amal = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let bola_wallet = generate_local_wallet();
let bola = Arc::new(ClientBuilder::new_test_client(&bola_wallet).await);
let amal_group = amal
.create_group(None, GroupMetadataOptions::default())
.unwrap();
amal_group.sync(&amal).await.unwrap();
// Add bola to the group
amal_group
.add_members(&amal, vec![bola_wallet.get_address()])
.await
.unwrap();
let bola_group = receive_group_invite(&bola).await;

amal_group
.send_message_optimistic(b"test one", &amal)
.unwrap();
amal_group
.send_message_optimistic(b"test two", &amal)
.unwrap();
amal_group
.send_message_optimistic(b"test three", &amal)
.unwrap();
let four = amal_group
.send_message_optimistic(b"test four", &amal)
.unwrap();

four.publish().await.unwrap();

bola_group.sync(&bola).await.unwrap();
let messages = bola_group
.find_messages(None, None, None, None, None)
.unwrap();
assert_eq!(
messages
.into_iter()
.map(|m| m.decrypted_message_bytes)
.collect::<Vec<Vec<u8>>>(),
vec![
b"test one".to_vec(),
b"test two".to_vec(),
b"test three".to_vec(),
b"test four".to_vec(),
]
);
}
}
6 changes: 3 additions & 3 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,13 @@ impl MlsGroup {
}

#[tracing::instrument(level = "trace", skip(conn, self, client))]
pub(super) async fn publish_intents<ClientApi>(
pub(super) async fn publish_intents<ApiClient>(
&self,
conn: DbConnection,
client: &Client<ClientApi>,
client: &Client<ApiClient>,
) -> Result<(), GroupError>
where
ClientApi: XmtpApi,
ApiClient: XmtpApi,
{
let provider = self.context.mls_provider(conn);
let mut openmls_group = self.load_mls_group(&provider)?;
Expand Down

0 comments on commit d4d8134

Please sign in to comment.