Skip to content

Commit

Permalink
feat: update conversation info when out of sync
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamadJaara committed Dec 5, 2023
1 parent ebd7489 commit 693fefb
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class DebugScope internal constructor(
)

private val messageSendFailureHandler: MessageSendFailureHandler
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler)
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, conversationRepository, messageRepository, messageSendingScheduler)

private val sessionEstablisher: SessionEstablisher
get() = SessionEstablisherImpl(proteusClientProvider, preKeyRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MessageScope internal constructor(
) {

private val messageSendFailureHandler: MessageSendFailureHandler
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler)
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, conversationRepository, messageRepository, messageSendingScheduler)

private val sessionEstablisher: SessionEstablisher
get() = SessionEstablisherImpl(proteusClientProvider, preKeyRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package com.wire.kalium.logic.feature.message
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.user.UserRepository
Expand All @@ -39,7 +40,7 @@ interface MessageSendFailureHandler {
* @return Either.Left if can't recover from error
* @return Either.Right if the error was properly handled and a new attempt at sending message can be made
*/
suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either<CoreFailure, Unit>
suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure, conversationId: ConversationId?): Either<CoreFailure, Unit>

/**
* Handle a failure when attempting to send a message
Expand All @@ -62,15 +63,21 @@ interface MessageSendFailureHandler {
class MessageSendFailureHandlerImpl internal constructor(
private val userRepository: UserRepository,
private val clientRepository: ClientRepository,
private val conversationRepository: ConversationRepository,
private val messageRepository: MessageRepository,
private val messageSendingScheduler: MessageSendingScheduler
) : MessageSendFailureHandler {

override suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either<CoreFailure, Unit> =
override suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure, conversationId: ConversationId?): Either<CoreFailure, Unit> =
// TODO(optimization): add/remove members to/from conversation
// TODO(optimization): remove clients from conversation
userRepository
.fetchUsersByIds(sendFailure.missingClientsOfUsers.keys)
.flatMap {
conversationId?.let {
conversationRepository.fetchConversation(conversationId)
} ?: Either.Right(Unit)
}
.flatMap {
sendFailure
.missingClientsOfUsers
Expand All @@ -80,6 +87,7 @@ class MessageSendFailureHandlerImpl internal constructor(
}
}


override suspend fun handleFailureAndUpdateMessageStatus(
failure: CoreFailure,
conversationId: ConversationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ internal class MessageSenderImpl internal constructor(
messageRepository
.sendEnvelope(message.conversationId, envelope, messageTarget)
.fold({
handleProteusError(it, "Send", message.toLogString()) {
handleProteusError(it, "Send", message.toLogString(), message.conversationId) {
attemptToSendWithProteus(message, messageTarget)
}
}, { messageSent ->
Expand All @@ -375,7 +375,7 @@ internal class MessageSenderImpl internal constructor(
messageRepository
.broadcastEnvelope(envelope, option)
.fold({
handleProteusError(it, "Broadcast", message.toLogString()) {
handleProteusError(it, "Broadcast", message.toLogString(), null) {
attemptToBroadcastWithProteus(
message,
target
Expand All @@ -390,6 +390,7 @@ internal class MessageSenderImpl internal constructor(
failure: CoreFailure,
action: String, // Send or Broadcast
messageLogString: String,
conversationId: ConversationId?,
retry: suspend () -> Either<CoreFailure, String>
) =
when (failure) {
Expand All @@ -398,7 +399,7 @@ internal class MessageSenderImpl internal constructor(
"Proteus $action Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }"
)
messageSendFailureHandler
.handleClientsHaveChangedFailure(failure)
.handleClientsHaveChangedFailure(failure, conversationId = conversationId)
.flatMap {
logger.w("Retrying After Proteus $action Failure: { \"message\" : \"${messageLogString}\"}")
retry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
Expand All @@ -45,13 +47,11 @@ import io.mockative.given
import io.mockative.mock
import io.mockative.once
import io.mockative.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import okio.IOException
import kotlin.test.Test
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
class MessageSendFailureHandlerTest {

@Test
Expand All @@ -62,7 +62,7 @@ class MessageSendFailureHandlerTest {
.arrange()
val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null)

messageSendFailureHandler.handleClientsHaveChangedFailure(failureData)
messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null)

verify(arrangement.userRepository)
.suspendFunction(arrangement.userRepository::fetchUsersByIds)
Expand All @@ -78,7 +78,7 @@ class MessageSendFailureHandlerTest {
.arrange()
val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null)

messageSendFailureHandler.handleClientsHaveChangedFailure(failureData)
messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null)

verify(arrangement.clientRepository)
.suspendFunction(arrangement.clientRepository::storeUserClientIdList)
Expand All @@ -99,7 +99,7 @@ class MessageSendFailureHandlerTest {
.arrange()
val failureData = ProteusSendMessageFailure(mapOf(), mapOf(), mapOf(), null)

val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData)
val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null)
result.shouldFail()
assertEquals(Either.Left(failure), result)
}
Expand All @@ -113,7 +113,7 @@ class MessageSendFailureHandlerTest {
.arrange()
val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne), mapOf(), mapOf(), null)

val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData)
val result = messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null)
result.shouldFail()
assertEquals(Either.Left(failure), result)
}
Expand Down Expand Up @@ -175,18 +175,74 @@ class MessageSendFailureHandlerTest {
.wasInvoked(once)
}

@Test
fun givenMissingClientsError_whenAConversationIdIsProvided_thenUpdateConversationInfo() = runTest {
val (arrangement, messageSendFailureHandler) = Arrangement()
.withFetchUsersByIdSuccess()
.withStoreUserClientIdListSuccess()
.withFetchConversation(Either.Right(Unit))
.arrange()
val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null)

messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, arrangement.conversationId)

verify(arrangement.userRepository)
.suspendFunction(arrangement.userRepository::fetchUsersByIds)
.with(eq(failureData.missingClientsOfUsers.keys))
.wasInvoked(once)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversation)
.with(any())
.wasInvoked(exactly = once)
}

@Test
fun givenMissingClientsError_whenNoConversationIdIsProvided_thenUpdateConversationInfo() = runTest {
val (arrangement, messageSendFailureHandler) = Arrangement()
.withFetchUsersByIdSuccess()
.withStoreUserClientIdListSuccess()
.arrange()
val failureData = ProteusSendMessageFailure(mapOf(arrangement.userOne, arrangement.userTwo), mapOf(), mapOf(), null)

messageSendFailureHandler.handleClientsHaveChangedFailure(failureData, null)

verify(arrangement.userRepository)
.suspendFunction(arrangement.userRepository::fetchUsersByIds)
.with(eq(failureData.missingClientsOfUsers.keys))
.wasInvoked(once)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversation)
.with(any())
.wasNotInvoked()
}

class Arrangement {

@Mock
internal val clientRepository = mock(classOf<ClientRepository>())

@Mock
internal val userRepository = mock(classOf<UserRepository>())

@Mock
internal val messageRepository = mock(classOf<MessageRepository>())

@Mock
val messageSendingScheduler = configure(mock(MessageSendingScheduler::class)) { stubsUnitByDefault = true }

@Mock
val conversationRepository = configure(mock(ConversationRepository::class)) { stubsUnitByDefault = true }

private val messageSendFailureHandler: MessageSendFailureHandler =
MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler)
MessageSendFailureHandlerImpl(
userRepository,
clientRepository,
conversationRepository,
messageRepository,
messageSendingScheduler
)
val userOne: Pair<UserId, List<ClientId>> =
UserId("userId1", "anta.wire") to listOf(ClientId("clientId"), ClientId("secondClientId"))
val userTwo: Pair<UserId, List<ClientId>> =
Expand All @@ -202,30 +258,41 @@ class MessageSendFailureHandlerTest {
.whenInvokedWith(any())
.thenReturn(Either.Right(Unit))
}

fun withFetchUsersByIdFailure(failure: CoreFailure) = apply {
given(userRepository)
.suspendFunction(userRepository::fetchUsersByIds)
.whenInvokedWith(any())
.thenReturn(Either.Left(failure))
}

fun withStoreUserClientIdListSuccess() = apply {
given(clientRepository)
.suspendFunction(clientRepository::storeUserClientIdList)
.whenInvokedWith(any(), any())
.thenReturn(Either.Right(Unit))
}

fun withStoreUserClientIdListFailure(failure: StorageFailure) = apply {
given(clientRepository)
.suspendFunction(clientRepository::storeUserClientIdList)
.whenInvokedWith(any(), any())
.thenReturn(Either.Left(failure))
}

fun withUpdateMessageStatusSuccess() = apply {
given(messageRepository)
.suspendFunction(messageRepository::updateMessageStatus)
.whenInvokedWith(any(), any(), any())
.thenReturn(Either.Right(Unit))
}

fun withFetchConversation(result: Either<CoreFailure, Unit>) = apply {
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversation)
.whenInvokedWith(any())
.thenReturn(result)
}
}

private companion object {
Expand Down

0 comments on commit 693fefb

Please sign in to comment.