Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): pull feature flags when online [WPB-5403] #2208

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ import com.wire.kalium.logic.feature.conversation.keyingmaterials.KeyingMaterial
import com.wire.kalium.logic.feature.debug.DebugScope
import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCase
import com.wire.kalium.logic.feature.e2ei.EnrollE2EIUseCaseImpl
import com.wire.kalium.logic.feature.featureConfig.FeatureFlagSyncWorkerImpl
import com.wire.kalium.logic.feature.featureConfig.FeatureFlagsSyncWorker
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCase
import com.wire.kalium.logic.feature.featureConfig.SyncFeatureConfigsUseCaseImpl
import com.wire.kalium.logic.feature.keypackage.KeyPackageManager
Expand Down Expand Up @@ -1248,6 +1250,13 @@ class UserSessionScope internal constructor(
)
}

private val featureFlagsSyncWorker: FeatureFlagsSyncWorker by lazy {
FeatureFlagSyncWorkerImpl(
incrementalSyncRepository = incrementalSyncRepository,
syncFeatureConfigs = syncFeatureConfigsUseCase,
)
}

private val keyPackageRepository: KeyPackageRepository
get() = KeyPackageDataSource(
clientIdProvider, authenticatedNetworkContainer.keyPackageApi, mlsClientProvider, userId
Expand Down Expand Up @@ -1569,6 +1578,10 @@ class UserSessionScope internal constructor(
launch {
proteusSyncWorker.execute()
}

launch {
featureFlagsSyncWorker.execute()
}
}

fun onDestroy() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.featureConfig

import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncStatus
import kotlinx.coroutines.flow.filter
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes

/**
* Worker that periodically syncs feature flags.
*/
internal interface FeatureFlagsSyncWorker {
suspend fun execute()
}

internal class FeatureFlagSyncWorkerImpl(
private val incrementalSyncRepository: IncrementalSyncRepository,
private val syncFeatureConfigs: SyncFeatureConfigsUseCase,
private val minIntervalBetweenPulls: Duration = MIN_INTERVAL_BETWEEN_PULLS,
private val clock: Clock = Clock.System
) : FeatureFlagsSyncWorker {

private var lastPullInstant: Instant? = null

override suspend fun execute() {
incrementalSyncRepository.incrementalSyncState.filter {
it is IncrementalSyncStatus.Live
}.collect {
syncFeatureFlagsIfNeeded()
}
}

private suspend fun FeatureFlagSyncWorkerImpl.syncFeatureFlagsIfNeeded() {
val now = clock.now()
val wasLastPullRecent = lastPullInstant?.let { lastPull ->
lastPull + minIntervalBetweenPulls > now
} ?: false
if (!wasLastPullRecent) {
syncFeatureConfigs()
lastPullInstant = now
}
}

private companion object {
val MIN_INTERVAL_BETWEEN_PULLS = 60.minutes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ internal interface ProteusSyncWorker {
* @param incrementalSyncRepository The incremental sync repository.
* @param proteusPreKeyRefiller The proteus pre-key refiller.
* @param preKeyRepository The pre-key repository.
* @param minInterValBetweenRefills The minimum interval between prekey refills.
* @param minIntervalBetweenRefills The minimum interval between prekey refills.
*/
internal class ProteusSyncWorkerImpl(
private val incrementalSyncRepository: IncrementalSyncRepository,
private val proteusPreKeyRefiller: ProteusPreKeyRefiller,
private val preKeyRepository: PreKeyRepository,
private val minInterValBetweenRefills: Duration = MIN_INTEVAL_BETWEEN_REFILLS
private val minIntervalBetweenRefills: Duration = MIN_INTERVAL_BETWEEN_REFILLS
) : ProteusSyncWorker {

override suspend fun execute() {
preKeyRepository.lastPreKeyRefillCheckInstantFlow()
.collectLatest { lastRefill ->
val now = Clock.System.now()
val nextCheckTime = lastRefill?.plus(minInterValBetweenRefills) ?: now
val nextCheckTime = lastRefill?.plus(minIntervalBetweenRefills) ?: now
val delayUntilNextCheck = nextCheckTime - now
delay(delayUntilNextCheck)
waitUntilLiveAndRefillPreKeysIfNeeded()
Expand All @@ -74,6 +74,6 @@ internal class ProteusSyncWorkerImpl(
}

private companion object {
val MIN_INTEVAL_BETWEEN_REFILLS = 1.days
val MIN_INTERVAL_BETWEEN_REFILLS = 1.days
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.featureConfig

import com.wire.kalium.logic.data.sync.IncrementalSyncStatus
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangement
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangementImpl
import io.mockative.Mock
import io.mockative.classOf
import io.mockative.given
import io.mockative.mock
import io.mockative.once
import io.mockative.verify
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes

class FeatureFlagSyncWorkerTest {

@Test
fun givenSyncIsLive_thenShouldCallFeatureConfigsUseCase() = runTest {
val (arrangement, featureFlagSyncWorker) = arrange {
withIncrementalSyncState(flowOf(IncrementalSyncStatus.Live))
}
val job = launch {
featureFlagSyncWorker.execute()
}

advanceUntilIdle()
verify(arrangement.syncFeatureConfigsUseCase)
.suspendFunction(arrangement.syncFeatureConfigsUseCase::invoke)
.wasInvoked(exactly = once)
job.cancel()
}

@Test
fun givenSyncIsLiveTwiceInAShortInterval_thenShouldCallFeatureConfigsUseCaseOnlyOnce() = runTest {
val minimumInterval = 5.minutes
val stateChannel = Channel<IncrementalSyncStatus>(capacity = Channel.UNLIMITED)

val (arrangement, featureFlagSyncWorker) = arrange {
minimumIntervalBetweenPulls = minimumInterval
withIncrementalSyncState(stateChannel.consumeAsFlow())
}
val job = launch {
featureFlagSyncWorker.execute()
}
stateChannel.send(IncrementalSyncStatus.Live)
stateChannel.send(IncrementalSyncStatus.Pending)
advanceUntilIdle()
stateChannel.send(IncrementalSyncStatus.Live)
advanceUntilIdle() // Not enough to run twice
verify(arrangement.syncFeatureConfigsUseCase)
.suspendFunction(arrangement.syncFeatureConfigsUseCase::invoke)
.wasInvoked(exactly = once)

job.cancel()
}

@Test
fun givenSyncIsLiveAgainAfterMinInterval_thenShouldCallFeatureConfigsUseCaseTwice() = runTest {
val minInterval = 5.minutes
val now = Clock.System.now()
val stateTimes = mapOf(
now to IncrementalSyncStatus.Live,
now + minInterval + 1.milliseconds to IncrementalSyncStatus.Pending,
now + minInterval + 2.milliseconds to IncrementalSyncStatus.Live
)
val fakeClock = object: Clock {
var callCount = 0
override fun now(): Instant {
return stateTimes.keys.toList()[callCount].also { callCount++ }
}
}
val stateChannel = Channel<IncrementalSyncStatus>(capacity = Channel.UNLIMITED)
val (arrangement, featureFlagSyncWorker) = arrange {
minimumIntervalBetweenPulls = minInterval
withIncrementalSyncState(stateChannel.consumeAsFlow())
clock = fakeClock
}
stateChannel.send(stateTimes.values.toList()[0])
val job = launch {
featureFlagSyncWorker.execute()
}
advanceUntilIdle()

verify(arrangement.syncFeatureConfigsUseCase)
.suspendFunction(arrangement.syncFeatureConfigsUseCase::invoke)
.wasInvoked(exactly = once)
stateChannel.send(stateTimes.values.toList()[1])
advanceUntilIdle()

stateChannel.send(stateTimes.values.toList()[2])
advanceUntilIdle()

verify(arrangement.syncFeatureConfigsUseCase)
.suspendFunction(arrangement.syncFeatureConfigsUseCase::invoke)
.wasInvoked(exactly = once)
job.cancel()
}

private class Arrangement(
private val configure: Arrangement.() -> Unit
) : IncrementalSyncRepositoryArrangement by IncrementalSyncRepositoryArrangementImpl() {

@Mock
val syncFeatureConfigsUseCase: SyncFeatureConfigsUseCase = mock(classOf<SyncFeatureConfigsUseCase>())

var minimumIntervalBetweenPulls: Duration = 1.minutes

var clock: Clock = Clock.System

init {
given(syncFeatureConfigsUseCase)
.suspendFunction(syncFeatureConfigsUseCase::invoke)
.whenInvoked()
.thenReturn(Either.Right(Unit))
}

fun arrange(): Pair<Arrangement, FeatureFlagSyncWorkerImpl> = run {
configure()
this@Arrangement to FeatureFlagSyncWorkerImpl(
incrementalSyncRepository = incrementalSyncRepository,
syncFeatureConfigs = syncFeatureConfigsUseCase,
minIntervalBetweenPulls = minimumIntervalBetweenPulls,
clock = clock
)
}
}

private companion object {
fun arrange(configure: Arrangement.() -> Unit) = Arrangement(configure).arrange()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class ProteusSyncWorkerTest {
incrementalSyncRepository = incrementalSyncRepository,
proteusPreKeyRefiller = proteusPreKeyRefiller,
preKeyRepository = preKeyRepository,
minInterValBetweenRefills = minIntervalBetweenRefills
minIntervalBetweenRefills = minIntervalBetweenRefills
)
}
}
Expand Down
Loading