From 0cfeb187578e569cf0f6fc41eb465929bd742a6c Mon Sep 17 00:00:00 2001 From: Yamil Medina Date: Thu, 28 Sep 2023 12:09:39 -0300 Subject: [PATCH] feat: typing indicator receiver improvements (WPB-4706) (#2098) * feat: adjustment for typing indicator * feat: adjustment for typing indicator * feat: adjustment for typing indicator buffer * feat: typing indicator user mapping * feat: mapping users summary --- .../conversation/TypingIndicatorRepository.kt | 42 ++++++++++++------- .../logic/data/publicuser/PublicUserMapper.kt | 16 +++++++ .../kalium/logic/data/user/UserRepository.kt | 14 +++++++ .../kalium/logic/feature/UserSessionScope.kt | 5 ++- .../feature/conversation/ConversationScope.kt | 2 +- .../conversation/ObserveUsersTypingUseCase.kt | 26 +++++++++--- .../TypingIndicatorRepositoryTest.kt | 8 ++-- .../logic/data/user/UserRepositoryTest.kt | 25 +++++++++++ .../handler/TypingIndicatorHandlerTest.kt | 4 +- 9 files changed, 114 insertions(+), 28 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index 525e1c9417d..bd3f79ff53f 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -24,7 +24,6 @@ import com.wire.kalium.logic.util.safeComputeAndMutateSetValue import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.datetime.Clock @@ -33,7 +32,7 @@ import kotlinx.datetime.Instant internal interface TypingIndicatorRepository { fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) - suspend fun observeUsersTyping(conversationId: ConversationId): Flow> + suspend fun observeUsersTyping(conversationId: ConversationId): Flow> } internal class TypingIndicatorRepositoryImpl( @@ -41,34 +40,47 @@ internal class TypingIndicatorRepositoryImpl( ) : TypingIndicatorRepository { private val userTypingDataSourceFlow: MutableSharedFlow = - MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) override fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { userTypingCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) } - userTypingDataSourceFlow.tryEmit(Unit) + .also { + userTypingDataSourceFlow.tryEmit(Unit) + } } override fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { userTypingCache.block { entry -> entry[conversationId]?.apply { this.removeAll { it.userId == userId } } + }.also { + userTypingDataSourceFlow.tryEmit(Unit) } - userTypingDataSourceFlow.tryEmit(Unit) } - override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { + override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { return userTypingDataSourceFlow - .map { userTypingCache.filterUsersIdTypingInConversation(conversationId) } - .onStart { emit(userTypingCache.filterUsersIdTypingInConversation(conversationId)) } - .distinctUntilChanged() + .map { userTypingCache[conversationId] ?: emptySet() } + .onStart { emit(userTypingCache[conversationId] ?: emptySet()) } } -} -private fun ConcurrentMutableMap>.filterUsersIdTypingInConversation( - conversationId: ConversationId -) = this[conversationId]?.map { it.userId }?.toSet().orEmpty() + companion object { + const val BUFFER_SIZE = 32 // drop after this threshold + } +} // todo expire by worker -class ExpiringUserTyping( +data class ExpiringUserTyping( val userId: UserId, val date: Instant -) +) { + override fun equals(other: Any?): Boolean { + return other != null && when (other) { + is ExpiringUserTyping -> other.userId == this.userId + else -> false + } + } + + override fun hashCode(): Int { + return this.userId.hashCode() + } +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/publicuser/PublicUserMapper.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/publicuser/PublicUserMapper.kt index bb93181a9ae..302d72c24e5 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/publicuser/PublicUserMapper.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/publicuser/PublicUserMapper.kt @@ -22,6 +22,7 @@ import com.wire.kalium.logic.data.id.QualifiedID import com.wire.kalium.logic.data.id.TeamId import com.wire.kalium.logic.data.id.toDao import com.wire.kalium.logic.data.id.toModel +import com.wire.kalium.logic.data.message.UserSummary import com.wire.kalium.logic.data.user.AvailabilityStatusMapper import com.wire.kalium.logic.data.user.BotService import com.wire.kalium.logic.data.user.ConnectionState @@ -51,6 +52,8 @@ interface PublicUserMapper { // UserProfileDTO has no info about userType, we need to pass it explicitly userType: UserType ): OtherUser + + fun fromEntityToUserSummary(userEntity: UserEntity): UserSummary } class PublicUserMapperImpl( @@ -130,4 +133,17 @@ class PublicUserMapperImpl( expiresAt = userDetailResponse.expiresAt?.toInstant(), defederated = false ) + + override fun fromEntityToUserSummary(userEntity: UserEntity) = with(userEntity) { + UserSummary( + userId = UserId(id.value, id.domain), + userHandle = handle, + userName = name, + userPreviewAssetId = previewAssetId?.toModel(), + userType = domainUserTypeMapper.fromUserTypeEntity(userType), + isUserDeleted = deleted, + availabilityStatus = availabilityStatusMapper.fromDaoAvailabilityStatusToModel(availabilityStatus), + connectionStatus = connectionStateMapper.fromDaoConnectionStateToUser(connectionStatus) + ) + } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt index 709328fa4a3..2ddcfacb088 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt @@ -32,6 +32,7 @@ import com.wire.kalium.logic.data.id.QualifiedIdMapper import com.wire.kalium.logic.data.id.toApi import com.wire.kalium.logic.data.id.toDao import com.wire.kalium.logic.data.id.toModel +import com.wire.kalium.logic.data.message.UserSummary import com.wire.kalium.logic.data.publicuser.PublicUserMapper import com.wire.kalium.logic.data.session.SessionRepository import com.wire.kalium.logic.data.team.Team @@ -113,6 +114,7 @@ internal interface UserRepository { * when backends stops federating. */ suspend fun defederateUser(userId: UserId): Either + // TODO: move to migration repo suspend fun insertUsersIfUnknown(users: List): Either suspend fun fetchUserInfo(userId: UserId): Either @@ -126,6 +128,11 @@ internal interface UserRepository { * Removes broken user asset to avoid fetching it until next sync. */ suspend fun removeUserBrokenAsset(qualifiedID: QualifiedID): Either + + /** + * Gets users summary by their ids. + */ + suspend fun getUsersSummaryByIds(userIds: List): Either> } @Suppress("LongParameterList", "TooManyFunctions") @@ -463,6 +470,13 @@ internal class UserDataSource internal constructor( userDAO.removeUserAsset(qualifiedID.toDao()) } + override suspend fun getUsersSummaryByIds(userIds: List): Either> = + wrapStorageRequest { + userDAO.getUsersByQualifiedIDList(userIds.map { it.toDao() }).map { + publicUserMapper.fromEntityToUserSummary(it) + } + } + companion object { internal const val SELF_USER_ID_KEY = "selfUserID" internal val FEDERATED_USER_TTL = 5.minutes diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index 9cf7f01c66a..f6b9761ccca 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -1278,8 +1278,8 @@ class UserSessionScope internal constructor( slowSyncRepository, cachedClientIdClearer ) - val conversations: ConversationScope - get() = ConversationScope( + val conversations: ConversationScope by lazy { + ConversationScope( conversationRepository, conversationGroupRepository, connectionRepository, @@ -1302,6 +1302,7 @@ class UserSessionScope internal constructor( userStorage, this ) + } val migration get() = MigrationScope(userStorage.database) val debug: DebugScope diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index 9d74a4ff520..163299c7c30 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -264,6 +264,6 @@ class ConversationScope internal constructor( internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl(ConcurrentMutableMap()) val observeUsersTyping: ObserveUsersTypingUseCase - get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository) + get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository) } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt index 29b8d8cd32e..e7a1e9ee880 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt @@ -19,20 +19,36 @@ package com.wire.kalium.logic.feature.conversation import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository import com.wire.kalium.logic.data.id.ConversationId -import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.data.message.UserSummary +import com.wire.kalium.logic.data.user.UserRepository +import com.wire.kalium.logic.functional.fold +import com.wire.kalium.logic.kaliumLogger +import com.wire.kalium.util.KaliumDispatcher +import com.wire.kalium.util.KaliumDispatcherImpl import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.withContext /** * Use case for observing current users typing in a given conversation. + * This will get their info details from the local database. */ interface ObserveUsersTypingUseCase { - suspend fun invoke(conversationId: ConversationId): Flow> + suspend operator fun invoke(conversationId: ConversationId): Flow> } internal class ObserveUsersTypingUseCaseImpl( - private val typingIndicatorRepository: TypingIndicatorRepository + private val typingIndicatorRepository: TypingIndicatorRepository, + private val userRepository: UserRepository, + private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl ) : ObserveUsersTypingUseCase { - override suspend fun invoke(conversationId: ConversationId): Flow> = - typingIndicatorRepository.observeUsersTyping(conversationId) + override suspend operator fun invoke(conversationId: ConversationId): Flow> = withContext(dispatcher.io) { + typingIndicatorRepository.observeUsersTyping(conversationId).map { usersEntries -> + userRepository.getUsersSummaryByIds(usersEntries.map { it.userId }).fold({ + kaliumLogger.w("Users not found locally, skipping... $it") + emptySet() + }, { it.toSet() }) + } + } } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt index da56bda0526..5f7394df362 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt @@ -43,7 +43,7 @@ class TypingIndicatorRepositoryTest { assertEquals( setOf(expectedUserTypingOne, expectedUserTypingTwo), - typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull() + typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it.userId }?.toSet() ) } @@ -58,7 +58,7 @@ class TypingIndicatorRepositoryTest { assertEquals( expectedUserTyping, - typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull() + typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it.userId }?.toSet() ) } @@ -70,11 +70,11 @@ class TypingIndicatorRepositoryTest { assertEquals( setOf(expectedUserTypingOne), - typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull() + typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it.userId }?.toSet() ) assertEquals( setOf(expectedUserTypingTwo), - typingIndicatorRepository.observeUsersTyping(conversationTwo).firstOrNull() + typingIndicatorRepository.observeUsersTyping(conversationTwo).firstOrNull()?.map { it.userId }?.toSet() ) } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/user/UserRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/user/UserRepositoryTest.kt index ef8a5ace6c5..1f08fd8d994 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/user/UserRepositoryTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/user/UserRepositoryTest.kt @@ -513,6 +513,31 @@ class UserRepositoryTest { .wasInvoked(once) } + @Test + fun givenUserIds_WhenRequestingSummaries_thenShouldSucceed() = runTest { + // Given + val requestedUserIds = listOf( + UserId(value = "id1", domain = "domain1"), + UserId(value = "id2", domain = "domain2") + ) + val knownUserEntities = listOf( + TestUser.ENTITY.copy(id = UserIDEntity(value = "id1", domain = "domain1")), + TestUser.ENTITY.copy(id = UserIDEntity(value = "id2", domain = "domain2")) + ) + val (arrangement, userRepository) = Arrangement() + .withSuccessfulGetUsersByQualifiedIdList(knownUserEntities) + .arrange() + + // When + userRepository.getUsersSummaryByIds(requestedUserIds).shouldSucceed() + + // Then + verify(arrangement.userDAO) + .suspendFunction(arrangement.userDAO::getUsersByQualifiedIDList) + .with(any()) + .wasInvoked(once) + } + private class Arrangement { @Mock val userDAO = configure(mock(classOf())) { stubsUnitByDefault = true } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt index 1a5368cce14..454ccc3f71e 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt @@ -18,6 +18,7 @@ package com.wire.kalium.logic.sync.receiver.handler import com.wire.kalium.logic.data.conversation.Conversation +import com.wire.kalium.logic.data.conversation.ExpiringUserTyping import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.framework.TestConversation @@ -32,6 +33,7 @@ import io.mockative.once import io.mockative.verify import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.runTest +import kotlinx.datetime.Clock import kotlin.test.Test class TypingIndicatorHandlerTest { @@ -74,7 +76,7 @@ class TypingIndicatorHandlerTest { given(typingIndicatorRepository) .suspendFunction(typingIndicatorRepository::observeUsersTyping) .whenInvokedWith(eq(TestConversation.ID)) - .thenReturn(flowOf(usersId)) + .thenReturn(flowOf(usersId.map { ExpiringUserTyping(it, Clock.System.now()) }.toSet())) } fun arrange() = this to TypingIndicatorHandlerImpl(typingIndicatorRepository)