Skip to content

Commit

Permalink
feat: typing indicator receiver improvements (WPB-4706) (#2098)
Browse files Browse the repository at this point in the history
* feat: adjustment for typing indicator

* feat: adjustment for typing indicator

* feat: adjustment for typing indicator buffer

* feat: typing indicator user mapping

* feat: mapping users summary
  • Loading branch information
yamilmedina authored Sep 28, 2023
1 parent 2497ff6 commit 0cfeb18
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.wire.kalium.logic.util.safeComputeAndMutateSetValue
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.datetime.Clock
Expand All @@ -33,42 +32,55 @@ import kotlinx.datetime.Instant
internal interface TypingIndicatorRepository {
fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId)
fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId)
suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>>
suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<ExpiringUserTyping>>
}

internal class TypingIndicatorRepositoryImpl(
private val userTypingCache: ConcurrentMutableMap<ConversationId, MutableSet<ExpiringUserTyping>>
) : TypingIndicatorRepository {

private val userTypingDataSourceFlow: MutableSharedFlow<Unit> =
MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST)

override fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
userTypingCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) }
userTypingDataSourceFlow.tryEmit(Unit)
.also {
userTypingDataSourceFlow.tryEmit(Unit)
}
}

override fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
userTypingCache.block { entry ->
entry[conversationId]?.apply { this.removeAll { it.userId == userId } }
}.also {
userTypingDataSourceFlow.tryEmit(Unit)
}
userTypingDataSourceFlow.tryEmit(Unit)
}

override suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>> {
override suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<ExpiringUserTyping>> {
return userTypingDataSourceFlow
.map { userTypingCache.filterUsersIdTypingInConversation(conversationId) }
.onStart { emit(userTypingCache.filterUsersIdTypingInConversation(conversationId)) }
.distinctUntilChanged()
.map { userTypingCache[conversationId] ?: emptySet() }
.onStart { emit(userTypingCache[conversationId] ?: emptySet()) }
}
}

private fun ConcurrentMutableMap<ConversationId, MutableSet<ExpiringUserTyping>>.filterUsersIdTypingInConversation(
conversationId: ConversationId
) = this[conversationId]?.map { it.userId }?.toSet().orEmpty()
companion object {
const val BUFFER_SIZE = 32 // drop after this threshold
}
}

