Skip to content

Commit

Permalink
fix: disable status update for private users (#2202)
Browse files Browse the repository at this point in the history
* implement sync migration logic and migrate to 7

* add unit tests

* detekt

* detekt

* add test for the order of migration steps

* move migration to the top of the execution sync stack and add more tests

* fix test

* address PR comments

* detekt

* add docs
  • Loading branch information
MohamadJaara authored Nov 14, 2023
1 parent cdf5567 commit 83cdfef
Show file tree
Hide file tree
Showing 19 changed files with 720 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ fun interface CurrentClientIdProvider {
suspend operator fun invoke(): Either<CoreFailure, ClientId>
}

fun interface SelfTeamIdProvider {
internal fun interface SelfTeamIdProvider {
suspend operator fun invoke(): Either<CoreFailure, TeamId?>
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal interface SlowSyncRepository {
suspend fun observeLastSlowSyncCompletionInstant(): Flow<Instant?>
fun updateSlowSyncStatus(slowSyncStatus: SlowSyncStatus)
suspend fun setSlowSyncVersion(version: Int)
suspend fun getSlowSyncVersion(): Int
suspend fun getSlowSyncVersion(): Int?
}

internal class SlowSyncRepositoryImpl(private val metadataDao: MetadataDAO) : SlowSyncRepository {
Expand Down Expand Up @@ -97,9 +97,7 @@ internal class SlowSyncRepositoryImpl(private val metadataDao: MetadataDAO) : Sl
metadataDao.insertValue(value = version.toString(), key = SLOW_SYNC_VERSION_KEY)
}

override suspend fun getSlowSyncVersion(): Int {
return metadataDao.valueByKey(key = SLOW_SYNC_VERSION_KEY)?.toIntOrNull() ?: 0
}
override suspend fun getSlowSyncVersion(): Int? = metadataDao.valueByKey(key = SLOW_SYNC_VERSION_KEY)?.toInt()

companion object {
const val LAST_SLOW_SYNC_INSTANT_KEY = "lastSlowSyncInstant"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ enum class SlowSyncStep {
SELF_TEAM,
CONTACTS,
JOINING_MLS_CONVERSATIONS,
RESOLVE_ONE_ON_ONE_PROTOCOLS
RESOLVE_ONE_ON_ONE_PROTOCOLS,
MIGRATION
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.wire.kalium.logic.data.user

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.functional.Either
Expand All @@ -43,7 +44,7 @@ internal interface AccountRepository {
*/
suspend fun updateSelfEmail(email: String): Either<NetworkFailure, Boolean>
suspend fun updateLocalSelfUserHandle(handle: String): Either<CoreFailure, Unit>
suspend fun updateSelfUserAvailabilityStatus(status: UserAvailabilityStatus)
suspend fun updateSelfUserAvailabilityStatus(status: UserAvailabilityStatus): Either<StorageFailure, Unit>
}

internal class AccountRepositoryImpl(
Expand Down Expand Up @@ -76,7 +77,7 @@ internal class AccountRepositoryImpl(
userDAO.updateUserHandle(selfUserId.toDao(), handle)
}

override suspend fun updateSelfUserAvailabilityStatus(status: UserAvailabilityStatus) {
override suspend fun updateSelfUserAvailabilityStatus(status: UserAvailabilityStatus) = wrapStorageRequest {
userDAO.updateUserAvailabilityStatus(
selfUserId.toDao(),
availabilityStatusMapper.fromModelAvailabilityStatusToDao(status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ import com.wire.kalium.logic.sync.slow.SlowSyncRecoveryHandler
import com.wire.kalium.logic.sync.slow.SlowSyncRecoveryHandlerImpl
import com.wire.kalium.logic.sync.slow.SlowSyncWorker
import com.wire.kalium.logic.sync.slow.SlowSyncWorkerImpl
import com.wire.kalium.logic.sync.slow.migration.SyncMigrationStepsProvider
import com.wire.kalium.logic.sync.slow.migration.SyncMigrationStepsProviderImpl
import com.wire.kalium.logic.util.MessageContentEncoder
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.network.networkContainer.AuthenticatedNetworkContainer
Expand Down Expand Up @@ -954,13 +956,19 @@ class UserSessionScope internal constructor(
private val slowSyncRecoveryHandler: SlowSyncRecoveryHandler
get() = SlowSyncRecoveryHandlerImpl(logout)

private val syncMigrationStepsProvider: () -> SyncMigrationStepsProvider = {
SyncMigrationStepsProviderImpl(lazy { accountRepository }, selfTeamId)
}

private val slowSyncManager: SlowSyncManager by lazy {
SlowSyncManager(
slowSyncCriteriaProvider,
slowSyncRepository,
slowSyncWorker,
slowSyncRecoveryHandler,
networkStateObserver
networkStateObserver,
syncMigrationStepsProvider

)
}
private val mlsConversationsRecoveryManager: MLSConversationsRecoveryManager by lazy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.wire.kalium.logic.functional.combine
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.SyncExceptionHandler
import com.wire.kalium.logic.sync.incremental.IncrementalSyncManager
import com.wire.kalium.logic.sync.slow.migration.SyncMigrationStepsProvider
import com.wire.kalium.logic.sync.slow.migration.steps.SyncMigrationStep
import com.wire.kalium.logic.util.ExponentialDurationHelper
import com.wire.kalium.logic.util.ExponentialDurationHelperImpl
import com.wire.kalium.network.NetworkStateObserver
Expand Down Expand Up @@ -64,6 +66,7 @@ internal class SlowSyncManager(
private val slowSyncWorker: SlowSyncWorker,
private val slowSyncRecoveryHandler: SlowSyncRecoveryHandler,
private val networkStateObserver: NetworkStateObserver,
private val syncMigrationStepsProvider: () -> SyncMigrationStepsProvider,
kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl,
private val exponentialDurationHelper: ExponentialDurationHelper = ExponentialDurationHelperImpl(MIN_RETRY_DELAY, MAX_RETRY_DELAY)
) {
Expand Down Expand Up @@ -96,17 +99,29 @@ internal class SlowSyncManager(
startMonitoring()
}

private suspend fun isSlowSyncNeededFlow(): Flow<Boolean> = slowSyncRepository.observeLastSlowSyncCompletionInstant()
.map { lastTimeSlowSyncWasPerformed ->
lastTimeSlowSyncWasPerformed?.let {
val currentTime = DateTimeUtil.currentInstant()
logger.i("Last SlowSync was performed on '$lastTimeSlowSyncWasPerformed'")
val nextSlowSyncDateTime = lastTimeSlowSyncWasPerformed + MIN_TIME_BETWEEN_SLOW_SYNCS
logger.i("Next SlowSync should be performed on '$nextSlowSyncDateTime'")
val lastVersion = slowSyncRepository.getSlowSyncVersion()
logger.i("Last saved SlowSync version is $lastVersion, current is $CURRENT_VERSION")
currentTime > nextSlowSyncDateTime || CURRENT_VERSION > lastVersion
} ?: true
private suspend fun isSlowSyncNeededFlow(): Flow<SlowSyncParam> = slowSyncRepository.observeLastSlowSyncCompletionInstant()
.map { latestSlowSync ->
logger.i("Last SlowSync was performed on '$latestSlowSync'")
val lastVersion = slowSyncRepository.getSlowSyncVersion()
when {
(lastVersion != null) && (CURRENT_VERSION > lastVersion) -> {
logger.i("Last saved SlowSync version is $lastVersion, current is $CURRENT_VERSION")
SlowSyncParam.MigrationNeeded(oldVersion = lastVersion, newVersion = CURRENT_VERSION)
}

latestSlowSync == null -> {
SlowSyncParam.NotPerformedBefore
}

DateTimeUtil.currentInstant() > (latestSlowSync + MIN_TIME_BETWEEN_SLOW_SYNCS) -> {
logger.i("Slow sync too old - last slow sync was performed on '$latestSlowSync'")
SlowSyncParam.LastSlowSyncTooOld
}

else -> {
SlowSyncParam.Success
}
}
}

private fun startMonitoring() {
Expand All @@ -122,19 +137,33 @@ internal class SlowSyncManager(
}
}

private suspend fun handleCriteriaResolution(syncCriteriaResolution: SyncCriteriaResolution, isSlowSyncNeeded: Boolean) {
private suspend fun handleCriteriaResolution(syncCriteriaResolution: SyncCriteriaResolution, isSlowSyncNeeded: SlowSyncParam) {
if (syncCriteriaResolution is SyncCriteriaResolution.Ready) {
// START SYNC IF NEEDED
logger.i("SlowSync criteria ready, checking if SlowSync is needed or already performed")
if (isSlowSyncNeeded) {
logger.i("Starting SlowSync as all criteria are met and it wasn't performed recently")
performSlowSync()
logger.i("SlowSync completed. Updating last completion instant")
slowSyncRepository.setSlowSyncVersion(CURRENT_VERSION)
slowSyncRepository.setLastSlowSyncCompletionInstant(DateTimeUtil.currentInstant())
} else {
logger.i("No need to perform SlowSync. Marking as Complete")

when (isSlowSyncNeeded) {
SlowSyncParam.LastSlowSyncTooOld,
SlowSyncParam.NotPerformedBefore -> {
performSlowSync(emptyList())
}

is SlowSyncParam.MigrationNeeded -> {
val migrationSteps = syncMigrationStepsProvider()
.getMigrationSteps(
isSlowSyncNeeded.oldVersion,
isSlowSyncNeeded.newVersion
)
performSlowSync(
migrationSteps = migrationSteps
)
}

SlowSyncParam.Success -> {
logger.i("No need to perform SlowSync. Marking as Complete")
}
}

exponentialDurationHelper.reset()
slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Complete)
} else {
Expand All @@ -144,11 +173,15 @@ internal class SlowSyncManager(
}
}

private suspend fun performSlowSync() {
slowSyncWorker.slowSyncStepsFlow().cancellable().collect { step ->
private suspend fun performSlowSync(migrationSteps: List<SyncMigrationStep>) {
logger.i("Starting SlowSync as all criteria are met and it wasn't performed recently")
slowSyncWorker.slowSyncStepsFlow(migrationSteps).cancellable().collect { step ->
logger.i("Performing SlowSyncStep $step")
slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Ongoing(step))
}
logger.i("SlowSync completed. Updating last completion instant")
slowSyncRepository.setSlowSyncVersion(CURRENT_VERSION)
slowSyncRepository.setLastSlowSyncCompletionInstant(DateTimeUtil.currentInstant())
}

private companion object {
Expand All @@ -159,7 +192,7 @@ internal class SlowSyncManager(
* Useful when a new step is added to Slow Sync, or when we fix some bug in Slow Sync,
* and we'd like to get all users to take advantage of the fix.
*/
const val CURRENT_VERSION = 6
const val CURRENT_VERSION = 7

val MIN_RETRY_DELAY = 1.seconds
val MAX_RETRY_DELAY = 10.minutes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.sync.slow

sealed interface SlowSyncParam {
data object Success : SlowSyncParam
data object NotPerformedBefore : SlowSyncParam
data object LastSlowSyncTooOld : SlowSyncParam
data class MigrationNeeded(val oldVersion: Int, val newVersion: Int) : SlowSyncParam
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package com.wire.kalium.logic.sync.slow

import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.JoinExistingMLSConversationsUseCase
import com.wire.kalium.logic.data.event.EventRepository
import com.wire.kalium.logic.data.sync.SlowSyncStep
import com.wire.kalium.logic.feature.connection.SyncConnectionsUseCase
import com.wire.kalium.logic.data.conversation.JoinExistingMLSConversationsUseCase
import com.wire.kalium.logic.feature.conversation.SyncConversationsUseCase
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase
Expand All @@ -33,12 +33,14 @@ import com.wire.kalium.logic.feature.user.SyncSelfUserUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCase
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.isRight
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.nullableFold
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.KaliumSyncException
import com.wire.kalium.logic.sync.slow.migration.steps.SyncMigrationStep
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
Expand All @@ -51,7 +53,7 @@ internal interface SlowSyncWorker {
* Performs all [SlowSyncStep] in the correct order,
* emits the current ongoing step.
*/
suspend fun slowSyncStepsFlow(): Flow<SlowSyncStep>
suspend fun slowSyncStepsFlow(migrationSteps: List<SyncMigrationStep>): Flow<SlowSyncStep>
}

@Suppress("LongParameterList")
Expand All @@ -70,7 +72,7 @@ internal class SlowSyncWorkerImpl(

private val logger = kaliumLogger.withFeatureId(SYNC)

override suspend fun slowSyncStepsFlow(): Flow<SlowSyncStep> = flow {
override suspend fun slowSyncStepsFlow(migrationSteps: List<SyncMigrationStep>): Flow<SlowSyncStep> = flow {

suspend fun Either<CoreFailure, Unit>.continueWithStep(
slowSyncStep: SlowSyncStep,
Expand All @@ -81,7 +83,12 @@ internal class SlowSyncWorkerImpl(

val lastProcessedEventIdToSaveOnSuccess = getLastProcessedEventIdToSaveOnSuccess()

performStep(SlowSyncStep.SELF_USER, syncSelfUser::invoke)
performStep(SlowSyncStep.MIGRATION) {
migrationSteps.foldToEitherWhileRight(Unit) { step, _ ->
step()
}
}
.continueWithStep(SlowSyncStep.SELF_USER, syncSelfUser::invoke)
.continueWithStep(SlowSyncStep.FEATURE_FLAGS, syncFeatureConfigs::invoke)
.continueWithStep(SlowSyncStep.UPDATE_SUPPORTED_PROTOCOLS) { updateSupportedProtocols.invoke().map { } }
.continueWithStep(SlowSyncStep.CONVERSATIONS, syncConversations::invoke)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.sync.slow.migration

import com.wire.kalium.logic.data.id.SelfTeamIdProvider
import com.wire.kalium.logic.data.user.AccountRepository
import com.wire.kalium.logic.sync.slow.migration.steps.SyncMigrationStep
import com.wire.kalium.logic.sync.slow.migration.steps.SyncMigrationStep_6_7

internal interface SyncMigrationStepsProvider {
fun getMigrationSteps(fromVersion: Int, toVersion: Int): List<SyncMigrationStep>
}

@Suppress("MagicNumber")
internal class SyncMigrationStepsProviderImpl(
accountRepository: Lazy<AccountRepository>,
selfTeamIdProvider: SelfTeamIdProvider
) : SyncMigrationStepsProvider {

private val steps = mapOf(
7 to lazy { SyncMigrationStep_6_7(accountRepository, selfTeamIdProvider) }
)

override fun getMigrationSteps(fromVersion: Int, toVersion: Int): List<SyncMigrationStep> {
return steps
.filter { it.key in (fromVersion + 1)..toVersion }.values
.sortedBy { it.value.version }
.map { it.value }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.sync.slow.migration.steps

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.functional.Either

/**
* Migration step.
* this interface provide a way to migrate the sync version of the user.
* the logic is executed before sync itself
* keep in mind this logic can run multiple times
* since it runs before sync then if one of the sync steps after it failed,
* and we need to retry sync then this logic will run again
* @property version The sync version after executing this migration step.
* @property invoke The migration step itself
*/
internal interface SyncMigrationStep {
val version: Int
suspend operator fun invoke(): Either<CoreFailure, Unit>
}
Loading

0 comments on commit 83cdfef

Please sign in to comment.