Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-16199 Multi-source event mediator - Subscription (WIP) #4616

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
038934d
CORE-16199 Multi-source event mediator implementation
mbrkic-r3 Sep 11, 2023
951a6b6
CORE-16199 Using the latest version of StateManager API
mbrkic-r3 Sep 11, 2023
3eebf02
CORE-16177 Adding CordaMessage, MessageProducer, and KafkaMessageProd…
ben-millar Sep 7, 2023
a9f0936
CORE-16177 Adding kdocs to CordaMessage
ben-millar Sep 7, 2023
73dacd2
CORE-16177 Added helper function for message conversion, fixed naming…
ben-millar Sep 7, 2023
3e15193
CORE-16177 Modified MessageProducer to return a Kotlin Deferred (coro…
ben-millar Sep 8, 2023
a234604
CORE-16177 Implemented KafkaMessageProducer factory
ben-millar Sep 8, 2023
ccfbb72
CORE-16177 Adding CordaMessage unit tests
ben-millar Sep 11, 2023
d4dba85
CORE-16177 Adding KafkaMessageProducerImpl unit tests
ben-millar Sep 11, 2023
0b2b869
CORE-16177 Removing DBMessageProducerImpl
ben-millar Sep 11, 2023
3dea3df
CORE-16177 Include the kotlinx.coroutine JAR within message0-bus JAR
ben-millar Sep 12, 2023
0817b6a
CORE-16177 Addressing PR comments
ben-millar Sep 12, 2023
a147437
CORE-16177 Reverting changes to CordaKafkaProducerImpl
ben-millar Sep 12, 2023
d635421
CORE-16177 Removing MessageProducerFactory
ben-millar Sep 12, 2023
a6a094d
CORE-16177 Tidied up MessageProducerImpl::send()
ben-millar Sep 12, 2023
87df137
CORE-16177 Updating comment in CordaMessage.kt
ben-millar Sep 14, 2023
da6f7b7
CORE-16177 Ensuring 'Deferred' is completed in all cases before retur…
ben-millar Sep 14, 2023
eb544fc
CORE-16177 Modifying how the kotlinx.coroutine dependency is packaged
ben-millar Sep 14, 2023
6238bd3
CORE-16177: Align Kotlin coroutines with other wrapped bundles. (#4657)
chrisr3 Sep 17, 2023
b532ca0
CORE-16177 Refactored to address PR comments
mbrkic-r3 Sep 18, 2023
15f50d7
CORE-16177 Fixed dependencies
mbrkic-r3 Sep 18, 2023
e9b7653
CORE-16199 Implementation of MessageRouter and related classes, imple…
mbrkic-r3 Sep 18, 2023
b2a7c72
CORE-16199 Refactoring
mbrkic-r3 Sep 18, 2023
09ed3a5
CORE-16177 Reverted build file for kafka-message-bus-impl
mbrkic-r3 Sep 18, 2023
153048d
CORE-16199 Refactoring
mbrkic-r3 Sep 18, 2023
a8511b1
CORE-16199 Refactoring
mbrkic-r3 Sep 18, 2023
a102c69
CORE-16199 Refactoring
mbrkic-r3 Sep 18, 2023
04afcd5
Merge remote-tracking branch 'origin/bm/CORE-16177' into mbrkic-r3/CO…
mbrkic-r3 Sep 19, 2023
f38f2a2
CORE-16199 Refactoring
mbrkic-r3 Sep 19, 2023
4a31710
CORE-16199 Refactoring, using Deferred instead of CompletableFuture i…
mbrkic-r3 Sep 20, 2023
0280425
CORE-16199 Refactoring
mbrkic-r3 Sep 22, 2023
e76fc6c
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 22, 2023
e91df9d
CORE-16199 Renaming MediatorProducer to MessagingClient
mbrkic-r3 Sep 22, 2023
96a9b98
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 22, 2023
dc0695b
CORE-16199 Fixed compile errors after merge with latest from 5.1
mbrkic-r3 Sep 22, 2023
cb5f5e6
CORE-16199 Fixed bug with persisting states
mbrkic-r3 Sep 27, 2023
f89d6e5
CORE-16199 Removed unnecessary build task
mbrkic-r3 Sep 27, 2023
1000241
CORE-16199 Added missing events to flow engine router and enabled Flo…
mbrkic-r3 Sep 27, 2023
d513bee
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 27, 2023
f3996eb
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 27, 2023
7a20c15
CORE-16199 Refactoring
mbrkic-r3 Sep 27, 2023
76650d2
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 28, 2023
71d0b56
CORE-16199 Removed package api/mediator/statemanager, using interface…
mbrkic-r3 Sep 28, 2023
f2d1e87
CORE-16199 Refactoring
mbrkic-r3 Sep 28, 2023
31f88c4
CORE-16199 Refactored MessageBusClient
mbrkic-r3 Sep 29, 2023
6be23dc
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 29, 2023
8db1cb4
CORE-16199 Made polling consumers async
mbrkic-r3 Sep 29, 2023
9388869
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 29, 2023
5b47423
CORE-16199 Added unit test testPollWithError
mbrkic-r3 Sep 29, 2023
a14250e
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Sep 29, 2023
7166868
CORE-16199 Added factories for creating Multi-Source Event Mediators …
mbrkic-r3 Oct 3, 2023
4b9ef74
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 3, 2023
cd0581b
CORE-16199 Fixed compile errors after merge with latest from 5.1
mbrkic-r3 Oct 3, 2023
99a744d
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 7, 2023
c0d48c5
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 7, 2023
a5a387d
CORE-16199 Fixed event processing bugs
mbrkic-r3 Oct 9, 2023
28225ea
CORE-16199 Fixed unit tests.
mbrkic-r3 Oct 9, 2023
a493250
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 9, 2023
d829ea5
CORE-16199 Fixed unit tests.
mbrkic-r3 Oct 10, 2023
4a36056
CORE-16199 Added logging to analyse flaky test
mbrkic-r3 Oct 10, 2023
c975f4b
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 10, 2023
c7a159f
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 10, 2023
1908794
CORE-16199 Added logging to analyse flaky test
mbrkic-r3 Oct 10, 2023
f291084
CORE-16199 Added logging to analyse flaky test
mbrkic-r3 Oct 10, 2023
a31fd68
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 10, 2023
3333442
CORE-16199 Added routing for TokenPoolCacheEvent
mbrkic-r3 Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.output.FlowStatus
import net.corda.data.flow.state.checkpoint.Checkpoint
import net.corda.data.ledger.persistence.LedgerPersistenceRequest
import net.corda.data.ledger.utxo.token.selection.event.TokenPoolCacheEvent
import net.corda.data.persistence.EntityRequest
import net.corda.data.uniqueness.UniquenessCheckRequestAvro
import net.corda.flow.pipeline.factory.FlowEventProcessorFactory
Expand All @@ -27,6 +28,7 @@ import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC
import net.corda.schema.Schemas.Persistence.PERSISTENCE_ENTITY_PROCESSOR_TOPIC
import net.corda.schema.Schemas.Persistence.PERSISTENCE_LEDGER_PROCESSOR_TOPIC
import net.corda.schema.Schemas.Services.TOKEN_CACHE_EVENT
import net.corda.schema.Schemas.UniquenessChecker.UNIQUENESS_CHECK_TOPIC
import net.corda.schema.Schemas.Verification.VERIFICATION_LEDGER_PROCESSOR_TOPIC
import org.osgi.service.component.annotations.Activate
Expand Down Expand Up @@ -94,6 +96,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
is FlowOpsRequest -> routeTo(messageBusClient, FLOW_OPS_MESSAGE_TOPIC)
is FlowStatus -> routeTo(messageBusClient, FLOW_STATUS_TOPIC)
is LedgerPersistenceRequest -> routeTo(messageBusClient, PERSISTENCE_LEDGER_PROCESSOR_TOPIC)
is TokenPoolCacheEvent -> routeTo(messageBusClient, TOKEN_CACHE_EVENT)
is TransactionVerificationRequest -> routeTo(messageBusClient, VERIFICATION_LEDGER_PROCESSOR_TOPIC)
is UniquenessCheckRequestAvro -> routeTo(messageBusClient, UNIQUENESS_CHECK_TOPIC)
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE
import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSOR_TIMEOUT
import net.corda.utilities.trace
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.slf4j.LoggerFactory

@Suppress("LongParameterList")
@Component(service = [FlowExecutor::class])
// TODO @Component(service = [FlowExecutor::class])
class FlowExecutorImpl constructor(
coordinatorFactory: LifecycleCoordinatorFactory,
private val subscriptionFactory: SubscriptionFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package net.corda.flow.service

import com.typesafe.config.ConfigValueFactory
import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.state.checkpoint.Checkpoint
import net.corda.flow.messaging.mediator.FlowEventMediatorFactory
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.helper.getConfig
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.LifecycleEvent
import net.corda.lifecycle.LifecycleStatus
import net.corda.lifecycle.RegistrationHandle
import net.corda.lifecycle.StartEvent
import net.corda.lifecycle.StopEvent
import net.corda.lifecycle.createCoordinator
import net.corda.messaging.api.mediator.MultiSourceEventMediator
import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG
import net.corda.schema.configuration.ConfigKeys.MESSAGING_CONFIG
import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE
import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSOR_TIMEOUT
import net.corda.utilities.trace
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.slf4j.LoggerFactory

// TODO @Component(service = [FlowExecutor::class])
@Component(service = [FlowExecutor::class])
class FlowExecutorMediatorImpl (
coordinatorFactory: LifecycleCoordinatorFactory,
private val flowEventMediatorFactory: FlowEventMediatorFactory,
private val toMessagingConfig: (Map<String, SmartConfig>) -> SmartConfig,
) : FlowExecutor {

@Activate
constructor(
@Reference(service = LifecycleCoordinatorFactory::class)
coordinatorFactory: LifecycleCoordinatorFactory,
@Reference(service = FlowEventMediatorFactory::class)
flowEventMediatorFactory: FlowEventMediatorFactory,
) : this(
coordinatorFactory,
flowEventMediatorFactory,
{ cfg -> cfg.getConfig(MESSAGING_CONFIG) }
)

companion object {
private val log = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val coordinator = coordinatorFactory.createCoordinator<FlowExecutor> { event, _ -> eventHandler(event) }
private var subscriptionRegistrationHandle: RegistrationHandle? = null
private var multiSourceEventMediator: MultiSourceEventMediator<String, Checkpoint, FlowEvent>? = null

override fun onConfigChange(config: Map<String, SmartConfig>) {
try {
val messagingConfig = toMessagingConfig(config)
val updatedConfigs = updateConfigsWithFlowConfig(config, messagingConfig)

// close the lifecycle registration first to prevent down being signaled
subscriptionRegistrationHandle?.close()
multiSourceEventMediator?.close()

multiSourceEventMediator = flowEventMediatorFactory.create(
updatedConfigs,
messagingConfig,
)

subscriptionRegistrationHandle = coordinator.followStatusChangesByName(
setOf(multiSourceEventMediator!!.subscriptionName)
)

multiSourceEventMediator?.start()
} catch (ex: Exception) {
val reason = "Failed to configure the flow executor using '${config}'"
log.error(reason, ex)
coordinator.updateStatus(LifecycleStatus.ERROR, reason)
}
}

override val isRunning: Boolean
get() = coordinator.isRunning

override fun start() {
coordinator.start()
}

override fun stop() {
coordinator.stop()
}

private fun updateConfigsWithFlowConfig(
initialConfigs: Map<String, SmartConfig>,
messagingConfig: SmartConfig
): Map<String, SmartConfig> {
val flowConfig = initialConfigs.getConfig(FLOW_CONFIG)
val updatedFlowConfig = flowConfig
.withValue(PROCESSOR_TIMEOUT, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(PROCESSOR_TIMEOUT)))
.withValue(MAX_ALLOWED_MSG_SIZE, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(MAX_ALLOWED_MSG_SIZE)))

return initialConfigs.mapValues {
if (it.key == FLOW_CONFIG) {
updatedFlowConfig
} else {
it.value
}
}
}

private fun eventHandler(event: LifecycleEvent) {
when (event) {
is StartEvent -> {
coordinator.updateStatus(LifecycleStatus.UP)
}
is StopEvent -> {
log.trace { "Flow executor is stopping..." }
subscriptionRegistrationHandle?.close()
multiSourceEventMediator?.close()
log.trace { "Flow executor stopped" }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,38 @@ class MultiSourceEventMediatorImpl<K : Any, S : Any, E : Any>(

private fun processEvents() {
log.debug { "Polling and processing events" }
log.info("Polling consumers")
val messages = pollConsumers()
log.info("Polling consumers done")
if (messages.isNotEmpty()) {
val msgGroups = messages.groupBy { it.key }
log.info("Retrieving states from StateManager")
val persistedStates = stateManager.get(msgGroups.keys.map { it.toString() })
log.info("Creating message processor tasks")
var msgProcessorTasks = taskManagerHelper.createMessageProcessorTasks(
msgGroups, persistedStates, config.messageProcessor
)
do {
log.info("Executing processor tasks")
val processingResults = taskManagerHelper.executeProcessorTasks(msgProcessorTasks)
log.info("Persisting states")
val conflictingStates = stateManagerHelper.persistStates(processingResults)
log.info("Splitting successful/failed states")
val (successResults, failResults) = processingResults.partition {
!conflictingStates.contains(it.key.toString())
}
log.info("Creating client tasks")
val clientTasks = taskManagerHelper.createClientTasks(successResults, messageRouter)
log.info("Executing client tasks")
val clientResults = taskManagerHelper.executeClientTasks(clientTasks)
log.info("Generating new tasks")
msgProcessorTasks =
taskManagerHelper.createMessageProcessorTasks(clientResults) +
taskManagerHelper.createMessageProcessorTasks(failResults, conflictingStates)
} while (msgProcessorTasks.isNotEmpty())
log.info("Committing offsets")
commitOffsets()
log.info("Committing offsets done")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ class TaskManagerImpl @Activate constructor() : TaskManager {
}

private fun <T> executeShortRunning(command: () -> T): CompletableFuture<T> {
val result = CompletableFuture<T>()
val resultFuture = CompletableFuture<T>()
executorService.execute {
result.complete(command())
try {
val result = command()
resultFuture.complete(result)
} catch (t: Throwable) {
resultFuture.completeExceptionally(t)
}
}
return result
return resultFuture
}

private fun <T> executeLongRunning(command: () -> T): CompletableFuture<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.messaging.api.records.Record
import net.corda.test.util.waitWhile
import org.junit.jupiter.api.BeforeEach
// import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.atLeast
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.concurrent.CompletableFuture

class MultiSourceEventMediatorImplTest {
companion object {
private val log = LoggerFactory.getLogger(this::class.java.enclosingClass)
private const val TEST_TIMEOUT_SECONDS = 20L
const val KEY1 = "key1"
const val KEY2 = "key2"
Expand Down Expand Up @@ -119,8 +121,7 @@ class MultiSourceEventMediatorImplTest {
)
}

//@Test
// TODO Test temporarily disabled as it seems to be flaky
@Test
fun `mediator processes multiples events by key`() {
val events = (1..6).map { "event$it" }
val eventBatches = listOf(
Expand All @@ -142,26 +143,39 @@ class MultiSourceEventMediatorImplTest {
whenever(consumer.poll(any())).thenAnswer {
CompletableDeferred(
if (batchNumber < eventBatches.size) {
log.info("Polling mock for batch number [$batchNumber]")
eventBatches[batchNumber++]
} else {
log.info("Polling mock, all batches consumed")
Thread.sleep(10)
emptyList()
}
)
}

log.info("Starting mediator")
mediator.start()
waitWhile(Duration.ofSeconds(TEST_TIMEOUT_SECONDS)) { batchNumber < eventBatches.size }
log.info("Closing mediator")
mediator.close()

log.info("verify(mediatorConsumerFactory)")
verify(mediatorConsumerFactory).create(any<MediatorConsumerConfig<Any, Any>>())
log.info("verify(messagingClientFactory)")
verify(messagingClientFactory).create(any<MessagingClientConfig>())
log.info("verify(messageRouterFactory)")
verify(messageRouterFactory).create(any<MessagingClientFinder>())
log.info("verify(messageProcessor)")
verify(messageProcessor, times(events.size)).onNext(anyOrNull(), any())
log.info("verify(stateManager).get")
verify(stateManager, times(eventBatches.size)).get(any())
log.info("verify(stateManager).create")
verify(stateManager, times(eventBatches.size)).create(any())
log.info("verify(consumer).poll")
verify(consumer, atLeast(eventBatches.size)).poll(any())
log.info("verify(consumer).asyncCommitOffsets")
verify(consumer, times(eventBatches.size)).asyncCommitOffsets()
log.info("verify(messagingClient)")
verify(messagingClient, times(events.size)).send(any())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import net.corda.messaging.api.mediator.MessageRouter
import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY
import net.corda.messaging.api.mediator.taskmanager.TaskManager
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.messaging.api.records.Record
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.kotlin.mock
import net.corda.messaging.api.records.Record
import org.mockito.Mockito.`when`
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify

Expand Down