From f2f48127dbf6590a5d8dcda75ce03dbde0011a3d Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Wed, 4 Oct 2023 11:29:17 +0200 Subject: [PATCH 01/15] feat: mapper and repo ops deletion --- .../conversation/TypingIndicatorRepository.kt | 12 ++++- .../conversation/SendTypingEventUseCase.kt | 44 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index 5c1fdbf1db7..1e8bdfa1bdb 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -31,6 +31,8 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlin.time.DurationUnit +import kotlin.time.toDuration internal interface TypingIndicatorRepository { suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) @@ -82,12 +84,20 @@ internal class TypingIndicatorRepositoryImpl( return conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus) } + private fun cleanExpiredReceivedEvents(conversationId: ConversationId) { + userTypingCache.block { entry -> + entry[conversationId]?.apply { + this.removeAll { it.date < Clock.System.now().minus(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) } + } + } + } + companion object { const val BUFFER_SIZE = 32 // drop after this threshold + val TYPING_INDICATOR_TIMEOUT_IN_SECONDS = 5.toDuration(DurationUnit.SECONDS) } } -// todo expire by worker data class ExpiringUserTyping( val userId: UserId, val date: Instant diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt new file mode 100644 index 00000000000..9026e3b9a23 --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt @@ -0,0 +1,44 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.feature.conversation + +import com.wire.kalium.logic.data.conversation.Conversation +import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.data.id.ConversationId + +/** + * UseCase for sending a typing event to a specific [ConversationId] + * + * Underlying implementation will take care of enqueuing the event for a [Conversation.TypingIndicatorMode.STOPPED] + * after a certain amount of time. + * + */ +interface SendTypingEventUseCase { + suspend operator fun invoke( + conversationId: ConversationId, + typingStatus: Conversation.TypingIndicatorMode + ) +} + +internal class SendTypingEventUseCaseImpl( + private val typingIndicatorRepository: TypingIndicatorRepository +) : SendTypingEventUseCase { + override suspend fun invoke(conversationId: ConversationId, typingStatus: Conversation.TypingIndicatorMode) { + typingIndicatorRepository.sendTypingIndicatorStatus(conversationId, typingStatus) + } +} From d781cad5bcd7bd927d95bda80d783a724a56eaa0 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Wed, 4 Oct 2023 18:01:49 +0200 Subject: [PATCH 02/15] chore: ip --- .../conversation/TypingIndicatorRepository.kt | 39 +++++++++++-------- .../feature/conversation/ConversationScope.kt | 3 +- .../TypingIndicatorRepositoryTest.kt | 2 +- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index 1e8bdfa1bdb..c293dd7ad97 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -23,6 +23,8 @@ import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.properties.UserPropertyRepository import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.functional.fold +import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.util.safeComputeAndMutateSetValue import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow @@ -46,50 +48,55 @@ internal interface TypingIndicatorRepository { } internal class TypingIndicatorRepositoryImpl( - private val userTypingCache: ConcurrentMutableMap>, + private val incomingTypingEventsCache: ConcurrentMutableMap>, + private val outgoingStoppedQueueTypingEventsCache: ConcurrentMutableMap, private val conversationRepository: ConversationRepository, private val userPropertyRepository: UserPropertyRepository ) : TypingIndicatorRepository { - private val userTypingDataSourceFlow: MutableSharedFlow = + private val incomingTypingUserDataSourceFlow: MutableSharedFlow = MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) override suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { if (userPropertyRepository.getTypingIndicatorStatus()) { - userTypingCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) } + incomingTypingEventsCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) } .also { - userTypingDataSourceFlow.tryEmit(Unit) + incomingTypingUserDataSourceFlow.tryEmit(Unit) } } } override suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { - userTypingCache.block { entry -> + incomingTypingEventsCache.block { entry -> entry[conversationId]?.apply { this.removeAll { it.userId == userId } } }.also { - userTypingDataSourceFlow.tryEmit(Unit) + incomingTypingUserDataSourceFlow.tryEmit(Unit) } } override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { - return userTypingDataSourceFlow - .map { userTypingCache[conversationId] ?: emptySet() } - .onStart { emit(userTypingCache[conversationId] ?: emptySet()) } + return incomingTypingUserDataSourceFlow + .map { incomingTypingEventsCache[conversationId] ?: emptySet() } + .onStart { emit(incomingTypingEventsCache[conversationId] ?: emptySet()) } } override suspend fun sendTypingIndicatorStatus( conversationId: ConversationId, typingStatus: Conversation.TypingIndicatorMode ): Either { - return conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus) - } + if (userPropertyRepository.getTypingIndicatorStatus()) { + conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus) + .fold({ kaliumLogger.w("Skipping failed to send typing indicator status: $it") }) { + when (typingStatus) { + Conversation.TypingIndicatorMode.STARTED -> outgoingStoppedQueueTypingEventsCache[conversationId] = + Clock.System.now().plus(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) - private fun cleanExpiredReceivedEvents(conversationId: ConversationId) { - userTypingCache.block { entry -> - entry[conversationId]?.apply { - this.removeAll { it.date < Clock.System.now().minus(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) } - } + Conversation.TypingIndicatorMode.STOPPED -> outgoingStoppedQueueTypingEventsCache.remove(conversationId) + } + Either.Right(Unit) + } } + return Either.Right(Unit) } companion object { diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index 7e07482cadb..a72a6988765 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -266,7 +266,8 @@ class ConversationScope internal constructor( val observeArchivedUnreadConversationsCount: ObserveArchivedUnreadConversationsCountUseCase get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository) - internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl(ConcurrentMutableMap(), userPropertyRepository) + internal val typingIndicatorRepository = + TypingIndicatorRepositoryImpl(ConcurrentMutableMap(), ConcurrentMutableMap(), conversationRepository, userPropertyRepository) val observeUsersTyping: ObserveUsersTypingUseCase get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository) diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt index 7e2171ef149..117f1c14ba0 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt @@ -102,7 +102,7 @@ class TypingIndicatorRepositoryTest { } fun arrange() = this to TypingIndicatorRepositoryImpl( - userTypingCache = ConcurrentMutableMap(), + incomingTypingEventsCache = ConcurrentMutableMap(), userPropertyRepository = userPropertyRepository ) } From 25d332daf4c5ad6b9983a753e6e15bb0c79be273 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Thu, 5 Oct 2023 17:29:13 +0200 Subject: [PATCH 03/15] chore: some fun refactoring, just ideas --- .../conversation/TypingIndicatorRepository.kt | 94 +++++++++++++++++-- .../feature/conversation/ConversationScope.kt | 2 +- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index c293dd7ad97..5d990634716 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -19,6 +19,7 @@ package com.wire.kalium.logic.data.conversation import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.CoreFailure +import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl.Companion.TYPING_INDICATOR_TIMEOUT_IN_SECONDS import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.properties.UserPropertyRepository import com.wire.kalium.logic.data.user.UserId @@ -26,13 +27,21 @@ import com.wire.kalium.logic.functional.Either import com.wire.kalium.logic.functional.fold import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.util.safeComputeAndMutateSetValue +import com.wire.kalium.util.KaliumDispatcher +import com.wire.kalium.util.KaliumDispatcherImpl +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlin.coroutines.CoroutineContext import kotlin.time.DurationUnit import kotlin.time.toDuration @@ -47,13 +56,68 @@ internal interface TypingIndicatorRepository { ): Either } +/** + * Outgoing user typing sent events, cleanup manager + */ +internal class OutgoingTypingIndicatorManager( + userSessionCoroutineScope: CoroutineScope, + private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl +) : CoroutineScope by userSessionCoroutineScope { + override val coroutineContext: CoroutineContext + get() = kaliumDispatcher.default + + private val outgoingStoppedQueueTypingEventsMutex = Mutex() + private val outgoingStoppedQueueTypingEvents = mutableMapOf() + + // todo. what if we pass the convo id and the callback so we can reuse it for sending and receiving? + // as an interface function and implement specific for incoming and outgoing. + // where incoming clears by [ExpiringUserTyping] and outgoing by [Instant] + suspend fun enqueueStoppedTypingTimeout(conversationId: ConversationId, callback: () -> Unit) { + + delay(1L) + callback.invoke() + } + + fun enqueueStoppingEventAndWait(conversationId: ConversationId) { + launch { + outgoingStoppedQueueTypingEventsMutex.withLock { + if (outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { + return@launch + } + + outgoingStoppedQueueTypingEvents[conversationId] = Clock.System.now().plus(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) + delay(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) + outgoingStoppedQueueTypingEvents.remove(conversationId) + } + } + } +} + +/** + * Incoming user typing received events, cleanup manager + */ +internal class IncomingTypingIndicatorManager( + userSessionCoroutineScope: CoroutineScope, + private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl +) : CoroutineScope by userSessionCoroutineScope { + override val coroutineContext: CoroutineContext + get() = kaliumDispatcher.default + + private val incomingStoppedQueueTypingEventsMutex = Mutex() +} + +@Suppress("LongParameterList") internal class TypingIndicatorRepositoryImpl( private val incomingTypingEventsCache: ConcurrentMutableMap>, private val outgoingStoppedQueueTypingEventsCache: ConcurrentMutableMap, private val conversationRepository: ConversationRepository, - private val userPropertyRepository: UserPropertyRepository + private val userPropertyRepository: UserPropertyRepository, + userSessionCoroutineScope: CoroutineScope, + private val outgoingTypingIndicatorManager: OutgoingTypingIndicatorManager = OutgoingTypingIndicatorManager(userSessionCoroutineScope), + private val incomingTypingIndicatorManager: IncomingTypingIndicatorManager = IncomingTypingIndicatorManager(userSessionCoroutineScope) ) : TypingIndicatorRepository { + private val incomingTypingUserDataSourceFlow: MutableSharedFlow = MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) @@ -85,16 +149,34 @@ internal class TypingIndicatorRepositoryImpl( typingStatus: Conversation.TypingIndicatorMode ): Either { if (userPropertyRepository.getTypingIndicatorStatus()) { - conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus) - .fold({ kaliumLogger.w("Skipping failed to send typing indicator status: $it") }) { + when (typingStatus) { + Conversation.TypingIndicatorMode.STARTED -> { + conversationRepository.sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STARTED) + .fold({ kaliumLogger.w("Skipping failed to send typing indicator status: $it") }) { + outgoingTypingIndicatorManager.enqueueStoppingEventAndWait(conversationId) + conversationRepository.sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STOPPED) + .fold({ kaliumLogger.w("Skipping failed to send typing indicator status: $it") }) { + kaliumLogger.i("Successfully sent typing stopped indicator status") + } + } + } + + Conversation.TypingIndicatorMode.STOPPED -> { + outgoingTypingIndicatorManager.enqueueStoppingEventAndWait(conversationId) + } + } + + /*conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus) + .fold() { when (typingStatus) { - Conversation.TypingIndicatorMode.STARTED -> outgoingStoppedQueueTypingEventsCache[conversationId] = - Clock.System.now().plus(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) + Conversation.TypingIndicatorMode.STARTED -> outgoingTypingIndicatorManager.enqueueStoppingEventAndWait( + conversationId + ) Conversation.TypingIndicatorMode.STOPPED -> outgoingStoppedQueueTypingEventsCache.remove(conversationId) } Either.Right(Unit) - } + }*/ } return Either.Right(Unit) } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index a72a6988765..e43e87c592b 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -267,7 +267,7 @@ class ConversationScope internal constructor( get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository) internal val typingIndicatorRepository = - TypingIndicatorRepositoryImpl(ConcurrentMutableMap(), ConcurrentMutableMap(), conversationRepository, userPropertyRepository) + TypingIndicatorRepositoryImpl(ConcurrentMutableMap(), ConcurrentMutableMap(), conversationRepository, userPropertyRepository, scope) val observeUsersTyping: ObserveUsersTypingUseCase get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository) From 79b09e7a8d02a8fb9248350d1db3904d1a8d3169 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Thu, 5 Oct 2023 18:44:48 +0200 Subject: [PATCH 04/15] chore: some fun refactoring, just ideas --- .../conversation/TypingIndicatorRepository.kt | 90 +++++++++---------- .../feature/conversation/ConversationScope.kt | 12 ++- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index 5d990634716..e88e36cde17 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -19,7 +19,6 @@ package com.wire.kalium.logic.data.conversation import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.CoreFailure -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl.Companion.TYPING_INDICATOR_TIMEOUT_IN_SECONDS import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.properties.UserPropertyRepository import com.wire.kalium.logic.data.user.UserId @@ -57,49 +56,74 @@ internal interface TypingIndicatorRepository { } /** - * Outgoing user typing sent events, cleanup manager + * Outgoing user typing sent events manager, will send started and stopped events and enqueue stopped events + * todo, move to separate file after testing */ internal class OutgoingTypingIndicatorManager( userSessionCoroutineScope: CoroutineScope, + private val conversationRepository: ConversationRepository, private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl ) : CoroutineScope by userSessionCoroutineScope { override val coroutineContext: CoroutineContext get() = kaliumDispatcher.default private val outgoingStoppedQueueTypingEventsMutex = Mutex() - private val outgoingStoppedQueueTypingEvents = mutableMapOf() + private val outgoingStoppedQueueTypingEvents = mutableMapOf() + private val TYPING_INDICATOR_TIMEOUT_IN_SECONDS = 10.toDuration(DurationUnit.SECONDS) - // todo. what if we pass the convo id and the callback so we can reuse it for sending and receiving? - // as an interface function and implement specific for incoming and outgoing. - // where incoming clears by [ExpiringUserTyping] and outgoing by [Instant] - suspend fun enqueueStoppedTypingTimeout(conversationId: ConversationId, callback: () -> Unit) { - delay(1L) - callback.invoke() + fun sendStoppingEvent(conversationId: ConversationId) { + launch { + outgoingStoppedQueueTypingEventsMutex.withLock { + if (!outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { + return@launch + } + outgoingStoppedQueueTypingEvents.remove(conversationId) + sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STOPPED) + } + } } - fun enqueueStoppingEventAndWait(conversationId: ConversationId) { + fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) { launch { outgoingStoppedQueueTypingEventsMutex.withLock { if (outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { return@launch } + val isSent = sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STARTED) + when (isSent) { + true -> { + outgoingStoppedQueueTypingEvents[conversationId] = Unit + delay(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) + sendStoppingEvent(conversationId) + } - outgoingStoppedQueueTypingEvents[conversationId] = Clock.System.now().plus(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) - delay(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) - outgoingStoppedQueueTypingEvents.remove(conversationId) + false -> Unit // do nothing + } } } } + + private suspend fun sendTypingIndicatorStatus( + conversationId: ConversationId, + typingStatus: Conversation.TypingIndicatorMode + ): Boolean = conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus).fold({ + kaliumLogger.w("Skipping failed to send typing indicator status: $typingStatus") + false + }) { + kaliumLogger.i("Successfully sent typing started indicator status: $typingStatus") + true + } } /** * Incoming user typing received events, cleanup manager + * todo, replicate same as outgoing */ internal class IncomingTypingIndicatorManager( - userSessionCoroutineScope: CoroutineScope, +// userSessionCoroutineScope: CoroutineScope, private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl -) : CoroutineScope by userSessionCoroutineScope { +) : CoroutineScope { override val coroutineContext: CoroutineContext get() = kaliumDispatcher.default @@ -109,12 +133,9 @@ internal class IncomingTypingIndicatorManager( @Suppress("LongParameterList") internal class TypingIndicatorRepositoryImpl( private val incomingTypingEventsCache: ConcurrentMutableMap>, - private val outgoingStoppedQueueTypingEventsCache: ConcurrentMutableMap, - private val conversationRepository: ConversationRepository, private val userPropertyRepository: UserPropertyRepository, - userSessionCoroutineScope: CoroutineScope, - private val outgoingTypingIndicatorManager: OutgoingTypingIndicatorManager = OutgoingTypingIndicatorManager(userSessionCoroutineScope), - private val incomingTypingIndicatorManager: IncomingTypingIndicatorManager = IncomingTypingIndicatorManager(userSessionCoroutineScope) + private val outgoingTypingIndicatorManager: OutgoingTypingIndicatorManager, + private val incomingTypingIndicatorManager: IncomingTypingIndicatorManager ) : TypingIndicatorRepository { @@ -150,40 +171,17 @@ internal class TypingIndicatorRepositoryImpl( ): Either { if (userPropertyRepository.getTypingIndicatorStatus()) { when (typingStatus) { - Conversation.TypingIndicatorMode.STARTED -> { - conversationRepository.sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STARTED) - .fold({ kaliumLogger.w("Skipping failed to send typing indicator status: $it") }) { - outgoingTypingIndicatorManager.enqueueStoppingEventAndWait(conversationId) - conversationRepository.sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STOPPED) - .fold({ kaliumLogger.w("Skipping failed to send typing indicator status: $it") }) { - kaliumLogger.i("Successfully sent typing stopped indicator status") - } - } - } + Conversation.TypingIndicatorMode.STARTED -> + outgoingTypingIndicatorManager.sendStartedAndEnqueueStoppingEvent(conversationId) - Conversation.TypingIndicatorMode.STOPPED -> { - outgoingTypingIndicatorManager.enqueueStoppingEventAndWait(conversationId) - } + Conversation.TypingIndicatorMode.STOPPED -> outgoingTypingIndicatorManager.sendStoppingEvent(conversationId) } - - /*conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus) - .fold() { - when (typingStatus) { - Conversation.TypingIndicatorMode.STARTED -> outgoingTypingIndicatorManager.enqueueStoppingEventAndWait( - conversationId - ) - - Conversation.TypingIndicatorMode.STOPPED -> outgoingStoppedQueueTypingEventsCache.remove(conversationId) - } - Either.Right(Unit) - }*/ } return Either.Right(Unit) } companion object { const val BUFFER_SIZE = 32 // drop after this threshold - val TYPING_INDICATOR_TIMEOUT_IN_SECONDS = 5.toDuration(DurationUnit.SECONDS) } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index e43e87c592b..4564bfe63dd 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -24,9 +24,11 @@ import com.wire.kalium.logic.configuration.server.ServerConfigRepository import com.wire.kalium.logic.data.connection.ConnectionRepository import com.wire.kalium.logic.data.conversation.ConversationGroupRepository import com.wire.kalium.logic.data.conversation.ConversationRepository +import com.wire.kalium.logic.data.conversation.IncomingTypingIndicatorManager import com.wire.kalium.logic.data.conversation.MLSConversationRepository import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl +import com.wire.kalium.logic.data.conversation.OutgoingTypingIndicatorManager import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider import com.wire.kalium.logic.data.id.QualifiedIdMapper @@ -266,8 +268,16 @@ class ConversationScope internal constructor( val observeArchivedUnreadConversationsCount: ObserveArchivedUnreadConversationsCountUseCase get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository) + private val outgoingTypingIndicatorManager: OutgoingTypingIndicatorManager = + OutgoingTypingIndicatorManager(scope, conversationRepository) + internal val typingIndicatorRepository = - TypingIndicatorRepositoryImpl(ConcurrentMutableMap(), ConcurrentMutableMap(), conversationRepository, userPropertyRepository, scope) + TypingIndicatorRepositoryImpl( + ConcurrentMutableMap(), + userPropertyRepository, + outgoingTypingIndicatorManager, + IncomingTypingIndicatorManager() + ) val observeUsersTyping: ObserveUsersTypingUseCase get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository) From e5278f1b5be1c06dd346397fd0cdea26419552be Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Fri, 6 Oct 2023 12:57:57 +0200 Subject: [PATCH 05/15] chore: some fun refactoring, just ideas --- .../TypingIndicatorIncomingEventManager.kt | 86 ++++++++++++ .../TypingIndicatorOutgoingEventManager.kt | 101 +++++++++++++++ .../conversation/TypingIndicatorRepository.kt | 122 +----------------- .../feature/conversation/ConversationScope.kt | 21 +-- 4 files changed, 207 insertions(+), 123 deletions(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt new file mode 100644 index 00000000000..68313f5fb8f --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt @@ -0,0 +1,86 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.data.conversation + +import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.util.KaliumDispatcher +import com.wire.kalium.util.KaliumDispatcherImpl +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.datetime.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.time.DurationUnit +import kotlin.time.toDuration + +/** + * Incoming user typing received events, cleanup manager + * todo, replicate same as outgoing + */ +internal class TypingIndicatorIncomingEventManager( + userSessionCoroutineScope: CoroutineScope, + private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl +) : CoroutineScope by userSessionCoroutineScope { + override val coroutineContext: CoroutineContext + get() = kaliumDispatcher.default + + private val incomingStoppedQueueTypingEventsMutex = Mutex() + private val incomingStoppedQueueTypingEvents = mutableMapOf>() + private val typingIndicatorTimeoutInSeconds = 30.toDuration(DurationUnit.SECONDS) + + private val incomingTypingUserDataSourceFlow: MutableSharedFlow = + MutableSharedFlow(extraBufferCapacity = TypingIndicatorRepositoryImpl.BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) + + suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { + incomingStoppedQueueTypingEventsMutex.withLock { + val values = + if (incomingStoppedQueueTypingEvents.containsKey(conversationId)) incomingStoppedQueueTypingEvents[conversationId]!! else mutableSetOf() + + values.add(ExpiringUserTyping(userId, Clock.System.now())) + incomingStoppedQueueTypingEvents[conversationId] = values.also { + incomingTypingUserDataSourceFlow.tryEmit(Unit) + } + + delay(typingIndicatorTimeoutInSeconds) + removeTypingUserInConversation(conversationId, userId) + } + } + + suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { + incomingStoppedQueueTypingEventsMutex.withLock { + incomingStoppedQueueTypingEvents.also { entry -> + entry[conversationId]?.apply { this.removeAll { it.userId == userId } } + }.also { + incomingTypingUserDataSourceFlow.tryEmit(Unit) + } + } + } + + suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { + return incomingTypingUserDataSourceFlow + .map { incomingStoppedQueueTypingEvents[conversationId] ?: emptySet() } + .onStart { emit(incomingStoppedQueueTypingEvents[conversationId] ?: emptySet()) } + } +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt new file mode 100644 index 00000000000..dfff9e963a8 --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt @@ -0,0 +1,101 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.data.conversation + +import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.functional.fold +import com.wire.kalium.logic.kaliumLogger +import com.wire.kalium.util.KaliumDispatcher +import com.wire.kalium.util.KaliumDispatcherImpl +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.coroutines.CoroutineContext +import kotlin.time.DurationUnit +import kotlin.time.toDuration + +/** + * Outgoing user typing sent events manager. + * + * - It will send started and stopped events. + * - For each started sent event, will 'enqueue' a stopped event after a timeout. + * + */ +internal class TypingIndicatorOutgoingEventManager( + private val conversationRepository: ConversationRepository, + private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, + userSessionCoroutineScope: CoroutineScope +) : CoroutineScope by userSessionCoroutineScope { + override val coroutineContext: CoroutineContext + get() = kaliumDispatcher.default + + private val outgoingStoppedQueueTypingEventsMutex = Mutex() + private val outgoingStoppedQueueTypingEvents = mutableMapOf() + private val typingIndicatorTimeoutInSeconds = 10.toDuration(DurationUnit.SECONDS) + + /** + * Sends a stopping event and removes it from the 'queue'. + */ + fun sendStoppingEvent(conversationId: ConversationId) { + launch { + outgoingStoppedQueueTypingEventsMutex.withLock { + if (!outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { + return@launch + } + outgoingStoppedQueueTypingEvents.remove(conversationId) + sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STOPPED) + } + } + } + + /** + * Sends a started event and enqueues a stopping event if sent successfully. + */ + fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) { + launch { + outgoingStoppedQueueTypingEventsMutex.withLock { + if (outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { + return@launch + } + val isSent = sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STARTED) + when (isSent) { + true -> { + outgoingStoppedQueueTypingEvents[conversationId] = Unit + delay(typingIndicatorTimeoutInSeconds) + sendStoppingEvent(conversationId) + } + + false -> Unit // do nothing + } + } + } + } + + private suspend fun sendTypingIndicatorStatus( + conversationId: ConversationId, + typingStatus: Conversation.TypingIndicatorMode + ): Boolean = conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus).fold({ + kaliumLogger.w("Skipping failed to send typing indicator status: $typingStatus") + false + }) { + kaliumLogger.i("Successfully sent typing started indicator status: $typingStatus") + true + } +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index e88e36cde17..cb0b3ff083b 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -17,32 +17,13 @@ */ package com.wire.kalium.logic.data.conversation -import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.CoreFailure import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.properties.UserPropertyRepository import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.functional.Either -import com.wire.kalium.logic.functional.fold -import com.wire.kalium.logic.kaliumLogger -import com.wire.kalium.logic.util.safeComputeAndMutateSetValue -import com.wire.kalium.util.KaliumDispatcher -import com.wire.kalium.util.KaliumDispatcherImpl -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.datetime.Clock import kotlinx.datetime.Instant -import kotlin.coroutines.CoroutineContext -import kotlin.time.DurationUnit -import kotlin.time.toDuration internal interface TypingIndicatorRepository { suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) @@ -55,114 +36,25 @@ internal interface TypingIndicatorRepository { ): Either } -/** - * Outgoing user typing sent events manager, will send started and stopped events and enqueue stopped events - * todo, move to separate file after testing - */ -internal class OutgoingTypingIndicatorManager( - userSessionCoroutineScope: CoroutineScope, - private val conversationRepository: ConversationRepository, - private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl -) : CoroutineScope by userSessionCoroutineScope { - override val coroutineContext: CoroutineContext - get() = kaliumDispatcher.default - - private val outgoingStoppedQueueTypingEventsMutex = Mutex() - private val outgoingStoppedQueueTypingEvents = mutableMapOf() - private val TYPING_INDICATOR_TIMEOUT_IN_SECONDS = 10.toDuration(DurationUnit.SECONDS) - - - fun sendStoppingEvent(conversationId: ConversationId) { - launch { - outgoingStoppedQueueTypingEventsMutex.withLock { - if (!outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { - return@launch - } - outgoingStoppedQueueTypingEvents.remove(conversationId) - sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STOPPED) - } - } - } - - fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) { - launch { - outgoingStoppedQueueTypingEventsMutex.withLock { - if (outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { - return@launch - } - val isSent = sendTypingIndicatorStatus(conversationId, Conversation.TypingIndicatorMode.STARTED) - when (isSent) { - true -> { - outgoingStoppedQueueTypingEvents[conversationId] = Unit - delay(TYPING_INDICATOR_TIMEOUT_IN_SECONDS) - sendStoppingEvent(conversationId) - } - - false -> Unit // do nothing - } - } - } - } - - private suspend fun sendTypingIndicatorStatus( - conversationId: ConversationId, - typingStatus: Conversation.TypingIndicatorMode - ): Boolean = conversationRepository.sendTypingIndicatorStatus(conversationId, typingStatus).fold({ - kaliumLogger.w("Skipping failed to send typing indicator status: $typingStatus") - false - }) { - kaliumLogger.i("Successfully sent typing started indicator status: $typingStatus") - true - } -} - -/** - * Incoming user typing received events, cleanup manager - * todo, replicate same as outgoing - */ -internal class IncomingTypingIndicatorManager( -// userSessionCoroutineScope: CoroutineScope, - private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl -) : CoroutineScope { - override val coroutineContext: CoroutineContext - get() = kaliumDispatcher.default - - private val incomingStoppedQueueTypingEventsMutex = Mutex() -} - @Suppress("LongParameterList") internal class TypingIndicatorRepositoryImpl( - private val incomingTypingEventsCache: ConcurrentMutableMap>, private val userPropertyRepository: UserPropertyRepository, - private val outgoingTypingIndicatorManager: OutgoingTypingIndicatorManager, - private val incomingTypingIndicatorManager: IncomingTypingIndicatorManager + private val typingIndicatorOutgoingEventManager: TypingIndicatorOutgoingEventManager, + private val typingIndicatorIncomingEventManager: TypingIndicatorIncomingEventManager ) : TypingIndicatorRepository { - - private val incomingTypingUserDataSourceFlow: MutableSharedFlow = - MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) - override suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { if (userPropertyRepository.getTypingIndicatorStatus()) { - incomingTypingEventsCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) } - .also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) - } + typingIndicatorIncomingEventManager.addTypingUserInConversation(conversationId, userId) } } override suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { - incomingTypingEventsCache.block { entry -> - entry[conversationId]?.apply { this.removeAll { it.userId == userId } } - }.also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) - } + typingIndicatorIncomingEventManager.removeTypingUserInConversation(conversationId, userId) } override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { - return incomingTypingUserDataSourceFlow - .map { incomingTypingEventsCache[conversationId] ?: emptySet() } - .onStart { emit(incomingTypingEventsCache[conversationId] ?: emptySet()) } + return typingIndicatorIncomingEventManager.observeUsersTyping(conversationId) } override suspend fun sendTypingIndicatorStatus( @@ -172,9 +64,9 @@ internal class TypingIndicatorRepositoryImpl( if (userPropertyRepository.getTypingIndicatorStatus()) { when (typingStatus) { Conversation.TypingIndicatorMode.STARTED -> - outgoingTypingIndicatorManager.sendStartedAndEnqueueStoppingEvent(conversationId) + typingIndicatorOutgoingEventManager.sendStartedAndEnqueueStoppingEvent(conversationId) - Conversation.TypingIndicatorMode.STOPPED -> outgoingTypingIndicatorManager.sendStoppingEvent(conversationId) + Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorOutgoingEventManager.sendStoppingEvent(conversationId) } } return Either.Right(Unit) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index 4564bfe63dd..1acf409715c 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -18,17 +18,16 @@ package com.wire.kalium.logic.feature.conversation -import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.cache.SelfConversationIdProvider import com.wire.kalium.logic.configuration.server.ServerConfigRepository import com.wire.kalium.logic.data.connection.ConnectionRepository import com.wire.kalium.logic.data.conversation.ConversationGroupRepository import com.wire.kalium.logic.data.conversation.ConversationRepository -import com.wire.kalium.logic.data.conversation.IncomingTypingIndicatorManager import com.wire.kalium.logic.data.conversation.MLSConversationRepository import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl -import com.wire.kalium.logic.data.conversation.OutgoingTypingIndicatorManager +import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingEventManager +import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingEventManager import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider import com.wire.kalium.logic.data.id.QualifiedIdMapper @@ -268,17 +267,23 @@ class ConversationScope internal constructor( val observeArchivedUnreadConversationsCount: ObserveArchivedUnreadConversationsCountUseCase get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository) - private val outgoingTypingIndicatorManager: OutgoingTypingIndicatorManager = - OutgoingTypingIndicatorManager(scope, conversationRepository) + private val typingIndicatorOutgoingEventManager: TypingIndicatorOutgoingEventManager = + TypingIndicatorOutgoingEventManager(conversationRepository = conversationRepository, userSessionCoroutineScope = scope) + + + private val typingIndicatorIncomingEventManager: TypingIndicatorIncomingEventManager = + TypingIndicatorIncomingEventManager(userSessionCoroutineScope = scope) internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl( - ConcurrentMutableMap(), userPropertyRepository, - outgoingTypingIndicatorManager, - IncomingTypingIndicatorManager() + typingIndicatorOutgoingEventManager, + typingIndicatorIncomingEventManager ) + val sendTypingEventUseCase: SendTypingEventUseCase + get() = SendTypingEventUseCaseImpl(typingIndicatorRepository) + val observeUsersTyping: ObserveUsersTypingUseCase get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository) From 08bd6b1c93a5d4c84350e20bcf64cf65102e1852 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Fri, 6 Oct 2023 14:23:59 +0200 Subject: [PATCH 06/15] chore: working base --- .../conversation/TypingIndicatorRepository.kt | 30 +++++++++++++++---- .../feature/conversation/ConversationScope.kt | 5 ++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index cb0b3ff083b..a52b612674b 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -17,12 +17,19 @@ */ package com.wire.kalium.logic.data.conversation +import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.CoreFailure import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.properties.UserPropertyRepository import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.util.safeComputeAndMutateSetValue +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart +import kotlinx.datetime.Clock import kotlinx.datetime.Instant internal interface TypingIndicatorRepository { @@ -38,23 +45,36 @@ internal interface TypingIndicatorRepository { @Suppress("LongParameterList") internal class TypingIndicatorRepositoryImpl( + private val incomingTypingEventsCache: ConcurrentMutableMap>, private val userPropertyRepository: UserPropertyRepository, - private val typingIndicatorOutgoingEventManager: TypingIndicatorOutgoingEventManager, - private val typingIndicatorIncomingEventManager: TypingIndicatorIncomingEventManager + private val typingIndicatorOutgoingEventManager: TypingIndicatorOutgoingEventManager ) : TypingIndicatorRepository { + + private val incomingTypingUserDataSourceFlow: MutableSharedFlow = + MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) + override suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { if (userPropertyRepository.getTypingIndicatorStatus()) { - typingIndicatorIncomingEventManager.addTypingUserInConversation(conversationId, userId) + incomingTypingEventsCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) } + .also { + incomingTypingUserDataSourceFlow.tryEmit(Unit) + } } } override suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { - typingIndicatorIncomingEventManager.removeTypingUserInConversation(conversationId, userId) + incomingTypingEventsCache.block { entry -> + entry[conversationId]?.apply { this.removeAll { it.userId == userId } } + }.also { + incomingTypingUserDataSourceFlow.tryEmit(Unit) + } } override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { - return typingIndicatorIncomingEventManager.observeUsersTyping(conversationId) + return incomingTypingUserDataSourceFlow + .map { incomingTypingEventsCache[conversationId] ?: emptySet() } + .onStart { emit(incomingTypingEventsCache[conversationId] ?: emptySet()) } } override suspend fun sendTypingIndicatorStatus( diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index 1acf409715c..fc8d71ed2fc 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -18,6 +18,7 @@ package com.wire.kalium.logic.feature.conversation +import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.cache.SelfConversationIdProvider import com.wire.kalium.logic.configuration.server.ServerConfigRepository import com.wire.kalium.logic.data.connection.ConnectionRepository @@ -276,9 +277,9 @@ class ConversationScope internal constructor( internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl( + ConcurrentMutableMap(), userPropertyRepository, - typingIndicatorOutgoingEventManager, - typingIndicatorIncomingEventManager + typingIndicatorOutgoingEventManager ) val sendTypingEventUseCase: SendTypingEventUseCase From 3f8bdda94cc14e0eda58c00d259b02e4a7bb6057 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Fri, 6 Oct 2023 17:59:39 +0200 Subject: [PATCH 07/15] chore: clear indicators --- .../data/conversation/TypingIndicatorRepository.kt | 10 ++++++++++ .../com/wire/kalium/logic/feature/UserSessionScope.kt | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index a52b612674b..2702240c563 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -41,6 +41,8 @@ internal interface TypingIndicatorRepository { conversationId: ConversationId, typingStatus: Conversation.TypingIndicatorMode ): Either + + suspend fun clearExpiredTypingIndicators(): Unit } @Suppress("LongParameterList") @@ -92,6 +94,14 @@ internal class TypingIndicatorRepositoryImpl( return Either.Right(Unit) } + override suspend fun clearExpiredTypingIndicators() { + incomingTypingEventsCache.block { entry -> + entry.clear() + }.also { + incomingTypingUserDataSourceFlow.tryEmit(Unit) + } + } + companion object { const val BUFFER_SIZE = 32 // drop after this threshold } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index bf72978ad2a..33f5580b536 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -1571,6 +1571,10 @@ class UserSessionScope internal constructor( launch { mlsConversationsVerificationStatusesHandler.invoke() } + + launch { + conversations.typingIndicatorRepository.clearExpiredTypingIndicators() + } } fun onDestroy() { From 821950978ee8ba6cc46da497083d3e1a1a16fc8e Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Fri, 6 Oct 2023 18:21:24 +0200 Subject: [PATCH 08/15] chore: clear indicators --- .../kalium/logic/feature/UserSessionScope.kt | 14 +++++--- .../TypingIndicatorSyncManager.kt | 36 +++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index 33f5580b536..d84a5e773ee 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -156,8 +156,6 @@ import com.wire.kalium.logic.feature.connection.SyncConnectionsUseCaseImpl import com.wire.kalium.logic.feature.conversation.ConversationScope import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManager import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManagerImpl -import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase -import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCase import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCaseImpl import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationsUseCase @@ -170,17 +168,22 @@ import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManage import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManagerImpl import com.wire.kalium.logic.feature.conversation.MLSConversationsVerificationStatusesHandler import com.wire.kalium.logic.feature.conversation.MLSConversationsVerificationStatusesHandlerImpl +import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase +import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCase import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCaseImpl import com.wire.kalium.logic.feature.conversation.RecoverMLSConversationsUseCase import com.wire.kalium.logic.feature.conversation.RecoverMLSConversationsUseCaseImpl import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCase import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCaseImpl +import com.wire.kalium.logic.feature.conversation.TypingIndicatorSyncManager import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterialsManager import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterialsManagerImpl import com.wire.kalium.logic.feature.debug.DebugScope import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCase import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCaseImpl +import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase +import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCaseImpl import com.wire.kalium.logic.feature.featureConfig.handler.AppLockConfigHandler import com.wire.kalium.logic.feature.featureConfig.handler.ClassifiedDomainsConfigHandler import com.wire.kalium.logic.feature.featureConfig.handler.ConferenceCallingConfigHandler @@ -190,8 +193,6 @@ import com.wire.kalium.logic.feature.featureConfig.handler.GuestRoomConfigHandle import com.wire.kalium.logic.feature.featureConfig.handler.MLSConfigHandler import com.wire.kalium.logic.feature.featureConfig.handler.SecondFactorPasswordChallengeConfigHandler import com.wire.kalium.logic.feature.featureConfig.handler.SelfDeletingMessagesConfigHandler -import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase -import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCaseImpl import com.wire.kalium.logic.feature.keypackage.KeyPackageManager import com.wire.kalium.logic.feature.keypackage.KeyPackageManagerImpl import com.wire.kalium.logic.feature.message.AddSystemMessageToAllConversationsUseCase @@ -1532,6 +1533,9 @@ class UserSessionScope internal constructor( MLSConversationsVerificationStatusesHandlerImpl(conversationRepository, persistMessage, mlsConversationRepository, userId) } + private val typingIndicatorSyncManager: TypingIndicatorSyncManager = + TypingIndicatorSyncManager(lazy { conversations.typingIndicatorRepository }, observeSyncState) + init { launch { apiMigrationManager.performMigrations() @@ -1573,7 +1577,7 @@ class UserSessionScope internal constructor( } launch { - conversations.typingIndicatorRepository.clearExpiredTypingIndicators() + typingIndicatorSyncManager.execute() } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt new file mode 100644 index 00000000000..7ed32baf4e6 --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt @@ -0,0 +1,36 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.feature.conversation + +import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.kaliumLogger +import com.wire.kalium.logic.sync.ObserveSyncStateUseCase +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.distinctUntilChanged + +internal class TypingIndicatorSyncManager( + private val typingIndicatorRepository: Lazy, + private val observeSyncStateUseCase: ObserveSyncStateUseCase +) { + suspend fun execute() { + observeSyncStateUseCase().distinctUntilChanged().collectLatest { + kaliumLogger.d("Periodically clearing expired typing indicators") + typingIndicatorRepository.value.clearExpiredTypingIndicators() + } + } +} From f515f8f76c2ef0e14935501bf1df6de335c195ae Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Fri, 6 Oct 2023 18:44:42 +0200 Subject: [PATCH 09/15] chore: clear indicators --- .../TypingIndicatorIncomingEventManager.kt | 86 ------------------- .../conversation/TypingIndicatorRepository.kt | 38 ++------ ...ger.kt => TypingIndicatorSenderHandler.kt} | 2 +- .../feature/conversation/ConversationScope.kt | 15 ++-- .../conversation/ObserveUsersTypingUseCase.kt | 2 +- .../TypingIndicatorSyncManager.kt | 5 +- 6 files changed, 20 insertions(+), 128 deletions(-) delete mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt rename logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/{TypingIndicatorOutgoingEventManager.kt => TypingIndicatorSenderHandler.kt} (98%) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt deleted file mode 100644 index 68313f5fb8f..00000000000 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingEventManager.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Wire - * Copyright (C) 2023 Wire Swiss GmbH - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ -package com.wire.kalium.logic.data.conversation - -import com.wire.kalium.logic.data.id.ConversationId -import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.util.KaliumDispatcher -import com.wire.kalium.util.KaliumDispatcherImpl -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.datetime.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.time.DurationUnit -import kotlin.time.toDuration - -/** - * Incoming user typing received events, cleanup manager - * todo, replicate same as outgoing - */ -internal class TypingIndicatorIncomingEventManager( - userSessionCoroutineScope: CoroutineScope, - private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl -) : CoroutineScope by userSessionCoroutineScope { - override val coroutineContext: CoroutineContext - get() = kaliumDispatcher.default - - private val incomingStoppedQueueTypingEventsMutex = Mutex() - private val incomingStoppedQueueTypingEvents = mutableMapOf>() - private val typingIndicatorTimeoutInSeconds = 30.toDuration(DurationUnit.SECONDS) - - private val incomingTypingUserDataSourceFlow: MutableSharedFlow = - MutableSharedFlow(extraBufferCapacity = TypingIndicatorRepositoryImpl.BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) - - suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { - incomingStoppedQueueTypingEventsMutex.withLock { - val values = - if (incomingStoppedQueueTypingEvents.containsKey(conversationId)) incomingStoppedQueueTypingEvents[conversationId]!! else mutableSetOf() - - values.add(ExpiringUserTyping(userId, Clock.System.now())) - incomingStoppedQueueTypingEvents[conversationId] = values.also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) - } - - delay(typingIndicatorTimeoutInSeconds) - removeTypingUserInConversation(conversationId, userId) - } - } - - suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { - incomingStoppedQueueTypingEventsMutex.withLock { - incomingStoppedQueueTypingEvents.also { entry -> - entry[conversationId]?.apply { this.removeAll { it.userId == userId } } - }.also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) - } - } - } - - suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { - return incomingTypingUserDataSourceFlow - .map { incomingStoppedQueueTypingEvents[conversationId] ?: emptySet() } - .onStart { emit(incomingStoppedQueueTypingEvents[conversationId] ?: emptySet()) } - } -} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index 2702240c563..eb7813f9d5b 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -29,36 +29,32 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant internal interface TypingIndicatorRepository { suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) - suspend fun observeUsersTyping(conversationId: ConversationId): Flow> + suspend fun observeUsersTyping(conversationId: ConversationId): Flow> suspend fun sendTypingIndicatorStatus( conversationId: ConversationId, typingStatus: Conversation.TypingIndicatorMode ): Either - suspend fun clearExpiredTypingIndicators(): Unit + suspend fun clearExpiredTypingIndicators() } -@Suppress("LongParameterList") internal class TypingIndicatorRepositoryImpl( - private val incomingTypingEventsCache: ConcurrentMutableMap>, + private val incomingTypingEventsCache: ConcurrentMutableMap>, private val userPropertyRepository: UserPropertyRepository, - private val typingIndicatorOutgoingEventManager: TypingIndicatorOutgoingEventManager + private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler ) : TypingIndicatorRepository { - private val incomingTypingUserDataSourceFlow: MutableSharedFlow = MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) override suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { if (userPropertyRepository.getTypingIndicatorStatus()) { - incomingTypingEventsCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) } + incomingTypingEventsCache.safeComputeAndMutateSetValue(conversationId) { userId } .also { incomingTypingUserDataSourceFlow.tryEmit(Unit) } @@ -67,13 +63,13 @@ internal class TypingIndicatorRepositoryImpl( override suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { incomingTypingEventsCache.block { entry -> - entry[conversationId]?.apply { this.removeAll { it.userId == userId } } + entry[conversationId]?.apply { this.removeAll { it == userId } } }.also { incomingTypingUserDataSourceFlow.tryEmit(Unit) } } - override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { + override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { return incomingTypingUserDataSourceFlow .map { incomingTypingEventsCache[conversationId] ?: emptySet() } .onStart { emit(incomingTypingEventsCache[conversationId] ?: emptySet()) } @@ -86,9 +82,9 @@ internal class TypingIndicatorRepositoryImpl( if (userPropertyRepository.getTypingIndicatorStatus()) { when (typingStatus) { Conversation.TypingIndicatorMode.STARTED -> - typingIndicatorOutgoingEventManager.sendStartedAndEnqueueStoppingEvent(conversationId) + typingIndicatorSenderHandler.sendStartedAndEnqueueStoppingEvent(conversationId) - Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorOutgoingEventManager.sendStoppingEvent(conversationId) + Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorSenderHandler.sendStoppingEvent(conversationId) } } return Either.Right(Unit) @@ -106,19 +102,3 @@ internal class TypingIndicatorRepositoryImpl( const val BUFFER_SIZE = 32 // drop after this threshold } } - -data class ExpiringUserTyping( - val userId: UserId, - val date: Instant -) { - override fun equals(other: Any?): Boolean { - return other != null && when (other) { - is ExpiringUserTyping -> other.userId == this.userId - else -> false - } - } - - override fun hashCode(): Int { - return this.userId.hashCode() - } -} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt similarity index 98% rename from logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt rename to logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt index dfff9e963a8..f00880748a7 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingEventManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt @@ -38,7 +38,7 @@ import kotlin.time.toDuration * - For each started sent event, will 'enqueue' a stopped event after a timeout. * */ -internal class TypingIndicatorOutgoingEventManager( +internal class TypingIndicatorSenderHandler( private val conversationRepository: ConversationRepository, private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, userSessionCoroutineScope: CoroutineScope diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index fc8d71ed2fc..a9b94feb5d4 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -27,8 +27,7 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.conversation.MLSConversationRepository import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl -import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingEventManager -import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingEventManager +import com.wire.kalium.logic.data.conversation.TypingIndicatorSenderHandler import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider import com.wire.kalium.logic.data.id.QualifiedIdMapper @@ -268,21 +267,17 @@ class ConversationScope internal constructor( val observeArchivedUnreadConversationsCount: ObserveArchivedUnreadConversationsCountUseCase get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository) - private val typingIndicatorOutgoingEventManager: TypingIndicatorOutgoingEventManager = - TypingIndicatorOutgoingEventManager(conversationRepository = conversationRepository, userSessionCoroutineScope = scope) - - - private val typingIndicatorIncomingEventManager: TypingIndicatorIncomingEventManager = - TypingIndicatorIncomingEventManager(userSessionCoroutineScope = scope) + private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler = + TypingIndicatorSenderHandler(conversationRepository = conversationRepository, userSessionCoroutineScope = scope) internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl( ConcurrentMutableMap(), userPropertyRepository, - typingIndicatorOutgoingEventManager + typingIndicatorSenderHandler ) - val sendTypingEventUseCase: SendTypingEventUseCase + val sendTypingEvent: SendTypingEventUseCase get() = SendTypingEventUseCaseImpl(typingIndicatorRepository) val observeUsersTyping: ObserveUsersTypingUseCase diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt index e7a1e9ee880..4d6643ed372 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt @@ -44,7 +44,7 @@ internal class ObserveUsersTypingUseCaseImpl( ) : ObserveUsersTypingUseCase { override suspend operator fun invoke(conversationId: ConversationId): Flow> = withContext(dispatcher.io) { typingIndicatorRepository.observeUsersTyping(conversationId).map { usersEntries -> - userRepository.getUsersSummaryByIds(usersEntries.map { it.userId }).fold({ + userRepository.getUsersSummaryByIds(usersEntries.map { it }).fold({ kaliumLogger.w("Users not found locally, skipping... $it") emptySet() }, { it.toSet() }) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt index 7ed32baf4e6..295584ca66a 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt @@ -27,9 +27,12 @@ internal class TypingIndicatorSyncManager( private val typingIndicatorRepository: Lazy, private val observeSyncStateUseCase: ObserveSyncStateUseCase ) { + /** + * Periodically clears and drop orphaned typing indicators, so we don't keep them forever. + */ suspend fun execute() { observeSyncStateUseCase().distinctUntilChanged().collectLatest { - kaliumLogger.d("Periodically clearing expired typing indicators") + kaliumLogger.d("Starting clear of orphaned typing indicators...") typingIndicatorRepository.value.clearExpiredTypingIndicators() } } From 36598465e57bcd33073765489587da052aa92297 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Mon, 9 Oct 2023 10:18:50 +0200 Subject: [PATCH 10/15] chore: notes --- .../logic/data/conversation/TypingIndicatorRepository.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt index eb7813f9d5b..d6465de4749 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt @@ -102,3 +102,8 @@ internal class TypingIndicatorRepositoryImpl( const val BUFFER_SIZE = 32 // drop after this threshold } } + +// todo to cleanup pr pt2. +// redo rename +// separate incoming and outgoing into 2 repos +// adjust and test From f2ce0e844948cb8caaad76cccf28ea9cbac4f55b Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Mon, 9 Oct 2023 11:12:10 +0200 Subject: [PATCH 11/15] chore: refactor into new classes splitted --- ...t => TypingIndicatorIncomingRepository.kt} | 53 ++++++------------- .../TypingIndicatorOutgoingRepository.kt | 53 +++++++++++++++++++ ...pper.kt => TypingIndicatorStatusMapper.kt} | 0 .../kalium/logic/feature/UserSessionScope.kt | 4 +- .../feature/conversation/ConversationScope.kt | 20 ++++--- .../conversation/ObserveUsersTypingUseCase.kt | 6 +-- .../conversation/SendTypingEventUseCase.kt | 4 +- .../TypingIndicatorSyncManager.kt | 6 +-- .../handler/TypingIndicatorHandler.kt | 8 +-- .../TypingIndicatorRepositoryTest.kt | 8 +-- .../handler/TypingIndicatorHandlerTest.kt | 30 +++++------ 11 files changed, 111 insertions(+), 81 deletions(-) rename logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/{TypingIndicatorRepository.kt => TypingIndicatorIncomingRepository.kt} (56%) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepository.kt rename logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/{TypingIndicatorModeMapper.kt => TypingIndicatorStatusMapper.kt} (100%) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt similarity index 56% rename from logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt rename to logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt index d6465de4749..2bf759f1a60 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt @@ -18,11 +18,9 @@ package com.wire.kalium.logic.data.conversation import co.touchlab.stately.collections.ConcurrentMutableMap -import com.wire.kalium.logic.CoreFailure import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.properties.UserPropertyRepository import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.logic.functional.Either import com.wire.kalium.logic.util.safeComputeAndMutateSetValue import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow @@ -30,71 +28,50 @@ import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart -internal interface TypingIndicatorRepository { +internal interface TypingIndicatorIncomingRepository { suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) suspend fun observeUsersTyping(conversationId: ConversationId): Flow> - - suspend fun sendTypingIndicatorStatus( - conversationId: ConversationId, - typingStatus: Conversation.TypingIndicatorMode - ): Either - suspend fun clearExpiredTypingIndicators() } -internal class TypingIndicatorRepositoryImpl( - private val incomingTypingEventsCache: ConcurrentMutableMap>, - private val userPropertyRepository: UserPropertyRepository, - private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler -) : TypingIndicatorRepository { +internal class TypingIndicatorIncomingRepositoryImpl( + private val userTypingCache: ConcurrentMutableMap>, + private val userPropertyRepository: UserPropertyRepository +) : TypingIndicatorIncomingRepository { - private val incomingTypingUserDataSourceFlow: MutableSharedFlow = + private val userTypingDataSourceFlow: MutableSharedFlow = MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE, onBufferOverflow = BufferOverflow.DROP_OLDEST) override suspend fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) { if (userPropertyRepository.getTypingIndicatorStatus()) { - incomingTypingEventsCache.safeComputeAndMutateSetValue(conversationId) { userId } + userTypingCache.safeComputeAndMutateSetValue(conversationId) { userId } .also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) + userTypingDataSourceFlow.tryEmit(Unit) } } } override suspend fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) { - incomingTypingEventsCache.block { entry -> + userTypingCache.block { entry -> entry[conversationId]?.apply { this.removeAll { it == userId } } }.also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) + userTypingDataSourceFlow.tryEmit(Unit) } } override suspend fun observeUsersTyping(conversationId: ConversationId): Flow> { - return incomingTypingUserDataSourceFlow - .map { incomingTypingEventsCache[conversationId] ?: emptySet() } - .onStart { emit(incomingTypingEventsCache[conversationId] ?: emptySet()) } + return userTypingDataSourceFlow + .map { userTypingCache[conversationId] ?: emptySet() } + .onStart { emit(userTypingCache[conversationId] ?: emptySet()) } } - override suspend fun sendTypingIndicatorStatus( - conversationId: ConversationId, - typingStatus: Conversation.TypingIndicatorMode - ): Either { - if (userPropertyRepository.getTypingIndicatorStatus()) { - when (typingStatus) { - Conversation.TypingIndicatorMode.STARTED -> - typingIndicatorSenderHandler.sendStartedAndEnqueueStoppingEvent(conversationId) - - Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorSenderHandler.sendStoppingEvent(conversationId) - } - } - return Either.Right(Unit) - } override suspend fun clearExpiredTypingIndicators() { - incomingTypingEventsCache.block { entry -> + userTypingCache.block { entry -> entry.clear() }.also { - incomingTypingUserDataSourceFlow.tryEmit(Unit) + userTypingDataSourceFlow.tryEmit(Unit) } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepository.kt new file mode 100644 index 00000000000..91b08863df4 --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepository.kt @@ -0,0 +1,53 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.data.conversation + +import com.wire.kalium.logic.CoreFailure +import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.data.properties.UserPropertyRepository +import com.wire.kalium.logic.functional.Either + +internal interface TypingIndicatorOutgoingRepository { + suspend fun sendTypingIndicatorStatus( + conversationId: ConversationId, + typingStatus: Conversation.TypingIndicatorMode + ): Either + +} + +internal class TypingIndicatorOutgoingRepositoryImpl( + private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler, + private val userPropertyRepository: UserPropertyRepository +) : TypingIndicatorOutgoingRepository { + + override suspend fun sendTypingIndicatorStatus( + conversationId: ConversationId, + typingStatus: Conversation.TypingIndicatorMode + ): Either { + if (userPropertyRepository.getTypingIndicatorStatus()) { + when (typingStatus) { + Conversation.TypingIndicatorMode.STARTED -> + typingIndicatorSenderHandler.sendStartedAndEnqueueStoppingEvent(conversationId) + + Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorSenderHandler.sendStoppingEvent(conversationId) + } + } + return Either.Right(Unit) + } + +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorModeMapper.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorStatusMapper.kt similarity index 100% rename from logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorModeMapper.kt rename to logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorStatusMapper.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index d84a5e773ee..58202a0b8f0 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -1163,7 +1163,7 @@ class UserSessionScope internal constructor( ) private val typingIndicatorHandler: TypingIndicatorHandler - get() = TypingIndicatorHandlerImpl(userId, conversations.typingIndicatorRepository) + get() = TypingIndicatorHandlerImpl(userId, conversations.typingIndicatorIncomingRepository) private val conversationEventReceiver: ConversationEventReceiver by lazy { ConversationEventReceiverImpl( @@ -1534,7 +1534,7 @@ class UserSessionScope internal constructor( } private val typingIndicatorSyncManager: TypingIndicatorSyncManager = - TypingIndicatorSyncManager(lazy { conversations.typingIndicatorRepository }, observeSyncState) + TypingIndicatorSyncManager(lazy { conversations.typingIndicatorIncomingRepository }, observeSyncState) init { launch { diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index a9b94feb5d4..c877116a605 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -27,8 +27,9 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.conversation.MLSConversationRepository import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl +import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingRepositoryImpl +import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepositoryImpl import com.wire.kalium.logic.data.conversation.TypingIndicatorSenderHandler -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider import com.wire.kalium.logic.data.id.QualifiedIdMapper import com.wire.kalium.logic.data.message.PersistMessageUseCase @@ -270,17 +271,22 @@ class ConversationScope internal constructor( private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler = TypingIndicatorSenderHandler(conversationRepository = conversationRepository, userSessionCoroutineScope = scope) - internal val typingIndicatorRepository = - TypingIndicatorRepositoryImpl( + internal val typingIndicatorIncomingRepository = + TypingIndicatorIncomingRepositoryImpl( ConcurrentMutableMap(), - userPropertyRepository, - typingIndicatorSenderHandler + userPropertyRepository + ) + + internal val typingIndicatorOutgoingRepository = + TypingIndicatorOutgoingRepositoryImpl( + typingIndicatorSenderHandler, + userPropertyRepository ) val sendTypingEvent: SendTypingEventUseCase - get() = SendTypingEventUseCaseImpl(typingIndicatorRepository) + get() = SendTypingEventUseCaseImpl(typingIndicatorOutgoingRepository) val observeUsersTyping: ObserveUsersTypingUseCase - get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository, userRepository) + get() = ObserveUsersTypingUseCaseImpl(typingIndicatorIncomingRepository, userRepository) } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt index 4d6643ed372..abc7dabe6ee 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ObserveUsersTypingUseCase.kt @@ -17,7 +17,7 @@ */ package com.wire.kalium.logic.feature.conversation -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepository import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.message.UserSummary import com.wire.kalium.logic.data.user.UserRepository @@ -38,12 +38,12 @@ interface ObserveUsersTypingUseCase { } internal class ObserveUsersTypingUseCaseImpl( - private val typingIndicatorRepository: TypingIndicatorRepository, + private val typingIndicatorIncomingRepository: TypingIndicatorIncomingRepository, private val userRepository: UserRepository, private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl ) : ObserveUsersTypingUseCase { override suspend operator fun invoke(conversationId: ConversationId): Flow> = withContext(dispatcher.io) { - typingIndicatorRepository.observeUsersTyping(conversationId).map { usersEntries -> + typingIndicatorIncomingRepository.observeUsersTyping(conversationId).map { usersEntries -> userRepository.getUsersSummaryByIds(usersEntries.map { it }).fold({ kaliumLogger.w("Users not found locally, skipping... $it") emptySet() diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt index 9026e3b9a23..d1c49e02790 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCase.kt @@ -18,7 +18,7 @@ package com.wire.kalium.logic.feature.conversation import com.wire.kalium.logic.data.conversation.Conversation -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingRepository import com.wire.kalium.logic.data.id.ConversationId /** @@ -36,7 +36,7 @@ interface SendTypingEventUseCase { } internal class SendTypingEventUseCaseImpl( - private val typingIndicatorRepository: TypingIndicatorRepository + private val typingIndicatorRepository: TypingIndicatorOutgoingRepository ) : SendTypingEventUseCase { override suspend fun invoke(conversationId: ConversationId, typingStatus: Conversation.TypingIndicatorMode) { typingIndicatorRepository.sendTypingIndicatorStatus(conversationId, typingStatus) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt index 295584ca66a..aed94d8eee6 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/TypingIndicatorSyncManager.kt @@ -17,14 +17,14 @@ */ package com.wire.kalium.logic.feature.conversation -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepository import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.ObserveSyncStateUseCase import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.distinctUntilChanged internal class TypingIndicatorSyncManager( - private val typingIndicatorRepository: Lazy, + private val typingIndicatorIncomingRepository: Lazy, private val observeSyncStateUseCase: ObserveSyncStateUseCase ) { /** @@ -33,7 +33,7 @@ internal class TypingIndicatorSyncManager( suspend fun execute() { observeSyncStateUseCase().distinctUntilChanged().collectLatest { kaliumLogger.d("Starting clear of orphaned typing indicators...") - typingIndicatorRepository.value.clearExpiredTypingIndicators() + typingIndicatorIncomingRepository.value.clearExpiredTypingIndicators() } } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandler.kt index 6242cde9899..158c2b8dd27 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandler.kt @@ -20,7 +20,7 @@ package com.wire.kalium.logic.sync.receiver.handler import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.EVENT_RECEIVER import com.wire.kalium.logic.StorageFailure import com.wire.kalium.logic.data.conversation.Conversation -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepository import com.wire.kalium.logic.data.event.Event import com.wire.kalium.logic.data.event.EventLoggingStatus import com.wire.kalium.logic.data.event.logEventProcessing @@ -34,7 +34,7 @@ internal interface TypingIndicatorHandler { internal class TypingIndicatorHandlerImpl( private val selfUserId: UserId, - private val typingIndicatorRepository: TypingIndicatorRepository + private val typingIndicatorIncomingRepository: TypingIndicatorIncomingRepository ) : TypingIndicatorHandler { override suspend fun handle(event: Event.Conversation.TypingIndicator): Either { if (event.senderUserId == selfUserId) { @@ -43,12 +43,12 @@ internal class TypingIndicatorHandlerImpl( } when (event.typingIndicatorMode) { - Conversation.TypingIndicatorMode.STARTED -> typingIndicatorRepository.addTypingUserInConversation( + Conversation.TypingIndicatorMode.STARTED -> typingIndicatorIncomingRepository.addTypingUserInConversation( event.conversationId, event.senderUserId ) - Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorRepository.removeTypingUserInConversation( + Conversation.TypingIndicatorMode.STOPPED -> typingIndicatorIncomingRepository.removeTypingUserInConversation( event.conversationId, event.senderUserId ) diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt index f9270eb7073..d73a7ef7900 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt @@ -94,9 +94,6 @@ class TypingIndicatorRepositoryTest { @Mock val userPropertyRepository: UserPropertyRepository = mock(UserPropertyRepository::class) - @Mock - val typingIndicatorSenderHandler: TypingIndicatorSenderHandler = mock(TypingIndicatorSenderHandler::class) - fun withTypingIndicatorStatus(enabled: Boolean = true) = apply { given(userPropertyRepository) .suspendFunction(userPropertyRepository::getTypingIndicatorStatus) @@ -104,9 +101,8 @@ class TypingIndicatorRepositoryTest { .thenReturn(enabled) } - fun arrange() = this to TypingIndicatorRepositoryImpl( - incomingTypingEventsCache = ConcurrentMutableMap(), - typingIndicatorSenderHandler = typingIndicatorSenderHandler, + fun arrange() = this to TypingIndicatorIncomingRepositoryImpl( + userTypingCache = ConcurrentMutableMap(), userPropertyRepository = userPropertyRepository ) } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt index 4cb6a1f281e..4cbe5429581 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/TypingIndicatorHandlerTest.kt @@ -18,8 +18,7 @@ package com.wire.kalium.logic.sync.receiver.handler import com.wire.kalium.logic.data.conversation.Conversation -import com.wire.kalium.logic.data.conversation.ExpiringUserTyping -import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository +import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepository import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.framework.TestConversation import com.wire.kalium.logic.framework.TestEvent @@ -33,7 +32,6 @@ import io.mockative.once import io.mockative.verify import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.runTest -import kotlinx.datetime.Clock import kotlin.test.Test class TypingIndicatorHandlerTest { @@ -47,8 +45,8 @@ class TypingIndicatorHandlerTest { val result = handler.handle(TestEvent.typingIndicator(Conversation.TypingIndicatorMode.STARTED)) result.shouldSucceed() - verify(arrangement.typingIndicatorRepository) - .function(arrangement.typingIndicatorRepository::addTypingUserInConversation) + verify(arrangement.typingIndicatorIncomingRepository) + .function(arrangement.typingIndicatorIncomingRepository::addTypingUserInConversation) .with(eq(TestConversation.ID), eq(TestUser.SELF.id)) .wasNotInvoked() } @@ -62,8 +60,8 @@ class TypingIndicatorHandlerTest { val result = handler.handle(TestEvent.typingIndicator(Conversation.TypingIndicatorMode.STARTED)) result.shouldSucceed() - verify(arrangement.typingIndicatorRepository) - .function(arrangement.typingIndicatorRepository::addTypingUserInConversation) + verify(arrangement.typingIndicatorIncomingRepository) + .function(arrangement.typingIndicatorIncomingRepository::addTypingUserInConversation) .with(eq(TestConversation.ID), eq(TestUser.OTHER_USER_ID)) .wasInvoked(once) } @@ -77,8 +75,8 @@ class TypingIndicatorHandlerTest { val result = handler.handle(TestEvent.typingIndicator(Conversation.TypingIndicatorMode.STOPPED)) result.shouldSucceed() - verify(arrangement.typingIndicatorRepository) - .function(arrangement.typingIndicatorRepository::removeTypingUserInConversation) + verify(arrangement.typingIndicatorIncomingRepository) + .function(arrangement.typingIndicatorIncomingRepository::removeTypingUserInConversation) .with(eq(TestConversation.ID), eq(TestUser.OTHER_USER_ID)) .wasInvoked(once) } @@ -92,24 +90,24 @@ class TypingIndicatorHandlerTest { val result = handler.handle(TestEvent.typingIndicator(Conversation.TypingIndicatorMode.STOPPED)) result.shouldSucceed() - verify(arrangement.typingIndicatorRepository) - .function(arrangement.typingIndicatorRepository::removeTypingUserInConversation) + verify(arrangement.typingIndicatorIncomingRepository) + .function(arrangement.typingIndicatorIncomingRepository::removeTypingUserInConversation) .with(eq(TestConversation.ID), eq(TestUser.SELF.id)) .wasNotInvoked() } private class Arrangement { @Mock - val typingIndicatorRepository: TypingIndicatorRepository = mock(TypingIndicatorRepository::class) + val typingIndicatorIncomingRepository: TypingIndicatorIncomingRepository = mock(TypingIndicatorIncomingRepository::class) fun withTypingIndicatorObserve(usersId: Set) = apply { - given(typingIndicatorRepository) - .suspendFunction(typingIndicatorRepository::observeUsersTyping) + given(typingIndicatorIncomingRepository) + .suspendFunction(typingIndicatorIncomingRepository::observeUsersTyping) .whenInvokedWith(eq(TestConversation.ID)) - .thenReturn(flowOf(usersId.map { ExpiringUserTyping(it, Clock.System.now()) }.toSet())) + .thenReturn(flowOf(usersId)) } - fun arrange() = this to TypingIndicatorHandlerImpl(TestUser.SELF.id, typingIndicatorRepository) + fun arrange() = this to TypingIndicatorHandlerImpl(TestUser.SELF.id, typingIndicatorIncomingRepository) } } From 4ca6b5c77b4441942d3f5b13de78b0c16eab7385 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Mon, 9 Oct 2023 15:13:24 +0200 Subject: [PATCH 12/15] chore: comments cleanup --- .../data/conversation/TypingIndicatorIncomingRepository.kt | 5 ----- 1 file changed, 5 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt index 2bf759f1a60..ea44c764ba0 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt @@ -79,8 +79,3 @@ internal class TypingIndicatorIncomingRepositoryImpl( const val BUFFER_SIZE = 32 // drop after this threshold } } - -// todo to cleanup pr pt2. -// redo rename -// separate incoming and outgoing into 2 repos -// adjust and test From cf978255f80380a8d6e02d4be058f893591af536 Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Mon, 9 Oct 2023 16:27:45 +0200 Subject: [PATCH 13/15] chore: detekt fixes --- .../logic/data/conversation/TypingIndicatorIncomingRepository.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt index ea44c764ba0..6177e2920ca 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepository.kt @@ -66,7 +66,6 @@ internal class TypingIndicatorIncomingRepositoryImpl( .onStart { emit(userTypingCache[conversationId] ?: emptySet()) } } - override suspend fun clearExpiredTypingIndicators() { userTypingCache.block { entry -> entry.clear() From 171b47320fe5641bcc464ccda5f0e5cf2e1a01fd Mon Sep 17 00:00:00 2001 From: yamilmedina Date: Mon, 9 Oct 2023 16:32:12 +0200 Subject: [PATCH 14/15] Empty-Commit From a9089a957242b025ea0ef05d1a91fcd7121fd3be Mon Sep 17 00:00:00 2001 From: Yamil Medina Date: Tue, 10 Oct 2023 08:14:44 -0300 Subject: [PATCH 15/15] feat(typing): send typing indicator event tests pt3. (WPB-4590) (#2122) * chore: test cov * chore: test cov * chore: test cov * chore: test cov * chore: test cov --- .../TypingIndicatorSenderHandler.kt | 13 +- .../feature/conversation/ConversationScope.kt | 5 +- ... TypingIndicatorIncomingRepositoryTest.kt} | 20 ++- .../TypingIndicatorOutgoingRepositoryTest.kt | 130 ++++++++++++++++++ .../SendTypingEventUseCaseTest.kt | 87 ++++++++++++ 5 files changed, 248 insertions(+), 7 deletions(-) rename logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/{TypingIndicatorRepositoryTest.kt => TypingIndicatorIncomingRepositoryTest.kt} (85%) create mode 100644 logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepositoryTest.kt create mode 100644 logic/src/commonTest/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCaseTest.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt index f00880748a7..7b789f61585 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorSenderHandler.kt @@ -38,11 +38,16 @@ import kotlin.time.toDuration * - For each started sent event, will 'enqueue' a stopped event after a timeout. * */ -internal class TypingIndicatorSenderHandler( +internal interface TypingIndicatorSenderHandler { + fun sendStoppingEvent(conversationId: ConversationId) + fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) +} + +internal class TypingIndicatorSenderHandlerImpl( private val conversationRepository: ConversationRepository, private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, userSessionCoroutineScope: CoroutineScope -) : CoroutineScope by userSessionCoroutineScope { +) : TypingIndicatorSenderHandler, CoroutineScope by userSessionCoroutineScope { override val coroutineContext: CoroutineContext get() = kaliumDispatcher.default @@ -53,7 +58,7 @@ internal class TypingIndicatorSenderHandler( /** * Sends a stopping event and removes it from the 'queue'. */ - fun sendStoppingEvent(conversationId: ConversationId) { + override fun sendStoppingEvent(conversationId: ConversationId) { launch { outgoingStoppedQueueTypingEventsMutex.withLock { if (!outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { @@ -68,7 +73,7 @@ internal class TypingIndicatorSenderHandler( /** * Sends a started event and enqueues a stopping event if sent successfully. */ - fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) { + override fun sendStartedAndEnqueueStoppingEvent(conversationId: ConversationId) { launch { outgoingStoppedQueueTypingEventsMutex.withLock { if (outgoingStoppedQueueTypingEvents.containsKey(conversationId)) { diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt index c877116a605..8219ea99e1c 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/ConversationScope.kt @@ -27,9 +27,10 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.conversation.MLSConversationRepository import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl -import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingRepositoryImpl import com.wire.kalium.logic.data.conversation.TypingIndicatorIncomingRepositoryImpl +import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingRepositoryImpl import com.wire.kalium.logic.data.conversation.TypingIndicatorSenderHandler +import com.wire.kalium.logic.data.conversation.TypingIndicatorSenderHandlerImpl import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider import com.wire.kalium.logic.data.id.QualifiedIdMapper import com.wire.kalium.logic.data.message.PersistMessageUseCase @@ -269,7 +270,7 @@ class ConversationScope internal constructor( get() = ObserveArchivedUnreadConversationsCountUseCaseImpl(conversationRepository) private val typingIndicatorSenderHandler: TypingIndicatorSenderHandler = - TypingIndicatorSenderHandler(conversationRepository = conversationRepository, userSessionCoroutineScope = scope) + TypingIndicatorSenderHandlerImpl(conversationRepository = conversationRepository, userSessionCoroutineScope = scope) internal val typingIndicatorIncomingRepository = TypingIndicatorIncomingRepositoryImpl( diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepositoryTest.kt similarity index 85% rename from logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt rename to logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepositoryTest.kt index d73a7ef7900..2ca01b4fd80 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorRepositoryTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorIncomingRepositoryTest.kt @@ -19,6 +19,7 @@ package com.wire.kalium.logic.data.conversation import co.touchlab.stately.collections.ConcurrentMutableMap import com.wire.kalium.logic.data.properties.UserPropertyRepository +import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.framework.TestConversation import com.wire.kalium.logic.test_util.TestKaliumDispatcher import io.mockative.Mock @@ -30,7 +31,7 @@ import kotlinx.coroutines.test.runTest import kotlin.test.Test import kotlin.test.assertEquals -class TypingIndicatorRepositoryTest { +class TypingIndicatorIncomingRepositoryTest { @Test fun givenUsersInOneConversation_whenTheyAreTyping_thenAddItToTheListOfUsersTypingInConversation() = @@ -90,6 +91,23 @@ class TypingIndicatorRepositoryTest { .wasInvoked() } + @Test + fun givenUsersTypingInAConversation_whenClearExpiredItsCalled_thenShouldNotBePresentAnyInCached() = + runTest(TestKaliumDispatcher.default) { + val expectedUserTyping = setOf() + val (_, typingIndicatorRepository) = Arrangement().withTypingIndicatorStatus().arrange() + + typingIndicatorRepository.addTypingUserInConversation(conversationOne, TestConversation.USER_1) + typingIndicatorRepository.addTypingUserInConversation(conversationOne, TestConversation.USER_2) + + typingIndicatorRepository.clearExpiredTypingIndicators() + + assertEquals( + expectedUserTyping, + typingIndicatorRepository.observeUsersTyping(conversationOne).firstOrNull()?.map { it }?.toSet() + ) + } + private class Arrangement { @Mock val userPropertyRepository: UserPropertyRepository = mock(UserPropertyRepository::class) diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepositoryTest.kt new file mode 100644 index 00000000000..f4ec25590a4 --- /dev/null +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/TypingIndicatorOutgoingRepositoryTest.kt @@ -0,0 +1,130 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.data.conversation + +import com.wire.kalium.logic.data.properties.UserPropertyRepository +import com.wire.kalium.logic.framework.TestConversation +import com.wire.kalium.logic.test_util.TestKaliumDispatcher +import io.mockative.Mock +import io.mockative.any +import io.mockative.given +import io.mockative.mock +import io.mockative.verify +import kotlinx.coroutines.test.runTest +import kotlin.test.Test + +class TypingIndicatorOutgoingRepositoryTest { + + @Test + fun givenAStartedTypingEvent_whenUserConfigNotEnabled_thenShouldNotSendAnyIndication() = + runTest(TestKaliumDispatcher.default) { + val (arrangement, typingIndicatorRepository) = Arrangement() + .withTypingIndicatorStatus(false) + .withSenderHandlerCall() + .arrange() + + typingIndicatorRepository.sendTypingIndicatorStatus(conversationOne, Conversation.TypingIndicatorMode.STARTED) + + verify(arrangement.userPropertyRepository) + .suspendFunction(arrangement.userPropertyRepository::getTypingIndicatorStatus) + .wasInvoked() + + verify(arrangement.typingIndicatorSenderHandler) + .function(arrangement.typingIndicatorSenderHandler::sendStartedAndEnqueueStoppingEvent) + .with(any()) + .wasNotInvoked() + } + + @Test + fun givenAStartedTypingEvent_whenUserConfigIsEnabled_thenShouldSendAnyIndication() = + runTest(TestKaliumDispatcher.default) { + val (arrangement, typingIndicatorRepository) = Arrangement() + .withTypingIndicatorStatus(true) + .withSenderHandlerCall() + .arrange() + + typingIndicatorRepository.sendTypingIndicatorStatus(conversationOne, Conversation.TypingIndicatorMode.STARTED) + + verify(arrangement.userPropertyRepository) + .suspendFunction(arrangement.userPropertyRepository::getTypingIndicatorStatus) + .wasInvoked() + + verify(arrangement.typingIndicatorSenderHandler) + .function(arrangement.typingIndicatorSenderHandler::sendStartedAndEnqueueStoppingEvent) + .with(any()) + .wasInvoked() + } + + @Test + fun givenStoppedTypingEvent_whenCalled_thenShouldDelegateCallToHandler() = + runTest(TestKaliumDispatcher.default) { + val (arrangement, typingIndicatorRepository) = Arrangement() + .withTypingIndicatorStatus(true) + .withSenderHandlerStoppedCall() + .arrange() + + typingIndicatorRepository.sendTypingIndicatorStatus(conversationOne, Conversation.TypingIndicatorMode.STOPPED) + + verify(arrangement.userPropertyRepository) + .suspendFunction(arrangement.userPropertyRepository::getTypingIndicatorStatus) + .wasInvoked() + + verify(arrangement.typingIndicatorSenderHandler) + .function(arrangement.typingIndicatorSenderHandler::sendStoppingEvent) + .with(any()) + .wasInvoked() + } + + private class Arrangement { + @Mock + val userPropertyRepository: UserPropertyRepository = mock(UserPropertyRepository::class) + + @Mock + val typingIndicatorSenderHandler: TypingIndicatorSenderHandler = mock(TypingIndicatorSenderHandler::class) + + fun withTypingIndicatorStatus(enabled: Boolean = true) = apply { + given(userPropertyRepository) + .suspendFunction(userPropertyRepository::getTypingIndicatorStatus) + .whenInvoked() + .thenReturn(enabled) + } + + fun withSenderHandlerStoppedCall() = apply { + given(typingIndicatorSenderHandler) + .function(typingIndicatorSenderHandler::sendStoppingEvent) + .whenInvokedWith(any()) + .thenReturn(Unit) + } + + fun withSenderHandlerCall() = apply { + given(typingIndicatorSenderHandler) + .function(typingIndicatorSenderHandler::sendStartedAndEnqueueStoppingEvent) + .whenInvokedWith(any()) + .thenReturn(Unit) + } + + fun arrange() = this to TypingIndicatorOutgoingRepositoryImpl( + userPropertyRepository = userPropertyRepository, + typingIndicatorSenderHandler = typingIndicatorSenderHandler + ) + } + + private companion object { + val conversationOne = TestConversation.ID + } +} diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCaseTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCaseTest.kt new file mode 100644 index 00000000000..9ad7c63342b --- /dev/null +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/feature/conversation/SendTypingEventUseCaseTest.kt @@ -0,0 +1,87 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.feature.conversation + +import com.wire.kalium.logic.CoreFailure +import com.wire.kalium.logic.data.conversation.Conversation +import com.wire.kalium.logic.data.conversation.TypingIndicatorOutgoingRepository +import com.wire.kalium.logic.framework.TestConversation +import com.wire.kalium.logic.functional.Either +import io.mockative.Mock +import io.mockative.any +import io.mockative.eq +import io.mockative.given +import io.mockative.mock +import io.mockative.verify +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals + +class SendTypingEventUseCaseTest { + + @Test + fun givenATypingEvent_whenCallingSendSucceed_thenReturnSuccess() = runTest { + val (arrangement, useCase) = Arrangement() + .withTypingIndicatorStatusAndResult(Conversation.TypingIndicatorMode.STOPPED) + .arrange() + + useCase(TestConversation.ID, Conversation.TypingIndicatorMode.STOPPED) + + verify(arrangement.typingIndicatorRepository) + .suspendFunction(arrangement.typingIndicatorRepository::sendTypingIndicatorStatus) + .with(eq(TestConversation.ID), eq(Conversation.TypingIndicatorMode.STOPPED)) + .wasInvoked() + } + + @Test + fun givenATypingEvent_whenCallingSendFails_thenReturnIgnoringFailure() = runTest { + val (arrangement, useCase) = Arrangement() + .withTypingIndicatorStatusAndResult( + Conversation.TypingIndicatorMode.STARTED, + Either.Left(CoreFailure.Unknown(RuntimeException("Some error"))) + ) + .arrange() + + val result = useCase(TestConversation.ID, Conversation.TypingIndicatorMode.STARTED) + + verify(arrangement.typingIndicatorRepository) + .suspendFunction(arrangement.typingIndicatorRepository::sendTypingIndicatorStatus) + .with(eq(TestConversation.ID), eq(Conversation.TypingIndicatorMode.STARTED)) + .wasInvoked() + assertEquals(Unit, result) + } + + private class Arrangement { + @Mock + val typingIndicatorRepository: TypingIndicatorOutgoingRepository = mock(TypingIndicatorOutgoingRepository::class) + + fun withTypingIndicatorStatusAndResult( + typingMode: Conversation.TypingIndicatorMode, + result: Either = Either.Right(Unit) + ) = apply { + given(typingIndicatorRepository) + .suspendFunction(typingIndicatorRepository::sendTypingIndicatorStatus) + .whenInvokedWith(any(), eq(typingMode)) + .thenReturn(result) + } + + fun arrange() = this to SendTypingEventUseCaseImpl( + typingIndicatorRepository + ) + } +}