Skip to content

Commit

Permalink
CORE-17009 Manage Multi-Source Event Mediator Consumers and Clients (#…
Browse files Browse the repository at this point in the history
…4735)

Managing consumers, messaging clients and message router for Multi-Source Event Mediator:
- creating them before it starting to process messages
- recreating them to recover from errors (when needed, based on error type)
- closing them
  • Loading branch information
mbrkic-r3 authored Sep 29, 2023
1 parent 944efd7 commit faaad10
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.libs.statemanager.api.StateManager
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.LifecycleCoordinatorName
import net.corda.lifecycle.LifecycleStatus
import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException
import net.corda.messaging.api.mediator.MediatorConsumer
import net.corda.messaging.api.mediator.MessageRouter
import net.corda.messaging.api.mediator.MessagingClient
import net.corda.messaging.api.mediator.MultiSourceEventMediator
import net.corda.messaging.api.mediator.config.EventMediatorConfig
import net.corda.messaging.api.mediator.taskmanager.TaskManager
import net.corda.messaging.mediator.factory.MediatorComponentFactory
import org.slf4j.LoggerFactory
import java.util.UUID

// TODO This will be implemented with CORE-15754
@Suppress("LongParameterList", "unused_parameter")
Expand All @@ -18,15 +26,89 @@ class MultiSourceEventMediatorImpl<K : Any, S : Any, E : Any>(
private val stateManager: StateManager,
private val taskManager: TaskManager,
lifecycleCoordinatorFactory: LifecycleCoordinatorFactory,
): MultiSourceEventMediator<K, S, E> {
) : MultiSourceEventMediator<K, S, E> {

private val log = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}")

private var consumers = listOf<MediatorConsumer<K, E>>()
private var clients = listOf<MessagingClient>()
private lateinit var messageRouter: MessageRouter
private val mediatorComponentFactory = MediatorComponentFactory(
config.messageProcessor, config.consumerFactories, config.clientFactories, config.messageRouterFactory
)
private val uniqueId = UUID.randomUUID().toString()
private val lifecycleCoordinatorName = LifecycleCoordinatorName(
"MultiSourceEventMediator--${config.name}", uniqueId
)
private val lifecycleCoordinator =
lifecycleCoordinatorFactory.createCoordinator(lifecycleCoordinatorName) { _, _ -> }

override val subscriptionName: LifecycleCoordinatorName
get() = TODO("Not yet implemented")

override fun start() {
TODO("Not yet implemented")
}

private fun stop() = Thread.currentThread().interrupt()

private val stopped get() = Thread.currentThread().isInterrupted

override fun close() {
stop()
}

private fun run() {
var attempts = 0

while (!stopped) {
attempts++
try {
consumers = mediatorComponentFactory.createConsumers(::onSerializationError)
clients = mediatorComponentFactory.createClients(::onSerializationError)
messageRouter = mediatorComponentFactory.createRouter(clients)

consumers.forEach { it.subscribe() }
lifecycleCoordinator.updateStatus(LifecycleStatus.UP)

while (!stopped) {
// Poll and process events
TODO("Not yet implemented")
}

} catch (exception: Exception) {
when (exception) {
is InterruptedException -> {
log.info("Multi-Source Event Mediator is stopped. Closing consumers and clients.")
}

is CordaMessageAPIIntermittentException -> {
log.warn(
"${exception.message} Attempts: $attempts. Recreating consumers and clients and retrying.",
exception
)
}

else -> {
log.error(
"${exception.message} Attempts: $attempts. Closing Multi-Source Event Mediator.", exception
)
lifecycleCoordinator.updateStatus(LifecycleStatus.ERROR, "Error: ${exception.message}")
stop()
}
}
} finally {
closeConsumersAndProducers()
}
}
}

private fun onSerializationError(event: ByteArray) {
TODO("Not yet implemented")
}

