From 821c9123d5e67b4a817e1e9b83da6c67d185813f Mon Sep 17 00:00:00 2001
From: Mojtaba Chenani <chenani@outlook.com>
Date: Mon, 6 Jan 2025 17:35:49 +0100
Subject: [PATCH] feat(conversations): add filter args to conversation list
 with last message (#1470)

---
 bindings_ffi/src/mls.rs                       | 116 ++++++++++------
 xmtp_api_grpc/src/grpc_api_helper.rs          |   2 +-
 xmtp_mls/src/client.rs                        |   7 +-
 .../encrypted_store/conversation_list.rs      | 124 ++++++++++++++++--
 .../src/storage/encrypted_store/schema_gen.rs |   3 +
 5 files changed, 202 insertions(+), 50 deletions(-)

diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs
index a5fdd59f1..22af16770 100644
--- a/bindings_ffi/src/mls.rs
+++ b/bindings_ffi/src/mls.rs
@@ -958,23 +958,10 @@ impl FfiConversations {
     pub async fn list(
         &self,
         opts: FfiListConversationsOptions,
-    ) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
-        let inner = self.inner_client.as_ref();
-        let convo_list: Vec<Arc<FfiConversation>> = inner
-            .find_groups(opts.into())?
-            .into_iter()
-            .map(|group| Arc::new(group.into()))
-            .collect();
-
-        Ok(convo_list)
-    }
-
-    pub async fn list_conversations(
-        &self,
     ) -> Result<Vec<Arc<FfiConversationListItem>>, GenericError> {
         let inner = self.inner_client.as_ref();
         let convo_list: Vec<Arc<FfiConversationListItem>> = inner
-            .list_conversations()?
+            .list_conversations(opts.into())?
             .into_iter()
             .map(|conversation_item| {
                 Arc::new(FfiConversationListItem {
@@ -992,12 +979,21 @@ impl FfiConversations {
     pub async fn list_groups(
         &self,
         opts: FfiListConversationsOptions,
-    ) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
+    ) -> Result<Vec<Arc<FfiConversationListItem>>, GenericError> {
         let inner = self.inner_client.as_ref();
-        let convo_list: Vec<Arc<FfiConversation>> = inner
-            .find_groups(GroupQueryArgs::from(opts).conversation_type(ConversationType::Group))?
+        let convo_list: Vec<Arc<FfiConversationListItem>> = inner
+            .list_conversations(
+                GroupQueryArgs::from(opts).conversation_type(ConversationType::Group),
+            )?
             .into_iter()
-            .map(|group| Arc::new(group.into()))
+            .map(|conversation_item| {
+                Arc::new(FfiConversationListItem {
+                    conversation: conversation_item.group.into(),
+                    last_message: conversation_item
+                        .last_message
+                        .map(|stored_message| stored_message.into()),
+                })
+            })
             .collect();
 
         Ok(convo_list)
@@ -1006,12 +1002,19 @@ impl FfiConversations {
     pub async fn list_dms(
         &self,
         opts: FfiListConversationsOptions,
-    ) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
+    ) -> Result<Vec<Arc<FfiConversationListItem>>, GenericError> {
         let inner = self.inner_client.as_ref();
-        let convo_list: Vec<Arc<FfiConversation>> = inner
-            .find_groups(GroupQueryArgs::from(opts).conversation_type(ConversationType::Dm))?
+        let convo_list: Vec<Arc<FfiConversationListItem>> = inner
+            .list_conversations(GroupQueryArgs::from(opts).conversation_type(ConversationType::Dm))?
             .into_iter()
-            .map(|group| Arc::new(group.into()))
+            .map(|conversation_item| {
+                Arc::new(FfiConversationListItem {
+                    conversation: conversation_item.group.into(),
+                    last_message: conversation_item
+                        .last_message
+                        .map(|stored_message| stored_message.into()),
+                })
+            })
             .collect();
 
         Ok(convo_list)
@@ -2718,13 +2721,14 @@ mod tests {
             .await
             .unwrap();
         let bo_group = &bo_groups[0];
-        bo_group.sync().await.unwrap();
+        bo_group.conversation.sync().await.unwrap();
 
         // alix published + processed group creation and name update
         assert_eq!(alix_provider.conn_ref().intents_published(), 2);
         assert_eq!(alix_provider.conn_ref().intents_deleted(), 2);
 
         bo_group
+            .conversation
             .update_group_name("Old Name2".to_string())
             .await
             .unwrap();
@@ -2746,6 +2750,7 @@ mod tests {
 
         // Uncomment the following lines to add more group name updates
         bo_group
+            .conversation
             .update_group_name("Old Name3".to_string())
             .await
             .unwrap();
@@ -2795,7 +2800,10 @@ mod tests {
             .unwrap();
 
         // Step 4: List conversations and verify
-        let conversations = alix_conversations.list_conversations().await.unwrap();
+        let conversations = alix_conversations
+            .list(FfiListConversationsOptions::default())
+            .await
+            .unwrap();
 
         // Ensure the group is included
         assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group");
@@ -2832,7 +2840,10 @@ mod tests {
             .unwrap();
 
         // Step 4: List conversations and verify
-        let conversations = alix_conversations.list_conversations().await.unwrap();
+        let conversations = alix_conversations
+            .list(FfiListConversationsOptions::default())
+            .await
+            .unwrap();
 
         // Ensure the group is included
         assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group");
@@ -2881,7 +2892,10 @@ mod tests {
             .unwrap();
 
         // Step 7: Fetch the conversation list
-        let conversations = conversations_api.list_conversations().await.unwrap();
+        let conversations = conversations_api
+            .list(FfiListConversationsOptions::default())
+            .await
+            .unwrap();
 
         // Step 8: Assert the correct order of conversations
         assert_eq!(
@@ -2951,11 +2965,19 @@ mod tests {
 
         let alix_group1 = alix_groups[0].clone();
         let alix_group5 = alix_groups[5].clone();
-        let bo_group1 = bo.conversation(alix_group1.id()).unwrap();
-        let bo_group5 = bo.conversation(alix_group5.id()).unwrap();
+        let bo_group1 = bo.conversation(alix_group1.conversation.id()).unwrap();
+        let bo_group5 = bo.conversation(alix_group5.conversation.id()).unwrap();
 
-        alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap();
-        alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap();
+        alix_group1
+            .conversation
+            .send("alix1".as_bytes().to_vec())
+            .await
+            .unwrap();
+        alix_group5
+            .conversation
+            .send("alix1".as_bytes().to_vec())
+            .await
+            .unwrap();
 
         let bo_messages1 = bo_group1
             .find_messages(FfiListMessagesOptions::default())
@@ -3016,6 +3038,7 @@ mod tests {
             .unwrap()
         {
             group
+                .conversation
                 .remove_members(vec![bo.account_address.clone()])
                 .await
                 .unwrap();
@@ -3542,17 +3565,26 @@ mod tests {
             .unwrap();
         assert_eq!(bo_groups.len(), 1);
         let bo_group = bo_groups[0].clone();
-        bo_group.sync().await.unwrap();
+        bo_group.conversation.sync().await.unwrap();
 
         let bo_messages1 = bo_group
+            .conversation
             .find_messages(FfiListMessagesOptions::default())
             .await
             .unwrap();
         assert_eq!(bo_messages1.len(), first_msg_check);
 
-        bo_group.send("hello2".as_bytes().to_vec()).await.unwrap();
+        bo_group
+            .conversation
+            .send("hello2".as_bytes().to_vec())
+            .await
+            .unwrap();
         message_callbacks.wait_for_delivery(None).await.unwrap();
-        bo_group.send("hello3".as_bytes().to_vec()).await.unwrap();
+        bo_group
+            .conversation
+            .send("hello3".as_bytes().to_vec())
+            .await
+            .unwrap();
         message_callbacks.wait_for_delivery(None).await.unwrap();
 
         alix_group.sync().await.unwrap();
@@ -3565,9 +3597,10 @@ mod tests {
 
         alix_group.send("hello4".as_bytes().to_vec()).await.unwrap();
         message_callbacks.wait_for_delivery(None).await.unwrap();
-        bo_group.sync().await.unwrap();
+        bo_group.conversation.sync().await.unwrap();
 
         let bo_messages2 = bo_group
+            .conversation
             .find_messages(FfiListMessagesOptions::default())
             .await
             .unwrap();
@@ -3803,7 +3836,7 @@ mod tests {
         let bola_group = bola_groups.first().unwrap();
 
         // Check Bola's group for the added_by_inbox_id of the inviter
-        let added_by_inbox_id = bola_group.added_by_inbox_id().unwrap();
+        let added_by_inbox_id = bola_group.conversation.added_by_inbox_id().unwrap();
 
         // // Verify the welcome host_credential is equal to Amal's
         assert_eq!(
@@ -3986,24 +4019,26 @@ mod tests {
 
         let bola_group = bola_groups.first().unwrap();
         bola_group
+            .conversation
             .update_group_name("new_name".to_string())
             .await
             .unwrap_err();
 
         // Verify that bo CAN update the image url
         bola_group
+            .conversation
             .update_group_image_url_square("https://example.com/image.png".to_string())
             .await
             .unwrap();
 
         // Verify we can read the correct values from the group
-        bola_group.sync().await.unwrap();
+        bola_group.conversation.sync().await.unwrap();
         alix_group.sync().await.unwrap();
         assert_eq!(
-            bola_group.group_image_url_square().unwrap(),
+            bola_group.conversation.group_image_url_square().unwrap(),
             "https://example.com/image.png"
         );
-        assert_eq!(bola_group.group_name().unwrap(), "");
+        assert_eq!(bola_group.conversation.group_name().unwrap(), "");
         assert_eq!(
             alix_group.group_image_url_square().unwrap(),
             "https://example.com/image.png"
@@ -4091,10 +4126,12 @@ mod tests {
 
         let bola_group = bola_groups.first().unwrap();
         bola_group
+            .conversation
             .update_group_name("new_name".to_string())
             .await
             .unwrap_err();
         let result = bola_group
+            .conversation
             .update_group_name("New Group Name".to_string())
             .await;
         assert!(result.is_err());
@@ -4107,6 +4144,7 @@ mod tests {
 
         // Verify that Bola can update the group description
         let result = bola_group
+            .conversation
             .update_group_description("New Description".to_string())
             .await;
         assert!(result.is_ok());
@@ -4990,6 +5028,7 @@ mod tests {
             .list(FfiListConversationsOptions::default())
             .await
             .unwrap()[0]
+            .conversation
             .find_messages(FfiListMessagesOptions::default())
             .await
             .unwrap();
@@ -4998,6 +5037,7 @@ mod tests {
             .list(FfiListConversationsOptions::default())
             .await
             .unwrap()[0]
+            .conversation
             .find_messages(FfiListMessagesOptions::default())
             .await
             .unwrap();
diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs
index 0fdadaecf..202768a7b 100644
--- a/xmtp_api_grpc/src/grpc_api_helper.rs
+++ b/xmtp_api_grpc/src/grpc_api_helper.rs
@@ -31,7 +31,7 @@ use xmtp_proto::{
 
 #[tracing::instrument(level = "trace", skip_all)]
 pub async fn create_tls_channel(address: String) -> Result<Channel, Error> {
-    let span = tracing::trace_span!("grpc_connect", address);
+    let span = tracing::debug_span!("grpc_connect", address);
     let channel = Channel::from_shared(address)
         .map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))?
         // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC.
diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs
index 24806dbf7..69be71c2e 100644
--- a/xmtp_mls/src/client.rs
+++ b/xmtp_mls/src/client.rs
@@ -674,11 +674,14 @@ where
             .collect())
     }
 
-    pub fn list_conversations(&self) -> Result<Vec<ConversationListItem<Self>>, ClientError> {
+    pub fn list_conversations(
+        &self,
+        args: GroupQueryArgs,
+    ) -> Result<Vec<ConversationListItem<Self>>, ClientError> {
         Ok(self
             .store()
             .conn()?
-            .fetch_conversation_list()?
+            .fetch_conversation_list(args)?
             .into_iter()
             .map(|conversation_item| {
                 let message = conversation_item.message_id.and_then(|message_id| {
diff --git a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs
index 58f387a7c..2ee0d72f2 100644
--- a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs
+++ b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs
@@ -1,8 +1,12 @@
 use super::schema::conversation_list::dsl::conversation_list;
-use crate::storage::group::{ConversationType, GroupMembershipState};
+use crate::storage::consent_record::ConsentState;
+use crate::storage::group::{ConversationType, GroupMembershipState, GroupQueryArgs};
 use crate::storage::group_message::{ContentType, DeliveryStatus, GroupMessageKind};
 use crate::storage::{DbConnection, StorageError};
-use diesel::{QueryDsl, Queryable, RunQueryDsl, Table};
+use diesel::dsl::sql;
+use diesel::{
+    BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Queryable, RunQueryDsl, Table,
+};
 use serde::{Deserialize, Serialize};
 
 #[derive(Queryable, Debug, Clone, Deserialize, Serialize)]
@@ -53,17 +57,111 @@ pub struct ConversationListItem {
 }
 
 impl DbConnection {
-    pub fn fetch_conversation_list(&self) -> Result<Vec<ConversationListItem>, StorageError> {
-        let query = conversation_list
+    pub fn fetch_conversation_list<A: AsRef<GroupQueryArgs>>(
+        &self,
+        args: A,
+    ) -> Result<Vec<ConversationListItem>, StorageError> {
+        use crate::storage::schema::consent_records::dsl as consent_dsl;
+        use crate::storage::schema::conversation_list::dsl as conversation_list_dsl;
+
+        let GroupQueryArgs {
+            allowed_states,
+            created_after_ns,
+            created_before_ns,
+            limit,
+            conversation_type,
+            consent_state,
+            include_sync_groups,
+            include_duplicate_dms,
+        } = args.as_ref();
+        let mut query = conversation_list
             .select(conversation_list::all_columns())
+            .filter(conversation_list_dsl::conversation_type.ne(ConversationType::Sync))
             .into_boxed();
-        Ok(self.raw_query(|conn| query.load::<ConversationListItem>(conn))?)
+
+        if !include_duplicate_dms {
+            // Group by dm_id and grab the latest group (conversation stitching)
+            query = query.filter(sql::<diesel::sql_types::Bool>(
+                "id IN (
+                    SELECT id
+                    FROM groups
+                    GROUP BY CASE WHEN dm_id IS NULL THEN id ELSE dm_id END
+                    ORDER BY last_message_ns DESC
+                )",
+            ));
+        }
+
+        if let Some(limit) = limit {
+            query = query.limit(*limit);
+        }
+
+        if let Some(allowed_states) = allowed_states {
+            query = query.filter(conversation_list_dsl::membership_state.eq_any(allowed_states));
+        }
+
+        if let Some(created_after_ns) = created_after_ns {
+            query = query.filter(conversation_list_dsl::created_at_ns.gt(created_after_ns));
+        }
+
+        if let Some(created_before_ns) = created_before_ns {
+            query = query.filter(conversation_list_dsl::created_at_ns.lt(created_before_ns));
+        }
+
+        if let Some(conversation_type) = conversation_type {
+            query = query.filter(conversation_list_dsl::conversation_type.eq(conversation_type));
+        }
+
+        let mut conversations = if let Some(consent_state) = consent_state {
+            if *consent_state == ConsentState::Unknown {
+                let query = query
+                    .left_join(
+                        consent_dsl::consent_records
+                            .on(sql::<diesel::sql_types::Text>("lower(hex(groups.id))")
+                                .eq(consent_dsl::entity)),
+                    )
+                    .filter(
+                        consent_dsl::state
+                            .is_null()
+                            .or(consent_dsl::state.eq(ConsentState::Unknown)),
+                    )
+                    .select(conversation_list::all_columns())
+                    .order(conversation_list_dsl::created_at_ns.asc());
+
+                self.raw_query(|conn| query.load::<ConversationListItem>(conn))?
+            } else {
+                let query = query
+                    .inner_join(
+                        consent_dsl::consent_records
+                            .on(sql::<diesel::sql_types::Text>("lower(hex(groups.id))")
+                                .eq(consent_dsl::entity)),
+                    )
+                    .filter(consent_dsl::state.eq(*consent_state))
+                    .select(conversation_list::all_columns())
+                    .order(conversation_list_dsl::created_at_ns.asc());
+
+                self.raw_query(|conn| query.load::<ConversationListItem>(conn))?
+            }
+        } else {
+            self.raw_query(|conn| query.load::<ConversationListItem>(conn))?
+        };
+
+        // Were sync groups explicitly asked for? Was the include_sync_groups flag set to true?
+        // Then query for those separately
+        if matches!(conversation_type, Some(ConversationType::Sync)) || *include_sync_groups {
+            let query = conversation_list_dsl::conversation_list
+                .filter(conversation_list_dsl::conversation_type.eq(ConversationType::Sync));
+            let mut sync_groups = self.raw_query(|conn| query.load(conn))?;
+            conversations.append(&mut sync_groups);
+        }
+
+        Ok(conversations)
     }
 }
 
 #[cfg(test)]
 pub(crate) mod tests {
     use crate::storage::group::tests::{generate_group, generate_group_with_created_at};
+    use crate::storage::group::GroupQueryArgs;
     use crate::storage::tests::with_connection;
     use crate::Store;
     use wasm_bindgen_test::wasm_bindgen_test;
@@ -88,7 +186,9 @@ pub(crate) mod tests {
             }
 
             // Fetch the conversation list
-            let conversation_list = conn.fetch_conversation_list().unwrap();
+            let conversation_list = conn
+                .fetch_conversation_list(GroupQueryArgs::default())
+                .unwrap();
             assert_eq!(conversation_list.len(), 1, "Should return one group");
             assert_eq!(
                 conversation_list[0].id, group.id,
@@ -123,7 +223,9 @@ pub(crate) mod tests {
             message.store(conn).unwrap();
 
             // Fetch the conversation list
-            let conversation_list = conn.fetch_conversation_list().unwrap();
+            let conversation_list = conn
+                .fetch_conversation_list(GroupQueryArgs::default())
+                .unwrap();
 
             assert_eq!(conversation_list.len(), 3, "Should return all three groups");
             assert_eq!(
@@ -159,7 +261,9 @@ pub(crate) mod tests {
             first_message.store(conn).unwrap();
 
             // Fetch the conversation list and check last message
-            let mut conversation_list = conn.fetch_conversation_list().unwrap();
+            let mut conversation_list = conn
+                .fetch_conversation_list(GroupQueryArgs::default())
+                .unwrap();
             assert_eq!(conversation_list.len(), 1, "Should return one group");
             assert_eq!(
                 conversation_list[0].sent_at_ns.unwrap(),
@@ -178,7 +282,9 @@ pub(crate) mod tests {
             second_message.store(conn).unwrap();
 
             // Fetch the conversation list again and validate the last message is updated
-            conversation_list = conn.fetch_conversation_list().unwrap();
+            conversation_list = conn
+                .fetch_conversation_list(GroupQueryArgs::default())
+                .unwrap();
             assert_eq!(
                 conversation_list[0].sent_at_ns.unwrap(),
                 2000,
diff --git a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs
index 82a76b76a..e3f160d14 100644
--- a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs
+++ b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs
@@ -1,5 +1,7 @@
 // @generated automatically by Diesel CLI.
 
+use crate::storage::schema::conversation_list;
+
 diesel::table! {
     association_state (inbox_id, sequence_id) {
         inbox_id -> Text,
@@ -143,4 +145,5 @@ diesel::allow_tables_to_appear_in_same_query!(
     refresh_state,
     user_preferences,
     wallet_addresses,
+    conversation_list
 );