From 693fefb97ded9eb170eeb478283342aab15337a4 Mon Sep 17 00:00:00 2001 From: MohamadJaara Date: Tue, 5 Dec 2023 12:49:01 +0100 Subject: [PATCH] feat: update conversation info when out of sync --- .../kalium/logic/feature/debug/DebugScope.kt | 2 +- .../logic/feature/message/MessageScope.kt | 2 +- .../message/MessageSendFailureHandler.kt | 12 ++- .../logic/feature/message/MessageSender.kt | 7 +- .../prekey/MessageSendFailureHandlerTest.kt | 81 +++++++++++++++++-- 5 files changed, 90 insertions(+), 14 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt index af321f5dac4..24bfb411f06 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt @@ -109,7 +109,7 @@ class DebugScope internal constructor( ) private val messageSendFailureHandler: MessageSendFailureHandler - get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler) + get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, conversationRepository, messageRepository, messageSendingScheduler) private val sessionEstablisher: SessionEstablisher get() = SessionEstablisherImpl(proteusClientProvider, preKeyRepository) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageScope.kt index 5f51e6f5b2a..2cc2093c26a 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageScope.kt @@ -97,7 +97,7 @@ class MessageScope internal constructor( ) { private val messageSendFailureHandler: MessageSendFailureHandler - get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler) + get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, conversationRepository, messageRepository, messageSendingScheduler) private val sessionEstablisher: SessionEstablisher get() = SessionEstablisherImpl(proteusClientProvider, preKeyRepository) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSendFailureHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSendFailureHandler.kt index 8140dd5723b..dd518056d8f 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSendFailureHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSendFailureHandler.kt @@ -22,6 +22,7 @@ package com.wire.kalium.logic.feature.message import com.wire.kalium.logic.CoreFailure import com.wire.kalium.logic.NetworkFailure import com.wire.kalium.logic.data.client.ClientRepository +import com.wire.kalium.logic.data.conversation.ConversationRepository import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.message.MessageRepository import com.wire.kalium.logic.data.user.UserRepository @@ -39,7 +40,7 @@ interface MessageSendFailureHandler { * @return Either.Left if can't recover from error * @return Either.Right if the error was properly handled and a new attempt at sending message can be made */ - suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either + suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure, conversationId: ConversationId?): Either /** * Handle a failure when attempting to send a message @@ -62,15 +63,21 @@ interface MessageSendFailureHandler { class MessageSendFailureHandlerImpl internal constructor( private val userRepository: UserRepository, private val clientRepository: ClientRepository, + private val conversationRepository: ConversationRepository, private val messageRepository: MessageRepository, private val messageSendingScheduler: MessageSendingScheduler ) : MessageSendFailureHandler { - override suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either = + override suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure, conversationId: ConversationId?): Either = // TODO(optimization): add/remove members to/from conversation // TODO(optimization): remove clients from conversation userRepository .fetchUsersByIds(sendFailure.missingClientsOfUsers.keys) + .flatMap { + conversationId?.let { + conversationRepository.fetchConversation(conversationId) + } ?: Either.Right(Unit) + } .flatMap { sendFailure .missingClientsOfUsers @@ -80,6 +87,7 @@ class MessageSendFailureHandlerImpl internal constructor( } } + override suspend fun handleFailureAndUpdateMessageStatus( failure: CoreFailure, conversationId: ConversationId, diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt index 9df54219c9e..6d608daf303 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt @@ -352,7 +352,7 @@ internal class MessageSenderImpl internal constructor( messageRepository .sendEnvelope(message.conversationId, envelope, messageTarget) .fold({ - handleProteusError(it, "Send", message.toLogString()) { + handleProteusError(it, "Send", message.toLogString(), message.conversationId) { attemptToSendWithProteus(message, messageTarget) } }, { messageSent -> @@ -375,7 +375,7 @@ internal class MessageSenderImpl internal constructor( messageRepository .broadcastEnvelope(envelope, option) .fold({ - handleProteusError(it, "Broadcast", message.toLogString()) { + handleProteusError(it, "Broadcast", message.toLogString(), null) { attemptToBroadcastWithProteus( message, target @@ -390,6 +390,7 @@ internal class MessageSenderImpl internal constructor( failure: CoreFailure, action: String, // Send or Broadcast messageLogString: String, + conversationId: ConversationId?, retry: suspend () -> Either ) = when (failure) { @@ -398,7 +399,7 @@ internal class MessageSenderImpl internal constructor( "Proteus $action Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }" ) messageSendFailureHandler - .handleClientsHaveChangedFailure(failure) + .handleClientsHaveChangedFailure(failure, conversationId = conversationId) .flatMap { logger.w("Retrying After Proteus $action Failure: { \"message\" : \"${messageLogString}\"}") retry() diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/prekey/MessageSendFailureHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/prekey/MessageSendFailureHandlerTest.kt index 4051ab2a305..26ae1835b84 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/prekey/MessageSendFailureHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/prekey/MessageSendFailureHandlerTest.kt @@ -23,6 +23,8 @@ import com.wire.kalium.logic.NetworkFailure import com.wire.kalium.logic.StorageFailure import com.wire.kalium.logic.data.client.ClientRepository import com.wire.kalium.logic.data.conversation.ClientId +import com.wire.kalium.logic.data.conversation.ConversationRepository +import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.message.MessageRepository import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.data.user.UserRepository @@ -45,13 +47,11 @@ import io.mockative.given import io.mockative.mock import io.mockative.once import io.mockative.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import okio.IOException import kotlin.test.Test import kotlin.test.assertEquals -@OptIn(ExperimentalCoroutinesApi::class) class MessageSendFailureHandlerTest { @Test @@ -62,7 +62,7 @@ class MessageSendFailureHandlerTest { .arrange() val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null) - messageSendFailureHandler.handleClientsHaveChangedFailure(failureData) + messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null) verify(arrangement.userRepository) .suspendFunction(arrangement.userRepository::fetchUsersByIds) @@ -78,7 +78,7 @@ class MessageSendFailureHandlerTest { .arrange() val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null) - messageSendFailureHandler.handleClientsHaveChangedFailure(failureData) + messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null) verify(arrangement.clientRepository) .suspendFunction(arrangement.clientRepository::storeUserClientIdList) @@ -99,7 +99,7 @@ class MessageSendFailureHandlerTest { .arrange() val failureData = ProteusSendMessageFailure(mapOf(), mapOf(), mapOf(), null) - val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData) + val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null) result.shouldFail() assertEquals(Either.Left(failure), result) } @@ -113,7 +113,7 @@ class MessageSendFailureHandlerTest { .arrange() val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne), mapOf(), mapOf(), null) - val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData) + val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null) result.shouldFail() assertEquals(Either.Left(failure), result) } @@ -175,18 +175,74 @@ class MessageSendFailureHandlerTest { .wasInvoked(once) } + @Test + fun givenMissingClientsError_whenAConversationIdIsProvided_thenUpdateConversationInfo() = runTest { + val (arrangement, messageSendFailureHandler) = Arrangement() + .withFetchUsersByIdSuccess() + .withStoreUserClientIdListSuccess() + .withFetchConversation(Either.Right(Unit)) + .arrange() + val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null) + + messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, arrangement.conversationId) + + verify(arrangement.userRepository) + .suspendFunction(arrangement.userRepository::fetchUsersByIds) + .with(eq(failureData.missingClientsOfUsers.keys)) + .wasInvoked(once) + + verify(arrangement.conversationRepository) + .suspendFunction(arrangement.conversationRepository::fetchConversation) + .with(any()) + .wasInvoked(exactly = once) + } + + @Test + fun givenMissingClientsError_whenNoConversationIdIsProvided_thenUpdateConversationInfo() = runTest { + val (arrangement, messageSendFailureHandler) = Arrangement() + .withFetchUsersByIdSuccess() + .withStoreUserClientIdListSuccess() + .arrange() + val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null) + + messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null) + + verify(arrangement.userRepository) + .suspendFunction(arrangement.userRepository::fetchUsersByIds) + .with(eq(failureData.missingClientsOfUsers.keys)) + .wasInvoked(once) + + verify(arrangement.conversationRepository) + .suspendFunction(arrangement.conversationRepository::fetchConversation) + .with(any()) + .wasNotInvoked() + } + class Arrangement { @Mock internal val clientRepository = mock(classOf()) + @Mock internal val userRepository = mock(classOf()) + @Mock internal val messageRepository = mock(classOf()) + @Mock val messageSendingScheduler = configure(mock(MessageSendingScheduler::class)) { stubsUnitByDefault = true } + + @Mock + val conversationRepository = configure(mock(ConversationRepository::class)) { stubsUnitByDefault = true } + private val messageSendFailureHandler: MessageSendFailureHandler = - MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler) + MessageSendFailureHandlerImpl( + userRepository, + clientRepository, + conversationRepository, + messageRepository, + messageSendingScheduler + ) val userOne: Pair> = UserId("userId1", "anta.wire") to listOf(ClientId("clientId"), ClientId("secondClientId")) val userTwo: Pair> = @@ -202,30 +258,41 @@ class MessageSendFailureHandlerTest { .whenInvokedWith(any()) .thenReturn(Either.Right(Unit)) } + fun withFetchUsersByIdFailure(failure: CoreFailure) = apply { given(userRepository) .suspendFunction(userRepository::fetchUsersByIds) .whenInvokedWith(any()) .thenReturn(Either.Left(failure)) } + fun withStoreUserClientIdListSuccess() = apply { given(clientRepository) .suspendFunction(clientRepository::storeUserClientIdList) .whenInvokedWith(any(), any()) .thenReturn(Either.Right(Unit)) } + fun withStoreUserClientIdListFailure(failure: StorageFailure) = apply { given(clientRepository) .suspendFunction(clientRepository::storeUserClientIdList) .whenInvokedWith(any(), any()) .thenReturn(Either.Left(failure)) } + fun withUpdateMessageStatusSuccess() = apply { given(messageRepository) .suspendFunction(messageRepository::updateMessageStatus) .whenInvokedWith(any(), any(), any()) .thenReturn(Either.Right(Unit)) } + + fun withFetchConversation(result: Either) = apply { + given(conversationRepository) + .suspendFunction(conversationRepository::fetchConversation) + .whenInvokedWith(any()) + .thenReturn(result) + } } private companion object {