From 038ef9063dd45400cd695610d85c967e089be92a Mon Sep 17 00:00:00 2001 From: Jacob Persson <7156+typfel@users.noreply.github.com> Date: Wed, 27 Sep 2023 21:31:03 +0200 Subject: [PATCH] feat(debug): add use case for disabling event processing (#2096) --- .../kalium/logic/feature/UserSessionScope.kt | 6 ++- .../kalium/logic/feature/debug/DebugScope.kt | 7 +++ .../debug/DisableEventProcessingUseCase.kt | 39 ++++++++++++++++ .../logic/sync/incremental/EventProcessor.kt | 44 ++++++++++++------- 4 files changed, 79 insertions(+), 17 deletions(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DisableEventProcessingUseCase.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index dd4304cc940..aacecae1a91 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -784,8 +784,8 @@ class UserSessionScope internal constructor( private val eventGatherer: EventGatherer get() = EventGathererImpl(eventRepository, incrementalSyncRepository) - private val eventProcessor: EventProcessor - get() = EventProcessorImpl( + private val eventProcessor: EventProcessor by lazy { + EventProcessorImpl( eventRepository, conversationEventReceiver, userEventReceiver, @@ -794,6 +794,7 @@ class UserSessionScope internal constructor( userPropertiesEventReceiver, federationEventReceiver ) + } private val slowSyncCriteriaProvider: SlowSyncCriteriaProvider get() = SlowSlowSyncCriteriaProviderImpl(clientRepository, logoutRepository) @@ -1419,6 +1420,7 @@ class UserSessionScope internal constructor( messageSendingScheduler, selfConversationIdProvider, staleEpochVerifier, + eventProcessor, this ) val messages: MessageScope diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt index cd6a8619440..84891a1c98c 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DebugScope.kt @@ -51,6 +51,7 @@ import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageFor import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageForSelfUserAsSenderUseCaseImpl import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandlerImpl import com.wire.kalium.logic.sync.SyncManager +import com.wire.kalium.logic.sync.incremental.EventProcessor import com.wire.kalium.logic.util.MessageContentEncoder import com.wire.kalium.util.KaliumDispatcher import com.wire.kalium.util.KaliumDispatcherImpl @@ -77,6 +78,7 @@ class DebugScope internal constructor( private val messageSendingScheduler: MessageSendingScheduler, private val selfConversationIdProvider: SelfConversationIdProvider, private val staleEpochVerifier: StaleEpochVerifier, + private val eventProcessor: EventProcessor, private val scope: CoroutineScope, internal val dispatcher: KaliumDispatcher = KaliumDispatcherImpl ) { @@ -101,6 +103,11 @@ class DebugScope internal constructor( messageSender ) + val disableEventProcessing: DisableEventProcessingUseCase + get() = DisableEventProcessingUseCaseImpl( + eventProcessor = eventProcessor + ) + private val messageSendFailureHandler: MessageSendFailureHandler get() = MessageSendFailureHandlerImpl(userRepository, clientRepository, messageRepository, messageSendingScheduler) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DisableEventProcessingUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DisableEventProcessingUseCase.kt new file mode 100644 index 00000000000..f6bb30dea45 --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/debug/DisableEventProcessingUseCase.kt @@ -0,0 +1,39 @@ +/* + * Wire + * Copyright (C) 2023 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.feature.debug + +import com.wire.kalium.logic.sync.incremental.EventProcessor + +/** + * Disables processing of incoming events but still mark them as processed. + * + * This use case useful for testing error scenarios where messages have been lost, + * putting the client in an inconsistent state with the backend. + */ +interface DisableEventProcessingUseCase { + suspend operator fun invoke(disabled: Boolean) +} + +internal class DisableEventProcessingUseCaseImpl( + private val eventProcessor: EventProcessor +) : DisableEventProcessingUseCase { + + override suspend fun invoke(disabled: Boolean) { + eventProcessor.disableEventProcessing = disabled + } +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt index 892fb75d5f0..ebc747a0c04 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt @@ -40,6 +40,12 @@ import com.wire.kalium.util.serialization.toJsonElement * @see [Event] */ internal interface EventProcessor { + + /** + * When enabled events will be consumed but no event processing will occur. + */ + var disableEventProcessing: Boolean + /** * Process the [event], and persist the last processed event ID if the event * is not transient. @@ -66,23 +72,31 @@ internal class EventProcessorImpl( kaliumLogger.withFeatureId(EVENT_RECEIVER) } - override suspend fun processEvent(event: Event): Either = when (event) { - is Event.Conversation -> conversationEventReceiver.onEvent(event) - is Event.User -> userEventReceiver.onEvent(event) - is Event.FeatureConfig -> featureConfigEventReceiver.onEvent(event) - is Event.Unknown -> { - kaliumLogger - .logEventProcessing( - EventLoggingStatus.SKIPPED, - event - ) - // Skipping event = success + override var disableEventProcessing: Boolean = false + + override suspend fun processEvent(event: Event): Either = + if (disableEventProcessing) { + logger.w("Skipping processing of $event due to debug option") Either.Right(Unit) - } + } else { + when (event) { + is Event.Conversation -> conversationEventReceiver.onEvent(event) + is Event.User -> userEventReceiver.onEvent(event) + is Event.FeatureConfig -> featureConfigEventReceiver.onEvent(event) + is Event.Unknown -> { + kaliumLogger + .logEventProcessing( + EventLoggingStatus.SKIPPED, + event + ) + // Skipping event = success + Either.Right(Unit) + } - is Event.Team -> teamEventReceiver.onEvent(event) - is Event.UserProperty -> userPropertiesEventReceiver.onEvent(event) - is Event.Federation -> federationEventReceiver.onEvent(event) + is Event.Team -> teamEventReceiver.onEvent(event) + is Event.UserProperty -> userPropertiesEventReceiver.onEvent(event) + is Event.Federation -> federationEventReceiver.onEvent(event) + } }.onSuccess { val logMap = mapOf( "event" to event.toLogMap()