Skip to content

Commit

Permalink
refactor: message send error handling (#2272)
Browse files Browse the repository at this point in the history
* handle message send errors

* add unit tests

* detekt
  • Loading branch information
MohamadJaara authored Nov 29, 2023
1 parent 7a6fd54 commit 02911ee
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.functional.Either
Expand Down Expand Up @@ -67,6 +68,11 @@ interface ClientRepository {
suspend fun observeClientsByUserIdAndClientId(userId: UserId, clientId: ClientId): Flow<Either<StorageFailure, Client>>
suspend fun storeUserClientListAndRemoveRedundantClients(clients: List<InsertClientParam>): Either<StorageFailure, Unit>
suspend fun storeUserClientIdList(userId: UserId, clients: List<ClientId>): Either<StorageFailure, Unit>
suspend fun storeMapOfUserToClientId(userToClientMap: Map<UserId, List<ClientId>>): Either<StorageFailure, Unit>
suspend fun removeClientsAndReturnUsersWithNoClients(
redundantClientsOfUsers: Map<UserId, List<ClientId>>
): Either<StorageFailure, List<UserId>>

suspend fun registerToken(body: PushTokenBody): Either<NetworkFailure, Unit>
suspend fun deregisterToken(token: String): Either<NetworkFailure, Unit>
suspend fun getClientsByUserId(userId: UserId): Either<StorageFailure, List<OtherUserClient>>
Expand Down Expand Up @@ -187,11 +193,35 @@ class ClientDataSource(
clientRegistrationStorage.hasRegisteredMLSClient()
}

override suspend fun storeUserClientIdList(userId: UserId, clients: List<ClientId>): Either<StorageFailure, Unit> =
override suspend fun storeUserClientIdList(
userId: UserId,
clients: List<ClientId>
): Either<StorageFailure, Unit> =
clientMapper.toInsertClientParam(userId, clients).let { clientEntityList ->
wrapStorageRequest { clientDAO.insertClients(clientEntityList) }
}

override suspend fun storeMapOfUserToClientId(
userToClientMap: Map<UserId, List<ClientId>>
): Either<StorageFailure, Unit> =
userToClientMap.flatMap { (userId, clients) ->
clientMapper.toInsertClientParam(userId, clients)
}.let { insertClientParamList ->
wrapStorageRequest { clientDAO.insertClients(insertClientParamList) }
}

override suspend fun removeClientsAndReturnUsersWithNoClients(
redundantClientsOfUsers: Map<UserId, List<ClientId>>
): Either<StorageFailure, List<UserId>> =
redundantClientsOfUsers.mapKeys { it.key.toDao() }
.mapValues { it.value.map { clientId -> clientId.value } }
.let { redundantClientsOfUsersDao ->
wrapStorageRequest { clientDAO.removeClientsAndReturnUsersWithNoClients(redundantClientsOfUsersDao) }
.map {
it.map { userId -> userId.toModel() }
}
}

override suspend fun storeUserClientListAndRemoveRedundantClients(
clients: List<InsertClientParam>
): Either<StorageFailure, Unit> = wrapStorageRequest { clientDAO.insertClientsAndRemoveRedundant(clients) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ interface SendMessageFailureMapper {
class SendMessageFailureMapperImpl : SendMessageFailureMapper {
override fun fromDTO(error: ProteusClientsChangedError): ProteusSendMessageFailure = with(error.errorBody) {
ProteusSendMessageFailure(
missing.fromNestedMapToSimpleMap(),
redundant.fromNestedMapToSimpleMap(),
deleted.fromNestedMapToSimpleMap(),
failedToConfirmClients?.fromNestedMapToSimpleMap()
missingClientsOfUsers = missing.fromNestedMapToSimpleMap(),
redundantClientsOfUsers = redundant.fromNestedMapToSimpleMap(),
deletedClientsOfUsers = deleted.fromNestedMapToSimpleMap(),
failedClientsOfUsers = failedToConfirmClients?.fromNestedMapToSimpleMap()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ package com.wire.kalium.logic.feature.message

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.failure.ProteusSendMessageFailure
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.persistence.dao.message.MessageEntity

Expand Down Expand Up @@ -67,19 +70,31 @@ class MessageSendFailureHandlerImpl internal constructor(
) : MessageSendFailureHandler {

override suspend fun handleClientsHaveChangedFailure(sendFailure: ProteusSendMessageFailure): Either<CoreFailure, Unit> =
// TODO(optimization): add/remove members to/from conversation
// TODO(optimization): remove clients from conversation
userRepository
.fetchUsersByIds(sendFailure.missingClientsOfUsers.keys)
.flatMap {
sendFailure
.missingClientsOfUsers
.entries
.foldToEitherWhileRight(Unit) { entry, _ ->
clientRepository.storeUserClientIdList(entry.key, entry.value)
}
// TODO(optimization): add/remove members to/from conversation
handleDeletedClients(sendFailure.deletedClientsOfUsers)
.map { usersWithNoClientsRemaining ->
sendFailure.missingClientsOfUsers.keys + usersWithNoClientsRemaining
}.flatMap { usersThatNeedInfoRefresh ->
syncUserIds(usersThatNeedInfoRefresh)
}.flatMap {
addMissingClients(sendFailure.missingClientsOfUsers)
}

private suspend fun handleDeletedClients(deletedClient: Map<UserId, List<ClientId>>): Either<StorageFailure, Set<UserId>> {
return if (deletedClient.isEmpty()) Either.Right(emptySet())
else clientRepository.removeClientsAndReturnUsersWithNoClients(deletedClient).map { it.toSet() }
}

private suspend fun syncUserIds(userId: Set<UserId>): Either<CoreFailure, Unit> {
return if (userId.isEmpty()) Either.Right(Unit)
else userRepository.fetchUsersByIds(userId)
}

private suspend fun addMissingClients(missingClients: Map<UserId, List<ClientId>>): Either<CoreFailure, Unit> {
return if (missingClients.isEmpty()) Either.Right(Unit)
else clientRepository.storeMapOfUserToClientId(missingClients)
}

override suspend fun handleFailureAndUpdateMessageStatus(
failure: CoreFailure,
conversationId: ConversationId,
Expand All @@ -92,10 +107,12 @@ class MessageSendFailureHandlerImpl internal constructor(
kaliumLogger.e("Sending message of type $messageType failed due to federation context availability.")
messageRepository.updateMessageStatus(MessageEntity.Status.FAILED_REMOTELY, conversationId, messageId)
}

failure is NetworkFailure.NoNetworkConnection && scheduleResendIfNoNetwork -> {
kaliumLogger.i("Scheduling message for retrying in the future.")
messageSendingScheduler.scheduleSendingOfPendingMessages()
}

else -> {
messageRepository.updateMessageStatus(MessageEntity.Status.FAILED, conversationId, messageId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fun <L, R> Either<L, R>.getOrElse(value: R): R =
* Returns the value from this `Right` or the result of `fn` if this is a `Left`.
* Right(12).getOrElse{ it + 3 } RETURNS 12 and Left(12).getOrElse{ it + 3 } RETURNS 15
*/
fun <L, R> Either<L, R>.getOrElse(fn: (L) -> (R)): R =
inline fun <L, R> Either<L, R>.getOrElse(fn: (L) -> (R)): R =
when (this) {
is Left -> fn(value)
is Right -> this.value
Expand Down
Loading

0 comments on commit 02911ee

Please sign in to comment.