Skip to content

Commit

Permalink
feat: resolve one-on-one conversation to either proteus or mls (#2012)
Browse files Browse the repository at this point in the history
  • Loading branch information
typfel committed Sep 5, 2023
1 parent 7f191e7 commit 2f10cd9
Show file tree
Hide file tree
Showing 17 changed files with 822 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ enum class SlowSyncStep {
CONNECTIONS,
SELF_TEAM,
CONTACTS,
JOINING_MLS_CONVERSATIONS
JOINING_MLS_CONVERSATIONS,
RESOLVE_ONE_ON_ONE_PROTOCOLS
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ internal interface UserRepository {

suspend fun updateSupportedProtocols(protocols: Set<SupportedProtocol>): Either<CoreFailure, Unit>

suspend fun updateOneOnOneConversation(userId: UserId, conversationId: ConversationId): Either<CoreFailure, Unit>
suspend fun updateActiveOneOnOneConversation(userId: UserId, conversationId: ConversationId): Either<CoreFailure, Unit>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -408,7 +408,7 @@ internal class UserDataSource internal constructor(
}
}

override suspend fun updateOneOnOneConversation(userId: UserId, conversationId: ConversationId): Either<CoreFailure, Unit> =
override suspend fun updateActiveOneOnOneConversation(userId: UserId, conversationId: ConversationId): Either<CoreFailure, Unit> =
wrapStorageRequest { userDAO.updateActiveOneOnOneConversation(userId.toDao(), conversationId.toDao()) }

override fun observeAllKnownUsersNotInConversation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCase
import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCaseImpl
import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterialsManager
import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterialsManagerImpl
import com.wire.kalium.logic.feature.conversation.mls.MLSOneOnOneConversationResolver
import com.wire.kalium.logic.feature.conversation.mls.MLSOneOnOneConversationResolverImpl
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneMigrator
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneMigratorImpl
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolverImpl
import com.wire.kalium.logic.feature.debug.DebugScope
import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCase
import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCaseImpl
Expand Down Expand Up @@ -860,6 +866,27 @@ class UserSessionScope internal constructor(
clientIdProvider,
)

private val mlsOneOnOneConversationResolver: MLSOneOnOneConversationResolver
get() = MLSOneOnOneConversationResolverImpl(
conversationRepository,
joinExistingMLSConversationUseCase
)

private val oneOnOneMigrator: OneOnOneMigrator
get() = OneOnOneMigratorImpl(
mlsOneOnOneConversationResolver,
conversationGroupRepository,
conversationRepository,
messageRepository,
userRepository
)
private val oneOnOneResolver: OneOnOneResolver
get() = OneOnOneResolverImpl(
userRepository,
oneOnOneProtocolSelector,
oneOnOneMigrator
)

private val slowSyncWorker: SlowSyncWorker by lazy {
SlowSyncWorkerImpl(
syncSelfUser,
Expand All @@ -869,7 +896,8 @@ class UserSessionScope internal constructor(
syncConnections,
syncSelfTeamUseCase,
syncContacts,
joinExistingMLSConversations
joinExistingMLSConversations,
oneOnOneResolver
)
}

Expand Down Expand Up @@ -1344,7 +1372,7 @@ class UserSessionScope internal constructor(
team.isSelfATeamMember,
globalScope.serverConfigRepository,
userStorage,
oneOnOneProtocolSelector,
oneOnOneResolver,
this
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ import com.wire.kalium.logic.feature.conversation.keyingmaterials.UpdateKeyingMa
import com.wire.kalium.logic.feature.conversation.keyingmaterials.UpdateKeyingMaterialsUseCaseImpl
import com.wire.kalium.logic.feature.conversation.messagetimer.UpdateMessageTimerUseCase
import com.wire.kalium.logic.feature.conversation.messagetimer.UpdateMessageTimerUseCaseImpl
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.feature.message.SendConfirmationUseCase
import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.feature.team.DeleteTeamConversationUseCase
import com.wire.kalium.logic.feature.team.DeleteTeamConversationUseCaseImpl
import com.wire.kalium.logic.feature.team.GetSelfTeamUseCase
Expand Down Expand Up @@ -85,7 +85,7 @@ class ConversationScope internal constructor(
private val isSelfATeamMember: IsSelfATeamMemberUseCase,
private val serverConfigRepository: ServerConfigRepository,
private val userStorage: UserStorage,
private val oneOnOneProtocolSelector: OneOnOneProtocolSelector,
private val oneOnOneResolver: OneOnOneResolver,
private val scope: CoroutineScope
) {

Expand Down Expand Up @@ -154,8 +154,8 @@ class ConversationScope internal constructor(
val getOrCreateOneToOneConversationUseCase: GetOrCreateOneToOneConversationUseCase
get() = GetOrCreateOneToOneConversationUseCaseImpl(
conversationRepository,
conversationGroupRepository,
oneOnOneProtocolSelector
userRepository,
oneOnOneResolver
)

val updateConversationMutedStatus: UpdateConversationMutedStatusUseCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ package com.wire.kalium.logic.feature.conversation
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.ConversationGroupRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import kotlinx.coroutines.flow.first
Expand All @@ -44,26 +44,20 @@ interface GetOrCreateOneToOneConversationUseCase {

internal class GetOrCreateOneToOneConversationUseCaseImpl(
private val conversationRepository: ConversationRepository,
private val conversationGroupRepository: ConversationGroupRepository,
private val oneOnOneProtocolSelector: OneOnOneProtocolSelector,
private val userRepository: UserRepository,
private val oneOnOneResolver: OneOnOneResolver
) : GetOrCreateOneToOneConversationUseCase {
override suspend operator fun invoke(otherUserId: UserId): CreateConversationResult {
// TODO: filter out self user from the list (just in case of client bug that leads to self user to be included part of the list)
// TODO periodically re-resolve one-on-one
return conversationRepository.observeOneToOneConversationWithOtherUser(otherUserId)
.first()
.fold({ conversationFailure ->
if (conversationFailure is StorageFailure.DataNotFound) {
oneOnOneProtocolSelector.getProtocolForUser(otherUserId).flatMap { protocol ->
when (protocol) {
SupportedProtocol.MLS ->
conversationRepository.fetchMlsOneToOneConversation(otherUserId)
SupportedProtocol.PROTEUS ->
conversationGroupRepository.createGroupConversation(usersList = listOf(otherUserId))
}
}.fold(
CreateConversationResult::Failure,
CreateConversationResult::Success
)
resolveOneOnOneConversationWithUser(otherUserId)
.fold(
CreateConversationResult::Failure,
CreateConversationResult::Success
)
} else {
CreateConversationResult.Failure(conversationFailure)
}
Expand All @@ -72,6 +66,14 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
})
}

private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
(userRepository.getKnownUser(otherUserId).first()?.let { otherUser ->
// TODO support lazily establishing mls group for team 1-1
oneOnOneResolver.resolveOneOnOneConversationWithUser(otherUser).flatMap {
conversationRepository.getConversationById(it)?.let { Either.Right(it) } ?: Either.Left(StorageFailure.DataNotFound)
}
} ?: Either.Left(StorageFailure.DataNotFound))

}

sealed class CreateConversationResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.conversation.mls

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.ConversationGroupRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.user.OtherUser
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.data.user.type.isTeammate
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.map

interface OneOnOneMigrator {
suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId>
suspend fun migrateToMLS(user: OtherUser): Either<CoreFailure, ConversationId>
}

internal class OneOnOneMigratorImpl(
private val getResolvedMLSOneOnOne: MLSOneOnOneConversationResolver,
private val conversationGroupRepository: ConversationGroupRepository,
private val conversationRepository: ConversationRepository,
private val messageRepository: MessageRepository,
private val userRepository: UserRepository
) : OneOnOneMigrator {

override suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId> =
conversationRepository.getOneOnOneConversationsWithOtherUser(user.id, Conversation.Protocol.PROTEUS).flatMap { conversationIds ->
if (conversationIds.isNotEmpty()) {
val conversationId = conversationIds.first()
Either.Right(conversationId)
} else {
Either.Left(StorageFailure.DataNotFound)
}
}.fold({ failure ->
if (failure is StorageFailure.DataNotFound && user.userType.isTeammate()) {
conversationGroupRepository.createGroupConversation(usersList = listOf(user.id)).map { it.id }
} else {
Either.Left(failure)
}
}, {
Either.Right(it)
}).flatMap { conversationId ->
if (user.activeOneOnOneConversationId != conversationId) {
userRepository.updateActiveOneOnOneConversation(user.id, conversationId)
}
Either.Right(conversationId)
}

override suspend fun migrateToMLS(user: OtherUser): Either<CoreFailure, ConversationId> {
return getResolvedMLSOneOnOne(user.id)
.flatMap { mlsConversation ->
if (user.activeOneOnOneConversationId == mlsConversation) {
return@flatMap Either.Right(mlsConversation)
}
migrateOneOnOneHistory(user, mlsConversation)
.flatMap {
userRepository.updateActiveOneOnOneConversation(
conversationId = mlsConversation,
userId = user.id
).map {
mlsConversation
}
}
}
}

private suspend fun migrateOneOnOneHistory(user: OtherUser, targetConversation: ConversationId): Either<CoreFailure, Unit> {
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.conversation.mls

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.OtherUser
import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

interface OneOnOneResolver {
suspend fun resolveAllOneOnOneConversations(): Either<CoreFailure, Unit>
suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId>
}

internal class OneOnOneResolverImpl(
private val userRepository: UserRepository,
private val oneOnOneProtocolSelector: OneOnOneProtocolSelector,
private val oneOnOneMigrator: OneOnOneMigrator,
) : OneOnOneResolver {

override suspend fun resolveAllOneOnOneConversations(): Either<CoreFailure, Unit> {
val usersWithOneOnOne = userRepository.getUsersWithOneOnOneConversation()
kaliumLogger.i("Resolving one-on-one protocol for ${usersWithOneOnOne.size} user(s)")
return usersWithOneOnOne.foldToEitherWhileRight(Unit) { item, _ ->
resolveOneOnOneConversationWithUser(item).map { }
}
}

override suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId> =
oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
SupportedProtocol.MLS -> oneOnOneMigrator.migrateToMLS(user)
}
}.flatMapLeft {
if (it is CoreFailure.NoCommonProtocolFound) {
// TODO mark conversation as read only
Either.Right(Unit)
}
Either.Left(it)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.wire.kalium.logic.data.sync.SlowSyncStep
import com.wire.kalium.logic.feature.connection.SyncConnectionsUseCase
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationsUseCase
import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCase
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase
import com.wire.kalium.logic.feature.team.SyncSelfTeamUseCase
import com.wire.kalium.logic.feature.user.SyncContactsUseCase
Expand Down Expand Up @@ -58,7 +59,8 @@ internal class SlowSyncWorkerImpl(
private val syncConnections: SyncConnectionsUseCase,
private val syncSelfTeam: SyncSelfTeamUseCase,
private val syncContacts: SyncContactsUseCase,
private val joinMLSConversations: JoinExistingMLSConversationsUseCase
private val joinMLSConversations: JoinExistingMLSConversationsUseCase,
private val oneOnOneResolver: OneOnOneResolver,
) : SlowSyncWorker {

private val logger = kaliumLogger.withFeatureId(SYNC)
Expand All @@ -80,6 +82,7 @@ internal class SlowSyncWorkerImpl(
.continueWithStep(SlowSyncStep.SELF_TEAM, syncSelfTeam::invoke)
.continueWithStep(SlowSyncStep.CONTACTS, syncContacts::invoke)
.continueWithStep(SlowSyncStep.JOINING_MLS_CONVERSATIONS, joinMLSConversations::invoke)
.continueWithStep(SlowSyncStep.RESOLVE_ONE_ON_ONE_PROTOCOLS, oneOnOneResolver::resolveAllOneOnOneConversations)
.onFailure {
throw KaliumSyncException("Failure during SlowSync", it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class UserRepositoryTest {
.withUpdateOneOnOneConversationSuccess()
.arrange()

userRepository.updateOneOnOneConversation(
userRepository.updateActiveOneOnOneConversation(
userId,
conversationId
).shouldSucceed()
Expand All @@ -589,7 +589,7 @@ class UserRepositoryTest {
val userId = TestUser.USER_ID
val conversationId = TestConversation.CONVERSATION.id

connectionRepository.updateOneOnOneConversation(
connectionRepository.updateActiveOneOnOneConversation(
userId,
conversationId
).shouldFail {
Expand Down
Loading

0 comments on commit 2f10cd9

Please sign in to comment.