Skip to content

Commit

Permalink
fix: prevent cancellation midway through event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorhugods committed Aug 15, 2024
1 parent 58d7ee8 commit c226ba3
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ class UserSessionScope internal constructor(
featureConfigEventReceiver,
userPropertiesEventReceiver,
federationEventReceiver,
this@UserSessionScope,
userScopedLogger,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import com.wire.kalium.logic.sync.receiver.UserEventReceiver
import com.wire.kalium.logic.sync.receiver.UserPropertiesEventReceiver
import com.wire.kalium.logic.util.EventLoggingStatus
import com.wire.kalium.logic.util.createEventProcessingLogger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.withContext

/**
* Handles incoming events from remote.
Expand Down Expand Up @@ -71,6 +75,7 @@ internal class EventProcessorImpl(
private val featureConfigEventReceiver: FeatureConfigEventReceiver,
private val userPropertiesEventReceiver: UserPropertiesEventReceiver,
private val federationEventReceiver: FederationEventReceiver,
private val processingScope: CoroutineScope,
logger: KaliumLogger = kaliumLogger,
) : EventProcessor {

Expand All @@ -80,13 +85,23 @@ internal class EventProcessorImpl(

override var disableEventProcessing: Boolean = false

override suspend fun processEvent(eventEnvelope: EventEnvelope): Either<CoreFailure, Unit> {
override suspend fun processEvent(eventEnvelope: EventEnvelope): Either<CoreFailure, Unit> = processingScope.async {
val (event, deliveryInfo) = eventEnvelope
if (disableEventProcessing) {
logger.w("Skipping processing of ${event.toLogString()} due to debug option")
return Either.Right(Unit)
Either.Right(Unit)
} else {
withContext(NonCancellable) {
doProcess(event, deliveryInfo, eventEnvelope)
}
}
}.await()

private suspend fun doProcess(
event: Event,
deliveryInfo: EventDeliveryInfo,
eventEnvelope: EventEnvelope
): Either<CoreFailure, Unit> {
return when (event) {
is Event.Conversation -> conversationEventReceiver.onEvent(event, deliveryInfo)
is Event.User -> userEventReceiver.onEvent(event, deliveryInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,23 @@ import com.wire.kalium.logic.util.arrangement.eventHandler.FeatureConfigEventRec
import com.wire.kalium.logic.util.shouldFail
import io.mockative.Mock
import io.mockative.any
import io.mockative.eq
import io.mockative.coEvery
import io.mockative.coVerify
import io.mockative.eq
import io.mockative.mock
import io.mockative.once
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlin.coroutines.cancellation.CancellationException
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse

class EventProcessorTest {

Expand All @@ -50,7 +59,7 @@ class EventProcessorTest {
// Given
val event = TestEvent.memberJoin()

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}

Expand All @@ -68,7 +77,7 @@ class EventProcessorTest {
// Given
val event = TestEvent.memberJoin()

val (arrangement, eventProcessor) = Arrangement()
val (arrangement, eventProcessor) = Arrangement(this)
.withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
.arrange()

Expand All @@ -87,7 +96,7 @@ class EventProcessorTest {
val event = TestEvent.memberJoin()
val failure = CoreFailure.MissingClientRegistration

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withConversationEventReceiverFailingWith(failure)
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}
Expand All @@ -107,7 +116,7 @@ class EventProcessorTest {
// Given
val event = TestEvent.newConnection()

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}

Expand All @@ -126,7 +135,7 @@ class EventProcessorTest {
val event = TestEvent.newConnection()
val failure = CoreFailure.MissingClientRegistration

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUserEventReceiverFailingWith(failure)
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}
Expand All @@ -146,7 +155,7 @@ class EventProcessorTest {
// Given
val envelope = TestEvent.newConnection().wrapInEnvelope(isTransient = false)

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUpdateLastProcessedEventId(envelope.event.id, Either.Right(Unit))
}

Expand All @@ -164,7 +173,7 @@ class EventProcessorTest {
// Given
val event = TestEvent.newConnection().wrapInEnvelope(isTransient = true)

val (arrangement, eventProcessor) = Arrangement().arrange()
val (arrangement, eventProcessor) = Arrangement(this).arrange()

// When
eventProcessor.processEvent(event)
Expand All @@ -180,7 +189,7 @@ class EventProcessorTest {
// Given
val event = TestEvent.userPropertyReadReceiptMode()

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}

Expand All @@ -197,7 +206,7 @@ class EventProcessorTest {
val event = TestEvent.userPropertyReadReceiptMode()
val failure = CoreFailure.MissingClientRegistration

val (arrangement, eventProcessor) = Arrangement().arrange {
val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUserPropertiesEventReceiverFailingWith(failure)
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}
Expand All @@ -212,7 +221,88 @@ class EventProcessorTest {
}.wasNotInvoked()
}

private class Arrangement : FeatureConfigEventReceiverArrangement by FeatureConfigEventReceiverArrangementImpl() {
@Test
fun givenEvent_whenCallerIsCancelled_thenShouldStillProcessNormally() = runTest {
val event = TestEvent.userPropertyReadReceiptMode()

val callerScope = CoroutineScope(Job())

val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
withUserPropertiesEventReceiverInvoking {
callerScope.cancel() // Cancel during event processing
Either.Right(Unit)
}
}

callerScope.launch {
eventProcessor.processEvent(event.wrapInEnvelope())
}.join()
advanceUntilIdle()
assertFalse(callerScope.isActive)
// Then
coVerify {
arrangement.userPropertiesEventReceiver.onEvent(any(), any())
}.wasInvoked(exactly = once)
coVerify {
arrangement.eventRepository.updateLastProcessedEventId(any())
}.wasInvoked(exactly = once)
}

@Test
fun givenEvent_whenProcessingScopeIsCancelledMidwayThrough_thenShouldProceedAnywayAndCancellationIsPropagated() = runTest {
val event = TestEvent.userPropertyReadReceiptMode()

val processingScope = CoroutineScope(Job())

val (arrangement, eventProcessor) = Arrangement(processingScope).arrange {
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
withUserPropertiesEventReceiverInvoking {
processingScope.cancel() // Cancel during event processing
Either.Right(Unit)
}
}

assertFailsWith(CancellationException::class) {
eventProcessor.processEvent(event.wrapInEnvelope())
advanceUntilIdle()
}
// Then
coVerify {
arrangement.userPropertiesEventReceiver.onEvent(any(), any())
}.wasInvoked(exactly = once)
coVerify {
arrangement.eventRepository.updateLastProcessedEventId(any())
}.wasInvoked(exactly = once)
}

@Test
fun givenEvent_whenProcessingScopeIsAlreadyCancelled_thenShouldNotProcessAndPropagateCancellation() = runTest {
val event = TestEvent.userPropertyReadReceiptMode()

val processingScope = CoroutineScope(Job())
processingScope.cancel()

val (arrangement, eventProcessor) = Arrangement(processingScope).arrange {
withUpdateLastProcessedEventId(event.id, Either.Right(Unit))
}

assertFailsWith(CancellationException::class) {
eventProcessor.processEvent(event.wrapInEnvelope())
advanceUntilIdle()
}
// Then
coVerify {
arrangement.userPropertiesEventReceiver.onEvent(any(), any())
}.wasNotInvoked()
coVerify {
arrangement.eventRepository.updateLastProcessedEventId(any())
}.wasNotInvoked()
}

private class Arrangement(
val processingScope: CoroutineScope
) : FeatureConfigEventReceiverArrangement by FeatureConfigEventReceiverArrangementImpl() {

@Mock
val eventRepository = mock(EventRepository::class)
Expand Down Expand Up @@ -276,6 +366,12 @@ class EventProcessorTest {
}.returns(result)
}

suspend fun withUserPropertiesEventReceiverInvoking(invocation: () -> Either<CoreFailure, Unit>) = apply {
coEvery {
userPropertiesEventReceiver.onEvent(any(), any())
}.invokes(invocation)
}

suspend fun withUserPropertiesEventReceiverSucceeding() = withUserPropertiesEventReceiverReturning(Either.Right(Unit))

suspend fun withUserPropertiesEventReceiverFailingWith(failure: CoreFailure) = withUserPropertiesEventReceiverReturning(
Expand All @@ -295,7 +391,8 @@ class EventProcessorTest {
teamEventReceiver,
featureConfigEventReceiver,
userPropertiesEventReceiver,
federationEventReceiver
federationEventReceiver,
processingScope
)
}
}
Expand Down

0 comments on commit c226ba3

Please sign in to comment.