Skip to content

Commit

Permalink
feat: resolve active 1-1 after accepting a connection request (#2042)
Browse files Browse the repository at this point in the history
  • Loading branch information
typfel committed Oct 13, 2023
1 parent f4506ed commit fcb0e42
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,12 @@ class UserSessionScope internal constructor(
kaliumConfigs
)

val connection: ConnectionScope get() = ConnectionScope(connectionRepository, conversationRepository)
val connection: ConnectionScope
get() = ConnectionScope(
connectionRepository,
conversationRepository,
oneOnOneResolver
)

val observeSecurityClassificationLabel: ObserveSecurityClassificationLabelUseCase
get() = ObserveSecurityClassificationLabelUseCaseImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import com.wire.kalium.logic.data.connection.ConnectionRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.user.ConnectionState
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.DateTimeUtil

Expand All @@ -44,16 +46,26 @@ fun interface AcceptConnectionRequestUseCase {
internal class AcceptConnectionRequestUseCaseImpl(
private val connectionRepository: ConnectionRepository,
private val conversationRepository: ConversationRepository,
private val oneOnOneResolver: OneOnOneResolver
) : AcceptConnectionRequestUseCase {

override suspend fun invoke(userId: UserId): AcceptConnectionRequestUseCaseResult {
return connectionRepository.updateConnectionStatus(userId, ConnectionState.ACCEPTED)
.flatMap {
conversationRepository.fetchConversation(it.qualifiedConversationId)
conversationRepository.updateConversationModifiedDate(it.qualifiedConversationId, DateTimeUtil.currentInstant())
.flatMap { connection ->
conversationRepository.fetchConversation(connection.qualifiedConversationId)
.flatMap {
conversationRepository.updateConversationModifiedDate(
connection.qualifiedConversationId,
DateTimeUtil.currentInstant()
)
}.flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUserId(
connection.qualifiedToId
).map { }
}
}
.fold({
kaliumLogger.e("An error occurred when accepting the connection request from $userId")
kaliumLogger.e("An error occurred when accepting the connection request from ${userId.toLogString()}: $it")
AcceptConnectionRequestUseCaseResult.Failure(it)
}, {
AcceptConnectionRequestUseCaseResult.Success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ package com.wire.kalium.logic.feature.connection

import com.wire.kalium.logic.data.connection.ConnectionRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver

class ConnectionScope(
private val connectionRepository: ConnectionRepository,
private val conversationRepository: ConversationRepository,
private val oneOnOneResolver: OneOnOneResolver
) {
val sendConnectionRequest: SendConnectionRequestUseCase get() = SendConnectionRequestUseCaseImpl(connectionRepository)

val acceptConnectionRequest: AcceptConnectionRequestUseCase
get() = AcceptConnectionRequestUseCaseImpl(
connectionRepository,
conversationRepository
conversationRepository,
oneOnOneResolver
)

val cancelConnectionRequest: CancelConnectionRequestUseCase get() = CancelConnectionRequestUseCaseImpl(connectionRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ internal class JoinExistingMLSConversationUseCaseImpl(
if (failure.kaliumException.isMlsStaleMessage()) {
kaliumLogger.w("Epoch out of date for conversation ${conversation.id}, re-fetching and re-trying")
// Re-fetch current epoch and try again
conversationRepository.fetchConversation(conversation.id).flatMap {
if (conversation.type == Conversation.Type.ONE_ON_ONE) {
conversationRepository.getConversationMembers(conversation.id).flatMap {
conversationRepository.fetchMlsOneToOneConversation(it.first())
}
} else {
conversationRepository.fetchConversation(conversation.id)
}.flatMap {
conversationRepository.baseInfoById(conversation.id).flatMap { conversation ->
joinOrEstablishMLSGroup(conversation)
}
Expand Down Expand Up @@ -139,6 +145,7 @@ internal class JoinExistingMLSConversationUseCaseImpl(
}

type == Conversation.Type.ONE_ON_ONE -> {
kaliumLogger.i("Establish group for ${conversation.type}")
conversationRepository.getConversationMembers(conversation.id).flatMap { members ->
mlsConversationRepository.establishMLSGroup(
protocol.groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUse
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

/**
* Attempts to find an existing MLS-capable one-on-one conversation,
Expand Down Expand Up @@ -62,8 +63,10 @@ internal class MLSOneOnOneConversationResolverImpl(
}

if (initializedMLSOneOnOne != null) {
kaliumLogger.d("Already established mls group for one-on-one with ${userId.toLogString()}, skipping.")
Either.Right(initializedMLSOneOnOne.id)
} else {
kaliumLogger.d("Establishing mls group for one-on-one with ${userId.toLogString()}")
conversationRepository.fetchMlsOneToOneConversation(userId).flatMap { conversation ->
joinExistingMLSConversationUseCase(conversation.id).map { conversation.id }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

interface OneOnOneMigrator {
suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId>
Expand Down Expand Up @@ -64,6 +65,7 @@ internal class OneOnOneMigratorImpl(
Either.Right(it)
}).flatMap { conversationId ->
if (user.activeOneOnOneConversationId != conversationId) {
kaliumLogger.d("resolved one-on-one to proteus, user = ${user.id.toLogString()}")
userRepository.updateActiveOneOnOneConversation(user.id, conversationId)
}
Either.Right(conversationId)
Expand All @@ -75,6 +77,9 @@ internal class OneOnOneMigratorImpl(
if (user.activeOneOnOneConversationId == mlsConversation) {
return@flatMap Either.Right(mlsConversation)
}

kaliumLogger.d("resolved one-on-one to MLS, user = ${user.id.toLogString()}")

migrateOneOnOneHistory(user, mlsConversation)
.flatMap {
userRepository.updateActiveOneOnOneConversation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package com.wire.kalium.logic.feature.conversation.mls

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.OtherUser
import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.functional.Either
Expand All @@ -29,9 +31,11 @@ import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import kotlinx.coroutines.flow.firstOrNull

interface OneOnOneResolver {
suspend fun resolveAllOneOnOneConversations(): Either<CoreFailure, Unit>
suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId>
suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId>
}

Expand All @@ -49,8 +53,14 @@ internal class OneOnOneResolverImpl(
}
}

override suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId> =
oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
override suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId> =
userRepository.getKnownUser(userId).firstOrNull()?.let {
resolveOneOnOneConversationWithUser(it)
} ?: Either.Left(StorageFailure.DataNotFound)

override suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId> {
kaliumLogger.i("Resolving one-on-one protocol for ${user.id.toLogString()}")
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
SupportedProtocol.MLS -> oneOnOneMigrator.migrateToMLS(user)
Expand All @@ -62,4 +72,5 @@ internal class OneOnOneResolverImpl(
}
Either.Left(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import kotlinx.coroutines.flow.first

internal interface UserEventReceiver : EventReceiver<Event.User>

Expand Down Expand Up @@ -97,7 +96,7 @@ internal class UserEventReceiverImpl internal constructor(
private suspend fun handleNewConnection(event: Event.User.NewConnection): Either<CoreFailure, Unit> =
connectionRepository.insertConnectionFromEvent(event)
.flatMap {
resolveOneOnOneConversationUponConnectionAccepted(event.connection)
resolveActiveOneOnOneConversationUponConnectionAccepted(event.connection)
}
.onSuccess {
kaliumLogger
Expand All @@ -115,11 +114,9 @@ internal class UserEventReceiverImpl internal constructor(
)
}

private suspend fun resolveOneOnOneConversationUponConnectionAccepted(connection: Connection): Either<CoreFailure, Unit> =
private suspend fun resolveActiveOneOnOneConversationUponConnectionAccepted(connection: Connection): Either<CoreFailure, Unit> =
if (connection.status == ConnectionState.ACCEPTED) {
userRepository.getKnownUser(connection.qualifiedToId).first()?.let {
oneOnOneResolver.resolveOneOnOneConversationWithUser(it).map { }
} ?: Either.Right(Unit)
oneOnOneResolver.resolveOneOnOneConversationWithUserId(connection.qualifiedToId).map { }
} else {
Either.Right(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package com.wire.kalium.logic.sync.receiver.conversation

import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.conversation.Conversation
Expand Down Expand Up @@ -57,29 +56,26 @@ internal class MLSWelcomeEventHandlerImpl(
client.processWelcomeMessage(event.message.decodeBase64Bytes())
}
}.flatMap { groupID ->
val groupIdLogPair = Pair("groupId", groupID.obfuscateId())

conversationRepository.fetchConversationIfUnknown(event.conversationId)
.flatMap {
markConversationAsEstablished(GroupID(groupID))
}.flatMap {
resolveConversationIfOneOnOne(event.conversationId)
}.onSuccess {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SUCCESS,
event,
Pair("info", "Established mls conversation from welcome message"),
groupIdLogPair
)
}.onFailure {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.FAILURE,
event,
groupIdLogPair
)
}
}
}.onSuccess {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SUCCESS,
event,
Pair("info", "Established mls conversation from welcome message")
)
}.onFailure {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.FAILURE,
event,
Pair("failure", it)
)
}

private suspend fun markConversationAsEstablished(groupID: GroupID): Either<CoreFailure, Unit> =
Expand Down
Loading

0 comments on commit fcb0e42

Please sign in to comment.