From 94b61f0ab1456d9e0c5a8a02f17ab3980ed22785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Wed, 20 Dec 2023 16:24:54 +0100 Subject: [PATCH 01/10] feat: discover legal hold when receiving message [WPB-5837] --- .../kalium/logic/feature/UserSessionScope.kt | 7 +- .../message/NewMessageEventHandler.kt | 18 +- .../handler/legalhold/LegalHoldHandler.kt | 99 +++++++- .../LegalHoldSystemMessagesHandler.kt | 71 +++--- .../wire/kalium/logic/util/DebounceBuffer.kt | 64 +++++ .../message/NewMessageEventHandlerTest.kt | 53 ++-- .../handler/legalhold/LegalHoldHandlerTest.kt | 233 +++++++++++++++++- .../LegalHoldSystemMessageHandlerTest.kt | 188 ++------------ .../kalium/logic/util/DebounceBufferTest.kt | 108 ++++++++ 9 files changed, 576 insertions(+), 265 deletions(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt create mode 100644 logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index 1a474891ecb..6bb9355a630 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -1249,8 +1249,8 @@ class UserSessionScope internal constructor( get() = NewMessageEventHandlerImpl( proteusUnpacker, mlsUnpacker, - conversationRepository, applicationMessageHandler, + legalHoldHandler, { conversationId, messageId -> messages.ephemeralMessageDeletionHandler.startSelfDeletion(conversationId, messageId) }, @@ -1387,10 +1387,9 @@ class UserSessionScope internal constructor( private val legalHoldSystemMessagesHandler = LegalHoldSystemMessagesHandlerImpl( selfUserId = userId, - membersHavingLegalHoldClient = membersHavingLegalHoldClient, persistMessage = persistMessage, conversationRepository = conversationRepository, - messageRepository = messageRepository, + messageRepository = messageRepository ) private val legalHoldHandler = LegalHoldHandlerImpl( @@ -1398,7 +1397,9 @@ class UserSessionScope internal constructor( persistOtherUserClients = persistOtherUserClients, fetchSelfClientsFromRemote = fetchSelfClientsFromRemote, observeLegalHoldStateForUser = observeLegalHoldStateForUser, + membersHavingLegalHoldClient = membersHavingLegalHoldClient, userConfigRepository = userConfigRepository, + conversationRepository = conversationRepository, legalHoldSystemMessagesHandler = legalHoldSystemMessagesHandler, ) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt index b2f47fa4984..35397daa65d 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt @@ -23,7 +23,6 @@ import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logic.ProteusFailure import com.wire.kalium.logic.data.conversation.ClientId import com.wire.kalium.logic.data.conversation.Conversation -import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.event.Event import com.wire.kalium.logic.data.event.EventLoggingStatus import com.wire.kalium.logic.data.event.logEventProcessing @@ -34,6 +33,7 @@ import com.wire.kalium.logic.feature.message.StaleEpochVerifier import com.wire.kalium.logic.functional.onFailure import com.wire.kalium.logic.functional.onSuccess import com.wire.kalium.logic.kaliumLogger +import com.wire.kalium.logic.sync.receiver.handler.legalhold.LegalHoldHandler import com.wire.kalium.util.serialization.toJsonElement import kotlinx.datetime.toInstant @@ -46,8 +46,8 @@ internal interface NewMessageEventHandler { internal class NewMessageEventHandlerImpl( private val proteusMessageUnpacker: ProteusMessageUnpacker, private val mlsMessageUnpacker: MLSMessageUnpacker, - private val conversationRepository: ConversationRepository, private val applicationMessageHandler: ApplicationMessageHandler, + private val legalHoldHandler: LegalHoldHandler, private val enqueueSelfDeletion: (conversationId: ConversationId, messageId: String) -> Unit, private val selfUserId: UserId, private val staleEpochVerifier: StaleEpochVerifier @@ -86,13 +86,10 @@ internal class NewMessageEventHandlerImpl( ) }.onSuccess { if (it is MessageUnpackResult.ApplicationMessage) { - handleSuccessfulResult(it) if (it.content.legalHoldStatus != Conversation.LegalHoldStatus.UNKNOWN) { - conversationRepository.updateLegalHoldStatus( - conversationId = it.conversationId, - legalHoldStatus = it.content.legalHoldStatus - ) + legalHoldHandler.handleNewMessage(it) } + handleSuccessfulResult(it) onMessageInserted(it) } kaliumLogger @@ -143,13 +140,10 @@ internal class NewMessageEventHandlerImpl( }.onSuccess { it.forEach { if (it is MessageUnpackResult.ApplicationMessage) { - handleSuccessfulResult(it) if (it.content.legalHoldStatus != Conversation.LegalHoldStatus.UNKNOWN) { - conversationRepository.updateLegalHoldStatus( - conversationId = it.conversationId, - legalHoldStatus = it.content.legalHoldStatus - ) + legalHoldHandler.handleNewMessage(it) } + handleSuccessfulResult(it) onMessageInserted(it) } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 28777e9235e..72e7ae8b12d 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -19,19 +19,33 @@ package com.wire.kalium.logic.sync.receiver.handler.legalhold import com.wire.kalium.logic.CoreFailure import com.wire.kalium.logic.configuration.UserConfigRepository +import com.wire.kalium.logic.data.conversation.Conversation +import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.event.Event +import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.client.FetchSelfClientsFromRemoteUseCase import com.wire.kalium.logic.feature.client.PersistOtherUserClientsUseCase import com.wire.kalium.logic.feature.legalhold.LegalHoldState +import com.wire.kalium.logic.feature.legalhold.MembersHavingLegalHoldClientUseCase import com.wire.kalium.logic.feature.legalhold.ObserveLegalHoldStateForUserUseCase import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.functional.getOrNull +import com.wire.kalium.logic.functional.map import com.wire.kalium.logic.kaliumLogger +import com.wire.kalium.logic.sync.receiver.conversation.message.MessageUnpackResult +import com.wire.kalium.logic.util.DebounceBuffer +import com.wire.kalium.util.KaliumDispatcher +import com.wire.kalium.util.KaliumDispatcherImpl +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.firstOrNull +import kotlinx.coroutines.launch +import kotlin.time.Duration.Companion.seconds internal interface LegalHoldHandler { suspend fun handleEnable(legalHoldEnabled: Event.User.LegalHoldEnabled): Either suspend fun handleDisable(legalHoldDisabled: Event.User.LegalHoldDisabled): Either + suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage): Either } @Suppress("LongParameterList") @@ -40,9 +54,23 @@ internal class LegalHoldHandlerImpl internal constructor( private val persistOtherUserClients: PersistOtherUserClientsUseCase, private val fetchSelfClientsFromRemote: FetchSelfClientsFromRemoteUseCase, private val observeLegalHoldStateForUser: ObserveLegalHoldStateForUserUseCase, + private val membersHavingLegalHoldClient: MembersHavingLegalHoldClientUseCase, private val userConfigRepository: UserConfigRepository, + private val conversationRepository: ConversationRepository, private val legalHoldSystemMessagesHandler: LegalHoldSystemMessagesHandler, + private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, ) : LegalHoldHandler { + private val scope = CoroutineScope(kaliumDispatcher.default) + private val conversationsWithUpdatedLegalHoldStatus = + DebounceBuffer(DEBOUNCE_BUFFER_MAX_SIZE, DEBOUNCE_BUFFER_TIMEOUT, scope) + + init { + scope.launch { + conversationsWithUpdatedLegalHoldStatus.observe() + .collect {handleUpdatedBufferedConversations(it) } + } + } + override suspend fun handleEnable(legalHoldEnabled: Event.User.LegalHoldEnabled): Either { kaliumLogger.i("legal hold enabled for user ${legalHoldEnabled.userId.toLogString()}") // check if the user has already been under legal hold prior to this event @@ -54,7 +82,8 @@ internal class LegalHoldHandlerImpl internal constructor( if (selfUserId == legalHoldEnabled.userId) { // notify only for self user userConfigRepository.setLegalHoldChangeNotified(false) } - legalHoldSystemMessagesHandler.handleEnable(legalHoldEnabled.userId) + handleConversationsForUser(legalHoldEnabled.userId) + legalHoldSystemMessagesHandler.handleEnabledForUser(legalHoldEnabled.userId) } return Either.Right(Unit) @@ -71,12 +100,25 @@ internal class LegalHoldHandlerImpl internal constructor( if (selfUserId == legalHoldDisabled.userId) { // notify only for self user userConfigRepository.setLegalHoldChangeNotified(false) } - legalHoldSystemMessagesHandler.handleDisable(legalHoldDisabled.userId) + handleConversationsForUser(legalHoldDisabled.userId) + legalHoldSystemMessagesHandler.handleDisabledForUser(legalHoldDisabled.userId) } return Either.Right(Unit) } + override suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage): Either { + val isStatusChangedForConversation = when (message.content.legalHoldStatus) { + Conversation.LegalHoldStatus.ENABLED -> handleForConversation(message.conversationId, Conversation.LegalHoldStatus.ENABLED) + Conversation.LegalHoldStatus.DISABLED -> handleForConversation(message.conversationId, Conversation.LegalHoldStatus.DISABLED) + else -> false + } + if (isStatusChangedForConversation) { + conversationsWithUpdatedLegalHoldStatus.add(message.conversationId) + } + return Either.Right(Unit) + } + private suspend fun processEvent(selfUserId: UserId, userId: UserId) { if (selfUserId == userId) { userConfigRepository.deleteLegalHoldRequest() @@ -88,4 +130,57 @@ internal class LegalHoldHandlerImpl internal constructor( private suspend fun isUserUnderLegalHold(userId: UserId): Boolean = observeLegalHoldStateForUser(userId).firstOrNull() == LegalHoldState.Enabled + + private suspend fun handleForConversation(conversationId: ConversationId, newStatus: Conversation.LegalHoldStatus): Boolean { + val currentStatus = conversationRepository.observeLegalHoldForConversation(conversationId).firstOrNull()?.getOrNull() + val isChanged = currentStatus != newStatus + if (isChanged && newStatus != Conversation.LegalHoldStatus.UNKNOWN) { + // if conversation legal hold status has changed, update it and create system message for it + conversationRepository.updateLegalHoldStatus(conversationId, newStatus) + when (newStatus) { + Conversation.LegalHoldStatus.DISABLED -> + legalHoldSystemMessagesHandler.handleDisabledForConversation(conversationId) + Conversation.LegalHoldStatus.ENABLED -> + legalHoldSystemMessagesHandler.handleEnabledForConversation(conversationId) + else -> { /* do nothing */ } + } + } + return isChanged + } + + private suspend fun handleConversationsForUser(userId: UserId) { + conversationRepository.getConversationsByUserId(userId).map { conversations -> + conversations.forEach { conversation -> + // create system message for conversation if needed + membersHavingLegalHoldClient(conversation.id) + .map { if (it.isEmpty()) Conversation.LegalHoldStatus.DISABLED else Conversation.LegalHoldStatus.ENABLED } + .map { newLegalHoldStatus -> handleForConversation(conversation.id, newLegalHoldStatus) } + } + } + } + + private suspend fun handleUpdatedBufferedConversations(conversationIds: List) { + conversationIds.forEach { + conversationRepository.getConversationMembers(it).map { memberIds -> + memberIds.forEach { userId -> + val userHasBeenUnderLegalHold = isUserUnderLegalHold(userId) + // TODO: to be optimized - send empty message and handle legal hold discovery after sending a message + processEvent(selfUserId, userId) + val userIsNowUnderLegalHold = isUserUnderLegalHold(userId) + if (userHasBeenUnderLegalHold != userIsNowUnderLegalHold) { + if (selfUserId == userId) { // notify only for self user + userConfigRepository.setLegalHoldChangeNotified(false) + } + if (userIsNowUnderLegalHold) legalHoldSystemMessagesHandler.handleEnabledForUser(userId) + else legalHoldSystemMessagesHandler.handleDisabledForUser(userId) + } + } + } + } + } + + companion object { + private const val DEBOUNCE_BUFFER_MAX_SIZE = 100 + private val DEBOUNCE_BUFFER_TIMEOUT = 3.seconds + } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt index 61785f6cc8e..3f1c6df140c 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt @@ -26,37 +26,52 @@ import com.wire.kalium.logic.data.message.MessageContent import com.wire.kalium.logic.data.message.MessageRepository import com.wire.kalium.logic.data.message.PersistMessageUseCase import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.logic.feature.legalhold.MembersHavingLegalHoldClientUseCase import com.wire.kalium.logic.functional.Either import com.wire.kalium.logic.functional.map import com.wire.kalium.util.DateTimeUtil internal interface LegalHoldSystemMessagesHandler { - suspend fun handleEnable(userId: UserId) - suspend fun handleDisable(userId: UserId) + suspend fun handleEnabledForUser(userId: UserId) + suspend fun handleDisabledForUser(userId: UserId) + suspend fun handleEnabledForConversation(conversationId: ConversationId) + suspend fun handleDisabledForConversation(conversationId: ConversationId) } internal class LegalHoldSystemMessagesHandlerImpl( private val selfUserId: UserId, - private val membersHavingLegalHoldClient: MembersHavingLegalHoldClientUseCase, private val persistMessage: PersistMessageUseCase, private val conversationRepository: ConversationRepository, private val messageRepository: MessageRepository, ) : LegalHoldSystemMessagesHandler { - override suspend fun handleEnable(userId: UserId) = handleSystemMessages( - userId = userId, - update = { members -> (members + userId).distinct() }, - createNew = { MessageContent.LegalHold.ForMembers.Enabled(members = listOf(userId)) }, - firstHandleForConversation = true - ) - override suspend fun handleDisable(userId: UserId) = handleSystemMessages( + override suspend fun handleEnabledForUser(userId: UserId) = handleSystemMessagesForUser( + userId = userId, + update = { members -> (members + userId).distinct() }, + createNew = { MessageContent.LegalHold.ForMembers.Enabled(members = listOf(userId)) } + ) + + override suspend fun handleDisabledForUser(userId: UserId) = handleSystemMessagesForUser( userId = userId, update = { members -> (members + userId).distinct() }, - createNew = { MessageContent.LegalHold.ForMembers.Disabled(members = listOf(userId)) }, - firstHandleForConversation = false + createNew = { MessageContent.LegalHold.ForMembers.Disabled(members = listOf(userId)) } ) + override suspend fun handleEnabledForConversation(conversationId: ConversationId) = + handleSystemMessageForConversation(conversationId, Conversation.LegalHoldStatus.ENABLED) + + override suspend fun handleDisabledForConversation(conversationId: ConversationId) = + handleSystemMessageForConversation(conversationId, Conversation.LegalHoldStatus.DISABLED) + + private suspend fun handleSystemMessageForConversation(conversationId: ConversationId, newStatus: Conversation.LegalHoldStatus) { + when (newStatus) { + Conversation.LegalHoldStatus.DISABLED -> + persistMessage(createSystemMessage(MessageContent.LegalHold.ForConversation.Disabled, conversationId)) + Conversation.LegalHoldStatus.ENABLED -> + persistMessage(createSystemMessage(MessageContent.LegalHold.ForConversation.Enabled, conversationId)) + else -> { /* do nothing */ } + } + } + private suspend inline fun getLastLegalHoldMessagesForConversations( userId: UserId, conversations: List, @@ -65,45 +80,21 @@ internal class LegalHoldSystemMessagesHandlerImpl( else messageRepository.getLastMessagesForConversationIds(conversations.map { it.id }) .map { it.filterValues { it.content is T }.mapValues { it.value.id to (it.value.content as T) } } - private suspend inline fun handleSystemMessages( + private suspend inline fun handleSystemMessagesForUser( userId: UserId, crossinline update: (List) -> List, crossinline createNew: () -> T, - firstHandleForConversation: Boolean, ) { // get all conversations where the given user is a member conversationRepository.getConversationsByUserId(userId).map { conversations -> // get last legal hold messages for the given conversations getLastLegalHoldMessagesForConversations(userId, conversations).map { lastMessagesMap -> - - val createOrUpdateSystemMessageForMembers: suspend (conversation: Conversation) -> Unit = { conversation -> + conversations.forEach { conversation -> + // create or update system messages for members lastMessagesMap[conversation.id]?.let { (lastMessageId, lastMessageContent) -> messageRepository.updateLegalHoldMessageMembers(lastMessageId, conversation.id, update(lastMessageContent.members)) } ?: persistMessage(createSystemMessage(createNew(), conversation.id)) } - - val createSystemMessageForConversationIfNeeded: suspend (conversation: Conversation) -> Unit = { conversation -> - membersHavingLegalHoldClient(conversation.id) - .map { if (it.isEmpty()) Conversation.LegalHoldStatus.DISABLED else Conversation.LegalHoldStatus.ENABLED } - .map { newLegalHoldStatus -> - if (newLegalHoldStatus != conversation.legalHoldStatus) { - // if conversation legal hold status has changed, update it - conversationRepository.updateLegalHoldStatus(conversation.id, newLegalHoldStatus) - // if conversation legal hold status changed, create system message for it - if (newLegalHoldStatus == Conversation.LegalHoldStatus.DISABLED) persistMessage( - createSystemMessage(MessageContent.LegalHold.ForConversation.Disabled, conversation.id) - ) - else if (newLegalHoldStatus == Conversation.LegalHoldStatus.ENABLED) persistMessage( - createSystemMessage(MessageContent.LegalHold.ForConversation.Enabled, conversation.id) - ) - } - } - } - - val actionsForConversation = listOf(createOrUpdateSystemMessageForMembers, createSystemMessageForConversationIfNeeded) - .let { if (firstHandleForConversation) it.reversed() else it } - - conversations.forEach { conversation -> actionsForConversation.forEach { it(conversation) } } } } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt new file mode 100644 index 00000000000..bf1ec4a923b --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt @@ -0,0 +1,64 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.util + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * A buffer that will collect items and emit list of all items buffered since last emitted list + * only when it has reached size of [capacity] or when [timeout] has passed since last item added. + */ +@OptIn(FlowPreview::class) +internal class DebounceBuffer(private val capacity: Int, private val timeout: Duration, scope: CoroutineScope) { + + private val buffer = mutableListOf() + private val mutex = Mutex() + private val trigger = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val sharedFlow = trigger + .debounce { it } + .map { getAllAndClear() } + .shareIn(scope, SharingStarted.Eagerly, 1) + suspend fun add(value: T) = mutex.withLock { + if (!buffer.contains(value)) { + buffer.add(value) + trigger.emit( + if (buffer.size >= capacity) 0.seconds.inWholeMilliseconds + else timeout.inWholeMilliseconds + ) + } + } + + private suspend fun getAllAndClear(): List = mutex.withLock { + buffer.toList().also { buffer.clear() } + } + + fun observe(): Flow> = sharedFlow.filter { it.isNotEmpty() } +} diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt index 72c05b3cd88..24ef5f275d3 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt @@ -33,6 +33,7 @@ import com.wire.kalium.logic.feature.message.StaleEpochVerifier import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandler import com.wire.kalium.logic.framework.TestEvent import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.sync.receiver.handler.legalhold.LegalHoldHandler import com.wire.kalium.util.DateTimeUtil import com.wire.kalium.util.DateTimeUtil.toIsoDateTimeString import io.mockative.Mock @@ -146,7 +147,7 @@ class NewMessageEventHandlerTest { @Test fun givenAnMLSMessageWithLegalHoldUnknown_whenHandlingIt_thenDoNotUpdateLegalHoldStatus() = runTest { val (arrangement, newMessageEventHandler) = Arrangement() - .withUpdateLegalHoldStatusSuccess() + .withHandleLegalHoldSuccess() .withMLSUnpackerReturning( Either.Right( listOf( @@ -164,9 +165,9 @@ class NewMessageEventHandlerTest { newMessageEventHandler.handleNewMLSMessage(newMessageEvent) - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(any(), eq(Conversation.LegalHoldStatus.DISABLED)) + verify(arrangement.legalHoldHandler) + .suspendFunction(arrangement.legalHoldHandler::handleNewMessage) + .with(any()) .wasNotInvoked() verify(arrangement.applicationMessageHandler) @@ -178,7 +179,7 @@ class NewMessageEventHandlerTest { @Test fun givenUnpackingSuccess_whenHandling_thenHandleContent() = runTest { val (arrangement, newMessageEventHandler) = Arrangement() - .withUpdateLegalHoldStatusSuccess() + .withHandleLegalHoldSuccess() .withMLSUnpackerReturning(Either.Right(listOf(applicationMessage))) .arrange() @@ -191,9 +192,9 @@ class NewMessageEventHandlerTest { .with(eq(newMessageEvent)) .wasInvoked(exactly = once) - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(any(), eq(Conversation.LegalHoldStatus.DISABLED)) + verify(arrangement.legalHoldHandler) + .suspendFunction(arrangement.legalHoldHandler::handleNewMessage) + .with(eq(applicationMessage)) .wasInvoked(exactly = once) verify(arrangement.applicationMessageHandler) @@ -205,7 +206,7 @@ class NewMessageEventHandlerTest { @Test fun givenEphemeralMessageFromSelf_whenHandling_thenEnqueueForSelfDelete() = runTest { val (arrangement, newMessageEventHandler) = Arrangement() - .withUpdateLegalHoldStatusSuccess() + .withHandleLegalHoldSuccess() .withProteusUnpackerReturning( Either.Right( applicationMessage.copy( @@ -237,7 +238,7 @@ class NewMessageEventHandlerTest { @Test fun givenEphemeralMessage_whenHandling_thenDoNotEnqueueForSelfDelete() = runTest { val (arrangement, newMessageEventHandler) = Arrangement() - .withUpdateLegalHoldStatusSuccess() + .withHandleLegalHoldSuccess() .withProteusUnpackerReturning(Either.Right(applicationMessage)) .arrange() @@ -264,7 +265,7 @@ class NewMessageEventHandlerTest { @Test fun givenAMessageWithUnknownLegalHoldStatus_whenHandlingIt_thenDoNotUpdateCurrentLegalHold() = runTest { val (arrangement, newMessageEventHandler) = Arrangement() - .withUpdateLegalHoldStatusSuccess() + .withHandleLegalHoldSuccess() .withProteusUnpackerReturning( Either.Right( applicationMessage.copy( @@ -285,16 +286,16 @@ class NewMessageEventHandlerTest { .with(eq(newMessageEvent)) .wasInvoked(exactly = once) - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(eq(conversationID), eq(Conversation.LegalHoldStatus.UNKNOWN)) + verify(arrangement.legalHoldHandler) + .suspendFunction(arrangement.legalHoldHandler::handleNewMessage) + .with(any()) .wasNotInvoked() } @Test fun givenMessageFromSelf_whenHandling_thenDoNotEnqueueForSelfDelete() = runTest { val (arrangement, newMessageEventHandler) = Arrangement() - .withUpdateLegalHoldStatusSuccess() + .withHandleLegalHoldSuccess() .withProteusUnpackerReturning(Either.Right(applicationMessage)) .arrange() @@ -307,9 +308,9 @@ class NewMessageEventHandlerTest { .with(eq(newMessageEvent)) .wasInvoked(exactly = once) - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(eq(conversationID), eq(Conversation.LegalHoldStatus.DISABLED)) + verify(arrangement.legalHoldHandler) + .suspendFunction(arrangement.legalHoldHandler::handleNewMessage) + .with(eq(applicationMessage)) .wasInvoked(exactly = once) verify(arrangement.applicationMessageHandler) @@ -366,9 +367,6 @@ class NewMessageEventHandlerTest { @Mock val mlsMessageUnpacker = mock(classOf()) - @Mock - val conversationRepository = mock(classOf()) - @Mock val applicationMessageHandler = configure(mock(classOf())) { stubsUnitByDefault = true @@ -380,11 +378,14 @@ class NewMessageEventHandlerTest { @Mock val ephemeralMessageDeletionHandler = mock(EphemeralMessageDeletionHandler::class) + @Mock + val legalHoldHandler = mock(LegalHoldHandler::class) + private val newMessageEventHandler: NewMessageEventHandler = NewMessageEventHandlerImpl( proteusMessageUnpacker, mlsMessageUnpacker, - conversationRepository, applicationMessageHandler, + legalHoldHandler, { conversationId, messageId -> ephemeralMessageDeletionHandler.startSelfDeletion( conversationId, @@ -402,10 +403,10 @@ class NewMessageEventHandlerTest { .thenReturn(result) } - fun withUpdateLegalHoldStatusSuccess() = apply { - given(conversationRepository) - .suspendFunction(conversationRepository::updateLegalHoldStatus) - .whenInvokedWith(any(), any()) + fun withHandleLegalHoldSuccess() = apply { + given(legalHoldHandler) + .suspendFunction(legalHoldHandler::handleNewMessage) + .whenInvokedWith(any()) .thenReturn(Either.Right(Unit)) } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt index 4b3d7d0665b..ea7484e77fc 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt @@ -19,14 +19,25 @@ package com.wire.kalium.logic.sync.receiver.handler.legalhold import com.wire.kalium.logic.configuration.UserConfigRepository import com.wire.kalium.logic.data.conversation.ClientId +import com.wire.kalium.logic.data.conversation.Conversation +import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.event.Event +import com.wire.kalium.logic.data.message.MessageContent +import com.wire.kalium.logic.data.message.ProtoContent +import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.client.FetchSelfClientsFromRemoteUseCase import com.wire.kalium.logic.feature.client.PersistOtherUserClientsUseCase import com.wire.kalium.logic.feature.client.SelfClientsResult import com.wire.kalium.logic.feature.legalhold.LegalHoldState +import com.wire.kalium.logic.feature.legalhold.MembersHavingLegalHoldClientUseCase import com.wire.kalium.logic.feature.legalhold.ObserveLegalHoldStateForUserUseCase +import com.wire.kalium.logic.framework.TestConversation import com.wire.kalium.logic.framework.TestUser import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.sync.receiver.conversation.message.MessageUnpackResult +import com.wire.kalium.logic.test_util.TestKaliumDispatcher +import com.wire.kalium.util.DateTimeUtil.toIsoDateTimeString +import com.wire.kalium.util.KaliumDispatcher import io.mockative.Mock import io.mockative.any import io.mockative.configure @@ -38,10 +49,10 @@ import io.mockative.verify import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.setMain +import kotlinx.datetime.Instant import kotlin.test.BeforeTest import kotlin.test.Test @@ -50,7 +61,7 @@ class LegalHoldHandlerTest { @OptIn(ExperimentalCoroutinesApi::class) @BeforeTest fun setup() { - Dispatchers.setMain(StandardTestDispatcher()) + Dispatchers.setMain(testDispatchers.default) } @OptIn(ExperimentalCoroutinesApi::class) @@ -107,7 +118,7 @@ class LegalHoldHandlerTest { handler.handleEnable(legalHoldEventEnabled) // then verify(arrangement.legalHoldSystemMessagesHandler) - .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnable) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForUser) .with(any()) .wasInvoked() } @@ -123,7 +134,7 @@ class LegalHoldHandlerTest { handler.handleEnable(legalHoldEventEnabled) // then verify(arrangement.legalHoldSystemMessagesHandler) - .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnable) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForUser) .with(any()) .wasNotInvoked() } @@ -139,7 +150,7 @@ class LegalHoldHandlerTest { handler.handleDisable(legalHoldEventDisabled) // then verify(arrangement.legalHoldSystemMessagesHandler) - .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisable) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForUser) .with(any()) .wasInvoked() } @@ -154,7 +165,7 @@ class LegalHoldHandlerTest { handler.handleDisable(legalHoldEventDisabled) // then verify(arrangement.legalHoldSystemMessagesHandler) - .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisable) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForUser) .with(any()) .wasNotInvoked() } @@ -255,6 +266,162 @@ class LegalHoldHandlerTest { .wasNotInvoked() } + @Test + fun givenConversationWithNoMoreUsersUnderLegalHold_whenHandlingDisable_thenHandleDisabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withObserveLegalHoldStateForUserSuccess(LegalHoldState.Enabled) + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .arrange() + // when + handler.handleDisable(legalHoldEventDisabled) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) + .with(any()) + .wasInvoked(exactly = once) + } + @Test + fun givenConversationWithStillUsersUnderLegalHold_whenHandlingDisable_thenDoNotHandleDisabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .arrange() + // when + handler.handleDisable(legalHoldEventDisabled.copy(userId = TestUser.OTHER_USER_ID)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) + .with(any()) + .wasNotInvoked() + } + @Test + fun givenConversationLegalHoldAlreadyDisabled_whenHandlingDisable_thenDoNotHandleDisabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .arrange() + // when + handler.handleDisable(legalHoldEventDisabled.copy(userId = TestUser.OTHER_USER_ID)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) + .with(any()) + .wasNotInvoked() + } + @Test + fun givenFirstUserUnderLegalHoldAppeared_whenHandlingEnable_thenHandleEnabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID)) + .arrange() + // when + handler.handleEnable(legalHoldEventEnabled.copy(userId = TestUser.OTHER_USER_ID)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) + .with(any()) + .wasInvoked(exactly = once) + } + @Test + fun givenNextUsersUnderLegalHoldAppeared_whenHandlingEnable_thenDoNotHandleEnabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID)) + .arrange() + // when + handler.handleEnable(legalHoldEventEnabled.copy(userId = TestUser.OTHER_USER_ID_2)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) + .with(any()) + .wasNotInvoked() + } + @Test + fun givenConversationLegalHoldAlreadyEnabled_whenHandlingEnable_thenDoNotHandleEnabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .arrange() + // when + handler.handleEnable(legalHoldEventEnabled.copy(userId = TestUser.OTHER_USER_ID)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) + .with(any()) + .wasNotInvoked() + } + + @Test + fun givenConversationWithLegalHoldDisabled_whenNewMessageWithLegalHoldDisabled_thenDoNotHandleDisabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .arrange() + // when + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) + .with(any()) + .wasNotInvoked() + } + @Test + fun givenConversationWithLegalHoldDisabled_whenNewMessageWithLegalHoldEnabled_thenHandleEnabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .arrange() + // when + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) + .with(eq(TestConversation.CONVERSATION.id)) + .wasInvoked() + } + @Test + fun givenConversationWithLegalHoldEnabled_whenNewMessageWithLegalHoldEnabled_thenDoNotHandleDisabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .arrange() + // when + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) + .with(any()) + .wasNotInvoked() + } + @Test + fun givenConversationWithLegalHoldEnabled_whenNewMessageWithLegalHoldDisabled_thenHandleDisabledForConversation() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) + .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .arrange() + // when + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) + .with(eq(TestConversation.CONVERSATION.id)) + .wasInvoked() + } + + private class Arrangement { @Mock @@ -266,9 +433,15 @@ class LegalHoldHandlerTest { @Mock val observeLegalHoldStateForUser = mock(ObserveLegalHoldStateForUserUseCase::class) + @Mock + val membersHavingLegalHoldClient = mock(MembersHavingLegalHoldClientUseCase::class) + @Mock val userConfigRepository = mock(UserConfigRepository::class) + @Mock + val conversationRepository = mock(ConversationRepository::class) + @Mock val legalHoldSystemMessagesHandler = configure(mock(LegalHoldSystemMessagesHandler::class)) { stubsUnitByDefault = true } @@ -276,6 +449,9 @@ class LegalHoldHandlerTest { withObserveLegalHoldStateForUserSuccess(LegalHoldState.Disabled) withFetchSelfClientsFromRemoteSuccess() withDeleteLegalHoldRequestSuccess() + withGetConversationsByUserIdSuccess(emptyList()) + withMembersHavingLegalHoldClientSuccess(emptyList()) + withUpdateLegalHoldStatusSuccess() } fun arrange() = @@ -284,8 +460,11 @@ class LegalHoldHandlerTest { persistOtherUserClients = persistOtherUserClients, fetchSelfClientsFromRemote = fetchSelfClientsFromRemote, observeLegalHoldStateForUser = observeLegalHoldStateForUser, + membersHavingLegalHoldClient = membersHavingLegalHoldClient, + conversationRepository = conversationRepository, userConfigRepository = userConfigRepository, legalHoldSystemMessagesHandler = legalHoldSystemMessagesHandler, + kaliumDispatcher = testDispatchers, ) fun withDeleteLegalHoldSuccess() = apply { @@ -322,20 +501,58 @@ class LegalHoldHandlerTest { .whenInvoked() .thenReturn(Either.Right(Unit)) } + fun withMembersHavingLegalHoldClientSuccess(result: List) = apply { + given(membersHavingLegalHoldClient) + .suspendFunction(membersHavingLegalHoldClient::invoke) + .whenInvokedWith(any()) + .thenReturn(Either.Right(result)) + } + fun withUpdateLegalHoldStatusSuccess() = apply { + given(conversationRepository) + .suspendFunction(conversationRepository::updateLegalHoldStatus) + .whenInvokedWith(any(), any()) + .thenReturn(Either.Right(Unit)) + } + fun withGetConversationsByUserIdSuccess(conversations: List = emptyList()) = apply { + given(conversationRepository) + .suspendFunction(conversationRepository::getConversationsByUserId) + .whenInvokedWith(any()) + .thenReturn(Either.Right(conversations)) + } + fun withObserveLegalHoldForConversationSuccess(status: Conversation.LegalHoldStatus) = apply { + given(conversationRepository) + .suspendFunction(conversationRepository::observeLegalHoldForConversation) + .whenInvokedWith(any()) + .thenReturn(flowOf(Either.Right(status))) + } } companion object { - val legalHoldEventEnabled = Event.User.LegalHoldEnabled( + private val testDispatchers: KaliumDispatcher = TestKaliumDispatcher + private val legalHoldEventEnabled = Event.User.LegalHoldEnabled( transient = false, live = false, id = "id-1", userId = TestUser.SELF.id, ) - val legalHoldEventDisabled = Event.User.LegalHoldDisabled( + private val legalHoldEventDisabled = Event.User.LegalHoldDisabled( transient = false, live = false, id = "id-2", userId = TestUser.OTHER_USER_ID ) + private fun applicationMessage(legalHoldStatus: Conversation.LegalHoldStatus) = MessageUnpackResult.ApplicationMessage( + conversationId = TestConversation.CONVERSATION.id, + timestampIso = Instant.DISTANT_PAST.toIsoDateTimeString(), + senderUserId = TestUser.SELF.id, + senderClientId = ClientId("clientID"), + content = ProtoContent.Readable( + messageUid = "messageUID", + messageContent = MessageContent.Text(value = "messageContent"), + expectsReadConfirmation = false, + legalHoldStatus = legalHoldStatus, + expiresAfterMillis = null + ) + ) } } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt index b504ba2b346..57f185547fc 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt @@ -20,21 +20,17 @@ package com.wire.kalium.logic.sync.receiver.handler.legalhold import com.wire.kalium.logic.configuration.UserConfigRepository import com.wire.kalium.logic.data.conversation.Conversation import com.wire.kalium.logic.data.conversation.ConversationRepository -import com.wire.kalium.logic.data.event.Event import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.message.Message import com.wire.kalium.logic.data.message.MessageContent import com.wire.kalium.logic.data.message.MessageRepository import com.wire.kalium.logic.data.message.PersistMessageUseCase -import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.logic.feature.legalhold.MembersHavingLegalHoldClientUseCase import com.wire.kalium.logic.framework.TestConversation import com.wire.kalium.logic.framework.TestMessage import com.wire.kalium.logic.framework.TestUser import com.wire.kalium.logic.functional.Either import io.mockative.Mock import io.mockative.any -import io.mockative.eq import io.mockative.given import io.mockative.matching import io.mockative.mock @@ -56,14 +52,14 @@ class LegalHoldSystemMessagesHandlerTest { Dispatchers.setMain(StandardTestDispatcher()) } @Test - fun givenNoLastLegalHoldEnabledMessageForConversation_whenHandlingEnable_thenCreateNewSystemMessage() = runTest { + fun givenNoLastLegalHoldEnabledMessageForConversation_whenHandlingEnableForUser_thenCreateNewSystemMessage() = runTest { // given val (arrangement, handler) = Arrangement() .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to TestMessage.TEXT_MESSAGE)) .arrange() // when - handler.handleEnable(userId = TestUser.OTHER_USER_ID) + handler.handleEnabledForUser(userId = TestUser.OTHER_USER_ID) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) @@ -78,7 +74,7 @@ class LegalHoldSystemMessagesHandlerTest { .wasNotInvoked() } @Test - fun givenLastLegalHoldEnabledMessageForConversation_whenHandlingEnable_thenUpdateExistingSystemMessage() = runTest { + fun givenLastLegalHoldEnabledMessageForConversation_whenHandlingEnableForUser_thenUpdateExistingSystemMessage() = runTest { // given val legalHoldMessage = testLegalHoldSystemMessage(MessageContent.LegalHold.ForMembers.Enabled(listOf(TestUser.OTHER_USER_ID_2))) val (arrangement, handler) = Arrangement() @@ -86,7 +82,7 @@ class LegalHoldSystemMessagesHandlerTest { .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to legalHoldMessage)) .arrange() // when - handler.handleEnable(userId = TestUser.OTHER_USER_ID) + handler.handleEnabledForUser(userId = TestUser.OTHER_USER_ID) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) @@ -98,48 +94,14 @@ class LegalHoldSystemMessagesHandlerTest { .wasInvoked(exactly = once) } @Test - fun givenConversationLegalHoldStateIsDisabled_whenHandlingEnable_thenUpdateState() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED)) - ) - .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.USER_ID)) - .arrange() - // when - handler.handleEnable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(eq(TestConversation.CONVERSATION.id), eq(Conversation.LegalHoldStatus.ENABLED)) - .wasInvoked(exactly = once) - } - @Test - fun givenConversationLegalHoldStateIsEnabled_whenHandlingEnable_thenDoNotUpdateState() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED)) - ) - .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.USER_ID)) - .arrange() - // when - handler.handleEnable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(any(), any()) - .wasNotInvoked() - } - @Test - fun givenNoLastLegalHoldDisabledMessageForConversation_whenHandlingDisable_thenCreateNewSystemMessage() = runTest { + fun givenNoLastLegalHoldDisabledMessageForConversation_whenHandlingDisableForUser_thenCreateNewSystemMessage() = runTest { // given val (arrangement, handler) = Arrangement() .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to TestMessage.TEXT_MESSAGE)) .arrange() // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) + handler.handleDisabledForUser(userId = TestUser.OTHER_USER_ID) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) @@ -154,7 +116,7 @@ class LegalHoldSystemMessagesHandlerTest { .wasNotInvoked() } @Test - fun givenLastLegalHoldDisabledMessageForConversation_whenHandlingDisable_thenUpdateExistingSystemMessage() = runTest { + fun givenLastLegalHoldDisabledMessageForConversation_whenHandlingDisableForUser_thenUpdateExistingSystemMessage() = runTest { // given val legalHoldMessage = testLegalHoldSystemMessage(MessageContent.LegalHold.ForMembers.Disabled(listOf(TestUser.OTHER_USER_ID_2))) val (arrangement, handler) = Arrangement() @@ -162,7 +124,7 @@ class LegalHoldSystemMessagesHandlerTest { .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to legalHoldMessage)) .arrange() // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) + handler.handleDisabledForUser(userId = TestUser.OTHER_USER_ID) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) @@ -174,142 +136,34 @@ class LegalHoldSystemMessagesHandlerTest { .wasInvoked(exactly = once) } @Test - fun givenConversationLegalHoldStateIsEnabled_whenHandlingDisable_thenUpdateState() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED)) - ) - .arrange() - // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(eq(TestConversation.CONVERSATION.id), eq(Conversation.LegalHoldStatus.DISABLED)) - .wasInvoked(exactly = once) - } - @Test - fun givenConversationLegalHoldStateIsDisabled_whenHandlingDisable_thenDoNotUpdateState() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED)) - ) - .arrange() - // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.conversationRepository) - .suspendFunction(arrangement.conversationRepository::updateLegalHoldStatus) - .with(any(), any()) - .wasNotInvoked() - } - @Test - fun givenNoMoreUsersUnderLegalHold_whenHandlingDisable_thenCreateLegalHoldForConversationDisabledMessage() = runTest { + fun givenConversationId_whenHandlingEnableForConversation_thenCreateNewSystemMessage() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED)) - ) .arrange() // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) + handler.handleEnabledForConversation(conversationId = TestConversation.CONVERSATION.id) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Disabled }) + .with(matching { it.content is MessageContent.LegalHold.ForConversation.Enabled }) .wasInvoked(exactly = once) } @Test - fun givenStillAreUsersUnderLegalHold_whenHandlingDisable_thenDoNotCreateLegalHoldForConversationDisabledMessage() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED)) - ) - .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.USER_ID)) - .arrange() - // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.persistMessage) - .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Disabled }) - .wasNotInvoked() - } - @Test - fun givenConversationLegalHoldAlreadyDisabled_whenHandlingDisable_thenDoNotCreateLegalHoldForConversationDisabledMessage() = runTest { + fun givenConversationId_whenHandlingDisableForConversation_thenCreateNewSystemMessage() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED)) - ) .arrange() // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) + handler.handleDisabledForConversation(conversationId = TestConversation.CONVERSATION.id) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) .with(matching { it.content is MessageContent.LegalHold.ForConversation.Disabled }) - .wasNotInvoked() - } - @Test - fun givenFirstUserUnderLegalHoldAppeared_whenHandlingEnable_thenCreateLegalHoldForConversationEnabledMessage() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED)) - ) - .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID)) - .arrange() - // when - handler.handleEnable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.persistMessage) - .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Enabled }) .wasInvoked(exactly = once) } - @Test - fun givenNextUsersUnderLegalHoldAppeared_whenHandlingEnable_thenDoNotCreateLegalHoldForConversationEnabledMessage() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED)) - ) - .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID, TestUser.USER_ID)) - .arrange() - // when - handler.handleDisable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.persistMessage) - .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Enabled }) - .wasNotInvoked() - } - @Test - fun givenConversationLegalHoldAlreadyEnabled_whenHandlingEnable_thenDoNotCreateLegalHoldForConversationEnabledMessage() = runTest { - // given - val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess( - listOf(TestConversation.CONVERSATION.copy(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED)) - ) - .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID)) - .arrange() - // when - handler.handleEnable(userId = TestUser.OTHER_USER_ID) - // then - verify(arrangement.persistMessage) - .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Enabled }) - .wasNotInvoked() - } private class Arrangement { - @Mock - val membersHavingLegalHoldClient = mock(MembersHavingLegalHoldClientUseCase::class) @Mock val persistMessage = mock(PersistMessageUseCase::class) @@ -326,8 +180,6 @@ class LegalHoldSystemMessagesHandlerTest { init { withGetConversationsByUserIdSuccess(emptyList()) withGetLastMessagesForConversationIdsSuccess(mapOf()) - withMembersHavingLegalHoldClientSuccess(emptyList()) - withUpdateLegalHoldStatusSuccess() withUpdateLegalHoldMessageMembersSuccess() withPersistMessageSuccess() withDeleteLegalHoldRequestSuccess() @@ -336,7 +188,6 @@ class LegalHoldSystemMessagesHandlerTest { fun arrange() = this to LegalHoldSystemMessagesHandlerImpl( selfUserId = TestUser.SELF.id, - membersHavingLegalHoldClient = membersHavingLegalHoldClient, persistMessage = persistMessage, conversationRepository = conversationRepository, messageRepository = messageRepository, @@ -367,18 +218,7 @@ class LegalHoldSystemMessagesHandlerTest { .whenInvokedWith(any()) .thenReturn(Either.Right(Unit)) } - fun withMembersHavingLegalHoldClientSuccess(result: List) = apply { - given(membersHavingLegalHoldClient) - .suspendFunction(membersHavingLegalHoldClient::invoke) - .whenInvokedWith(any()) - .thenReturn(Either.Right(result)) - } - fun withUpdateLegalHoldStatusSuccess() = apply { - given(conversationRepository) - .suspendFunction(conversationRepository::updateLegalHoldStatus) - .whenInvokedWith(any(), any()) - .thenReturn(Either.Right(Unit)) - } + fun withDeleteLegalHoldRequestSuccess() = apply { given(userConfigRepository) .suspendFunction(userConfigRepository::deleteLegalHoldRequest) diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt new file mode 100644 index 00000000000..4962d678627 --- /dev/null +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt @@ -0,0 +1,108 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.util + +import app.cash.turbine.test +import com.wire.kalium.logic.test_util.TestKaliumDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.test.setMain +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds + +@OptIn(ExperimentalCoroutinesApi::class) +class DebounceBufferTest { + + @BeforeTest + fun before() { + Dispatchers.setMain(dispatcher.default) + } + + @Test + fun givenNewItem_whenObserving_thenEmitListAfterTimeout() = runTest { + val (_, debounceBuffer) = Arrangement().arrange() + advanceUntilIdle() + + debounceBuffer.observe().test { + debounceBuffer.add("1") + expectNoEvents() + + advanceTimeBy(3.seconds) + assertEquals(listOf("1"), expectMostRecentItem()) + + cancel() + } + } + + @Test + fun givenNewItemsKeepIncoming_whenObserving_thenEmitListAfterMaxSize() = runTest { + val (_, debounceBuffer) = Arrangement().arrange() + advanceUntilIdle() + + debounceBuffer.observe().test { + expectNoEvents() + + for (i in 1..5) { + debounceBuffer.add(i.toString()) + if (i < 5) expectNoEvents() + else assertEquals((1..5).map { it.toString() }, awaitItem()) + delay(1.seconds) + } + + cancel() + } + } + + @Test + fun givenNewItemsKeepIncomingAndStop_whenObserving_thenEmitListAfterMaxSizeAndTheRestAfterTimeout() = runTest { + val (_, debounceBuffer) = Arrangement().arrange() + advanceUntilIdle() + + debounceBuffer.observe().test { + expectNoEvents() + + for (i in 1..7) { + delay(1.seconds) + debounceBuffer.add(i.toString()) + if (i != 5) expectNoEvents() + else assertEquals((1..5).map { it.toString() }, awaitItem()) + } + + advanceTimeBy(3.seconds) + assertEquals((6..7).map { it.toString() }, awaitItem()) + + cancel() + } + } + + internal class Arrangement { + private val debounceBuffer = DebounceBuffer(capacity = 5, timeout = 2.seconds, scope = CoroutineScope(dispatcher.default)) + fun arrange() = this to debounceBuffer + } + + companion object { + private val dispatcher = TestKaliumDispatcher + } +} From b9d65fb6bacc0a483bda0b4f1494bfd9010c75da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Wed, 20 Dec 2023 16:26:10 +0100 Subject: [PATCH 02/10] removed unused code --- .../logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt | 2 +- .../receiver/conversation/message/NewMessageEventHandlerTest.kt | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 72e7ae8b12d..0a3f1f61b67 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -58,7 +58,7 @@ internal class LegalHoldHandlerImpl internal constructor( private val userConfigRepository: UserConfigRepository, private val conversationRepository: ConversationRepository, private val legalHoldSystemMessagesHandler: LegalHoldSystemMessagesHandler, - private val kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, + kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, ) : LegalHoldHandler { private val scope = CoroutineScope(kaliumDispatcher.default) private val conversationsWithUpdatedLegalHoldStatus = diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt index 24ef5f275d3..00d575920ff 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt @@ -24,7 +24,6 @@ import com.wire.kalium.logic.MLSFailure import com.wire.kalium.logic.ProteusFailure import com.wire.kalium.logic.data.conversation.ClientId import com.wire.kalium.logic.data.conversation.Conversation -import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.message.MessageContent import com.wire.kalium.logic.data.message.ProtoContent @@ -431,7 +430,6 @@ class NewMessageEventHandlerTest { private companion object { val SELF_USER_ID = UserId("selfUserId", "selfDomain") - val conversationID = ConversationId("conversationID", "domain") val applicationMessage = MessageUnpackResult.ApplicationMessage( ConversationId("conversationID", "domain"), Instant.DISTANT_PAST.toIsoDateTimeString(), From 6de28364802f65e934a20744a5974e0fde4346b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Wed, 20 Dec 2023 18:01:15 +0100 Subject: [PATCH 03/10] handle only once for each user id --- .../handler/legalhold/LegalHoldHandler.kt | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 0a3f1f61b67..23b3923d33d 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -30,6 +30,8 @@ import com.wire.kalium.logic.feature.legalhold.LegalHoldState import com.wire.kalium.logic.feature.legalhold.MembersHavingLegalHoldClientUseCase import com.wire.kalium.logic.feature.legalhold.ObserveLegalHoldStateForUserUseCase import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.functional.flatMap +import com.wire.kalium.logic.functional.foldToEitherWhileRight import com.wire.kalium.logic.functional.getOrNull import com.wire.kalium.logic.functional.map import com.wire.kalium.logic.kaliumLogger @@ -40,6 +42,7 @@ import com.wire.kalium.util.KaliumDispatcherImpl import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.launch +import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds internal interface LegalHoldHandler { @@ -59,15 +62,17 @@ internal class LegalHoldHandlerImpl internal constructor( private val conversationRepository: ConversationRepository, private val legalHoldSystemMessagesHandler: LegalHoldSystemMessagesHandler, kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, + debounceBufferCapacity: Int = DEBOUNCE_BUFFER_CAPACITY, + devounceBufferTimeout: Duration = DEBOUNCE_BUFFER_TIMEOUT, ) : LegalHoldHandler { private val scope = CoroutineScope(kaliumDispatcher.default) private val conversationsWithUpdatedLegalHoldStatus = - DebounceBuffer(DEBOUNCE_BUFFER_MAX_SIZE, DEBOUNCE_BUFFER_TIMEOUT, scope) + DebounceBuffer(debounceBufferCapacity, devounceBufferTimeout, scope) init { scope.launch { conversationsWithUpdatedLegalHoldStatus.observe() - .collect {handleUpdatedBufferedConversations(it) } + .collect { handleUpdatedBufferedConversations(it) } } } @@ -160,10 +165,18 @@ internal class LegalHoldHandlerImpl internal constructor( } private suspend fun handleUpdatedBufferedConversations(conversationIds: List) { - conversationIds.forEach { - conversationRepository.getConversationMembers(it).map { memberIds -> - memberIds.forEach { userId -> - val userHasBeenUnderLegalHold = isUserUnderLegalHold(userId) + conversationIds + .foldToEitherWhileRight(mapOf()) { conversationId, acc -> + conversationRepository.getConversationMembers(conversationId) + .flatMap { members -> + membersHavingLegalHoldClient(conversationId) + .map { membersHavingLegalHoldClient -> + (acc + members.map { it to membersHavingLegalHoldClient.contains(it) }) + } + } + } + .map { + it.forEach { (userId, userHasBeenUnderLegalHold) -> // TODO: to be optimized - send empty message and handle legal hold discovery after sending a message processEvent(selfUserId, userId) val userIsNowUnderLegalHold = isUserUnderLegalHold(userId) @@ -176,11 +189,10 @@ internal class LegalHoldHandlerImpl internal constructor( } } } - } } companion object { - private const val DEBOUNCE_BUFFER_MAX_SIZE = 100 + private const val DEBOUNCE_BUFFER_CAPACITY = 100 private val DEBOUNCE_BUFFER_TIMEOUT = 3.seconds } } From d4669a73fdaab943c4fc152bd7200defdeb0aad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Thu, 21 Dec 2023 12:06:41 +0100 Subject: [PATCH 04/10] reduce db queries --- .../handler/legalhold/LegalHoldHandler.kt | 32 ++++++------- .../message/NewMessageEventHandlerTest.kt | 2 +- .../handler/legalhold/LegalHoldHandlerTest.kt | 45 +++++++------------ 3 files changed, 34 insertions(+), 45 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 23b3923d33d..5f84182c6a3 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -32,7 +32,7 @@ import com.wire.kalium.logic.feature.legalhold.ObserveLegalHoldStateForUserUseCa import com.wire.kalium.logic.functional.Either import com.wire.kalium.logic.functional.flatMap import com.wire.kalium.logic.functional.foldToEitherWhileRight -import com.wire.kalium.logic.functional.getOrNull +import com.wire.kalium.logic.functional.getOrElse import com.wire.kalium.logic.functional.map import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.receiver.conversation.message.MessageUnpackResult @@ -136,22 +136,22 @@ internal class LegalHoldHandlerImpl internal constructor( private suspend fun isUserUnderLegalHold(userId: UserId): Boolean = observeLegalHoldStateForUser(userId).firstOrNull() == LegalHoldState.Enabled - private suspend fun handleForConversation(conversationId: ConversationId, newStatus: Conversation.LegalHoldStatus): Boolean { - val currentStatus = conversationRepository.observeLegalHoldForConversation(conversationId).firstOrNull()?.getOrNull() - val isChanged = currentStatus != newStatus - if (isChanged && newStatus != Conversation.LegalHoldStatus.UNKNOWN) { - // if conversation legal hold status has changed, update it and create system message for it + private suspend fun handleForConversation(conversationId: ConversationId, newStatus: Conversation.LegalHoldStatus): Boolean = + if (newStatus != Conversation.LegalHoldStatus.UNKNOWN) { conversationRepository.updateLegalHoldStatus(conversationId, newStatus) - when (newStatus) { - Conversation.LegalHoldStatus.DISABLED -> - legalHoldSystemMessagesHandler.handleDisabledForConversation(conversationId) - Conversation.LegalHoldStatus.ENABLED -> - legalHoldSystemMessagesHandler.handleEnabledForConversation(conversationId) - else -> { /* do nothing */ } - } - } - return isChanged - } + .getOrElse(false) + .also { isChanged -> // if conversation legal hold status has changed, create system message for it + if (isChanged) { + when (newStatus) { + Conversation.LegalHoldStatus.DISABLED -> + legalHoldSystemMessagesHandler.handleDisabledForConversation(conversationId) + Conversation.LegalHoldStatus.ENABLED -> + legalHoldSystemMessagesHandler.handleEnabledForConversation(conversationId) + else -> { /* do nothing */ } + } + } + } + } else false private suspend fun handleConversationsForUser(userId: UserId) { conversationRepository.getConversationsByUserId(userId).map { conversations -> diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt index 39242f081bd..00d575920ff 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandlerTest.kt @@ -406,7 +406,7 @@ class NewMessageEventHandlerTest { given(legalHoldHandler) .suspendFunction(legalHoldHandler::handleNewMessage) .whenInvokedWith(any()) - .thenReturn(Either.Right(true)) + .thenReturn(Either.Right(Unit)) } fun withMLSUnpackerReturning(result: Either>) = diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt index ea7484e77fc..c693174a5df 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt @@ -271,8 +271,7 @@ class LegalHoldHandlerTest { // given val (arrangement, handler) = Arrangement() .withObserveLegalHoldStateForUserSuccess(LegalHoldState.Enabled) - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED))) .arrange() // when handler.handleDisable(legalHoldEventDisabled) @@ -286,8 +285,7 @@ class LegalHoldHandlerTest { fun givenConversationWithStillUsersUnderLegalHold_whenHandlingDisable_thenDoNotHandleDisabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED))) .arrange() // when handler.handleDisable(legalHoldEventDisabled.copy(userId = TestUser.OTHER_USER_ID)) @@ -301,8 +299,7 @@ class LegalHoldHandlerTest { fun givenConversationLegalHoldAlreadyDisabled_whenHandlingDisable_thenDoNotHandleDisabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) .arrange() // when handler.handleDisable(legalHoldEventDisabled.copy(userId = TestUser.OTHER_USER_ID)) @@ -316,8 +313,7 @@ class LegalHoldHandlerTest { fun givenFirstUserUnderLegalHoldAppeared_whenHandlingEnable_thenHandleEnabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID)) .arrange() // when @@ -332,8 +328,8 @@ class LegalHoldHandlerTest { fun givenNextUsersUnderLegalHoldAppeared_whenHandlingEnable_thenDoNotHandleEnabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED))) + .withUpdateLegalHoldStatusSuccess(false) .withMembersHavingLegalHoldClientSuccess(listOf(TestUser.OTHER_USER_ID)) .arrange() // when @@ -348,8 +344,7 @@ class LegalHoldHandlerTest { fun givenConversationLegalHoldAlreadyEnabled_whenHandlingEnable_thenDoNotHandleEnabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED))) .arrange() // when handler.handleEnable(legalHoldEventEnabled.copy(userId = TestUser.OTHER_USER_ID)) @@ -364,8 +359,8 @@ class LegalHoldHandlerTest { fun givenConversationWithLegalHoldDisabled_whenNewMessageWithLegalHoldDisabled_thenDoNotHandleDisabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) + .withUpdateLegalHoldStatusSuccess(false) .arrange() // when handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED)) @@ -379,8 +374,7 @@ class LegalHoldHandlerTest { fun givenConversationWithLegalHoldDisabled_whenNewMessageWithLegalHoldEnabled_thenHandleEnabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.DISABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) .arrange() // when handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) @@ -394,8 +388,8 @@ class LegalHoldHandlerTest { fun givenConversationWithLegalHoldEnabled_whenNewMessageWithLegalHoldEnabled_thenDoNotHandleDisabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) + .withUpdateLegalHoldStatusSuccess(false) .arrange() // when handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) @@ -409,8 +403,7 @@ class LegalHoldHandlerTest { fun givenConversationWithLegalHoldEnabled_whenNewMessageWithLegalHoldDisabled_thenHandleDisabledForConversation() = runTest { // given val (arrangement, handler) = Arrangement() - .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) - .withObserveLegalHoldForConversationSuccess(Conversation.LegalHoldStatus.ENABLED) + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED))) .arrange() // when handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED)) @@ -507,11 +500,11 @@ class LegalHoldHandlerTest { .whenInvokedWith(any()) .thenReturn(Either.Right(result)) } - fun withUpdateLegalHoldStatusSuccess() = apply { + fun withUpdateLegalHoldStatusSuccess(isChanged: Boolean = true) = apply { given(conversationRepository) .suspendFunction(conversationRepository::updateLegalHoldStatus) .whenInvokedWith(any(), any()) - .thenReturn(Either.Right(Unit)) + .thenReturn(Either.Right(isChanged)) } fun withGetConversationsByUserIdSuccess(conversations: List = emptyList()) = apply { given(conversationRepository) @@ -519,12 +512,6 @@ class LegalHoldHandlerTest { .whenInvokedWith(any()) .thenReturn(Either.Right(conversations)) } - fun withObserveLegalHoldForConversationSuccess(status: Conversation.LegalHoldStatus) = apply { - given(conversationRepository) - .suspendFunction(conversationRepository::observeLegalHoldForConversation) - .whenInvokedWith(any()) - .thenReturn(flowOf(Either.Right(status))) - } } companion object { @@ -541,6 +528,8 @@ class LegalHoldHandlerTest { id = "id-2", userId = TestUser.OTHER_USER_ID ) + private fun conversation(legalHoldStatus: Conversation.LegalHoldStatus) = + TestConversation.CONVERSATION.copy(legalHoldStatus = legalHoldStatus) private fun applicationMessage(legalHoldStatus: Conversation.LegalHoldStatus) = MessageUnpackResult.ApplicationMessage( conversationId = TestConversation.CONVERSATION.id, timestampIso = Instant.DISTANT_PAST.toIsoDateTimeString(), From c473a92a8824d70345ca3e7e1addcacb06825219 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Thu, 21 Dec 2023 13:47:12 +0100 Subject: [PATCH 05/10] use date of received message to create system message for conversation --- .../handler/legalhold/LegalHoldHandler.kt | 19 ++++++---- .../LegalHoldSystemMessagesHandler.kt | 36 ++++++++++++------- .../handler/legalhold/LegalHoldHandlerTest.kt | 36 +++++++++++++------ .../LegalHoldSystemMessageHandlerTest.kt | 16 ++++----- 4 files changed, 69 insertions(+), 38 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 5f84182c6a3..7c3fbda0f87 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -37,6 +37,7 @@ import com.wire.kalium.logic.functional.map import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.receiver.conversation.message.MessageUnpackResult import com.wire.kalium.logic.util.DebounceBuffer +import com.wire.kalium.util.DateTimeUtil import com.wire.kalium.util.KaliumDispatcher import com.wire.kalium.util.KaliumDispatcherImpl import kotlinx.coroutines.CoroutineScope @@ -114,8 +115,10 @@ internal class LegalHoldHandlerImpl internal constructor( override suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage): Either { val isStatusChangedForConversation = when (message.content.legalHoldStatus) { - Conversation.LegalHoldStatus.ENABLED -> handleForConversation(message.conversationId, Conversation.LegalHoldStatus.ENABLED) - Conversation.LegalHoldStatus.DISABLED -> handleForConversation(message.conversationId, Conversation.LegalHoldStatus.DISABLED) + Conversation.LegalHoldStatus.ENABLED -> + handleForConversation(message.conversationId, Conversation.LegalHoldStatus.ENABLED, message.timestampIso) + Conversation.LegalHoldStatus.DISABLED -> + handleForConversation(message.conversationId, Conversation.LegalHoldStatus.DISABLED, message.timestampIso) else -> false } if (isStatusChangedForConversation) { @@ -136,7 +139,11 @@ internal class LegalHoldHandlerImpl internal constructor( private suspend fun isUserUnderLegalHold(userId: UserId): Boolean = observeLegalHoldStateForUser(userId).firstOrNull() == LegalHoldState.Enabled - private suspend fun handleForConversation(conversationId: ConversationId, newStatus: Conversation.LegalHoldStatus): Boolean = + private suspend fun handleForConversation( + conversationId: ConversationId, + newStatus: Conversation.LegalHoldStatus, + systemMessageTimestampIso: String = DateTimeUtil.currentIsoDateTimeString(), + ): Boolean = if (newStatus != Conversation.LegalHoldStatus.UNKNOWN) { conversationRepository.updateLegalHoldStatus(conversationId, newStatus) .getOrElse(false) @@ -144,9 +151,9 @@ internal class LegalHoldHandlerImpl internal constructor( if (isChanged) { when (newStatus) { Conversation.LegalHoldStatus.DISABLED -> - legalHoldSystemMessagesHandler.handleDisabledForConversation(conversationId) + legalHoldSystemMessagesHandler.handleDisabledForConversation(conversationId, systemMessageTimestampIso) Conversation.LegalHoldStatus.ENABLED -> - legalHoldSystemMessagesHandler.handleEnabledForConversation(conversationId) + legalHoldSystemMessagesHandler.handleEnabledForConversation(conversationId, systemMessageTimestampIso) else -> { /* do nothing */ } } } @@ -171,7 +178,7 @@ internal class LegalHoldHandlerImpl internal constructor( .flatMap { members -> membersHavingLegalHoldClient(conversationId) .map { membersHavingLegalHoldClient -> - (acc + members.map { it to membersHavingLegalHoldClient.contains(it) }) + members.associateWith { membersHavingLegalHoldClient.contains(it) } + acc } } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt index 3f1c6df140c..f41a975c58f 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt @@ -33,8 +33,8 @@ import com.wire.kalium.util.DateTimeUtil internal interface LegalHoldSystemMessagesHandler { suspend fun handleEnabledForUser(userId: UserId) suspend fun handleDisabledForUser(userId: UserId) - suspend fun handleEnabledForConversation(conversationId: ConversationId) - suspend fun handleDisabledForConversation(conversationId: ConversationId) + suspend fun handleEnabledForConversation(conversationId: ConversationId, systemMessageTimestampIso: String) + suspend fun handleDisabledForConversation(conversationId: ConversationId, systemMessageTimestampIso: String) } internal class LegalHoldSystemMessagesHandlerImpl( @@ -56,18 +56,24 @@ internal class LegalHoldSystemMessagesHandlerImpl( createNew = { MessageContent.LegalHold.ForMembers.Disabled(members = listOf(userId)) } ) - override suspend fun handleEnabledForConversation(conversationId: ConversationId) = - handleSystemMessageForConversation(conversationId, Conversation.LegalHoldStatus.ENABLED) + override suspend fun handleEnabledForConversation(conversationId: ConversationId, systemMessageTimestampIso: String) = + handleSystemMessageForConversation(conversationId, Conversation.LegalHoldStatus.ENABLED, systemMessageTimestampIso) - override suspend fun handleDisabledForConversation(conversationId: ConversationId) = - handleSystemMessageForConversation(conversationId, Conversation.LegalHoldStatus.DISABLED) + override suspend fun handleDisabledForConversation(conversationId: ConversationId, systemMessageTimestampIso: String) = + handleSystemMessageForConversation(conversationId, Conversation.LegalHoldStatus.DISABLED, systemMessageTimestampIso) - private suspend fun handleSystemMessageForConversation(conversationId: ConversationId, newStatus: Conversation.LegalHoldStatus) { + private suspend fun handleSystemMessageForConversation( + conversationId: ConversationId, + newStatus: Conversation.LegalHoldStatus, + systemMessageTimestampIso: String = DateTimeUtil.currentIsoDateTimeString() + ) { when (newStatus) { - Conversation.LegalHoldStatus.DISABLED -> - persistMessage(createSystemMessage(MessageContent.LegalHold.ForConversation.Disabled, conversationId)) - Conversation.LegalHoldStatus.ENABLED -> - persistMessage(createSystemMessage(MessageContent.LegalHold.ForConversation.Enabled, conversationId)) + Conversation.LegalHoldStatus.DISABLED -> persistMessage( + createSystemMessage(MessageContent.LegalHold.ForConversation.Disabled, conversationId, systemMessageTimestampIso) + ) + Conversation.LegalHoldStatus.ENABLED -> persistMessage( + createSystemMessage(MessageContent.LegalHold.ForConversation.Enabled, conversationId, systemMessageTimestampIso) + ) else -> { /* do nothing */ } } } @@ -99,12 +105,16 @@ internal class LegalHoldSystemMessagesHandlerImpl( } } - private fun createSystemMessage(content: MessageContent.LegalHold, conversationId: ConversationId): Message.System = + private fun createSystemMessage( + content: MessageContent.LegalHold, + conversationId: ConversationId, + date: String = DateTimeUtil.currentIsoDateTimeString(), + ): Message.System = Message.System( id = uuid4().toString(), content = content, conversationId = conversationId, - date = DateTimeUtil.currentIsoDateTimeString(), + date = date, senderUserId = selfUserId, status = Message.Status.Sent, visibility = Message.Visibility.VISIBLE, diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt index c693174a5df..2961d7b250c 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt @@ -278,7 +278,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) - .with(any()) + .with(any(), any()) .wasInvoked(exactly = once) } @Test @@ -292,7 +292,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) - .with(any()) + .with(any(), any()) .wasNotInvoked() } @Test @@ -306,7 +306,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) - .with(any()) + .with(any(), any()) .wasNotInvoked() } @Test @@ -321,7 +321,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) - .with(any()) + .with(any(), any()) .wasInvoked(exactly = once) } @Test @@ -337,7 +337,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) - .with(any()) + .with(any(), any()) .wasNotInvoked() } @Test @@ -351,7 +351,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) - .with(any()) + .with(any(), any()) .wasNotInvoked() } @@ -367,7 +367,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) - .with(any()) + .with(any(), any()) .wasNotInvoked() } @Test @@ -381,7 +381,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) - .with(eq(TestConversation.CONVERSATION.id)) + .with(eq(TestConversation.CONVERSATION.id), any()) .wasInvoked() } @Test @@ -396,7 +396,7 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) - .with(any()) + .with(any(), any()) .wasNotInvoked() } @Test @@ -410,10 +410,24 @@ class LegalHoldHandlerTest { // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) - .with(eq(TestConversation.CONVERSATION.id)) + .with(eq(TestConversation.CONVERSATION.id), any()) + .wasInvoked() + } + @Test + fun givenConversation_whenHandlingNewMessageWithChangedLegalHold_thenUseTimestampOfThatMessageToCreateSystemMessage() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) + .arrange() + val message = applicationMessage(Conversation.LegalHoldStatus.ENABLED) + // when + handler.handleNewMessage(message) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) + .with(eq(TestConversation.CONVERSATION.id), eq(message.timestampIso)) .wasInvoked() } - private class Arrangement { diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt index 57f185547fc..c6068446ba9 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt @@ -138,27 +138,27 @@ class LegalHoldSystemMessagesHandlerTest { @Test fun givenConversationId_whenHandlingEnableForConversation_thenCreateNewSystemMessage() = runTest { // given - val (arrangement, handler) = Arrangement() - .arrange() + val timestampIso = "2022-03-30T15:36:00.000Z" + val (arrangement, handler) = Arrangement().arrange() // when - handler.handleEnabledForConversation(conversationId = TestConversation.CONVERSATION.id) + handler.handleEnabledForConversation(conversationId = TestConversation.CONVERSATION.id, timestampIso) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Enabled }) + .with(matching { it.content is MessageContent.LegalHold.ForConversation.Enabled && it.date == timestampIso }) .wasInvoked(exactly = once) } @Test fun givenConversationId_whenHandlingDisableForConversation_thenCreateNewSystemMessage() = runTest { // given - val (arrangement, handler) = Arrangement() - .arrange() + val timestampIso = "2022-03-30T15:36:00.000Z" + val (arrangement, handler) = Arrangement().arrange() // when - handler.handleDisabledForConversation(conversationId = TestConversation.CONVERSATION.id) + handler.handleDisabledForConversation(conversationId = TestConversation.CONVERSATION.id, timestampIso) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) - .with(matching { it.content is MessageContent.LegalHold.ForConversation.Disabled }) + .with(matching { it.content is MessageContent.LegalHold.ForConversation.Disabled && it.date == timestampIso }) .wasInvoked(exactly = once) } From 7be3799a82e953ff7eea6df8038a553cc4815e3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Thu, 21 Dec 2023 15:53:20 +0100 Subject: [PATCH 06/10] replace DebounceBuffer with simpler TriggerBuffer --- .../kalium/logic/feature/UserSessionScope.kt | 1 + .../handler/legalhold/LegalHoldHandler.kt | 17 +-- .../{DebounceBuffer.kt => TriggerBuffer.kt} | 20 ++-- .../handler/legalhold/LegalHoldHandlerTest.kt | 49 ++++++++ .../kalium/logic/util/DebounceBufferTest.kt | 108 ------------------ .../kalium/logic/util/TriggerBufferTest.kt | 97 ++++++++++++++++ 6 files changed, 160 insertions(+), 132 deletions(-) rename logic/src/commonMain/kotlin/com/wire/kalium/logic/util/{DebounceBuffer.kt => TriggerBuffer.kt} (71%) delete mode 100644 logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt create mode 100644 logic/src/commonTest/kotlin/com/wire/kalium/logic/util/TriggerBufferTest.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index 17705f196b2..e479264db7a 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -1402,6 +1402,7 @@ class UserSessionScope internal constructor( membersHavingLegalHoldClient = membersHavingLegalHoldClient, userConfigRepository = userConfigRepository, conversationRepository = conversationRepository, + observeSyncState = observeSyncState, legalHoldSystemMessagesHandler = legalHoldSystemMessagesHandler, ) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 7c3fbda0f87..fd103db2df3 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -23,6 +23,7 @@ import com.wire.kalium.logic.data.conversation.Conversation import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.event.Event import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.data.sync.SyncState import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.client.FetchSelfClientsFromRemoteUseCase import com.wire.kalium.logic.feature.client.PersistOtherUserClientsUseCase @@ -35,16 +36,16 @@ import com.wire.kalium.logic.functional.foldToEitherWhileRight import com.wire.kalium.logic.functional.getOrElse import com.wire.kalium.logic.functional.map import com.wire.kalium.logic.kaliumLogger +import com.wire.kalium.logic.sync.ObserveSyncStateUseCase import com.wire.kalium.logic.sync.receiver.conversation.message.MessageUnpackResult -import com.wire.kalium.logic.util.DebounceBuffer +import com.wire.kalium.logic.util.TriggerBuffer import com.wire.kalium.util.DateTimeUtil import com.wire.kalium.util.KaliumDispatcher import com.wire.kalium.util.KaliumDispatcherImpl import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.firstOrNull +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds internal interface LegalHoldHandler { suspend fun handleEnable(legalHoldEnabled: Event.User.LegalHoldEnabled): Either @@ -59,16 +60,15 @@ internal class LegalHoldHandlerImpl internal constructor( private val fetchSelfClientsFromRemote: FetchSelfClientsFromRemoteUseCase, private val observeLegalHoldStateForUser: ObserveLegalHoldStateForUserUseCase, private val membersHavingLegalHoldClient: MembersHavingLegalHoldClientUseCase, + private val observeSyncState: ObserveSyncStateUseCase, private val userConfigRepository: UserConfigRepository, private val conversationRepository: ConversationRepository, private val legalHoldSystemMessagesHandler: LegalHoldSystemMessagesHandler, kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, - debounceBufferCapacity: Int = DEBOUNCE_BUFFER_CAPACITY, - devounceBufferTimeout: Duration = DEBOUNCE_BUFFER_TIMEOUT, ) : LegalHoldHandler { private val scope = CoroutineScope(kaliumDispatcher.default) private val conversationsWithUpdatedLegalHoldStatus = - DebounceBuffer(debounceBufferCapacity, devounceBufferTimeout, scope) + TriggerBuffer(observeSyncState().map { it == SyncState.Live }, scope) init { scope.launch { @@ -197,9 +197,4 @@ internal class LegalHoldHandlerImpl internal constructor( } } } - - companion object { - private const val DEBOUNCE_BUFFER_CAPACITY = 100 - private val DEBOUNCE_BUFFER_TIMEOUT = 3.seconds - } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt similarity index 71% rename from logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt rename to logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt index bf1ec4a923b..5a7c2cb2abf 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/DebounceBuffer.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt @@ -23,36 +23,30 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds /** - * A buffer that will collect items and emit list of all items buffered since last emitted list - * only when it has reached size of [capacity] or when [timeout] has passed since last item added. + * A buffer that will collect items and emit list of all items buffered since last emitted list only when trigger emits true. */ @OptIn(FlowPreview::class) -internal class DebounceBuffer(private val capacity: Int, private val timeout: Duration, scope: CoroutineScope) { +internal class TriggerBuffer(trigger: Flow, scope: CoroutineScope) { private val buffer = mutableListOf() private val mutex = Mutex() - private val trigger = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) - private val sharedFlow = trigger - .debounce { it } + private val newItemFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val sharedFlow = combine(newItemFlow, trigger, ::Pair) + .filter {(_, trigger) -> trigger } .map { getAllAndClear() } .shareIn(scope, SharingStarted.Eagerly, 1) suspend fun add(value: T) = mutex.withLock { if (!buffer.contains(value)) { buffer.add(value) - trigger.emit( - if (buffer.size >= capacity) 0.seconds.inWholeMilliseconds - else timeout.inWholeMilliseconds - ) + newItemFlow.emit(Unit) } } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt index 2961d7b250c..fcc7c4912dc 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt @@ -24,6 +24,7 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.event.Event import com.wire.kalium.logic.data.message.MessageContent import com.wire.kalium.logic.data.message.ProtoContent +import com.wire.kalium.logic.data.sync.SyncState import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.client.FetchSelfClientsFromRemoteUseCase import com.wire.kalium.logic.feature.client.PersistOtherUserClientsUseCase @@ -34,12 +35,14 @@ import com.wire.kalium.logic.feature.legalhold.ObserveLegalHoldStateForUserUseCa import com.wire.kalium.logic.framework.TestConversation import com.wire.kalium.logic.framework.TestUser import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.sync.ObserveSyncStateUseCase import com.wire.kalium.logic.sync.receiver.conversation.message.MessageUnpackResult import com.wire.kalium.logic.test_util.TestKaliumDispatcher import com.wire.kalium.util.DateTimeUtil.toIsoDateTimeString import com.wire.kalium.util.KaliumDispatcher import io.mockative.Mock import io.mockative.any +import io.mockative.anything import io.mockative.configure import io.mockative.eq import io.mockative.given @@ -48,6 +51,8 @@ import io.mockative.once import io.mockative.verify import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest @@ -428,6 +433,34 @@ class LegalHoldHandlerTest { .with(eq(TestConversation.CONVERSATION.id), eq(message.timestampIso)) .wasInvoked() } + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun givenNewMessageWithChangedLegalHoldState_whenHandling_thenBufferAndHandleItWhenSyncStateIsLive() = runTest { + // given + val syncStatesFlow = MutableStateFlow(SyncState.GatheringPendingEvents) + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) + .withGetConversationMembersSuccess(listOf(TestUser.OTHER_USER_ID)) + .withMembersHavingLegalHoldClientSuccess(emptyList()) // checked before legal hold state change so empty + .withObserveLegalHoldStateForUserSuccess(LegalHoldState.Enabled) // checked after legal hold state change, that's why enabled + .withSetLegalHoldChangeNotifiedSuccess() + .withSyncStates(syncStatesFlow) + .arrange() + advanceUntilIdle() + // when + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForUser) + .with(any()) + .wasNotInvoked() + syncStatesFlow.emit(SyncState.Live) + advanceUntilIdle() + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForUser) + .with(eq(TestUser.OTHER_USER_ID)) + .wasInvoked() + } private class Arrangement { @@ -452,6 +485,9 @@ class LegalHoldHandlerTest { @Mock val legalHoldSystemMessagesHandler = configure(mock(LegalHoldSystemMessagesHandler::class)) { stubsUnitByDefault = true } + @Mock + val observeSyncState = mock(ObserveSyncStateUseCase::class) + init { withObserveLegalHoldStateForUserSuccess(LegalHoldState.Disabled) withFetchSelfClientsFromRemoteSuccess() @@ -471,6 +507,7 @@ class LegalHoldHandlerTest { conversationRepository = conversationRepository, userConfigRepository = userConfigRepository, legalHoldSystemMessagesHandler = legalHoldSystemMessagesHandler, + observeSyncState = observeSyncState, kaliumDispatcher = testDispatchers, ) @@ -526,6 +563,18 @@ class LegalHoldHandlerTest { .whenInvokedWith(any()) .thenReturn(Either.Right(conversations)) } + fun withGetConversationMembersSuccess(members: List) = apply { + given(conversationRepository) + .suspendFunction(conversationRepository::getConversationMembers) + .whenInvokedWith(anything()) + .then { Either.Right(members) } + } + fun withSyncStates(syncStates: Flow) = apply { + given(observeSyncState) + .function(observeSyncState::invoke) + .whenInvoked() + .thenReturn(syncStates) + } } companion object { diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt deleted file mode 100644 index 4962d678627..00000000000 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/DebounceBufferTest.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Wire - * Copyright (C) 2023 Wire Swiss GmbH - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ -package com.wire.kalium.logic.util - -import app.cash.turbine.test -import com.wire.kalium.logic.test_util.TestKaliumDispatcher -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.test.advanceTimeBy -import kotlinx.coroutines.test.advanceUntilIdle -import kotlinx.coroutines.test.runTest -import kotlinx.coroutines.test.setMain -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.time.Duration.Companion.seconds - -@OptIn(ExperimentalCoroutinesApi::class) -class DebounceBufferTest { - - @BeforeTest - fun before() { - Dispatchers.setMain(dispatcher.default) - } - - @Test - fun givenNewItem_whenObserving_thenEmitListAfterTimeout() = runTest { - val (_, debounceBuffer) = Arrangement().arrange() - advanceUntilIdle() - - debounceBuffer.observe().test { - debounceBuffer.add("1") - expectNoEvents() - - advanceTimeBy(3.seconds) - assertEquals(listOf("1"), expectMostRecentItem()) - - cancel() - } - } - - @Test - fun givenNewItemsKeepIncoming_whenObserving_thenEmitListAfterMaxSize() = runTest { - val (_, debounceBuffer) = Arrangement().arrange() - advanceUntilIdle() - - debounceBuffer.observe().test { - expectNoEvents() - - for (i in 1..5) { - debounceBuffer.add(i.toString()) - if (i < 5) expectNoEvents() - else assertEquals((1..5).map { it.toString() }, awaitItem()) - delay(1.seconds) - } - - cancel() - } - } - - @Test - fun givenNewItemsKeepIncomingAndStop_whenObserving_thenEmitListAfterMaxSizeAndTheRestAfterTimeout() = runTest { - val (_, debounceBuffer) = Arrangement().arrange() - advanceUntilIdle() - - debounceBuffer.observe().test { - expectNoEvents() - - for (i in 1..7) { - delay(1.seconds) - debounceBuffer.add(i.toString()) - if (i != 5) expectNoEvents() - else assertEquals((1..5).map { it.toString() }, awaitItem()) - } - - advanceTimeBy(3.seconds) - assertEquals((6..7).map { it.toString() }, awaitItem()) - - cancel() - } - } - - internal class Arrangement { - private val debounceBuffer = DebounceBuffer(capacity = 5, timeout = 2.seconds, scope = CoroutineScope(dispatcher.default)) - fun arrange() = this to debounceBuffer - } - - companion object { - private val dispatcher = TestKaliumDispatcher - } -} diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/TriggerBufferTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/TriggerBufferTest.kt new file mode 100644 index 00000000000..4e79c892c0d --- /dev/null +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/util/TriggerBufferTest.kt @@ -0,0 +1,97 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.util + +import app.cash.turbine.test +import com.wire.kalium.logic.test_util.TestKaliumDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.test.setMain +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals + +@OptIn(ExperimentalCoroutinesApi::class) +class TriggerBufferTest { + + @BeforeTest + fun before() { + Dispatchers.setMain(dispatcher.default) + } + + @Test + fun givenNewItemsAndTriggerIsFalse_whenObserving_thenDoNotEmit() = runTest { + val (_, triggerBuffer) = Arrangement(false).arrange() + advanceUntilIdle() + + triggerBuffer.observe().test { + triggerBuffer.add("1") + triggerBuffer.add("2") + expectNoEvents() + + cancel() + } + } + + @Test + fun givenNewItemsAndTriggerIsTrue_whenObserving_thenEmitRightAway() = runTest { + val (_, triggerBuffer) = Arrangement(true).arrange() + advanceUntilIdle() + + triggerBuffer.observe().test { + triggerBuffer.add("1") + assertEquals(listOf("1"), awaitItem()) + + triggerBuffer.add("2") + assertEquals(listOf("2"), awaitItem()) + + cancel() + } + } + + @Test + fun givenNewItemsAndTriggerIsFalse_whenObservingAndTriggerChanges_thenEmitAfterTriggerChange() = runTest { + val (arrangement, triggerBuffer) = Arrangement(false).arrange() + advanceUntilIdle() + + triggerBuffer.observe().test { + triggerBuffer.add("1") + triggerBuffer.add("2") + expectNoEvents() + + arrangement.trigger.emit(true) + assertEquals(listOf("1", "2"), awaitItem()) + + cancel() + } + } + + internal class Arrangement(initialTriggerValue: Boolean) { + val trigger = MutableStateFlow(initialTriggerValue) + private val triggerBuffer = TriggerBuffer(trigger, scope = CoroutineScope(dispatcher.default)) + fun arrange() = this to triggerBuffer + } + + companion object { + private val dispatcher = TestKaliumDispatcher + } +} From 431551c41721102de95b04415bdc574e8c23bbd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Thu, 21 Dec 2023 16:11:27 +0100 Subject: [PATCH 07/10] handle live messages right away and use message timestamp for new system messages --- .../message/NewMessageEventHandler.kt | 4 +- .../handler/legalhold/LegalHoldHandler.kt | 27 ++++++++------ .../LegalHoldSystemMessagesHandler.kt | 13 ++++--- .../handler/legalhold/LegalHoldHandlerTest.kt | 37 +++++++++++++++---- .../LegalHoldSystemMessageHandlerTest.kt | 16 +++++--- 5 files changed, 66 insertions(+), 31 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt index 35397daa65d..812ad549f08 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/message/NewMessageEventHandler.kt @@ -87,7 +87,7 @@ internal class NewMessageEventHandlerImpl( }.onSuccess { if (it is MessageUnpackResult.ApplicationMessage) { if (it.content.legalHoldStatus != Conversation.LegalHoldStatus.UNKNOWN) { - legalHoldHandler.handleNewMessage(it) + legalHoldHandler.handleNewMessage(it, event.live) } handleSuccessfulResult(it) onMessageInserted(it) @@ -141,7 +141,7 @@ internal class NewMessageEventHandlerImpl( it.forEach { if (it is MessageUnpackResult.ApplicationMessage) { if (it.content.legalHoldStatus != Conversation.LegalHoldStatus.UNKNOWN) { - legalHoldHandler.handleNewMessage(it) + legalHoldHandler.handleNewMessage(it, event.live) } handleSuccessfulResult(it) onMessageInserted(it) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index fd103db2df3..47a650d8028 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -43,6 +43,7 @@ import com.wire.kalium.util.DateTimeUtil import com.wire.kalium.util.KaliumDispatcher import com.wire.kalium.util.KaliumDispatcherImpl import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch @@ -50,7 +51,7 @@ import kotlinx.coroutines.launch internal interface LegalHoldHandler { suspend fun handleEnable(legalHoldEnabled: Event.User.LegalHoldEnabled): Either suspend fun handleDisable(legalHoldDisabled: Event.User.LegalHoldDisabled): Either - suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage): Either + suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage, live: Boolean): Either } @Suppress("LongParameterList") @@ -60,20 +61,20 @@ internal class LegalHoldHandlerImpl internal constructor( private val fetchSelfClientsFromRemote: FetchSelfClientsFromRemoteUseCase, private val observeLegalHoldStateForUser: ObserveLegalHoldStateForUserUseCase, private val membersHavingLegalHoldClient: MembersHavingLegalHoldClientUseCase, - private val observeSyncState: ObserveSyncStateUseCase, private val userConfigRepository: UserConfigRepository, private val conversationRepository: ConversationRepository, private val legalHoldSystemMessagesHandler: LegalHoldSystemMessagesHandler, + observeSyncState: ObserveSyncStateUseCase, kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, ) : LegalHoldHandler { private val scope = CoroutineScope(kaliumDispatcher.default) private val conversationsWithUpdatedLegalHoldStatus = - TriggerBuffer(observeSyncState().map { it == SyncState.Live }, scope) + TriggerBuffer(observeSyncState().distinctUntilChanged().map { it == SyncState.Live }, scope) init { scope.launch { conversationsWithUpdatedLegalHoldStatus.observe() - .collect { handleUpdatedBufferedConversations(it) } + .collect { handleUpdatedConversations(it) } } } @@ -89,7 +90,7 @@ internal class LegalHoldHandlerImpl internal constructor( userConfigRepository.setLegalHoldChangeNotified(false) } handleConversationsForUser(legalHoldEnabled.userId) - legalHoldSystemMessagesHandler.handleEnabledForUser(legalHoldEnabled.userId) + legalHoldSystemMessagesHandler.handleEnabledForUser(legalHoldEnabled.userId, DateTimeUtil.currentIsoDateTimeString()) } return Either.Right(Unit) @@ -107,13 +108,13 @@ internal class LegalHoldHandlerImpl internal constructor( userConfigRepository.setLegalHoldChangeNotified(false) } handleConversationsForUser(legalHoldDisabled.userId) - legalHoldSystemMessagesHandler.handleDisabledForUser(legalHoldDisabled.userId) + legalHoldSystemMessagesHandler.handleDisabledForUser(legalHoldDisabled.userId, DateTimeUtil.currentIsoDateTimeString()) } return Either.Right(Unit) } - override suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage): Either { + override suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage, live: Boolean): Either { val isStatusChangedForConversation = when (message.content.legalHoldStatus) { Conversation.LegalHoldStatus.ENABLED -> handleForConversation(message.conversationId, Conversation.LegalHoldStatus.ENABLED, message.timestampIso) @@ -122,7 +123,8 @@ internal class LegalHoldHandlerImpl internal constructor( else -> false } if (isStatusChangedForConversation) { - conversationsWithUpdatedLegalHoldStatus.add(message.conversationId) + if (live) handleUpdatedConversations(listOf(message.conversationId), message.timestampIso) // handle it right away + else conversationsWithUpdatedLegalHoldStatus.add(message.conversationId) // buffer and handle after sync } return Either.Right(Unit) } @@ -171,7 +173,10 @@ internal class LegalHoldHandlerImpl internal constructor( } } - private suspend fun handleUpdatedBufferedConversations(conversationIds: List) { + private suspend fun handleUpdatedConversations( + conversationIds: List, + systemMessageTimestampIso: String = DateTimeUtil.currentIsoDateTimeString(), + ) { conversationIds .foldToEitherWhileRight(mapOf()) { conversationId, acc -> conversationRepository.getConversationMembers(conversationId) @@ -191,8 +196,8 @@ internal class LegalHoldHandlerImpl internal constructor( if (selfUserId == userId) { // notify only for self user userConfigRepository.setLegalHoldChangeNotified(false) } - if (userIsNowUnderLegalHold) legalHoldSystemMessagesHandler.handleEnabledForUser(userId) - else legalHoldSystemMessagesHandler.handleDisabledForUser(userId) + if (userIsNowUnderLegalHold) legalHoldSystemMessagesHandler.handleEnabledForUser(userId, systemMessageTimestampIso) + else legalHoldSystemMessagesHandler.handleDisabledForUser(userId, systemMessageTimestampIso) } } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt index f41a975c58f..59a6230a41b 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessagesHandler.kt @@ -31,8 +31,8 @@ import com.wire.kalium.logic.functional.map import com.wire.kalium.util.DateTimeUtil internal interface LegalHoldSystemMessagesHandler { - suspend fun handleEnabledForUser(userId: UserId) - suspend fun handleDisabledForUser(userId: UserId) + suspend fun handleEnabledForUser(userId: UserId, systemMessageTimestampIso: String) + suspend fun handleDisabledForUser(userId: UserId, systemMessageTimestampIso: String) suspend fun handleEnabledForConversation(conversationId: ConversationId, systemMessageTimestampIso: String) suspend fun handleDisabledForConversation(conversationId: ConversationId, systemMessageTimestampIso: String) } @@ -44,14 +44,16 @@ internal class LegalHoldSystemMessagesHandlerImpl( private val messageRepository: MessageRepository, ) : LegalHoldSystemMessagesHandler { - override suspend fun handleEnabledForUser(userId: UserId) = handleSystemMessagesForUser( + override suspend fun handleEnabledForUser(userId: UserId, systemMessageTimestampIso: String) = handleSystemMessagesForUser( userId = userId, + newSystemMessageTimestampIso = systemMessageTimestampIso, update = { members -> (members + userId).distinct() }, createNew = { MessageContent.LegalHold.ForMembers.Enabled(members = listOf(userId)) } ) - override suspend fun handleDisabledForUser(userId: UserId) = handleSystemMessagesForUser( + override suspend fun handleDisabledForUser(userId: UserId, systemMessageTimestampIso: String) = handleSystemMessagesForUser( userId = userId, + newSystemMessageTimestampIso = systemMessageTimestampIso, update = { members -> (members + userId).distinct() }, createNew = { MessageContent.LegalHold.ForMembers.Disabled(members = listOf(userId)) } ) @@ -88,6 +90,7 @@ internal class LegalHoldSystemMessagesHandlerImpl( private suspend inline fun handleSystemMessagesForUser( userId: UserId, + newSystemMessageTimestampIso: String = DateTimeUtil.currentIsoDateTimeString(), crossinline update: (List) -> List, crossinline createNew: () -> T, ) { @@ -99,7 +102,7 @@ internal class LegalHoldSystemMessagesHandlerImpl( // create or update system messages for members lastMessagesMap[conversation.id]?.let { (lastMessageId, lastMessageContent) -> messageRepository.updateLegalHoldMessageMembers(lastMessageId, conversation.id, update(lastMessageContent.members)) - } ?: persistMessage(createSystemMessage(createNew(), conversation.id)) + } ?: persistMessage(createSystemMessage(createNew(), conversation.id, newSystemMessageTimestampIso)) } } } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt index fcc7c4912dc..8acbe380534 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandlerTest.kt @@ -368,7 +368,7 @@ class LegalHoldHandlerTest { .withUpdateLegalHoldStatusSuccess(false) .arrange() // when - handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED)) + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED), false) // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) @@ -382,7 +382,7 @@ class LegalHoldHandlerTest { .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) .arrange() // when - handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED), false) // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) @@ -397,7 +397,7 @@ class LegalHoldHandlerTest { .withUpdateLegalHoldStatusSuccess(false) .arrange() // when - handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED), false) // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) @@ -411,7 +411,7 @@ class LegalHoldHandlerTest { .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.ENABLED))) .arrange() // when - handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED)) + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.DISABLED), false) // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleDisabledForConversation) @@ -426,7 +426,7 @@ class LegalHoldHandlerTest { .arrange() val message = applicationMessage(Conversation.LegalHoldStatus.ENABLED) // when - handler.handleNewMessage(message) + handler.handleNewMessage(message, false) // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForConversation) @@ -435,7 +435,7 @@ class LegalHoldHandlerTest { } @OptIn(ExperimentalCoroutinesApi::class) @Test - fun givenNewMessageWithChangedLegalHoldState_whenHandling_thenBufferAndHandleItWhenSyncStateIsLive() = runTest { + fun givenNewMessageWithChangedLegalHoldStateAndSyncing_whenHandling_thenBufferAndHandleItWhenSyncStateIsLive() = runTest { // given val syncStatesFlow = MutableStateFlow(SyncState.GatheringPendingEvents) val (arrangement, handler) = Arrangement() @@ -448,7 +448,7 @@ class LegalHoldHandlerTest { .arrange() advanceUntilIdle() // when - handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED)) + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED), false) // then verify(arrangement.legalHoldSystemMessagesHandler) .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForUser) @@ -462,6 +462,28 @@ class LegalHoldHandlerTest { .wasInvoked() } + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun givenNewMessageWithChangedLegalHoldStateAndSynced_whenHandling_thenHandleItRightAway() = runTest { + // given + val (arrangement, handler) = Arrangement() + .withGetConversationsByUserIdSuccess(listOf(conversation(legalHoldStatus = Conversation.LegalHoldStatus.DISABLED))) + .withGetConversationMembersSuccess(listOf(TestUser.OTHER_USER_ID)) + .withMembersHavingLegalHoldClientSuccess(emptyList()) // checked before legal hold state change so empty + .withObserveLegalHoldStateForUserSuccess(LegalHoldState.Enabled) // checked after legal hold state change, that's why enabled + .withSetLegalHoldChangeNotifiedSuccess() + .withSyncStates(flowOf(SyncState.Live)) + .arrange() + advanceUntilIdle() + // when + handler.handleNewMessage(applicationMessage(Conversation.LegalHoldStatus.ENABLED), true) + // then + verify(arrangement.legalHoldSystemMessagesHandler) + .suspendFunction(arrangement.legalHoldSystemMessagesHandler::handleEnabledForUser) + .with(eq(TestUser.OTHER_USER_ID)) + .wasInvoked() + } + private class Arrangement { @Mock @@ -495,6 +517,7 @@ class LegalHoldHandlerTest { withGetConversationsByUserIdSuccess(emptyList()) withMembersHavingLegalHoldClientSuccess(emptyList()) withUpdateLegalHoldStatusSuccess() + withSyncStates(flowOf(SyncState.GatheringPendingEvents)) } fun arrange() = diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt index c6068446ba9..2a6da6a841a 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldSystemMessageHandlerTest.kt @@ -54,17 +54,18 @@ class LegalHoldSystemMessagesHandlerTest { @Test fun givenNoLastLegalHoldEnabledMessageForConversation_whenHandlingEnableForUser_thenCreateNewSystemMessage() = runTest { // given + val timestampIso = "2022-03-30T15:36:00.000Z" val (arrangement, handler) = Arrangement() .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to TestMessage.TEXT_MESSAGE)) .arrange() // when - handler.handleEnabledForUser(userId = TestUser.OTHER_USER_ID) + handler.handleEnabledForUser(userId = TestUser.OTHER_USER_ID, timestampIso) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) .with(matching { - it.content is MessageContent.LegalHold.ForMembers.Enabled + it.content is MessageContent.LegalHold.ForMembers.Enabled && it.date == timestampIso && (it.content as MessageContent.LegalHold.ForMembers.Enabled).members == listOf(TestUser.OTHER_USER_ID) }) .wasInvoked(exactly = once) @@ -76,13 +77,14 @@ class LegalHoldSystemMessagesHandlerTest { @Test fun givenLastLegalHoldEnabledMessageForConversation_whenHandlingEnableForUser_thenUpdateExistingSystemMessage() = runTest { // given + val timestampIso = "2022-03-30T15:36:00.000Z" val legalHoldMessage = testLegalHoldSystemMessage(MessageContent.LegalHold.ForMembers.Enabled(listOf(TestUser.OTHER_USER_ID_2))) val (arrangement, handler) = Arrangement() .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to legalHoldMessage)) .arrange() // when - handler.handleEnabledForUser(userId = TestUser.OTHER_USER_ID) + handler.handleEnabledForUser(userId = TestUser.OTHER_USER_ID, timestampIso) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) @@ -96,17 +98,18 @@ class LegalHoldSystemMessagesHandlerTest { @Test fun givenNoLastLegalHoldDisabledMessageForConversation_whenHandlingDisableForUser_thenCreateNewSystemMessage() = runTest { // given + val timestampIso = "2022-03-30T15:36:00.000Z" val (arrangement, handler) = Arrangement() .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to TestMessage.TEXT_MESSAGE)) .arrange() // when - handler.handleDisabledForUser(userId = TestUser.OTHER_USER_ID) + handler.handleDisabledForUser(userId = TestUser.OTHER_USER_ID, timestampIso) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) .with(matching { - it.content is MessageContent.LegalHold.ForMembers.Disabled + it.content is MessageContent.LegalHold.ForMembers.Disabled && it.date == timestampIso && (it.content as MessageContent.LegalHold.ForMembers.Disabled).members == listOf(TestUser.OTHER_USER_ID) }) .wasInvoked(exactly = once) @@ -118,13 +121,14 @@ class LegalHoldSystemMessagesHandlerTest { @Test fun givenLastLegalHoldDisabledMessageForConversation_whenHandlingDisableForUser_thenUpdateExistingSystemMessage() = runTest { // given + val timestampIso = "2022-03-30T15:36:00.000Z" val legalHoldMessage = testLegalHoldSystemMessage(MessageContent.LegalHold.ForMembers.Disabled(listOf(TestUser.OTHER_USER_ID_2))) val (arrangement, handler) = Arrangement() .withGetConversationsByUserIdSuccess(listOf(TestConversation.CONVERSATION)) .withGetLastMessagesForConversationIdsSuccess(mapOf(TestConversation.CONVERSATION.id to legalHoldMessage)) .arrange() // when - handler.handleDisabledForUser(userId = TestUser.OTHER_USER_ID) + handler.handleDisabledForUser(userId = TestUser.OTHER_USER_ID, timestampIso) // then verify(arrangement.persistMessage) .suspendFunction(arrangement.persistMessage::invoke) From e02ac6def8f335885119851d0555610f83ab05f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Thu, 21 Dec 2023 16:48:50 +0100 Subject: [PATCH 08/10] change name of the buffer --- .../sync/receiver/handler/legalhold/LegalHoldHandler.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 47a650d8028..74618433e38 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -68,12 +68,12 @@ internal class LegalHoldHandlerImpl internal constructor( kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl, ) : LegalHoldHandler { private val scope = CoroutineScope(kaliumDispatcher.default) - private val conversationsWithUpdatedLegalHoldStatus = + private val bufferedUpdatedConversationIds = TriggerBuffer(observeSyncState().distinctUntilChanged().map { it == SyncState.Live }, scope) init { scope.launch { - conversationsWithUpdatedLegalHoldStatus.observe() + bufferedUpdatedConversationIds.observe() .collect { handleUpdatedConversations(it) } } } @@ -124,7 +124,7 @@ internal class LegalHoldHandlerImpl internal constructor( } if (isStatusChangedForConversation) { if (live) handleUpdatedConversations(listOf(message.conversationId), message.timestampIso) // handle it right away - else conversationsWithUpdatedLegalHoldStatus.add(message.conversationId) // buffer and handle after sync + else bufferedUpdatedConversationIds.add(message.conversationId) // buffer and handle after sync } return Either.Right(Unit) } From 7479aebeb05d1b1d6c9563394179fe430df0e6f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Thu, 21 Dec 2023 16:53:13 +0100 Subject: [PATCH 09/10] fix detekt --- .../kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt index 5a7c2cb2abf..b3da0b08fdd 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/util/TriggerBuffer.kt @@ -40,7 +40,7 @@ internal class TriggerBuffer(trigger: Flow, scope: CoroutineScope) { private val mutex = Mutex() private val newItemFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) private val sharedFlow = combine(newItemFlow, trigger, ::Pair) - .filter {(_, trigger) -> trigger } + .filter { (_, trigger) -> trigger } .map { getAllAndClear() } .shareIn(scope, SharingStarted.Eagerly, 1) suspend fun add(value: T) = mutex.withLock { From a008822e64b9f596137936f6416b9446204d9c07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Saleniuk?= Date: Fri, 22 Dec 2023 10:35:54 +0100 Subject: [PATCH 10/10] change after review --- .../sync/receiver/handler/legalhold/LegalHoldHandler.kt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt index 74618433e38..d2fe61709c8 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/legalhold/LegalHoldHandler.kt @@ -115,11 +115,9 @@ internal class LegalHoldHandlerImpl internal constructor( } override suspend fun handleNewMessage(message: MessageUnpackResult.ApplicationMessage, live: Boolean): Either { - val isStatusChangedForConversation = when (message.content.legalHoldStatus) { - Conversation.LegalHoldStatus.ENABLED -> - handleForConversation(message.conversationId, Conversation.LegalHoldStatus.ENABLED, message.timestampIso) - Conversation.LegalHoldStatus.DISABLED -> - handleForConversation(message.conversationId, Conversation.LegalHoldStatus.DISABLED, message.timestampIso) + val isStatusChangedForConversation = when (val legalHoldStatus = message.content.legalHoldStatus) { + Conversation.LegalHoldStatus.ENABLED, Conversation.LegalHoldStatus.DISABLED -> + handleForConversation(message.conversationId, legalHoldStatus, message.timestampIso) else -> false } if (isStatusChangedForConversation) {