// todo expire by worker
class ExpiringUserTyping(
data class ExpiringUserTyping(
val userId: UserId,
val date: Instant
)
) {
override fun equals(other: Any?): Boolean {
return other != null && when (other) {
is ExpiringUserTyping -> other.userId == this.userId
else -> false
}
}

override fun hashCode(): Int {
return this.userId.hashCode()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.TeamId
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.message.UserSummary
import com.wire.kalium.logic.data.user.AvailabilityStatusMapper
import com.wire.kalium.logic.data.user.BotService
import com.wire.kalium.logic.data.user.ConnectionState
Expand Down Expand Up @@ -51,6 +52,8 @@ interface PublicUserMapper {
// UserProfileDTO has no info about userType, we need to pass it explicitly
userType: UserType
): OtherUser

fun fromEntityToUserSummary(userEntity: UserEntity): UserSummary
}

class PublicUserMapperImpl(
Expand Down Expand Up @@ -130,4 +133,17 @@ class PublicUserMapperImpl(
expiresAt = userDetailResponse.expiresAt?.toInstant(),
defederated = false
)

override fun fromEntityToUserSummary(userEntity: UserEntity) = with(userEntity) {
UserSummary(
userId = UserId(id.value, id.domain),
userHandle = handle,
userName = name,
userPreviewAssetId = previewAssetId?.toModel(),
userType = domainUserTypeMapper.fromUserTypeEntity(userType),
isUserDeleted = deleted,
availabilityStatus = availabilityStatusMapper.fromDaoAvailabilityStatusToModel(availabilityStatus),
connectionStatus = connectionStateMapper.fromDaoConnectionStateToUser(connectionStatus)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.wire.kalium.logic.data.id.QualifiedIdMapper
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.message.UserSummary
import com.wire.kalium.logic.data.publicuser.PublicUserMapper
import com.wire.kalium.logic.data.session.SessionRepository
import com.wire.kalium.logic.data.team.Team
Expand Down Expand Up @@ -113,6 +114,7 @@ internal interface UserRepository {
* when backends stops federating.
*/
suspend fun defederateUser(userId: UserId): Either<CoreFailure, Unit>

// TODO: move to migration repo
suspend fun insertUsersIfUnknown(users: List<User>): Either<StorageFailure, Unit>
suspend fun fetchUserInfo(userId: UserId): Either<CoreFailure, Unit>
Expand All @@ -126,6 +128,11 @@ internal interface UserRepository {
* Removes broken user asset to avoid fetching it until next sync.
*/
suspend fun removeUserBrokenAsset(qualifiedID: QualifiedID): Either<CoreFailure, Unit>

/**
* Gets users summary by their ids.
*/
suspend fun getUsersSummaryByIds(userIds: List<QualifiedID>): Either<StorageFailure, List<UserSummary>>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -463,6 +470,13 @@ internal class UserDataSource internal constructor(
userDAO.removeUserAsset(qualifiedID.toDao())
}

override suspend fun getUsersSummaryByIds(userIds: List<QualifiedID>): Either<StorageFailure, List<UserSummary>> =
wrapStorageRequest {
userDAO.getUsersByQualifiedIDList(userIds.map { it.toDao() }).map {
publicUserMapper.fromEntityToUserSummary(it)
}
}

companion object {
internal const val SELF_USER_ID_KEY = "selfUserID"
internal val FEDERATED_USER_TTL = 5.minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,8 +1278,8 @@ class UserSessionScope internal constructor(
slowSyncRepository,
cachedClientIdClearer
)
val conversations: ConversationScope
get() = ConversationScope(
val conversations: ConversationScope by lazy {
ConversationScope(
conversationRepository,
conversationGroupRepository,
connectionRepository,
Expand All @@ -1302,6 +1302,7 @@ class UserSessionScope internal constructor(
userStorage,
this
)
}

val migration get() = MigrationScope(userStorage.database)
val debug: DebugScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,6 @@ class ConversationScope internal constructor(
internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl(ConcurrentMutableMap())

val observeUsersTyping: ObserveUsersTypingUseCase
get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository)
get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,36 @@ package com.wire.kalium.logic.feature.conversation

import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.message.UserSummary
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext

/**
* Use case for observing current users typing in a given conversation.
* This will get their info details from the local database.
*/
interface ObserveUsersTypingUseCase {
suspend fun invoke(conversationId: ConversationId): Flow<Set<UserId>>
suspend operator fun invoke(conversationId: ConversationId): Flow<Set<UserSummary>>
}

internal class ObserveUsersTypingUseCaseImpl(
private val typingIndicatorRepository: TypingIndicatorRepository
private val typingIndicatorRepository: TypingIndicatorRepository,
private val userRepository: UserRepository,
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl
) : ObserveUsersTypingUseCase {
override suspend fun invoke(conversationId: ConversationId): Flow<Set<UserId>> =
typingIndicatorRepository.observeUsersTyping(conversationId)
override suspend operator fun invoke(conversationId: ConversationId): Flow<Set<UserSummary>> = withContext(dispatcher.io) {
typingIndicatorRepository.observeUsersTyping(conversationId).map { usersEntries ->
userRepository.getUsersSummaryByIds(usersEntries.map { it.userId }).fold({
kaliumLogger.w("Users not found locally, skipping... $it")
emptySet()
}, { it.toSet() })
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TypingIndicatorRepositoryTest {

assertEquals(
setOf(expectedUserTypingOne, expectedUserTypingTwo),
typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()
typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it.userId }?.toSet()
)
}

Expand All @@ -58,7 +58,7 @@ class TypingIndicatorRepositoryTest {

assertEquals(
expectedUserTyping,
typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()
typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it.userId }?.toSet()
)
}

Expand All @@ -70,11 +70,11 @@ class TypingIndicatorRepositoryTest {

assertEquals(
setOf(expectedUserTypingOne),
typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()
typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it.userId }?.toSet()
)
assertEquals(
setOf(expectedUserTypingTwo),
typingIndicatorRepository.observeUsersTyping(conversationTwo).firstOrNull()
typingIndicatorRepository.observeUsersTyping(conversationTwo).firstOrNull()?.map { it.userId }?.toSet()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,31 @@ class UserRepositoryTest {
.wasInvoked(once)
}

@Test
fun givenUserIds_WhenRequestingSummaries_thenShouldSucceed() = runTest {
// Given
val requestedUserIds = listOf(
UserId(value = "id1", domain = "domain1"),
UserId(value = "id2", domain = "domain2")
)
val knownUserEntities = listOf(
TestUser.ENTITY.copy(id = UserIDEntity(value = "id1", domain = "domain1")),
TestUser.ENTITY.copy(id = UserIDEntity(value = "id2", domain = "domain2"))
)
val (arrangement, userRepository) = Arrangement()
.withSuccessfulGetUsersByQualifiedIdList(knownUserEntities)
.arrange()

// When
userRepository.getUsersSummaryByIds(requestedUserIds).shouldSucceed()

// Then
verify(arrangement.userDAO)
.suspendFunction(arrangement.userDAO::getUsersByQualifiedIDList)
.with(any())
.wasInvoked(once)
}

private class Arrangement {
@Mock
val userDAO = configure(mock(classOf<UserDAO>())) { stubsUnitByDefault = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.wire.kalium.logic.sync.receiver.handler

import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.ExpiringUserTyping
import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.framework.TestConversation
Expand All @@ -32,6 +33,7 @@ import io.mockative.once
import io.mockative.verify
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Clock
import kotlin.test.Test

class TypingIndicatorHandlerTest {
Expand Down Expand Up @@ -74,7 +76,7 @@ class TypingIndicatorHandlerTest {
given(typingIndicatorRepository)
.suspendFunction(typingIndicatorRepository::observeUsersTyping)
.whenInvokedWith(eq(TestConversation.ID))
.thenReturn(flowOf(usersId))
.thenReturn(flowOf(usersId.map { ExpiringUserTyping(it, Clock.System.now()) }.toSet()))
}

fun arrange() = this to TypingIndicatorHandlerImpl(typingIndicatorRepository)
Expand Down

0 comments on commit 0cfeb18

Please sign in to comment.