Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(typing): send typing indicator events use case and cleanup code pt2. (WPB-4590) #2121

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,34 @@
package com.wire.kalium.logic.data.conversation

import co.touchlab.stately.collections.ConcurrentMutableMap
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.properties.UserPropertyRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.util.safeComputeAndMutateSetValue
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

internal interface TypingIndicatorRepository {
internal interface TypingIndicatorIncomingRepository {
suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId)
suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId)
suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<ExpiringUserTyping>>

suspend fun sendTypingIndicatorStatus(
conversationId: ConversationId,
typingStatus: Conversation.TypingIndicatorMode
): Either<CoreFailure, Unit>
suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>>
suspend fun clearExpiredTypingIndicators()
}

internal class TypingIndicatorRepositoryImpl(
private val userTypingCache: ConcurrentMutableMap<ConversationId, MutableSet<ExpiringUserTyping>>,
private val conversationRepository: ConversationRepository,
internal class TypingIndicatorIncomingRepositoryImpl(
private val userTypingCache: ConcurrentMutableMap<ConversationId, MutableSet<UserId>>,
private val userPropertyRepository: UserPropertyRepository
) : TypingIndicatorRepository {
) : TypingIndicatorIncomingRepository {

private val userTypingDataSourceFlow: MutableSharedFlow<Unit> =
MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST)

override suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
if (userPropertyRepository.getTypingIndicatorStatus()) {
userTypingCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) }
userTypingCache.safeComputeAndMutateSetValue(conversationId) { userId }
.also {
userTypingDataSourceFlow.tryEmit(Unit)
}
Expand All @@ -63,43 +54,27 @@ internal class TypingIndicatorRepositoryImpl(

override suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
userTypingCache.block { entry ->
entry[conversationId]?.apply { this.removeAll { it.userId == userId } }
entry[conversationId]?.apply { this.removeAll { it == userId } }
}.also {
userTypingDataSourceFlow.tryEmit(Unit)
}
}

override suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<ExpiringUserTyping>> {
override suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>> {
return userTypingDataSourceFlow
.map { userTypingCache[conversationId] ?: emptySet() }
.onStart { emit(userTypingCache[conversationId] ?: emptySet()) }
}

override suspend fun sendTypingIndicatorStatus(
conversationId: ConversationId,
typingStatus: Conversation.TypingIndicatorMode
): Either<CoreFailure, Unit> {
return conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus)
override suspend fun clearExpiredTypingIndicators() {
userTypingCache.block { entry ->
entry.clear()
}.also {
userTypingDataSourceFlow.tryEmit(Unit)
}
}

companion object {
const val BUFFER_SIZE = 32 // drop after this threshold
}
}

