Skip to content

Commit

Permalink
feat: incremental sync logs, improv for foreground
Browse files Browse the repository at this point in the history
  • Loading branch information
yamilmedina committed Nov 1, 2024
1 parent 5a236bc commit a1b356d
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.logStructuredJson
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.time.Duration

/**
* Logs the sync process by providing structured logs.
Expand All @@ -35,11 +36,7 @@ internal class SyncManagerLogger(
private val syncStartedMoment: Instant
) {

init {
logSyncStarted()
}

private fun logSyncStarted() {
fun logSyncStarted() {
logger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.SYNC).logStructuredJson(
level = KaliumLogLevel.INFO,
leadingMessage = "Started sync process",
Expand All @@ -51,8 +48,7 @@ internal class SyncManagerLogger(
)
}

fun logSyncCompleted() {
val duration = Clock.System.now() - syncStartedMoment
fun logSyncCompleted(duration: Duration = Clock.System.now() - syncStartedMoment) {
val logMap = mapOf(
"syncId" to syncId,
"syncStatus" to SyncStatus.COMPLETED.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.wire.kalium.logic.sync.incremental

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC
import com.wire.kalium.logic.data.event.Event
Expand All @@ -28,6 +29,8 @@ import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.sync.SlowSyncStatus
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.SyncExceptionHandler
import com.wire.kalium.logic.sync.SyncType
import com.wire.kalium.logic.sync.provideNewSyncManagerStartedLogger
import com.wire.kalium.logic.sync.slow.SlowSyncManager
import com.wire.kalium.logic.util.ExponentialDurationHelper
import com.wire.kalium.logic.util.ExponentialDurationHelperImpl
Expand All @@ -41,12 +44,16 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -181,16 +188,25 @@ internal class IncrementalSyncManager(
incrementalSyncWorker
.processEventsWhilePolicyAllowsFlow()
.cancellable()
.collect {
val newState = when (it) {
EventSource.PENDING -> IncrementalSyncStatus.FetchingPendingEvents
.runningFold(uuid4().toString() to Clock.System.now()) { syncData: Pair<String, Instant>, eventSource ->
val syncLogger = kaliumLogger.provideNewSyncManagerStartedLogger(SyncType.INCREMENTAL, syncData.first)
val newState = when (eventSource) {
EventSource.PENDING -> {
syncLogger.logSyncStarted()
IncrementalSyncStatus.FetchingPendingEvents
}

EventSource.LIVE -> {
syncLogger.logSyncCompleted(Clock.System.now() - syncData.second)
exponentialDurationHelper.reset()
IncrementalSyncStatus.Live
}
}
incrementalSyncRepository.updateIncrementalSyncState(newState)
}

// when the source is LIVE, we need to generate a new syncId since it means the previous one is done
if (eventSource == EventSource.LIVE) uuid4().toString() to Clock.System.now() else syncData
}.collect()
incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending)
logger.i("$TAG IncrementalSync stopped.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

package com.wire.kalium.logic.sync.incremental

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC
import com.wire.kalium.logic.data.sync.ConnectionPolicy
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.KaliumSyncException
import com.wire.kalium.logic.sync.SyncType
import com.wire.kalium.logic.sync.provideNewSyncManagerStartedLogger
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.channelFlow
Expand Down Expand Up @@ -61,16 +58,13 @@ internal class IncrementalSyncWorkerImpl(
eventGatherer.currentSource.collect { send(it) }
}
launch {
val syncId = uuid4().toString()
val syncLogger = kaliumLogger.provideNewSyncManagerStartedLogger(SyncType.INCREMENTAL, syncId)
eventGatherer.gatherEvents().cancellable().collect {
// TODO make sure that event process is not cancel in a midway
eventProcessor.processEvent(it).onFailure { failure ->
throw KaliumSyncException("Failed to process event. Aborting Sync for a retry", failure)
}
}
// When events are all consumed, cancel the source job to complete the channelFlow
syncLogger.logSyncCompleted()
sourceJob.cancel()
logger.withFeatureId(SYNC).i("SYNC Finished gathering and processing events")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ internal class SlowSyncManager(

private suspend fun performSlowSync(migrationSteps: List<SyncMigrationStep>) {
val syncLogger = kaliumLogger.provideNewSyncManagerStartedLogger(SyncType.SLOW)
syncLogger.logSyncStarted()
logger.i("Starting SlowSync as all criteria are met and it wasn't performed recently")
slowSyncWorker.slowSyncStepsFlow(migrationSteps).cancellable().collect { step ->
logger.i("Performing SlowSyncStep $step")
Expand Down

0 comments on commit a1b356d

Please sign in to comment.