Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*DO NOT MERGE* Topology changes - One consumer for multiple topics #4955

Closed
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
44a3635
CORE-17429 State metadata support in message processor (#4828)
mbrkic-r3 Oct 12, 2023
e928edb
CORE-16203 Removed coroutine usage from Multi-Source Event Mediator.
mbrkic-r3 Oct 13, 2023
4956ab7
CORE-16203 Replace State and Event pattern with Multi-Source Event Me…
mbrkic-r3 Oct 12, 2023
b5f29d6
CORE-16203 Set linger.ms to 0 for stateAndEvent producer in kafka-mes…
mbrkic-r3 Oct 14, 2023
6023e7c
CORE-17487 5.1 Performance integration - parallel processing (#4873)
mbrkic-r3 Oct 14, 2023
36e96e5
CORE-17661 5.1 Performance integration - Multi-Source Event Mediator …
mbrkic-r3 Oct 14, 2023
48df404
Merge pull request #4872 from corda/mbrkic-r3/feature/51-perf-integra…
driessamyn Oct 14, 2023
580044a
CORE-16203 5.1 Performance integration - FlowMapper using Multi-Sourc…
mbrkic-r3 Oct 15, 2023
1afb5c2
Merge pull request #4876 from corda/mbrkic-r3/feature/51-perf-integra…
Omar-awad Oct 15, 2023
a201644
CORE-17768 Using new topology Kafka topics (flow.start, flow.session,…
mbrkic-r3 Oct 15, 2023
036fff5
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17768/…
mbrkic-r3 Oct 15, 2023
ffa5e83
CORE-17562 Metrics for Multi-Source Event Mediator
mbrkic-r3 Oct 15, 2023
1bb2c92
CORE-17562 Fixed unit test
mbrkic-r3 Oct 15, 2023
b5dab4c
Merge branch 'release/os/5.1' into feature/51-perf-integration
driessamyn Oct 16, 2023
ea06ea4
Update API version to fix build (#4882)
driessamyn Oct 16, 2023
1f0669d
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17562/…
Omar-awad Oct 16, 2023
8542b55
Merge pull request #4879 from corda/mbrkic-r3/CORE-17562/mediator-met…
Omar-awad Oct 16, 2023
47c1f12
CORE-16242 - Synchronous RPC Pattern API Implementation for Crypto Wo…
thiagoviana Oct 16, 2023
ef0afe4
CORE-17822 Apply prefix to topic overrides (#4893)
davidcurrie Oct 17, 2023
0b169e0
CORE-17627: Add metadata to flow mapper states for mapper status (#4886)
JamesHR3 Oct 17, 2023
f6a278a
CORE-17843 Fixed stopping event mediator. Modified to work with even…
mbrkic-r3 Oct 17, 2023
0aff9f5
CORE-17388: Add session timeout metadata to store alongside the check…
JamesHR3 Oct 17, 2023
b88f1cf
CORE-16181 Implementing RPC client, routing external events through R…
ben-millar Oct 17, 2023
e81978f
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
28a4895
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
f59239e
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
19b0879
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
5d9a67d
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17768/…
mbrkic-r3 Oct 17, 2023
f45f313
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 17, 2023
6e2f759
Merge remote-tracking branch 'origin/feature/51-perf-integration' int…
mbrkic-r3 Oct 17, 2023
921fbd2
CORE-17768 Optimized imports after merge
mbrkic-r3 Oct 17, 2023
061bf89
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17843/…
mbrkic-r3 Oct 17, 2023
ea96da7
CORE-17626: Add integration test for flow mapper cleanup (#4907)
JamesHR3 Oct 18, 2023
f674a17
CORE-17882: Add a route for flow events (#4915)
JamesHR3 Oct 18, 2023
b30fc1c
Merge remote-tracking branch 'origin/feature/51-perf-integration' int…
mbrkic-r3 Oct 18, 2023
6643999
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17843/FlowMapperS…
mbrkic-r3 Oct 18, 2023
a141940
Merge branch 'release/os/5.1' into feature/51-perf-integration
driessamyn Oct 18, 2023
adac57c
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17843/…
mbrkic-r3 Oct 18, 2023
91826e0
CORE-17843 Removed duplicate variable in TestStateManagerFactoryImpl …
mbrkic-r3 Oct 18, 2023
eb0945a
CORE-17768 Disabled smoke test "cluster configuration changes are pic…
mbrkic-r3 Oct 18, 2023
05741a6
CORE-17843 Disabled smoke test "cluster configuration changes are pic…
mbrkic-r3 Oct 18, 2023
48e04eb
Revert "CORE-17768 Disabled smoke test "cluster configuration changes…
mbrkic-r3 Oct 18, 2023
d4d30a8
CORE-17843 Minor fixed based on review feedback.
mbrkic-r3 Oct 18, 2023
6c5c5f2
Merge remote-tracking branch 'origin/feature/51-perf-integration' int…
mbrkic-r3 Oct 18, 2023
8ee72f8
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17843/FlowMapperS…
mbrkic-r3 Oct 18, 2023
127cfdd
CORE-17768 Fixed compile errors after merge
mbrkic-r3 Oct 18, 2023
9cefd49
CORE-17768 Fixed bad merge
mbrkic-r3 Oct 18, 2023
5bee79f
CORE-17768 Set API version to 5.1.0.37-alpha-1697718230127
mbrkic-r3 Oct 19, 2023
2542989
CORE-17768 Set API version to 5.1.0.37-alpha-1697725680919
mbrkic-r3 Oct 19, 2023
d78533b
CORE-17768 Added logging for debugging purposes
mbrkic-r3 Oct 19, 2023
425b66a
Revert "CORE-17768 Added logging for debugging purposes"
mbrkic-r3 Oct 19, 2023
81d9d54
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 20, 2023
24d2464
CORE-17768 Routing FlowEvents that FlowEngine sends out to itself to …
mbrkic-r3 Oct 20, 2023
eb3dcd7
CORE-17768 Fixed failing FlowEventMediatorFactoryImplTest
mbrkic-r3 Oct 20, 2023
787c31b
CORE-17768 Using one consumer for multiple topics
mbrkic-r3 Oct 23, 2023
f0a10d2
CORE-17768 Fixed unit tests
mbrkic-r3 Oct 23, 2023
3567800
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 26, 2023
17cc8d6
CORE-17768 Using beta version of API
mbrkic-r3 Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import net.corda.libs.packaging.core.CpkMetadata
import net.corda.messaging.api.records.Record
import net.corda.osgi.api.Application
import net.corda.osgi.api.Shutdown
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG
import net.corda.schema.configuration.FlowConfig.PROCESSING_FLOW_CLEANUP_TIME
import net.corda.schema.configuration.FlowConfig.PROCESSING_MAX_FLOW_SLEEP_DURATION
Expand Down Expand Up @@ -172,11 +172,11 @@ class CordaVNode @Activate constructor(

val rpcStartFlow = createRPCStartFlow(clientId, vnodeInfo.toAvro())
val flowId = generateRandomId()
val record = Record(FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, rpcStartFlow))
val record = Record(FLOW_START, flowId, FlowEvent(flowId, rpcStartFlow))
flowEventProcessorFactory.create(mapOf(FLOW_CONFIG to smartConfig)).apply {
val result = onNext(null, record)
result.responseEvents.singleOrNull { evt ->
evt.topic == FLOW_EVENT_TOPIC
evt.topic == FLOW_START
}?.also { evt ->
@Suppress("unchecked_cast")
onNext(result.updatedState, evt as Record<String, FlowEvent>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_EVENT_TOPIC,
Schemas.Flow.FLOW_SESSION,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand All @@ -269,7 +269,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_EVENT_TOPIC,
Schemas.Flow.FLOW_SESSION,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand All @@ -281,7 +281,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_EVENT_TOPIC,
Schemas.Flow.FLOW_SESSION,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import net.corda.flow.mapper.impl.executor.SessionEventExecutor
import net.corda.flow.mapper.impl.executor.SessionInitProcessor
import net.corda.flow.mapper.impl.executor.StartFlowExecutor
import net.corda.libs.configuration.SmartConfig
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_START
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
Expand Down Expand Up @@ -66,7 +66,7 @@ class FlowMapperEventExecutorFactoryImpl @Activate constructor(
}
}

is StartFlow -> StartFlowExecutor(eventKey, FLOW_EVENT_TOPIC, flowMapperEventPayload, state)
is StartFlow -> StartFlowExecutor(eventKey, FLOW_START, flowMapperEventPayload, state)
is ExecuteCleanup -> ExecuteCleanupEventExecutor(eventKey)
is ScheduleCleanup -> ScheduleCleanupEventExecutor(eventKey, flowMapperEventPayload, state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class RecordFactoryImpl @Activate constructor(

private fun getSessionEventOutputTopic(sessionEvent: SessionEvent): String {
return when (sessionEvent.messageDirection) {
MessageDirection.INBOUND -> Schemas.Flow.FLOW_EVENT_TOPIC
MessageDirection.INBOUND -> Schemas.Flow.FLOW_SESSION
MessageDirection.OUTBOUND -> {
if (isLocalCluster(sessionEvent)) {
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
Schemas.Flow.FLOW_MAPPER_SESSION_IN
} else {
Schemas.P2P.P2P_OUT_TOPIC
}
Expand All @@ -120,8 +120,8 @@ class RecordFactoryImpl @Activate constructor(
) : Record<*, *> {
val outputTopic = getSessionEventOutputTopic(sourceEvent)
val (newDirection, sessionId) = when (outputTopic) {
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId))
Schemas.Flow.FLOW_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId)
Schemas.Flow.FLOW_MAPPER_SESSION_IN -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId))
Schemas.Flow.FLOW_SESSION -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId)
else -> Pair(MessageDirection.OUTBOUND, sourceEvent.sessionId)
}
val sequenceNumber = if (newPayload is SessionError) null else sourceEvent.sequenceNum
Expand All @@ -136,14 +136,14 @@ class RecordFactoryImpl @Activate constructor(
sourceEvent.contextSessionProperties
)
return when (outputTopic) {
Schemas.Flow.FLOW_EVENT_TOPIC -> {
Schemas.Flow.FLOW_SESSION -> {
if (flowId == null) {
throw IllegalArgumentException("Flow ID is required to forward an event back to the flow event" +
"topic, but it was not provided.")
}
Record(outputTopic, flowId, FlowEvent(flowId, sessionEvent))
}
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> {
Schemas.Flow.FLOW_MAPPER_SESSION_IN -> {
Record(outputTopic, sessionEvent.sessionId, FlowMapperEvent(sessionEvent))
}
Schemas.P2P.P2P_OUT_TOPIC -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.lang.IllegalArgumentException
import java.nio.ByteBuffer
import java.time.Instant

Expand Down Expand Up @@ -98,7 +97,7 @@ internal class RecordFactoryImplTest {
"my-flow-id"
)
assertThat(record).isNotNull
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
verify(locallyHostedIdentitiesServiceSameCluster).isHostedLocally(bobId.toCorda())
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
Expand Down Expand Up @@ -165,7 +164,7 @@ internal class RecordFactoryImplTest {
flowConfig,
FLOW_ID
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
assertThat(record.key).isEqualTo(FLOW_ID)
assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java)
val sessionOutput = (record.value as FlowEvent).payload as SessionEvent
Expand Down Expand Up @@ -194,7 +193,7 @@ internal class RecordFactoryImplTest {
FLOW_ID
)
assertThat(record).isNotNull
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
assertThat(sessionOutput.sessionId).isEqualTo("$SESSION_ID-INITIATED")
Expand Down Expand Up @@ -249,7 +248,7 @@ internal class RecordFactoryImplTest {
flowConfig,
FLOW_ID
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
assertThat(record.key).isEqualTo(FLOW_ID)
assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java)
val sessionOutput = (record.value as FlowEvent).payload as SessionEvent
Expand Down Expand Up @@ -316,7 +315,7 @@ internal class RecordFactoryImplTest {
timestamp,
flowConfig,
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.key).isEqualTo("$SESSION_ID-INITIATED")
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SessionInitProcessorTest {
flowId: String
): Record<*, *> {
return if (sourceEvent.messageDirection == MessageDirection.INBOUND) {
Record(Schemas.Flow.FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, sourceEvent))
Record(Schemas.Flow.FLOW_SESSION, flowId, FlowEvent(flowId, sourceEvent))
} else {
Record(Schemas.P2P.P2P_OUT_TOPIC, "sessionId", "")
}
Expand Down Expand Up @@ -79,7 +79,7 @@ class SessionInitProcessorTest {

Assertions.assertThat(outboundEvents.size).isEqualTo(1)
val outboundEvent = outboundEvents.first()
Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
Assertions.assertThat(outboundEvent.key::class).isEqualTo(String::class)
Assertions.assertThat(outboundEvent.value!!::class).isEqualTo(FlowEvent::class)
Assertions.assertThat(payload.sessionId).isEqualTo("sessionId-INITIATED")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.schema.Schemas.Config.CONFIG_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_CLEANUP_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC
import net.corda.schema.Schemas.ScheduledTask
import net.corda.schema.configuration.BootConfig.BOOT_MAX_ALLOWED_MSG_SIZE
Expand Down Expand Up @@ -156,7 +158,7 @@ class FlowMapperServiceIntegrationTest {

//send 2 session init
val sessionDataAndInitEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
FLOW_MAPPER_SESSION_OUT, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.OUTBOUND, testSessionId, 1, SessionData(ByteBuffer.wrap("bytes".toByteArray()), SessionInit(
testCpiId, testFlowId, emptyKeyValuePairList(), emptyKeyValuePairList()
Expand All @@ -181,7 +183,7 @@ class FlowMapperServiceIntegrationTest {

//send data back
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
FLOW_MAPPER_SESSION_IN, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.INBOUND,
testSessionId,
Expand Down Expand Up @@ -228,7 +230,7 @@ class FlowMapperServiceIntegrationTest {
)

val startRPCEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent(
FLOW_MAPPER_START, testId, FlowMapperEvent(
StartFlow(
context,
""
Expand Down Expand Up @@ -284,7 +286,7 @@ class FlowMapperServiceIntegrationTest {

//send data, no state
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent(
FLOW_MAPPER_SESSION_OUT, testId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.OUTBOUND,
testId,
Expand Down Expand Up @@ -317,7 +319,7 @@ class FlowMapperServiceIntegrationTest {

//send 2 session init, 1 is duplicate
val sessionInitEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
FLOW_MAPPER_SESSION_OUT, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.OUTBOUND, testSessionId, 1, SessionCounterpartyInfoRequest(SessionInit(
testCpiId, testFlowId, emptyKeyValuePairList(), emptyKeyValuePairList()
Expand Down Expand Up @@ -345,7 +347,7 @@ class FlowMapperServiceIntegrationTest {

//send data back
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
FLOW_MAPPER_SESSION_IN, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.INBOUND,
testSessionId,
Expand Down Expand Up @@ -380,7 +382,7 @@ class FlowMapperServiceIntegrationTest {

//send data, no state
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent(
FLOW_MAPPER_SESSION_IN, testId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.INBOUND,
testId,
Expand All @@ -396,7 +398,7 @@ class FlowMapperServiceIntegrationTest {
val mapperLatch = CountDownLatch(2) // The initial message and the error back.
val records = mutableListOf<SessionEvent>()
val mapperSub = subscriptionFactory.createPubSubSubscription(
SubscriptionConfig("$testId-mapper", FLOW_MAPPER_EVENT_TOPIC),
SubscriptionConfig("$testId-mapper", FLOW_MAPPER_SESSION_IN),
TestFlowMapperProcessor(mapperLatch, records),
messagingConfig
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory
import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_SESSION
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.configuration.MessagingConfig
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
Expand Down Expand Up @@ -61,7 +62,12 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor(
.messagingConfig(messagingConfig)
.consumerFactories(
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig
listOf(
FLOW_START,
FLOW_SESSION,
),
CONSUMER_GROUP,
messagingConfig
),
)
.clientFactories(
Expand All @@ -81,7 +87,7 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor(

MessageRouter { message ->
when (val event = message.payload) {
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC)
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_OUT)
else -> {
val eventType = event?.let { it::class.java }
throw IllegalStateException("No route defined for event type [$eventType]")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.corda.session.mapper.messaging.mediator

import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.StartFlow
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.state.mapper.FlowMapperState
import net.corda.data.p2p.app.AppMessage
Expand All @@ -15,8 +16,11 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory
import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.Schemas.Flow.FLOW_SESSION
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC
import net.corda.schema.configuration.FlowConfig
import net.corda.session.mapper.service.executor.FlowMapperMessageProcessor
Expand Down Expand Up @@ -63,7 +67,13 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor(
.messagingConfig(messagingConfig)
.consumerFactories(
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_MAPPER_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig
listOf(
FLOW_MAPPER_START,
FLOW_MAPPER_SESSION_IN,
FLOW_MAPPER_SESSION_OUT,
),
CONSUMER_GROUP,
messagingConfig
),
)
.clientFactories(
Expand All @@ -84,8 +94,14 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor(
MessageRouter { message ->
when (val event = message.payload) {
is AppMessage -> routeTo(messageBusClient, P2P_OUT_TOPIC)
is FlowEvent -> routeTo(messageBusClient, FLOW_EVENT_TOPIC)
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC)
is FlowEvent -> {
if (event.payload is StartFlow) {
routeTo(messageBusClient, FLOW_START)
} else {
routeTo(messageBusClient, FLOW_SESSION)
}
}
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_IN)
else -> {
val eventType = event?.let { it::class.java }
throw IllegalStateException("No route defined for event type [$eventType]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import net.corda.data.flow.state.mapper.FlowMapperState
import net.corda.data.flow.state.mapper.FlowMapperStateType
import net.corda.messaging.api.records.Record
import net.corda.messaging.api.subscription.listener.StateAndEventListener
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.utilities.debug
import net.corda.utilities.trace
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -38,7 +38,7 @@ class FlowMapperListener(
publisher?.publish(
listOf(
Record(
FLOW_MAPPER_EVENT_TOPIC, key, FlowMapperEvent(
FLOW_MAPPER_SESSION_IN, key, FlowMapperEvent(
ExecuteCleanup(listOf())
)
)
Expand Down Expand Up @@ -81,7 +81,7 @@ class FlowMapperListener(
executorService.schedule(
{
log.debug { "Clearing up mapper state for key $eventKey" }
publisher?.publish(listOf(Record(FLOW_MAPPER_EVENT_TOPIC, eventKey, FlowMapperEvent(ExecuteCleanup(listOf())))))
publisher?.publish(listOf(Record(FLOW_MAPPER_SESSION_IN, eventKey, FlowMapperEvent(ExecuteCleanup(listOf())))))
},
expiryTime - clock.millis(),
TimeUnit.MILLISECONDS
Expand Down
Loading