From faaad10db773598d76fa67ce9c41bce7a7aed579 Mon Sep 17 00:00:00 2001 From: Miljenko Brkic <97448832+mbrkic-r3@users.noreply.github.com> Date: Fri, 29 Sep 2023 09:44:44 +0100 Subject: [PATCH] CORE-17009 Manage Multi-Source Event Mediator Consumers and Clients (#4735) Managing consumers, messaging clients and message router for Multi-Source Event Mediator: - creating them before it starting to process messages - recreating them to recover from errors (when needed, based on error type) - closing them --- .../mediator/MultiSourceEventMediatorImpl.kt | 84 ++++++++- .../factory/MediatorComponentFactory.kt | 80 +++++++++ .../factory/MediatorComponentFactoryTest.kt | 167 ++++++++++++++++++ 3 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt create mode 100644 libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 7e3ebb36172..0fdaf7abba2 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -5,9 +5,17 @@ import net.corda.avro.serialization.CordaAvroSerializer import net.corda.libs.statemanager.api.StateManager import net.corda.lifecycle.LifecycleCoordinatorFactory import net.corda.lifecycle.LifecycleCoordinatorName +import net.corda.lifecycle.LifecycleStatus +import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException +import net.corda.messaging.api.mediator.MediatorConsumer +import net.corda.messaging.api.mediator.MessageRouter +import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.MultiSourceEventMediator import net.corda.messaging.api.mediator.config.EventMediatorConfig import net.corda.messaging.api.mediator.taskmanager.TaskManager +import net.corda.messaging.mediator.factory.MediatorComponentFactory +import org.slf4j.LoggerFactory +import java.util.UUID // TODO This will be implemented with CORE-15754 @Suppress("LongParameterList", "unused_parameter") @@ -18,7 +26,23 @@ class MultiSourceEventMediatorImpl( private val stateManager: StateManager, private val taskManager: TaskManager, lifecycleCoordinatorFactory: LifecycleCoordinatorFactory, -): MultiSourceEventMediator { +) : MultiSourceEventMediator { + + private val log = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}") + + private var consumers = listOf>() + private var clients = listOf() + private lateinit var messageRouter: MessageRouter + private val mediatorComponentFactory = MediatorComponentFactory( + config.messageProcessor, config.consumerFactories, config.clientFactories, config.messageRouterFactory + ) + private val uniqueId = UUID.randomUUID().toString() + private val lifecycleCoordinatorName = LifecycleCoordinatorName( + "MultiSourceEventMediator--${config.name}", uniqueId + ) + private val lifecycleCoordinator = + lifecycleCoordinatorFactory.createCoordinator(lifecycleCoordinatorName) { _, _ -> } + override val subscriptionName: LifecycleCoordinatorName get() = TODO("Not yet implemented") @@ -26,7 +50,65 @@ class MultiSourceEventMediatorImpl( TODO("Not yet implemented") } + private fun stop() = Thread.currentThread().interrupt() + + private val stopped get() = Thread.currentThread().isInterrupted + override fun close() { + stop() + } + + private fun run() { + var attempts = 0 + + while (!stopped) { + attempts++ + try { + consumers = mediatorComponentFactory.createConsumers(::onSerializationError) + clients = mediatorComponentFactory.createClients(::onSerializationError) + messageRouter = mediatorComponentFactory.createRouter(clients) + + consumers.forEach { it.subscribe() } + lifecycleCoordinator.updateStatus(LifecycleStatus.UP) + + while (!stopped) { + // Poll and process events + TODO("Not yet implemented") + } + + } catch (exception: Exception) { + when (exception) { + is InterruptedException -> { + log.info("Multi-Source Event Mediator is stopped. Closing consumers and clients.") + } + + is CordaMessageAPIIntermittentException -> { + log.warn( + "${exception.message} Attempts: $attempts. Recreating consumers and clients and retrying.", + exception + ) + } + + else -> { + log.error( + "${exception.message} Attempts: $attempts. Closing Multi-Source Event Mediator.", exception + ) + lifecycleCoordinator.updateStatus(LifecycleStatus.ERROR, "Error: ${exception.message}") + stop() + } + } + } finally { + closeConsumersAndProducers() + } + } + } + + private fun onSerializationError(event: ByteArray) { TODO("Not yet implemented") } + + private fun closeConsumersAndProducers() { + consumers.forEach { it.close() } + clients.forEach { it.close() } + } } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt new file mode 100644 index 00000000000..9ba3e141c6c --- /dev/null +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt @@ -0,0 +1,80 @@ +package net.corda.messaging.mediator.factory + +import net.corda.messaging.api.mediator.MediatorConsumer +import net.corda.messaging.api.mediator.MessageRouter +import net.corda.messaging.api.mediator.MessagingClient +import net.corda.messaging.api.mediator.config.MediatorConsumerConfig +import net.corda.messaging.api.mediator.config.MessagingClientConfig +import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory +import net.corda.messaging.api.mediator.factory.MessageRouterFactory +import net.corda.messaging.api.mediator.factory.MessagingClientFactory +import net.corda.messaging.api.processor.StateAndEventProcessor + +/** + * Factory for creating various components used by Multi-Source Event Mediator. + */ +internal class MediatorComponentFactory( + private val messageProcessor: StateAndEventProcessor, + private val consumerFactories: Collection, + private val clientFactories: Collection, + private val messageRouterFactory: MessageRouterFactory, +) { + + /** + * Creates message consumers. + * + * @param onSerializationError Function for handling serialization errors. + * @return List of created [MediatorConsumer]s. + */ + fun createConsumers( + onSerializationError: (ByteArray) -> Unit + ): List> { + check(consumerFactories.isNotEmpty()) { + "No consumer factory set in configuration" + } + return consumerFactories.map { consumerFactory -> + consumerFactory.create( + MediatorConsumerConfig( + messageProcessor.keyClass, + messageProcessor.eventValueClass, + onSerializationError + ) + ) + } + } + + /** + * Creates messaging clients. + * + * @param onSerializationError Function for handling serialization errors. + * @return List of created [MessagingClient]s. + */ + fun createClients( + onSerializationError: (ByteArray) -> Unit + ): List { + check(clientFactories.isNotEmpty()) { + "No client factory set in configuration" + } + return clientFactories.map { clientFactory -> + clientFactory.create( + MessagingClientConfig(onSerializationError) + ) + } + } + + /** + * Creates message router. + * + * @param clients Collection of [MessagingClient]s. + * @return Message router. + */ + fun createRouter( + clients: Collection + ): MessageRouter { + val clientsById = clients.associateBy { it.id } + return messageRouterFactory.create { id -> + clientsById[id] + ?: throw IllegalStateException("Messaging client with ID \"$id\" not found") + } + } +} \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt new file mode 100644 index 00000000000..f111353d0be --- /dev/null +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt @@ -0,0 +1,167 @@ +package net.corda.messaging.mediator.factory + +import net.corda.messaging.api.mediator.MediatorConsumer +import net.corda.messaging.api.mediator.MessageRouter +import net.corda.messaging.api.mediator.MessagingClient +import net.corda.messaging.api.mediator.config.MediatorConsumerConfig +import net.corda.messaging.api.mediator.config.MessagingClientConfig +import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory +import net.corda.messaging.api.mediator.factory.MessageRouterFactory +import net.corda.messaging.api.mediator.factory.MessagingClientFactory +import net.corda.messaging.api.mediator.factory.MessagingClientFinder +import net.corda.messaging.api.processor.StateAndEventProcessor +import net.corda.messaging.api.records.Record +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.Mockito +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class MediatorComponentFactoryTest { + private lateinit var mediatorComponentFactory: MediatorComponentFactory + private val messageProcessor = object : StateAndEventProcessor { + override fun onNext(state: String?, event: Record): StateAndEventProcessor.Response { + TODO("Not yet implemented") + } + override val keyClass get() = String::class.java + override val stateValueClass get() = String::class.java + override val eventValueClass get() = String::class.java + + } + private val consumerFactories = listOf( + mock(), + mock(), + ) + private val clientFactories = listOf( + mock(), + mock(), + ) + private val messageRouterFactory = mock() + + @BeforeEach + fun beforeEach() { + consumerFactories.forEach { + doReturn(mock>()).`when`(it).create( + any>() + ) + } + + clientFactories.forEach { + doReturn(mock()).`when`(it).create( + any() + ) + } + + doReturn(mock()).`when`(messageRouterFactory).create( + any() + ) + + mediatorComponentFactory = MediatorComponentFactory( + messageProcessor, + consumerFactories, + clientFactories, + messageRouterFactory, + ) + } + + @Test + fun `successfully creates consumers`() { + val onSerializationError: (ByteArray) -> Unit = {} + + val mediatorConsumers = mediatorComponentFactory.createConsumers(onSerializationError) + + assertEquals(consumerFactories.size, mediatorConsumers.size) + mediatorConsumers.forEach { + assertNotNull(it) + } + + consumerFactories.forEach { + val consumerConfigCaptor = argumentCaptor>() + verify(it).create(consumerConfigCaptor.capture()) + val consumerConfig = consumerConfigCaptor.firstValue + assertEquals(String::class.java, consumerConfig.keyClass) + assertEquals(String::class.java, consumerConfig.valueClass) + assertEquals(onSerializationError, consumerConfig.onSerializationError) + } + } + + @Test + fun `throws exception when consumer factory not provided`() { + val mediatorComponentFactory = MediatorComponentFactory( + messageProcessor, + emptyList(), + clientFactories, + messageRouterFactory, + ) + + assertThrows { + mediatorComponentFactory.createConsumers { } + } + } + + @Test + fun `successfully creates clients`() { + val onSerializationError: (ByteArray) -> Unit = {} + + val mediatorClients = mediatorComponentFactory.createClients(onSerializationError) + + assertEquals(clientFactories.size, mediatorClients.size) + mediatorClients.forEach { + assertNotNull(it) + } + + clientFactories.forEach { + val clientConfigCaptor = argumentCaptor() + verify(it).create(clientConfigCaptor.capture()) + val clientConfig = clientConfigCaptor.firstValue + assertEquals(onSerializationError, clientConfig.onSerializationError) + } + } + + @Test + fun `throws exception when client factory not provided`() { + val mediatorComponentFactory = MediatorComponentFactory( + messageProcessor, + consumerFactories, + emptyList(), + messageRouterFactory, + ) + + assertThrows { + mediatorComponentFactory.createClients { } + } + } + + @Test + fun `successfully creates message router`() { + val clients = listOf( + mock(), + mock(), + ) + clients.forEachIndexed { id, client -> + Mockito.doReturn(id.toString()).whenever(client).id + } + + val messageRouter = mediatorComponentFactory.createRouter(clients) + + assertNotNull(messageRouter) + + val messagingClientFinderCaptor = argumentCaptor() + verify(messageRouterFactory).create(messagingClientFinderCaptor.capture()) + val messagingClientFinder = messagingClientFinderCaptor.firstValue + + clients.forEachIndexed { id, client -> + assertEquals(client, messagingClientFinder.find(id.toString())) + } + assertThrows { + messagingClientFinder.find("unknownId") + } + } +} \ No newline at end of file