Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics for sync engine slow and incremental (WPB-11199) #3086

Merged
merged 9 commits into from
Nov 5, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.sync

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogLevel
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.logStructuredJson
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.time.Duration

/**
* Logs the sync process by providing structured logs.
* It logs the sync process start and completion with the syncId as a unique identifier.
*/
internal class SyncManagerLogger(
private val logger: KaliumLogger,
private val syncId: String,
private val syncType: SyncType,
private val syncStartedMoment: Instant
) {

/**
* Logs the sync process start.
*/
fun logSyncStarted() {
logger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.SYNC).logStructuredJson(
level = KaliumLogLevel.INFO,
leadingMessage = "Started sync process",
jsonStringKeyValues = mapOf(
"syncMetadata" to mapOf(
"id" to syncId,
"status" to SyncStatus.STARTED.name,
"type" to syncType.name
)
)
)
}

/**
* Logs the sync process completion.
* Optionally, it can pass the duration of the sync process,
* useful for incremental sync that can happen between collecting states.
*
* @param duration optional the duration of the sync process.
*/
fun logSyncCompleted(duration: Duration = Clock.System.now() - syncStartedMoment) {
val logMap = mapOf(
"id" to syncId,
"status" to SyncStatus.COMPLETED.name,
"type" to syncType.name,
"performanceData" to mapOf("timeTakenInMillis" to duration.inWholeMilliseconds)
)

logger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.SYNC).logStructuredJson(
level = KaliumLogLevel.INFO,
leadingMessage = "Completed sync process",
jsonStringKeyValues = mapOf("syncMetadata" to logMap)
)
}
}

internal enum class SyncStatus {
STARTED,
COMPLETED
}

internal enum class SyncType {
SLOW,
INCREMENTAL
}

/**
* Provides a new [SyncManagerLogger] instance with the given parameters.
* @param syncType the [SyncType] that will log.
* @param syncId the unique identifier for the sync process.
* @param syncStartedMoment the moment when the sync process started.
*/
internal fun KaliumLogger.provideNewSyncManagerLogger(
syncType: SyncType,
syncId: String = uuid4().toString(),
syncStartedMoment: Instant = Clock.System.now()
) = SyncManagerLogger(this, syncId, syncType, syncStartedMoment)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.wire.kalium.logic.sync.incremental

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC
import com.wire.kalium.logic.data.event.Event
Expand All @@ -28,6 +29,8 @@ import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.sync.SlowSyncStatus
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.SyncExceptionHandler
import com.wire.kalium.logic.sync.SyncType
import com.wire.kalium.logic.sync.provideNewSyncManagerLogger
import com.wire.kalium.logic.sync.slow.SlowSyncManager
import com.wire.kalium.logic.util.ExponentialDurationHelper
import com.wire.kalium.logic.util.ExponentialDurationHelperImpl
Expand All @@ -41,12 +44,15 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlinx.datetime.Clock
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -181,16 +187,25 @@ internal class IncrementalSyncManager(
incrementalSyncWorker
.processEventsWhilePolicyAllowsFlow()
.cancellable()
.collect {
val newState = when (it) {
EventSource.PENDING -> IncrementalSyncStatus.FetchingPendingEvents
.runningFold(uuid4().toString() to Clock.System.now()) { syncData, eventSource ->
val syncLogger = kaliumLogger.provideNewSyncManagerLogger(SyncType.INCREMENTAL, syncData.first)
val newState = when (eventSource) {
EventSource.PENDING -> {
syncLogger.logSyncStarted()
IncrementalSyncStatus.FetchingPendingEvents
}

EventSource.LIVE -> {
syncLogger.logSyncCompleted(duration = Clock.System.now() - syncData.second)
exponentialDurationHelper.reset()
IncrementalSyncStatus.Live
}
}
incrementalSyncRepository.updateIncrementalSyncState(newState)
}

// when the source is LIVE, we need to generate a new syncId since it means the previous one is done
if (eventSource == EventSource.LIVE) uuid4().toString() to Clock.System.now() else syncData
}.collect()
incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending)
logger.i("$TAG IncrementalSync stopped.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import com.wire.kalium.logic.data.sync.SlowSyncStatus
import com.wire.kalium.logic.functional.combine
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.SyncExceptionHandler
import com.wire.kalium.logic.sync.SyncType
import com.wire.kalium.logic.sync.incremental.IncrementalSyncManager
import com.wire.kalium.logic.sync.provideNewSyncManagerLogger
import com.wire.kalium.logic.sync.slow.migration.SyncMigrationStepsProvider
import com.wire.kalium.logic.sync.slow.migration.steps.SyncMigrationStep
import com.wire.kalium.logic.util.ExponentialDurationHelper
Expand Down Expand Up @@ -187,11 +189,14 @@ internal class SlowSyncManager(
}

private suspend fun performSlowSync(migrationSteps: List<SyncMigrationStep>) {
val syncLogger = kaliumLogger.provideNewSyncManagerLogger(SyncType.SLOW)
syncLogger.logSyncStarted()
logger.i("Starting SlowSync as all criteria are met and it wasn't performed recently")
slowSyncWorker.slowSyncStepsFlow(migrationSteps).cancellable().collect { step ->
logger.i("Performing SlowSyncStep $step")
slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Ongoing(step))
}
syncLogger.logSyncCompleted()
logger.i("SlowSync completed. Updating last completion instant")
slowSyncRepository.setSlowSyncVersion(CURRENT_VERSION)
slowSyncRepository.setLastSlowSyncCompletionInstant(DateTimeUtil.currentInstant())
Expand Down
Loading