diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 6b90dd93361..5408e207373 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -7,6 +7,7 @@ import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.output.FlowStatus import net.corda.data.flow.state.checkpoint.Checkpoint import net.corda.data.ledger.persistence.LedgerPersistenceRequest +import net.corda.data.ledger.utxo.token.selection.event.TokenPoolCacheEvent import net.corda.data.persistence.EntityRequest import net.corda.data.uniqueness.UniquenessCheckRequestAvro import net.corda.flow.pipeline.factory.FlowEventProcessorFactory @@ -27,6 +28,7 @@ import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.Schemas.Persistence.PERSISTENCE_ENTITY_PROCESSOR_TOPIC import net.corda.schema.Schemas.Persistence.PERSISTENCE_LEDGER_PROCESSOR_TOPIC +import net.corda.schema.Schemas.Services.TOKEN_CACHE_EVENT import net.corda.schema.Schemas.UniquenessChecker.UNIQUENESS_CHECK_TOPIC import net.corda.schema.Schemas.Verification.VERIFICATION_LEDGER_PROCESSOR_TOPIC import org.osgi.service.component.annotations.Activate @@ -94,6 +96,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor( is FlowOpsRequest -> routeTo(messageBusClient, FLOW_OPS_MESSAGE_TOPIC) is FlowStatus -> routeTo(messageBusClient, FLOW_STATUS_TOPIC) is LedgerPersistenceRequest -> routeTo(messageBusClient, PERSISTENCE_LEDGER_PROCESSOR_TOPIC) + is TokenPoolCacheEvent -> routeTo(messageBusClient, TOKEN_CACHE_EVENT) is TransactionVerificationRequest -> routeTo(messageBusClient, VERIFICATION_LEDGER_PROCESSOR_TOPIC) is UniquenessCheckRequestAvro -> routeTo(messageBusClient, UNIQUENESS_CHECK_TOPIC) else -> { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorImpl.kt index cb2dab9393b..f625576cefd 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorImpl.kt @@ -28,12 +28,11 @@ import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSOR_TIMEOUT import net.corda.utilities.trace import org.osgi.service.component.annotations.Activate -import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.LoggerFactory @Suppress("LongParameterList") -@Component(service = [FlowExecutor::class]) +// TODO @Component(service = [FlowExecutor::class]) class FlowExecutorImpl constructor( coordinatorFactory: LifecycleCoordinatorFactory, private val subscriptionFactory: SubscriptionFactory, diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorMediatorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorMediatorImpl.kt new file mode 100644 index 00000000000..3427fd40d40 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowExecutorMediatorImpl.kt @@ -0,0 +1,123 @@ +package net.corda.flow.service + +import com.typesafe.config.ConfigValueFactory +import net.corda.data.flow.event.FlowEvent +import net.corda.data.flow.state.checkpoint.Checkpoint +import net.corda.flow.messaging.mediator.FlowEventMediatorFactory +import net.corda.libs.configuration.SmartConfig +import net.corda.libs.configuration.helper.getConfig +import net.corda.lifecycle.LifecycleCoordinatorFactory +import net.corda.lifecycle.LifecycleEvent +import net.corda.lifecycle.LifecycleStatus +import net.corda.lifecycle.RegistrationHandle +import net.corda.lifecycle.StartEvent +import net.corda.lifecycle.StopEvent +import net.corda.lifecycle.createCoordinator +import net.corda.messaging.api.mediator.MultiSourceEventMediator +import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG +import net.corda.schema.configuration.ConfigKeys.MESSAGING_CONFIG +import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE +import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSOR_TIMEOUT +import net.corda.utilities.trace +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.slf4j.LoggerFactory + +// TODO @Component(service = [FlowExecutor::class]) +@Component(service = [FlowExecutor::class]) +class FlowExecutorMediatorImpl ( + coordinatorFactory: LifecycleCoordinatorFactory, + private val flowEventMediatorFactory: FlowEventMediatorFactory, + private val toMessagingConfig: (Map) -> SmartConfig, +) : FlowExecutor { + + @Activate + constructor( + @Reference(service = LifecycleCoordinatorFactory::class) + coordinatorFactory: LifecycleCoordinatorFactory, + @Reference(service = FlowEventMediatorFactory::class) + flowEventMediatorFactory: FlowEventMediatorFactory, + ) : this( + coordinatorFactory, + flowEventMediatorFactory, + { cfg -> cfg.getConfig(MESSAGING_CONFIG) } + ) + + companion object { + private val log = LoggerFactory.getLogger(this::class.java.enclosingClass) + } + + private val coordinator = coordinatorFactory.createCoordinator { event, _ -> eventHandler(event) } + private var subscriptionRegistrationHandle: RegistrationHandle? = null + private var multiSourceEventMediator: MultiSourceEventMediator? = null + + override fun onConfigChange(config: Map) { + try { + val messagingConfig = toMessagingConfig(config) + val updatedConfigs = updateConfigsWithFlowConfig(config, messagingConfig) + + // close the lifecycle registration first to prevent down being signaled + subscriptionRegistrationHandle?.close() + multiSourceEventMediator?.close() + + multiSourceEventMediator = flowEventMediatorFactory.create( + updatedConfigs, + messagingConfig, + ) + + subscriptionRegistrationHandle = coordinator.followStatusChangesByName( + setOf(multiSourceEventMediator!!.subscriptionName) + ) + + multiSourceEventMediator?.start() + } catch (ex: Exception) { + val reason = "Failed to configure the flow executor using '${config}'" + log.error(reason, ex) + coordinator.updateStatus(LifecycleStatus.ERROR, reason) + } + } + + override val isRunning: Boolean + get() = coordinator.isRunning + + override fun start() { + coordinator.start() + } + + override fun stop() { + coordinator.stop() + } + + private fun updateConfigsWithFlowConfig( + initialConfigs: Map, + messagingConfig: SmartConfig + ): Map { + val flowConfig = initialConfigs.getConfig(FLOW_CONFIG) + val updatedFlowConfig = flowConfig + .withValue(PROCESSOR_TIMEOUT, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(PROCESSOR_TIMEOUT))) + .withValue(MAX_ALLOWED_MSG_SIZE, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(MAX_ALLOWED_MSG_SIZE))) + + return initialConfigs.mapValues { + if (it.key == FLOW_CONFIG) { + updatedFlowConfig + } else { + it.value + } + } + } + + private fun eventHandler(event: LifecycleEvent) { + when (event) { + is StartEvent -> { + coordinator.updateStatus(LifecycleStatus.UP) + } + is StopEvent -> { + log.trace { "Flow executor is stopping..." } + subscriptionRegistrationHandle?.close() + multiSourceEventMediator?.close() + log.trace { "Flow executor stopped" } + } + } + } +} diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 32023c08925..df901d57273 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -152,26 +152,38 @@ class MultiSourceEventMediatorImpl( private fun processEvents() { log.debug { "Polling and processing events" } + log.info("Polling consumers") val messages = pollConsumers() + log.info("Polling consumers done") if (messages.isNotEmpty()) { val msgGroups = messages.groupBy { it.key } + log.info("Retrieving states from StateManager") val persistedStates = stateManager.get(msgGroups.keys.map { it.toString() }) + log.info("Creating message processor tasks") var msgProcessorTasks = taskManagerHelper.createMessageProcessorTasks( msgGroups, persistedStates, config.messageProcessor ) do { + log.info("Executing processor tasks") val processingResults = taskManagerHelper.executeProcessorTasks(msgProcessorTasks) + log.info("Persisting states") val conflictingStates = stateManagerHelper.persistStates(processingResults) + log.info("Splitting successful/failed states") val (successResults, failResults) = processingResults.partition { !conflictingStates.contains(it.key.toString()) } + log.info("Creating client tasks") val clientTasks = taskManagerHelper.createClientTasks(successResults, messageRouter) + log.info("Executing client tasks") val clientResults = taskManagerHelper.executeClientTasks(clientTasks) + log.info("Generating new tasks") msgProcessorTasks = taskManagerHelper.createMessageProcessorTasks(clientResults) + taskManagerHelper.createMessageProcessorTasks(failResults, conflictingStates) } while (msgProcessorTasks.isNotEmpty()) + log.info("Committing offsets") commitOffsets() + log.info("Committing offsets done") } } diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/taskmanager/TaskManagerImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/taskmanager/TaskManagerImpl.kt index dec4a07876a..98a1d8a6c75 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/taskmanager/TaskManagerImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/taskmanager/TaskManagerImpl.kt @@ -21,11 +21,16 @@ class TaskManagerImpl @Activate constructor() : TaskManager { } private fun executeShortRunning(command: () -> T): CompletableFuture { - val result = CompletableFuture() + val resultFuture = CompletableFuture() executorService.execute { - result.complete(command()) + try { + val result = command() + resultFuture.complete(result) + } catch (t: Throwable) { + resultFuture.completeExceptionally(t) + } } - return result + return resultFuture } private fun executeLongRunning(command: () -> T): CompletableFuture { diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt index 48f558cb4ff..a837691fc7e 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt @@ -26,7 +26,7 @@ import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.records.Record import net.corda.test.util.waitWhile import org.junit.jupiter.api.BeforeEach -// import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Test import org.mockito.kotlin.any import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.atLeast @@ -34,11 +34,13 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import org.slf4j.LoggerFactory import java.time.Duration import java.util.concurrent.CompletableFuture class MultiSourceEventMediatorImplTest { companion object { + private val log = LoggerFactory.getLogger(this::class.java.enclosingClass) private const val TEST_TIMEOUT_SECONDS = 20L const val KEY1 = "key1" const val KEY2 = "key2" @@ -119,8 +121,7 @@ class MultiSourceEventMediatorImplTest { ) } - //@Test - // TODO Test temporarily disabled as it seems to be flaky + @Test fun `mediator processes multiples events by key`() { val events = (1..6).map { "event$it" } val eventBatches = listOf( @@ -142,26 +143,39 @@ class MultiSourceEventMediatorImplTest { whenever(consumer.poll(any())).thenAnswer { CompletableDeferred( if (batchNumber < eventBatches.size) { + log.info("Polling mock for batch number [$batchNumber]") eventBatches[batchNumber++] } else { + log.info("Polling mock, all batches consumed") Thread.sleep(10) emptyList() } ) } + log.info("Starting mediator") mediator.start() waitWhile(Duration.ofSeconds(TEST_TIMEOUT_SECONDS)) { batchNumber < eventBatches.size } + log.info("Closing mediator") mediator.close() + log.info("verify(mediatorConsumerFactory)") verify(mediatorConsumerFactory).create(any>()) + log.info("verify(messagingClientFactory)") verify(messagingClientFactory).create(any()) + log.info("verify(messageRouterFactory)") verify(messageRouterFactory).create(any()) + log.info("verify(messageProcessor)") verify(messageProcessor, times(events.size)).onNext(anyOrNull(), any()) + log.info("verify(stateManager).get") verify(stateManager, times(eventBatches.size)).get(any()) + log.info("verify(stateManager).create") verify(stateManager, times(eventBatches.size)).create(any()) + log.info("verify(consumer).poll") verify(consumer, atLeast(eventBatches.size)).poll(any()) + log.info("verify(consumer).asyncCommitOffsets") verify(consumer, times(eventBatches.size)).asyncCommitOffsets() + log.info("verify(messagingClient)") verify(messagingClient, times(events.size)).send(any()) } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/TaskManagerHelperTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/TaskManagerHelperTest.kt index 1fbc942b174..5a664098766 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/TaskManagerHelperTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/TaskManagerHelperTest.kt @@ -7,13 +7,13 @@ import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY import net.corda.messaging.api.mediator.taskmanager.TaskManager import net.corda.messaging.api.processor.StateAndEventProcessor +import net.corda.messaging.api.records.Record import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.mockito.kotlin.mock -import net.corda.messaging.api.records.Record import org.mockito.Mockito.`when` import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify