Skip to content

Commit

Permalink
fix: fallback to proteus if conversation already present but MLS is d…
Browse files Browse the repository at this point in the history
…efault (WPB-15191) (#3200)

* fix: fallback to proteus if conversation already present and no common protocol

* fix: test coverage

* fix: test coverage
  • Loading branch information
yamilmedina committed Jan 9, 2025
1 parent c5c2468 commit 768b5d4
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import kotlinx.coroutines.flow.first
/**
* Operation that creates one-to-one Conversation with specific [UserId] (only if it is absent in local DB)
* and returns [Conversation] data.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
interface GetOrCreateOneToOneConversationUseCase {
suspend operator fun invoke(otherUserId: UserId): CreateConversationResult
Expand All @@ -47,6 +43,14 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private val userRepository: UserRepository,
private val oneOnOneResolver: OneOnOneResolver
) : GetOrCreateOneToOneConversationUseCase {

/**
* The use case operation operation params and return type.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
override suspend operator fun invoke(otherUserId: UserId): CreateConversationResult {
// TODO periodically re-resolve one-on-one
return conversationRepository.observeOneToOneConversationWithOtherUser(otherUserId)
Expand All @@ -66,6 +70,18 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
})
}

/**
* Resolves one-on-one conversation with the user.
* Resolving conversations is the process of:
*
* - Intersecting the supported protocols of the self user and the other user.
* - Selecting the common protocol, based on the team settings with the highest priority.
* - Get or create a conversation with the other user.
* - If the protocol now is MLS, migrate the existing Proteus conversation to MLS.
* - Mark the conversation as active.
*
* If no common protocol is found, and we have existing Proteus conversations, we do best effort to use them as fallback.
*/
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
userRepository.userById(otherUserId).flatMap { otherUser ->
// TODO support lazily establishing mls group for team 1-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,19 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

interface OneOnOneMigrator {
/**
* Migrates the user's one-on-one Proteus. Without creating a new one since MLS is the default, marking it as active.
*/
suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Get one-on-one conversation with the user, if not found, create a new one (Proteus still default) and mark it as active.
*/
suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Perform migration of Proteus to MLS keeping history and marking the new conversation as active.
*/
suspend fun migrateToMLS(user: OtherUser): Either<CoreFailure, ConversationId>
}

Expand Down Expand Up @@ -92,19 +104,35 @@ internal class OneOnOneMigratorImpl(
}
}

override suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId> =
conversationRepository.getOneOnOneConversationsWithOtherUser(user.id, Conversation.Protocol.PROTEUS).flatMap { conversationIds ->
if (conversationIds.isNotEmpty()) {
val conversationId = conversationIds.first()
Either.Right(conversationId)
} else {
Either.Left(StorageFailure.DataNotFound)
}
}.flatMap { conversationId ->
if (user.activeOneOnOneConversationId != conversationId) {
kaliumLogger.d("resolved existing one-on-one to proteus, user = ${user.id.toLogString()}")
userRepository.updateActiveOneOnOneConversation(user.id, conversationId)
}
Either.Right(conversationId)
}

private suspend fun migrateOneOnOneHistory(user: OtherUser, targetConversation: ConversationId): Either<CoreFailure, Unit> {
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -154,11 +156,18 @@ internal class OneOnOneResolverImpl(
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
return oneOnOneProtocolSelector.getProtocolForUser(user.id).fold({ coreFailure ->
if (coreFailure is CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate) {
kaliumLogger.i("Resolving existing proteus 1:1 as not matching protocol found with: ${user.id.toLogString()}")
oneOnOneMigrator.migrateExistingProteus(user)
} else {
coreFailure.left()
}
}, { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
SupportedProtocol.MLS -> oneOnOneMigrator.migrateToMLS(user)
}
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ import com.wire.kalium.logic.util.arrangement.repository.UserRepositoryArrangeme
import com.wire.kalium.logic.util.shouldFail
import com.wire.kalium.logic.util.shouldSucceed
import io.mockative.any
import io.mockative.coVerify
import io.mockative.eq
import io.mockative.once
import io.mockative.verify
import io.mockative.verify
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -133,6 +135,11 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.messageRepository::moveMessagesToAnotherConversation)
.with(any(), any())
.wasNotInvoked()

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasNotInvoked()
}

@Test
Expand All @@ -142,7 +149,7 @@ class OneOnOneMigratorTest {
)
val failure = CoreFailure.MissingClientRegistration

val (_, oneOnOneMigrator) = arrange {
val (arrangement, oneOnOneMigrator) = arrange {
withResolveConversationReturning(Either.Left(failure))
}

Expand Down Expand Up @@ -173,6 +180,11 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.userRepository::updateActiveOneOnOneConversation)
.with(any(), any())
.wasNotInvoked()

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasNotInvoked()
}

@Test
Expand Down Expand Up @@ -215,6 +227,11 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.messageRepository::moveMessagesToAnotherConversation)
.with(eq(originalConversationId), eq(resolvedConversationId))
.wasInvoked(exactly = once)

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasInvoked(exactly = once)
}

@Test
Expand All @@ -238,23 +255,53 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.userRepository::updateActiveOneOnOneConversation)
.with(eq(user.id), eq(resolvedConversationId))
.wasInvoked(exactly = once)

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasInvoked(exactly = once)
}

@Test
fun givenExistingTeamOneOnOne_whenMigratingToProteus_thenShouldNOTCreateGroupConversation() = runTest {
val user = TestUser.OTHER.copy(
activeOneOnOneConversationId = null
)

val (arrangement, oneOneMigrator) = arrange {
withGetOneOnOneConversationsWithOtherUserReturning(Either.Right(listOf(TestConversation.ONE_ON_ONE().id)))
withUpdateOneOnOneConversationReturning(Either.Right(Unit))
}

oneOneMigrator.migrateExistingProteus(user)
.shouldSucceed()

verify(arrangement.conversationGroupRepository)
.suspendFunction(arrangement.conversationGroupRepository::createGroupConversation)
.with(eq(null), eq(listOf(TestUser.OTHER.id)), eq(ConversationOptions()))
.wasNotInvoked()

verify(arrangement.userRepository)
.suspendFunction(arrangement.userRepository::updateActiveOneOnOneConversation)
.with(eq(TestUser.OTHER.id), eq(TestConversation.ONE_ON_ONE().id))
.wasInvoked(exactly = once)
}

private class Arrangement(private val block: Arrangement.() -> Unit) :
private class Arrangement(private val block: suspend Arrangement.() -> Unit) :
MLSOneOnOneConversationResolverArrangement by MLSOneOnOneConversationResolverArrangementImpl(),
MessageRepositoryArrangement by MessageRepositoryArrangementImpl(),
ConversationRepositoryArrangement by ConversationRepositoryArrangementImpl(),
ConversationGroupRepositoryArrangement by ConversationGroupRepositoryArrangementImpl(),
UserRepositoryArrangement by UserRepositoryArrangementImpl()
{
UserRepositoryArrangement by UserRepositoryArrangementImpl() {
fun arrange() = run {
block()
this@Arrangement to OneOnOneMigratorImpl(
getResolvedMLSOneOnOne = mlsOneOnOneConversationResolver,
conversationGroupRepository = conversationGroupRepository,
conversationRepository = conversationRepository,
messageRepository = messageRepository,
userRepository = userRepository
userRepository = userRepository,
systemMessageInserter = systemMessageInserter
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.framework.TestConversation
import com.wire.kalium.logic.framework.TestUser
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.right
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangement
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangementImpl
import com.wire.kalium.logic.util.arrangement.mls.OneOnOneMigratorArrangement
Expand All @@ -33,6 +35,7 @@ import com.wire.kalium.logic.util.arrangement.repository.UserRepositoryArrangeme
import com.wire.kalium.logic.util.shouldFail
import com.wire.kalium.logic.util.shouldSucceed
import io.mockative.any
import io.mockative.coVerify
import io.mockative.eq
import io.mockative.given
import io.mockative.matchers.OneOfMatcher
Expand Down Expand Up @@ -255,6 +258,24 @@ class OneOnOneResolverTest {
.wasInvoked(exactly = once)
}

@Test
fun givenProtocolResolvesToOtherNeedToUpdate_whenResolveOneOnOneConversationWithUser_thenMigrateExistingToProteus() = runTest {
// given
val (arrangement, resolver) = arrange {
withGetProtocolForUser(CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate.left())
withMigrateExistingToProteusReturns(TestConversation.ID.right())
}

// when
resolver.resolveOneOnOneConversationWithUser(OTHER_USER, false).shouldSucceed()

// then
verify(arrangement.oneOnOneMigrator)
.suspendFunction(arrangement.oneOnOneMigrator::migrateExistingProteus)
.with(eq(OTHER_USER))
.wasInvoked(exactly = once)
}

private class Arrangement(private val block: Arrangement.() -> Unit) :
UserRepositoryArrangement by UserRepositoryArrangementImpl(),
OneOnOneProtocolSelectorArrangement by OneOnOneProtocolSelectorArrangementImpl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ interface OneOnOneMigratorArrangement {
fun withMigrateToMLSReturns(result: Either<CoreFailure, ConversationId>)

fun withMigrateToProteusReturns(result: Either<CoreFailure, ConversationId>)

fun withMigrateExistingToProteusReturns(result: Either<CoreFailure, ConversationId>)
}

class OneOnOneMigratorArrangementImpl : OneOnOneMigratorArrangement {
Expand All @@ -53,4 +55,12 @@ class OneOnOneMigratorArrangementImpl : OneOnOneMigratorArrangement {
.whenInvokedWith(any())
.thenReturn(result)
}

override fun withMigrateExistingToProteusReturns(result: Either<CoreFailure, ConversationId>) {
given(oneOnOneMigrator)
.suspendFunction(oneOnOneMigrator::migrateExistingProteus)
.whenInvokedWith(any())
.thenReturn(result)
}
}
//,

0 comments on commit 768b5d4

Please sign in to comment.