diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt index 12f5c70cea3..ffa0fc8a10e 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt @@ -23,6 +23,7 @@ import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logic.logStructuredJson import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlin.time.Duration /** * Logs the sync process by providing structured logs. @@ -35,11 +36,7 @@ internal class SyncManagerLogger( private val syncStartedMoment: Instant ) { - init { - logSyncStarted() - } - - private fun logSyncStarted() { + fun logSyncStarted() { logger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.SYNC).logStructuredJson( level = KaliumLogLevel.INFO, leadingMessage = "Started sync process", @@ -51,8 +48,7 @@ internal class SyncManagerLogger( ) } - fun logSyncCompleted() { - val duration = Clock.System.now() - syncStartedMoment + fun logSyncCompleted(duration: Duration = Clock.System.now() - syncStartedMoment) { val logMap = mapOf( "syncId" to syncId, "syncStatus" to SyncStatus.COMPLETED.name, diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt index 6e7d2654006..1ea78b8a0b4 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt @@ -18,6 +18,7 @@ package com.wire.kalium.logic.sync.incremental +import com.benasher44.uuid.uuid4 import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC import com.wire.kalium.logic.data.event.Event @@ -28,6 +29,8 @@ import com.wire.kalium.logic.data.sync.SlowSyncRepository import com.wire.kalium.logic.data.sync.SlowSyncStatus import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.SyncExceptionHandler +import com.wire.kalium.logic.sync.SyncType +import com.wire.kalium.logic.sync.provideNewSyncManagerStartedLogger import com.wire.kalium.logic.sync.slow.SlowSyncManager import com.wire.kalium.logic.util.ExponentialDurationHelper import com.wire.kalium.logic.util.ExponentialDurationHelperImpl @@ -41,12 +44,16 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.cancellable +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.runningFold import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -181,16 +188,25 @@ internal class IncrementalSyncManager( incrementalSyncWorker .processEventsWhilePolicyAllowsFlow() .cancellable() - .collect { - val newState = when (it) { - EventSource.PENDING -> IncrementalSyncStatus.FetchingPendingEvents + .runningFold(uuid4().toString() to Clock.System.now()) { syncData: Pair, eventSource -> + val syncLogger = kaliumLogger.provideNewSyncManagerStartedLogger(SyncType.INCREMENTAL, syncData.first) + val newState = when (eventSource) { + EventSource.PENDING -> { + syncLogger.logSyncStarted() + IncrementalSyncStatus.FetchingPendingEvents + } + EventSource.LIVE -> { + syncLogger.logSyncCompleted(Clock.System.now() - syncData.second) exponentialDurationHelper.reset() IncrementalSyncStatus.Live } } incrementalSyncRepository.updateIncrementalSyncState(newState) - } + + // when the source is LIVE, we need to generate a new syncId since it means the previous one is done + if (eventSource == EventSource.LIVE) uuid4().toString() to Clock.System.now() else syncData + }.collect() incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending) logger.i("$TAG IncrementalSync stopped.") } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt index edbce6a3651..785b48afde3 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt @@ -18,15 +18,12 @@ package com.wire.kalium.logic.sync.incremental -import com.benasher44.uuid.uuid4 import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC import com.wire.kalium.logic.data.sync.ConnectionPolicy import com.wire.kalium.logic.functional.onFailure import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.KaliumSyncException -import com.wire.kalium.logic.sync.SyncType -import com.wire.kalium.logic.sync.provideNewSyncManagerStartedLogger import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.cancellable import kotlinx.coroutines.flow.channelFlow @@ -61,8 +58,6 @@ internal class IncrementalSyncWorkerImpl( eventGatherer.currentSource.collect { send(it) } } launch { - val syncId = uuid4().toString() - val syncLogger = kaliumLogger.provideNewSyncManagerStartedLogger(SyncType.INCREMENTAL, syncId) eventGatherer.gatherEvents().cancellable().collect { // TODO make sure that event process is not cancel in a midway eventProcessor.processEvent(it).onFailure { failure -> @@ -70,7 +65,6 @@ internal class IncrementalSyncWorkerImpl( } } // When events are all consumed, cancel the source job to complete the channelFlow - syncLogger.logSyncCompleted() sourceJob.cancel() logger.withFeatureId(SYNC).i("SYNC Finished gathering and processing events") } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt index 66b017f462c..dc0f7c2abc6 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt @@ -190,6 +190,7 @@ internal class SlowSyncManager( private suspend fun performSlowSync(migrationSteps: List) { val syncLogger = kaliumLogger.provideNewSyncManagerStartedLogger(SyncType.SLOW) + syncLogger.logSyncStarted() logger.i("Starting SlowSync as all criteria are met and it wasn't performed recently") slowSyncWorker.slowSyncStepsFlow(migrationSteps).cancellable().collect { step -> logger.i("Performing SlowSyncStep $step")