Skip to content

Commit

Permalink
feat(debug): add use case for disabling event processing (#2096)
Browse files Browse the repository at this point in the history
  • Loading branch information
typfel authored Sep 27, 2023
1 parent 5455a6a commit 038ef90
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,8 @@ class UserSessionScope internal constructor(

private val eventGatherer: EventGatherer get() = EventGathererImpl(eventRepository, incrementalSyncRepository)

private val eventProcessor: EventProcessor
get() = EventProcessorImpl(
private val eventProcessor: EventProcessor by lazy {
EventProcessorImpl(
eventRepository,
conversationEventReceiver,
userEventReceiver,
Expand All @@ -794,6 +794,7 @@ class UserSessionScope internal constructor(
userPropertiesEventReceiver,
federationEventReceiver
)
}

private val slowSyncCriteriaProvider: SlowSyncCriteriaProvider
get() = SlowSlowSyncCriteriaProviderImpl(clientRepository, logoutRepository)
Expand Down Expand Up @@ -1419,6 +1420,7 @@ class UserSessionScope internal constructor(
messageSendingScheduler,
selfConversationIdProvider,
staleEpochVerifier,
eventProcessor,
this
)
val messages: MessageScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageFor
import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageForSelfUserAsSenderUseCaseImpl
import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandlerImpl
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.logic.sync.incremental.EventProcessor
import com.wire.kalium.logic.util.MessageContentEncoder
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
Expand All @@ -77,6 +78,7 @@ class DebugScope internal constructor(
private val messageSendingScheduler: MessageSendingScheduler,
private val selfConversationIdProvider: SelfConversationIdProvider,
private val staleEpochVerifier: StaleEpochVerifier,
private val eventProcessor: EventProcessor,
private val scope: CoroutineScope,
internal val dispatcher: KaliumDispatcher = KaliumDispatcherImpl
) {
Expand All @@ -101,6 +103,11 @@ class DebugScope internal constructor(
messageSender
)

val disableEventProcessing: DisableEventProcessingUseCase
get() = DisableEventProcessingUseCaseImpl(
eventProcessor = eventProcessor
)

private val messageSendFailureHandler: MessageSendFailureHandler
get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.debug

import com.wire.kalium.logic.sync.incremental.EventProcessor

/**
* Disables processing of incoming events but still mark them as processed.
*
* This use case useful for testing error scenarios where messages have been lost,
* putting the client in an inconsistent state with the backend.
*/
interface DisableEventProcessingUseCase {
suspend operator fun invoke(disabled: Boolean)
}

internal class DisableEventProcessingUseCaseImpl(
private val eventProcessor: EventProcessor
) : DisableEventProcessingUseCase {

override suspend fun invoke(disabled: Boolean) {
eventProcessor.disableEventProcessing = disabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ import com.wire.kalium.util.serialization.toJsonElement
* @see [Event]
*/
internal interface EventProcessor {

/**
* When enabled events will be consumed but no event processing will occur.
*/
var disableEventProcessing: Boolean

/**
* Process the [event], and persist the last processed event ID if the event
* is not transient.
Expand All @@ -66,23 +72,31 @@ internal class EventProcessorImpl(
kaliumLogger.withFeatureId(EVENT_RECEIVER)
}

override suspend fun processEvent(event: Event): Either<CoreFailure, Unit> = when (event) {
is Event.Conversation -> conversationEventReceiver.onEvent(event)
is Event.User -> userEventReceiver.onEvent(event)
is Event.FeatureConfig -> featureConfigEventReceiver.onEvent(event)
is Event.Unknown -> {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SKIPPED,
event
)
// Skipping event = success
override var disableEventProcessing: Boolean = false

override suspend fun processEvent(event: Event): Either<CoreFailure, Unit> =
if (disableEventProcessing) {
logger.w("Skipping processing of $event due to debug option")
Either.Right(Unit)
}
} else {
when (event) {
is Event.Conversation -> conversationEventReceiver.onEvent(event)
is Event.User -> userEventReceiver.onEvent(event)
is Event.FeatureConfig -> featureConfigEventReceiver.onEvent(event)
is Event.Unknown -> {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SKIPPED,
event
)
// Skipping event = success
Either.Right(Unit)
}

is Event.Team -> teamEventReceiver.onEvent(event)
is Event.UserProperty -> userPropertiesEventReceiver.onEvent(event)
is Event.Federation -> federationEventReceiver.onEvent(event)
is Event.Team -> teamEventReceiver.onEvent(event)
is Event.UserProperty -> userPropertiesEventReceiver.onEvent(event)
is Event.Federation -> federationEventReceiver.onEvent(event)
}
}.onSuccess {
val logMap = mapOf<String, Any>(
"event" to event.toLogMap()
Expand Down

0 comments on commit 038ef90

Please sign in to comment.