diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index f787659532b..a5ebf568447 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -929,6 +929,7 @@ class UserSessionScope internal constructor( featureConfigEventReceiver, userPropertiesEventReceiver, federationEventReceiver, + this@UserSessionScope, userScopedLogger, ) } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt index 5a4be8d4395..666a8fce093 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessor.kt @@ -36,6 +36,10 @@ import com.wire.kalium.logic.sync.receiver.UserEventReceiver import com.wire.kalium.logic.sync.receiver.UserPropertiesEventReceiver import com.wire.kalium.logic.util.EventLoggingStatus import com.wire.kalium.logic.util.createEventProcessingLogger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.async +import kotlinx.coroutines.withContext /** * Handles incoming events from remote. @@ -71,6 +75,7 @@ internal class EventProcessorImpl( private val featureConfigEventReceiver: FeatureConfigEventReceiver, private val userPropertiesEventReceiver: UserPropertiesEventReceiver, private val federationEventReceiver: FederationEventReceiver, + private val processingScope: CoroutineScope, logger: KaliumLogger = kaliumLogger, ) : EventProcessor { @@ -80,13 +85,23 @@ internal class EventProcessorImpl( override var disableEventProcessing: Boolean = false - override suspend fun processEvent(eventEnvelope: EventEnvelope): Either { + override suspend fun processEvent(eventEnvelope: EventEnvelope): Either = processingScope.async { val (event, deliveryInfo) = eventEnvelope if (disableEventProcessing) { logger.w("Skipping processing of ${event.toLogString()} due to debug option") - return Either.Right(Unit) + Either.Right(Unit) + } else { + withContext(NonCancellable) { + doProcess(event, deliveryInfo, eventEnvelope) + } } + }.await() + private suspend fun doProcess( + event: Event, + deliveryInfo: EventDeliveryInfo, + eventEnvelope: EventEnvelope + ): Either { return when (event) { is Event.Conversation -> conversationEventReceiver.onEvent(event, deliveryInfo) is Event.User -> userEventReceiver.onEvent(event, deliveryInfo) diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessorTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessorTest.kt index 20fb72d2e8e..b0a47b97a1c 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessorTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/EventProcessorTest.kt @@ -34,14 +34,23 @@ import com.wire.kalium.logic.util.arrangement.eventHandler.FeatureConfigEventRec import com.wire.kalium.logic.util.shouldFail import io.mockative.Mock import io.mockative.any -import io.mockative.eq import io.mockative.coEvery import io.mockative.coVerify +import io.mockative.eq import io.mockative.mock import io.mockative.once +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest +import kotlin.coroutines.cancellation.CancellationException import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse class EventProcessorTest { @@ -50,7 +59,7 @@ class EventProcessorTest { // Given val event = TestEvent.memberJoin() - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) } @@ -68,7 +77,7 @@ class EventProcessorTest { // Given val event = TestEvent.memberJoin() - val (arrangement, eventProcessor) = Arrangement() + val (arrangement, eventProcessor) = Arrangement(this) .withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) .arrange() @@ -87,7 +96,7 @@ class EventProcessorTest { val event = TestEvent.memberJoin() val failure = CoreFailure.MissingClientRegistration - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withConversationEventReceiverFailingWith(failure) withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) } @@ -107,7 +116,7 @@ class EventProcessorTest { // Given val event = TestEvent.newConnection() - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) } @@ -126,7 +135,7 @@ class EventProcessorTest { val event = TestEvent.newConnection() val failure = CoreFailure.MissingClientRegistration - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withUserEventReceiverFailingWith(failure) withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) } @@ -146,7 +155,7 @@ class EventProcessorTest { // Given val envelope = TestEvent.newConnection().wrapInEnvelope(isTransient = false) - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withUpdateLastProcessedEventId(envelope.event.id, Either.Right(Unit)) } @@ -164,7 +173,7 @@ class EventProcessorTest { // Given val event = TestEvent.newConnection().wrapInEnvelope(isTransient = true) - val (arrangement, eventProcessor) = Arrangement().arrange() + val (arrangement, eventProcessor) = Arrangement(this).arrange() // When eventProcessor.processEvent(event) @@ -180,7 +189,7 @@ class EventProcessorTest { // Given val event = TestEvent.userPropertyReadReceiptMode() - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) } @@ -197,7 +206,7 @@ class EventProcessorTest { val event = TestEvent.userPropertyReadReceiptMode() val failure = CoreFailure.MissingClientRegistration - val (arrangement, eventProcessor) = Arrangement().arrange { + val (arrangement, eventProcessor) = Arrangement(this).arrange { withUserPropertiesEventReceiverFailingWith(failure) withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) } @@ -212,7 +221,88 @@ class EventProcessorTest { }.wasNotInvoked() } - private class Arrangement : FeatureConfigEventReceiverArrangement by FeatureConfigEventReceiverArrangementImpl() { + @Test + fun givenEvent_whenCallerIsCancelled_thenShouldStillProcessNormally() = runTest { + val event = TestEvent.userPropertyReadReceiptMode() + + val callerScope = CoroutineScope(Job()) + + val (arrangement, eventProcessor) = Arrangement(this).arrange { + withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) + withUserPropertiesEventReceiverInvoking { + callerScope.cancel() // Cancel during event processing + Either.Right(Unit) + } + } + + callerScope.launch { + eventProcessor.processEvent(event.wrapInEnvelope()) + }.join() + advanceUntilIdle() + assertFalse(callerScope.isActive) + // Then + coVerify { + arrangement.userPropertiesEventReceiver.onEvent(any(), any()) + }.wasInvoked(exactly = once) + coVerify { + arrangement.eventRepository.updateLastProcessedEventId(any()) + }.wasInvoked(exactly = once) + } + + @Test + fun givenEvent_whenProcessingScopeIsCancelledMidwayThrough_thenShouldProceedAnywayAndCancellationIsPropagated() = runTest { + val event = TestEvent.userPropertyReadReceiptMode() + + val processingScope = CoroutineScope(Job()) + + val (arrangement, eventProcessor) = Arrangement(processingScope).arrange { + withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) + withUserPropertiesEventReceiverInvoking { + processingScope.cancel() // Cancel during event processing + Either.Right(Unit) + } + } + + assertFailsWith(CancellationException::class) { + eventProcessor.processEvent(event.wrapInEnvelope()) + advanceUntilIdle() + } + // Then + coVerify { + arrangement.userPropertiesEventReceiver.onEvent(any(), any()) + }.wasInvoked(exactly = once) + coVerify { + arrangement.eventRepository.updateLastProcessedEventId(any()) + }.wasInvoked(exactly = once) + } + + @Test + fun givenEvent_whenProcessingScopeIsAlreadyCancelled_thenShouldNotProcessAndPropagateCancellation() = runTest { + val event = TestEvent.userPropertyReadReceiptMode() + + val processingScope = CoroutineScope(Job()) + processingScope.cancel() + + val (arrangement, eventProcessor) = Arrangement(processingScope).arrange { + withUpdateLastProcessedEventId(event.id, Either.Right(Unit)) + } + + assertFailsWith(CancellationException::class) { + eventProcessor.processEvent(event.wrapInEnvelope()) + advanceUntilIdle() + } + // Then + coVerify { + arrangement.userPropertiesEventReceiver.onEvent(any(), any()) + }.wasNotInvoked() + coVerify { + arrangement.eventRepository.updateLastProcessedEventId(any()) + }.wasNotInvoked() + } + + private class Arrangement( + val processingScope: CoroutineScope + ) : FeatureConfigEventReceiverArrangement by FeatureConfigEventReceiverArrangementImpl() { @Mock val eventRepository = mock(EventRepository::class) @@ -276,6 +366,12 @@ class EventProcessorTest { }.returns(result) } + suspend fun withUserPropertiesEventReceiverInvoking(invocation: () -> Either) = apply { + coEvery { + userPropertiesEventReceiver.onEvent(any(), any()) + }.invokes(invocation) + } + suspend fun withUserPropertiesEventReceiverSucceeding() = withUserPropertiesEventReceiverReturning(Either.Right(Unit)) suspend fun withUserPropertiesEventReceiverFailingWith(failure: CoreFailure) = withUserPropertiesEventReceiverReturning( @@ -295,7 +391,8 @@ class EventProcessorTest { teamEventReceiver, featureConfigEventReceiver, userPropertiesEventReceiver, - federationEventReceiver + federationEventReceiver, + processingScope ) } }