Skip to content

Commit

Permalink
feat: update out of sync conversation info when sending messages fails (
Browse files Browse the repository at this point in the history
#2288)

* feat: update out of sync conversation info when sending messages fails

* detekt

* detekt
  • Loading branch information
MohamadJaara authored Dec 7, 2023
1 parent 4e75095 commit c1b5925
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ interface ConversationRepository {
// TODO make all functions to have only logic models
suspend fun persistConversations(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
selfUserTeamId: TeamId?,
originatedFromEvent: Boolean = false,
invalidateMembers: Boolean = false
): Either<CoreFailure, Unit>
Expand Down Expand Up @@ -346,7 +346,7 @@ internal class ConversationDataSource internal constructor(
}
persistConversations(
conversations = conversations.conversationsFound,
selfUserTeamId = selfTeamIdProvider().getOrNull()?.value,
selfUserTeamId = selfTeamIdProvider().getOrNull(),
invalidateMembers = true
)

Expand Down Expand Up @@ -388,7 +388,7 @@ internal class ConversationDataSource internal constructor(

override suspend fun persistConversations(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
selfUserTeamId: TeamId?,
originatedFromEvent: Boolean,
invalidateMembers: Boolean
) = wrapStorageRequest {
Expand All @@ -409,7 +409,7 @@ internal class ConversationDataSource internal constructor(
conversations.forEach { conversationsResponse ->
// do the cleanup of members from conversation in case when self user rejoined conversation
// and may not received any member remove or leave events
if (invalidateMembers && conversationsResponse.type == ConversationResponse.Type.GROUP) {
if (invalidateMembers && conversationsResponse.toConversationType(selfUserTeamId) == ConversationEntity.Type.GROUP) {
memberDAO.updateFullMemberList(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
Expand Down Expand Up @@ -496,7 +496,7 @@ internal class ConversationDataSource internal constructor(
val selfUserTeamId = selfTeamIdProvider().getOrNull()
persistConversations(
conversations = listOf(conversationResponse),
selfUserTeamId = selfUserTeamId?.value
selfUserTeamId = selfUserTeamId
).map { conversationResponse }
}.flatMap { response ->
baseInfoById(response.id.toModel())
Expand Down Expand Up @@ -566,7 +566,7 @@ internal class ConversationDataSource internal constructor(
conversationApi.fetchConversationDetails(conversationID.toApi())
}.flatMap {
val selfUserTeamId = selfTeamIdProvider().getOrNull()
persistConversations(listOf(it), selfUserTeamId?.value, invalidateMembers = true)
persistConversations(listOf(it), selfUserTeamId, invalidateMembers = true)
}
}

Expand All @@ -579,7 +579,7 @@ internal class ConversationDataSource internal constructor(
val conversation = it.copy(
type = ConversationResponse.Type.WAIT_FOR_CONNECTION,
)
persistConversations(listOf(conversation), selfUserTeamId?.value, invalidateMembers = true)
persistConversations(listOf(conversation), selfUserTeamId, invalidateMembers = true)
}
}

Expand Down Expand Up @@ -1052,7 +1052,7 @@ internal class ConversationDataSource internal constructor(
}.flatMap { updated ->
if (updated) {
val selfUserTeamId = selfTeamIdProvider().getOrNull()
persistConversations(listOf(conversationResponse), selfUserTeamId?.value, invalidateMembers = true)
persistConversations(listOf(conversationResponse), selfUserTeamId, invalidateMembers = true)
} else {
Either.Right(Unit)
}.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.data.asset.AssetRepository
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.client.ProteusClientProvider
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.ProtoContentMapper
import com.wire.kalium.logic.data.message.ProtoContentMapperImpl
import com.wire.kalium.logic.data.message.SessionEstablisher
import com.wire.kalium.logic.data.message.SessionEstablisherImpl
import com.wire.kalium.logic.data.prekey.PreKeyRepository
import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.client.ProteusClientProvider
import com.wire.kalium.logic.feature.message.MLSMessageCreator
import com.wire.kalium.logic.feature.message.MLSMessageCreatorImpl
import com.wire.kalium.logic.feature.message.MessageEnvelopeCreator
Expand All @@ -44,8 +46,6 @@ import com.wire.kalium.logic.feature.message.MessageSenderImpl
import com.wire.kalium.logic.feature.message.MessageSendingInterceptor
import com.wire.kalium.logic.feature.message.MessageSendingInterceptorImpl
import com.wire.kalium.logic.feature.message.MessageSendingScheduler
import com.wire.kalium.logic.data.message.SessionEstablisher
import com.wire.kalium.logic.data.message.SessionEstablisherImpl
import com.wire.kalium.logic.feature.message.StaleEpochVerifier
import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageForSelfUserAsReceiverUseCaseImpl
import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageForSelfUserAsSenderUseCaseImpl
Expand Down Expand Up @@ -109,7 +109,13 @@ class DebugScope internal constructor(
)

private val messageSendFailureHandler: MessageSendFailureHandler
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler)
get() = MessageSendFailureHandlerImpl(
userRepository,
clientRepository,
messageRepository,
messageSendingScheduler,
conversationRepository
)

private val sessionEstablisher: SessionEstablisher
get() = SessionEstablisherImpl(proteusClientProvider, preKeyRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@ import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.data.asset.AssetRepository
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.client.ProteusClientProvider
import com.wire.kalium.logic.data.connection.ConnectionRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.message.MessageMetadataRepository
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.message.PersistMessageUseCaseImpl
import com.wire.kalium.logic.data.message.ProtoContentMapper
import com.wire.kalium.logic.data.message.SessionEstablisher
import com.wire.kalium.logic.data.message.SessionEstablisherImpl
import com.wire.kalium.logic.data.message.reaction.ReactionRepository
import com.wire.kalium.logic.data.message.receipt.ReceiptRepository
import com.wire.kalium.logic.data.prekey.PreKeyRepository
import com.wire.kalium.logic.data.properties.UserPropertyRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.client.ProteusClientProvider
import com.wire.kalium.logic.data.message.SessionEstablisher
import com.wire.kalium.logic.data.message.SessionEstablisherImpl
import com.wire.kalium.logic.feature.asset.GetAssetMessagesForConversationUseCase
import com.wire.kalium.logic.feature.asset.GetAssetMessagesForConversationUseCaseImpl
import com.wire.kalium.logic.feature.asset.GetMessageAssetUseCase
Expand Down Expand Up @@ -99,7 +99,13 @@ class MessageScope internal constructor(
) {

private val messageSendFailureHandler: MessageSendFailureHandler
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler)
get() = MessageSendFailureHandlerImpl(
userRepository,
clientRepository,
messageRepository,
messageSendingScheduler,
conversationRepository
)

private val sessionEstablisher: SessionEstablisher
get() = SessionEstablisherImpl(proteusClientProvider, preKeyRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.user.UserId
Expand All @@ -42,7 +43,10 @@ interface MessageSendFailureHandler {
* @return Either.Left if can't recover from error
* @return Either.Right if the error was properly handled and a new attempt at sending message can be made
*/
suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either<CoreFailure, Unit>
suspend fun handleClientsHaveChangedFailure(
sendFailure: ProteusSendMessageFailure,
conversationId: ConversationId?
): Either<CoreFailure, Unit>

/**
* Handle a failure when attempting to send a message
Expand All @@ -66,20 +70,34 @@ class MessageSendFailureHandlerImpl internal constructor(
private val userRepository: UserRepository,
private val clientRepository: ClientRepository,
private val messageRepository: MessageRepository,
private val messageSendingScheduler: MessageSendingScheduler
private val messageSendingScheduler: MessageSendingScheduler,
private val conversationRepository: ConversationRepository,
) : MessageSendFailureHandler {

override suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either<CoreFailure, Unit> =
// TODO(optimization): add/remove members to/from conversation
override suspend fun handleClientsHaveChangedFailure(
sendFailure: ProteusSendMessageFailure,
conversationId: ConversationId?
): Either<CoreFailure, Unit> =
handleDeletedClients(sendFailure.deletedClientsOfUsers)
.map { usersWithNoClientsRemaining ->
sendFailure.missingClientsOfUsers.keys + usersWithNoClientsRemaining
}.flatMap { usersThatNeedInfoRefresh ->
syncUserIds(usersThatNeedInfoRefresh)
}.flatMap {
updateConversationInfo(sendFailure, conversationId)
}.flatMap {
addMissingClients(sendFailure.missingClientsOfUsers)
}

private suspend fun updateConversationInfo(
sendFailure: ProteusSendMessageFailure,
conversationId: ConversationId?
): Either<CoreFailure, Unit> = when {
(conversationId == null) -> Either.Right(Unit)
(sendFailure.deletedClientsOfUsers.isEmpty() && sendFailure.missingClientsOfUsers.isEmpty()) -> Either.Right(Unit)
else -> conversationRepository.fetchConversation(conversationId)
}

private suspend fun handleDeletedClients(deletedClient: Map<UserId, List<ClientId>>): Either<StorageFailure, Set<UserId>> {
return if (deletedClient.isEmpty()) Either.Right(emptySet())
else clientRepository.removeClientsAndReturnUsersWithNoClients(deletedClient).map { it.toSet() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ internal class MessageSenderImpl internal constructor(
messageRepository
.sendEnvelope(message.conversationId, envelope, messageTarget)
.fold({
handleProteusError(it, "Send", message.toLogString()) {
handleProteusError(it, "Send", message.toLogString(), message.conversationId) {
attemptToSendWithProteus(message, messageTarget)
}
}, { messageSent ->
Expand All @@ -375,7 +375,7 @@ internal class MessageSenderImpl internal constructor(
messageRepository
.broadcastEnvelope(envelope, option)
.fold({
handleProteusError(it, "Broadcast", message.toLogString()) {
handleProteusError(it, "Broadcast", message.toLogString(), null) {
attemptToBroadcastWithProteus(
message,
target
Expand All @@ -390,6 +390,7 @@ internal class MessageSenderImpl internal constructor(
failure: CoreFailure,
action: String, // Send or Broadcast
messageLogString: String,
conversationId: ConversationId?,
retry: suspend () -> Either<CoreFailure, String>
) =
when (failure) {
Expand All @@ -398,7 +399,7 @@ internal class MessageSenderImpl internal constructor(
"Proteus $action Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }"
)
messageSendFailureHandler
.handleClientsHaveChangedFailure(failure)
.handleClientsHaveChangedFailure(failure, conversationId)
.flatMap {
logger.w("Retrying After Proteus $action Failure: { \"message\" : \"${messageLogString}\"}")
retry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.data.id.SelfTeamIdProvider
import com.wire.kalium.logic.data.id.TeamId
import com.wire.kalium.logic.framework.TestConversation
import com.wire.kalium.logic.framework.TestTeam
import com.wire.kalium.logic.framework.TestUser
Expand Down Expand Up @@ -137,7 +138,7 @@ class ConversationRepositoryTest {
.withSelfUserFlow(selfUserFlow)
.arrange()

conversationRepository.persistConversations(listOf(event.conversation), "teamId")
conversationRepository.persistConversations(listOf(event.conversation), TeamId("teamId"))

with(arrangement) {
verify(conversationDAO)
Expand Down Expand Up @@ -246,7 +247,7 @@ class ConversationRepositoryTest {

conversationRepository.persistConversations(
listOf(event.conversation),
"teamId",
TeamId("teamId"),
originatedFromEvent = true
)

Expand Down
Loading

0 comments on commit c1b5925

Please sign in to comment.