// todo expire by worker
data class ExpiringUserTyping(
val userId: UserId,
val date: Instant
) {
override fun equals(other: Any?): Boolean {
return other != null && when (other) {
is ExpiringUserTyping -> other.userId == this.userId
else -> false
}
}

override fun hashCode(): Int {
return this.userId.hashCode()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.data.conversation

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.properties.UserPropertyRepository
import com.wire.kalium.logic.functional.Either

internal interface TypingIndicatorOutgoingRepository {
suspend fun sendTypingIndicatorStatus(
conversationId: ConversationId,
typingStatus: Conversation.TypingIndicatorMode
): Either<CoreFailure, Unit>

}

internal class TypingIndicatorOutgoingRepositoryImpl(
private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler,
private val userPropertyRepository: UserPropertyRepository
) : TypingIndicatorOutgoingRepository {

override suspend fun sendTypingIndicatorStatus(
conversationId: ConversationId,
typingStatus: Conversation.TypingIndicatorMode
): Either<CoreFailure, Unit> {
if (userPropertyRepository.getTypingIndicatorStatus()) {
when (typingStatus) {
Conversation.TypingIndicatorMode.STARTED ->
typingIndicatorSenderHandler.sendStartedAndEnqueueStoppingEvent(conversationId)

Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorSenderHandler.sendStoppingEvent(conversationId)
}
}
return Either.Right(Unit)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.data.conversation

import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.CoroutineContext
import kotlin.time.DurationUnit
import kotlin.time.toDuration

/**
* Outgoing user typing sent events manager.
*
* - It will send started and stopped events.
* - For each started sent event, will 'enqueue' a stopped event after a timeout.
*
*/
internal class TypingIndicatorSenderHandler(
private val conversationRepository: ConversationRepository,
private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl,
userSessionCoroutineScope: CoroutineScope
) : CoroutineScope by userSessionCoroutineScope {
override val coroutineContext: CoroutineContext
get() = kaliumDispatcher.default

private val outgoingStoppedQueueTypingEventsMutex = Mutex()
private val outgoingStoppedQueueTypingEvents = mutableMapOf<ConversationId, Unit>()
private val typingIndicatorTimeoutInSeconds = 10.toDuration(DurationUnit.SECONDS)

/**
* Sends a stopping event and removes it from the 'queue'.
*/
fun sendStoppingEvent(conversationId: ConversationId) {
launch {
outgoingStoppedQueueTypingEventsMutex.withLock {
if (!outgoingStoppedQueueTypingEvents.containsKey(conversationId)) {
return@launch
}
outgoingStoppedQueueTypingEvents.remove(conversationId)
sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STOPPED)
}
}
}

/**
* Sends a started event and enqueues a stopping event if sent successfully.
*/
fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) {
launch {
outgoingStoppedQueueTypingEventsMutex.withLock {
if (outgoingStoppedQueueTypingEvents.containsKey(conversationId)) {
return@launch
}
val isSent = sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STARTED)
when (isSent) {
true -> {
outgoingStoppedQueueTypingEvents[conversationId] = Unit
delay(typingIndicatorTimeoutInSeconds)
sendStoppingEvent(conversationId)
}

false -> Unit // do nothing
}
}
}
}

private suspend fun sendTypingIndicatorStatus(
conversationId: ConversationId,
typingStatus: Conversation.TypingIndicatorMode
): Boolean = conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus).fold({
kaliumLogger.w("Skipping failed to send typing indicator status: $typingStatus")
false
}) {
kaliumLogger.i("Successfully sent typing started indicator status: $typingStatus")
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ import com.wire.kalium.logic.feature.connection.SyncConnectionsUseCaseImpl
import com.wire.kalium.logic.feature.conversation.ConversationScope
import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManager
import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManagerImpl
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCase
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCaseImpl
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationsUseCase
Expand All @@ -170,17 +168,22 @@ import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManage
import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManagerImpl
import com.wire.kalium.logic.feature.conversation.MLSConversationsVerificationStatusesHandler
import com.wire.kalium.logic.feature.conversation.MLSConversationsVerificationStatusesHandlerImpl
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.RecoverMLSConversationsUseCase
import com.wire.kalium.logic.feature.conversation.RecoverMLSConversationsUseCaseImpl
import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCase
import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCaseImpl
import com.wire.kalium.logic.feature.conversation.TypingIndicatorSyncManager
import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterialsManager
import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterialsManagerImpl
import com.wire.kalium.logic.feature.debug.DebugScope
import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCase
import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCaseImpl
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCaseImpl
import com.wire.kalium.logic.feature.featureConfig.handler.AppLockConfigHandler
import com.wire.kalium.logic.feature.featureConfig.handler.ClassifiedDomainsConfigHandler
import com.wire.kalium.logic.feature.featureConfig.handler.ConferenceCallingConfigHandler
Expand All @@ -190,8 +193,6 @@ import com.wire.kalium.logic.feature.featureConfig.handler.GuestRoomConfigHandle
import com.wire.kalium.logic.feature.featureConfig.handler.MLSConfigHandler
import com.wire.kalium.logic.feature.featureConfig.handler.SecondFactorPasswordChallengeConfigHandler
import com.wire.kalium.logic.feature.featureConfig.handler.SelfDeletingMessagesConfigHandler
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCaseImpl
import com.wire.kalium.logic.feature.keypackage.KeyPackageManager
import com.wire.kalium.logic.feature.keypackage.KeyPackageManagerImpl
import com.wire.kalium.logic.feature.message.AddSystemMessageToAllConversationsUseCase
Expand Down Expand Up @@ -1162,7 +1163,7 @@ class UserSessionScope internal constructor(
)

private val typingIndicatorHandler: TypingIndicatorHandler
get() = TypingIndicatorHandlerImpl(userId, conversations.typingIndicatorRepository)
get() = TypingIndicatorHandlerImpl(userId, conversations.typingIndicatorIncomingRepository)

private val conversationEventReceiver: ConversationEventReceiver by lazy {
ConversationEventReceiverImpl(
Expand Down Expand Up @@ -1532,6 +1533,9 @@ class UserSessionScope internal constructor(
MLSConversationsVerificationStatusesHandlerImpl(conversationRepository, persistMessage, mlsConversationRepository, userId)
}

private val typingIndicatorSyncManager: TypingIndicatorSyncManager =
TypingIndicatorSyncManager(lazy { conversations.typingIndicatorIncomingRepository }, observeSyncState)

init {
launch {
apiMigrationManager.performMigrations()
Expand Down Expand Up @@ -1571,6 +1575,10 @@ class UserSessionScope internal constructor(
launch {
mlsConversationsVerificationStatusesHandler.invoke()
}

launch {
typingIndicatorSyncManager.execute()
}
}

fun onDestroy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl
import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl
import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingRepositoryImpl
import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepositoryImpl
import com.wire.kalium.logic.data.conversation.TypingIndicatorSenderHandler
import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider
import com.wire.kalium.logic.data.id.QualifiedIdMapper
import com.wire.kalium.logic.data.message.PersistMessageUseCase
Expand Down Expand Up @@ -266,10 +268,25 @@ class ConversationScope internal constructor(
val observeArchivedUnreadConversationsCount: ObserveArchivedUnreadConversationsCountUseCase
get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository)

internal val typingIndicatorRepository =
TypingIndicatorRepositoryImpl(ConcurrentMutableMap(), conversationRepository, userPropertyRepository)
private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler =
TypingIndicatorSenderHandler(conversationRepository = conversationRepository, userSessionCoroutineScope = scope)

internal val typingIndicatorIncomingRepository =
TypingIndicatorIncomingRepositoryImpl(
ConcurrentMutableMap(),
userPropertyRepository
)

internal val typingIndicatorOutgoingRepository =
TypingIndicatorOutgoingRepositoryImpl(
typingIndicatorSenderHandler,
userPropertyRepository
)

val sendTypingEvent: SendTypingEventUseCase
get() = SendTypingEventUseCaseImpl(typingIndicatorOutgoingRepository)

val observeUsersTyping: ObserveUsersTypingUseCase
get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository)
get() = ObserveUsersTypingUseCaseImpl(typingIndicatorIncomingRepository, userRepository)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package com.wire.kalium.logic.feature.conversation

import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository
import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.UserSummary
import com.wire.kalium.logic.data.user.UserRepository
Expand All @@ -38,13 +38,13 @@ interface ObserveUsersTypingUseCase {
}

internal class ObserveUsersTypingUseCaseImpl(
private val typingIndicatorRepository: TypingIndicatorRepository,
private val typingIndicatorIncomingRepository: TypingIndicatorIncomingRepository,
private val userRepository: UserRepository,
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl
) : ObserveUsersTypingUseCase {
override suspend operator fun invoke(conversationId: ConversationId): Flow<Set<UserSummary>> = withContext(dispatcher.io) {
typingIndicatorRepository.observeUsersTyping(conversationId).map { usersEntries ->
userRepository.getUsersSummaryByIds(usersEntries.map { it.userId }).fold({
typingIndicatorIncomingRepository.observeUsersTyping(conversationId).map { usersEntries ->
userRepository.getUsersSummaryByIds(usersEntries.map { it }).fold({
kaliumLogger.w("Users not found locally, skipping... $it")
emptySet()
}, { it.toSet() })
Expand Down
Loading
Loading