private fun closeConsumersAndProducers() {
consumers.forEach { it.close() }
clients.forEach { it.close() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package net.corda.messaging.mediator.factory

import net.corda.messaging.api.mediator.MediatorConsumer
import net.corda.messaging.api.mediator.MessageRouter
import net.corda.messaging.api.mediator.MessagingClient
import net.corda.messaging.api.mediator.config.MediatorConsumerConfig
import net.corda.messaging.api.mediator.config.MessagingClientConfig
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory
import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactory
import net.corda.messaging.api.processor.StateAndEventProcessor

/**
* Factory for creating various components used by Multi-Source Event Mediator.
*/
internal class MediatorComponentFactory<K : Any, S : Any, E : Any>(
private val messageProcessor: StateAndEventProcessor<K, S, E>,
private val consumerFactories: Collection<MediatorConsumerFactory>,
private val clientFactories: Collection<MessagingClientFactory>,
private val messageRouterFactory: MessageRouterFactory,
) {

/**
* Creates message consumers.
*
* @param onSerializationError Function for handling serialization errors.
* @return List of created [MediatorConsumer]s.
*/
fun createConsumers(
onSerializationError: (ByteArray) -> Unit
): List<MediatorConsumer<K, E>> {
check(consumerFactories.isNotEmpty()) {
"No consumer factory set in configuration"
}
return consumerFactories.map { consumerFactory ->
consumerFactory.create(
MediatorConsumerConfig(
messageProcessor.keyClass,
messageProcessor.eventValueClass,
onSerializationError
)
)
}
}

/**
* Creates messaging clients.
*
* @param onSerializationError Function for handling serialization errors.
* @return List of created [MessagingClient]s.
*/
fun createClients(
onSerializationError: (ByteArray) -> Unit
): List<MessagingClient> {
check(clientFactories.isNotEmpty()) {
"No client factory set in configuration"
}
return clientFactories.map { clientFactory ->
clientFactory.create(
MessagingClientConfig(onSerializationError)
)
}
}

/**
* Creates message router.
*
* @param clients Collection of [MessagingClient]s.
* @return Message router.
*/
fun createRouter(
clients: Collection<MessagingClient>
): MessageRouter {
val clientsById = clients.associateBy { it.id }
return messageRouterFactory.create { id ->
clientsById[id]
?: throw IllegalStateException("Messaging client with ID \"$id\" not found")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package net.corda.messaging.mediator.factory

import net.corda.messaging.api.mediator.MediatorConsumer
import net.corda.messaging.api.mediator.MessageRouter
import net.corda.messaging.api.mediator.MessagingClient
import net.corda.messaging.api.mediator.config.MediatorConsumerConfig
import net.corda.messaging.api.mediator.config.MessagingClientConfig
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory
import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFinder
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.messaging.api.records.Record
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.mockito.Mockito
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever

class MediatorComponentFactoryTest {
private lateinit var mediatorComponentFactory: MediatorComponentFactory<String, String, String>
private val messageProcessor = object : StateAndEventProcessor<String, String, String> {
override fun onNext(state: String?, event: Record<String, String>): StateAndEventProcessor.Response<String> {
TODO("Not yet implemented")
}
override val keyClass get() = String::class.java
override val stateValueClass get() = String::class.java
override val eventValueClass get() = String::class.java

}
private val consumerFactories = listOf(
mock<MediatorConsumerFactory>(),
mock<MediatorConsumerFactory>(),
)
private val clientFactories = listOf(
mock<MessagingClientFactory>(),
mock<MessagingClientFactory>(),
)
private val messageRouterFactory = mock<MessageRouterFactory>()

@BeforeEach
fun beforeEach() {
consumerFactories.forEach {
doReturn(mock<MediatorConsumer<String, String>>()).`when`(it).create(
any<MediatorConsumerConfig<String, String>>()
)
}

clientFactories.forEach {
doReturn(mock<MessagingClient>()).`when`(it).create(
any<MessagingClientConfig>()
)
}

doReturn(mock<MessageRouter>()).`when`(messageRouterFactory).create(
any<MessagingClientFinder>()
)

mediatorComponentFactory = MediatorComponentFactory(
messageProcessor,
consumerFactories,
clientFactories,
messageRouterFactory,
)
}

@Test
fun `successfully creates consumers`() {
val onSerializationError: (ByteArray) -> Unit = {}

val mediatorConsumers = mediatorComponentFactory.createConsumers(onSerializationError)

assertEquals(consumerFactories.size, mediatorConsumers.size)
mediatorConsumers.forEach {
assertNotNull(it)
}

consumerFactories.forEach {
val consumerConfigCaptor = argumentCaptor<MediatorConsumerConfig<String, String>>()
verify(it).create(consumerConfigCaptor.capture())
val consumerConfig = consumerConfigCaptor.firstValue
assertEquals(String::class.java, consumerConfig.keyClass)
assertEquals(String::class.java, consumerConfig.valueClass)
assertEquals(onSerializationError, consumerConfig.onSerializationError)
}
}

@Test
fun `throws exception when consumer factory not provided`() {
val mediatorComponentFactory = MediatorComponentFactory(
messageProcessor,
emptyList(),
clientFactories,
messageRouterFactory,
)

assertThrows<IllegalStateException> {
mediatorComponentFactory.createConsumers { }
}
}

@Test
fun `successfully creates clients`() {
val onSerializationError: (ByteArray) -> Unit = {}

val mediatorClients = mediatorComponentFactory.createClients(onSerializationError)

assertEquals(clientFactories.size, mediatorClients.size)
mediatorClients.forEach {
assertNotNull(it)
}

clientFactories.forEach {
val clientConfigCaptor = argumentCaptor<MessagingClientConfig>()
verify(it).create(clientConfigCaptor.capture())
val clientConfig = clientConfigCaptor.firstValue
assertEquals(onSerializationError, clientConfig.onSerializationError)
}
}

@Test
fun `throws exception when client factory not provided`() {
val mediatorComponentFactory = MediatorComponentFactory(
messageProcessor,
consumerFactories,
emptyList(),
messageRouterFactory,
)

assertThrows<IllegalStateException> {
mediatorComponentFactory.createClients { }
}
}

@Test
fun `successfully creates message router`() {
val clients = listOf(
mock<MessagingClient>(),
mock<MessagingClient>(),
)
clients.forEachIndexed { id, client ->
Mockito.doReturn(id.toString()).whenever(client).id
}

val messageRouter = mediatorComponentFactory.createRouter(clients)

assertNotNull(messageRouter)

val messagingClientFinderCaptor = argumentCaptor<MessagingClientFinder>()
verify(messageRouterFactory).create(messagingClientFinderCaptor.capture())
val messagingClientFinder = messagingClientFinderCaptor.firstValue

clients.forEachIndexed { id, client ->
assertEquals(client, messagingClientFinder.find(id.toString()))
}
assertThrows<IllegalStateException> {
messagingClientFinder.find("unknownId")
}
}
}

0 comments on commit faaad10

Please sign in to comment.