Skip to content

Commit

Permalink
Merge pull request #4985 from corda/vkolomeyko/e2e-timeout-investigation
Browse files Browse the repository at this point in the history
CORE-17768: Revert - Topology changes - Use new topics (#4931)
  • Loading branch information
Omar-awad authored Oct 27, 2023
2 parents 1de54e6 + 9738094 commit 9b4bf91
Show file tree
Hide file tree
Showing 32 changed files with 96 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import net.corda.libs.packaging.core.CpkMetadata
import net.corda.messaging.api.records.Record
import net.corda.osgi.api.Application
import net.corda.osgi.api.Shutdown
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG
import net.corda.schema.configuration.FlowConfig.PROCESSING_FLOW_CLEANUP_TIME
import net.corda.schema.configuration.FlowConfig.PROCESSING_MAX_FLOW_SLEEP_DURATION
Expand Down Expand Up @@ -172,11 +172,11 @@ class CordaVNode @Activate constructor(

val rpcStartFlow = createRPCStartFlow(clientId, vnodeInfo.toAvro())
val flowId = generateRandomId()
val record = Record(FLOW_START, flowId, FlowEvent(flowId, rpcStartFlow))
val record = Record(FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, rpcStartFlow))
flowEventProcessorFactory.create(mapOf(FLOW_CONFIG to smartConfig)).apply {
val result = onNext(null, record)
result.responseEvents.singleOrNull { evt ->
evt.topic == FLOW_START
evt.topic == FLOW_EVENT_TOPIC
}?.also { evt ->
@Suppress("unchecked_cast")
onNext(result.updatedState, evt as Record<String, FlowEvent>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_SESSION,
Schemas.Flow.FLOW_EVENT_TOPIC,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand All @@ -269,7 +269,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_SESSION,
Schemas.Flow.FLOW_EVENT_TOPIC,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand All @@ -281,7 +281,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_SESSION,
Schemas.Flow.FLOW_EVENT_TOPIC,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import net.corda.flow.mapper.impl.executor.SessionEventExecutor
import net.corda.flow.mapper.impl.executor.SessionInitProcessor
import net.corda.flow.mapper.impl.executor.StartFlowExecutor
import net.corda.libs.configuration.SmartConfig
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
Expand Down Expand Up @@ -66,7 +66,7 @@ class FlowMapperEventExecutorFactoryImpl @Activate constructor(
}
}

is StartFlow -> StartFlowExecutor(eventKey, FLOW_START, flowMapperEventPayload, state)
is StartFlow -> StartFlowExecutor(eventKey, FLOW_EVENT_TOPIC, flowMapperEventPayload, state)
is ExecuteCleanup -> ExecuteCleanupEventExecutor(eventKey)
is ScheduleCleanup -> ScheduleCleanupEventExecutor(eventKey, flowMapperEventPayload, state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class RecordFactoryImpl @Activate constructor(

private fun getSessionEventOutputTopic(sessionEvent: SessionEvent): String {
return when (sessionEvent.messageDirection) {
MessageDirection.INBOUND -> Schemas.Flow.FLOW_SESSION
MessageDirection.INBOUND -> Schemas.Flow.FLOW_EVENT_TOPIC
MessageDirection.OUTBOUND -> {
if (isLocalCluster(sessionEvent)) {
Schemas.Flow.FLOW_MAPPER_SESSION_IN
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
} else {
Schemas.P2P.P2P_OUT_TOPIC
}
Expand All @@ -120,8 +120,8 @@ class RecordFactoryImpl @Activate constructor(
) : Record<*, *> {
val outputTopic = getSessionEventOutputTopic(sourceEvent)
val (newDirection, sessionId) = when (outputTopic) {
Schemas.Flow.FLOW_MAPPER_SESSION_IN -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId))
Schemas.Flow.FLOW_SESSION -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId)
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId))
Schemas.Flow.FLOW_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId)
else -> Pair(MessageDirection.OUTBOUND, sourceEvent.sessionId)
}
val sequenceNumber = if (newPayload is SessionError) null else sourceEvent.sequenceNum
Expand All @@ -136,14 +136,14 @@ class RecordFactoryImpl @Activate constructor(
sourceEvent.contextSessionProperties
)
return when (outputTopic) {
Schemas.Flow.FLOW_SESSION -> {
Schemas.Flow.FLOW_EVENT_TOPIC -> {
if (flowId == null) {
throw IllegalArgumentException("Flow ID is required to forward an event back to the flow event" +
"topic, but it was not provided.")
}
Record(outputTopic, flowId, FlowEvent(flowId, sessionEvent))
}
Schemas.Flow.FLOW_MAPPER_SESSION_IN -> {
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> {
Record(outputTopic, sessionEvent.sessionId, FlowMapperEvent(sessionEvent))
}
Schemas.P2P.P2P_OUT_TOPIC -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.lang.IllegalArgumentException
import java.nio.ByteBuffer
import java.time.Instant

Expand Down Expand Up @@ -97,7 +98,7 @@ internal class RecordFactoryImplTest {
"my-flow-id"
)
assertThat(record).isNotNull
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
verify(locallyHostedIdentitiesServiceSameCluster).isHostedLocally(bobId.toCorda())
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
Expand Down Expand Up @@ -164,7 +165,7 @@ internal class RecordFactoryImplTest {
flowConfig,
FLOW_ID
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
assertThat(record.key).isEqualTo(FLOW_ID)
assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java)
val sessionOutput = (record.value as FlowEvent).payload as SessionEvent
Expand Down Expand Up @@ -193,7 +194,7 @@ internal class RecordFactoryImplTest {
FLOW_ID
)
assertThat(record).isNotNull
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
assertThat(sessionOutput.sessionId).isEqualTo("$SESSION_ID-INITIATED")
Expand Down Expand Up @@ -248,7 +249,7 @@ internal class RecordFactoryImplTest {
flowConfig,
FLOW_ID
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
assertThat(record.key).isEqualTo(FLOW_ID)
assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java)
val sessionOutput = (record.value as FlowEvent).payload as SessionEvent
Expand Down Expand Up @@ -315,7 +316,7 @@ internal class RecordFactoryImplTest {
timestamp,
flowConfig,
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.key).isEqualTo("$SESSION_ID-INITIATED")
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SessionInitProcessorTest {
flowId: String
): Record<*, *> {
return if (sourceEvent.messageDirection == MessageDirection.INBOUND) {
Record(Schemas.Flow.FLOW_SESSION, flowId, FlowEvent(flowId, sourceEvent))
Record(Schemas.Flow.FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, sourceEvent))
} else {
Record(Schemas.P2P.P2P_OUT_TOPIC, "sessionId", "")
}
Expand Down Expand Up @@ -79,7 +79,7 @@ class SessionInitProcessorTest {

Assertions.assertThat(outboundEvents.size).isEqualTo(1)
val outboundEvent = outboundEvents.first()
Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
Assertions.assertThat(outboundEvent.key::class).isEqualTo(String::class)
Assertions.assertThat(outboundEvent.value!!::class).isEqualTo(FlowEvent::class)
Assertions.assertThat(payload.sessionId).isEqualTo("sessionId-INITIATED")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.schema.Schemas.Config.CONFIG_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_CLEANUP_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC
import net.corda.schema.Schemas.ScheduledTask
import net.corda.schema.configuration.BootConfig.BOOT_MAX_ALLOWED_MSG_SIZE
Expand Down Expand Up @@ -147,7 +145,7 @@ class FlowMapperServiceIntegrationTest {
}
}

//@Test
@Test
fun `Test first session event outbound sets up flow mapper state, verify subsequent messages received are passed to flow event topic`
() {
val testId = "test1"
Expand All @@ -158,7 +156,7 @@ class FlowMapperServiceIntegrationTest {

//send 2 session init
val sessionDataAndInitEvent = Record<Any, Any>(
FLOW_MAPPER_SESSION_OUT, testSessionId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.OUTBOUND, testSessionId, 1, SessionData(ByteBuffer.wrap("bytes".toByteArray()), SessionInit(
testCpiId, testFlowId, emptyKeyValuePairList(), emptyKeyValuePairList()
Expand All @@ -183,7 +181,7 @@ class FlowMapperServiceIntegrationTest {

//send data back
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_SESSION_IN, testSessionId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.INBOUND,
testSessionId,
Expand All @@ -209,7 +207,7 @@ class FlowMapperServiceIntegrationTest {
flowEventMediator.close()
}

//@Test
@Test
fun testStartRPCDuplicatesAndCleanup() {
val testId = "test2"
val publisher = publisherFactory.createPublisher(PublisherConfig(testId), messagingConfig)
Expand All @@ -230,7 +228,7 @@ class FlowMapperServiceIntegrationTest {
)

val startRPCEvent = Record<Any, Any>(
FLOW_MAPPER_START, testId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent(
StartFlow(
context,
""
Expand Down Expand Up @@ -279,14 +277,14 @@ class FlowMapperServiceIntegrationTest {
flowEventMediator.close()
}

//@Test
@Test
fun testNoStateForMapper() {
val testId = "test3"
val publisher = publisherFactory.createPublisher(PublisherConfig(testId), messagingConfig)

//send data, no state
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_SESSION_OUT, testId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.OUTBOUND,
testId,
Expand Down Expand Up @@ -319,7 +317,7 @@ class FlowMapperServiceIntegrationTest {

//send 2 session init, 1 is duplicate
val sessionInitEvent = Record<Any, Any>(
FLOW_MAPPER_SESSION_OUT, testSessionId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.OUTBOUND, testSessionId, 1, SessionCounterpartyInfoRequest(SessionInit(
testCpiId, testFlowId, emptyKeyValuePairList(), emptyKeyValuePairList()
Expand Down Expand Up @@ -347,7 +345,7 @@ class FlowMapperServiceIntegrationTest {

//send data back
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_SESSION_IN, testSessionId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.INBOUND,
testSessionId,
Expand Down Expand Up @@ -375,14 +373,14 @@ class FlowMapperServiceIntegrationTest {
}


//@Test
@Test
fun `when the flow mapper receives an inbound session message for a non-existent session, an error is returned`() {
val testId = "test5"
val publisher = publisherFactory.createPublisher(PublisherConfig(testId), messagingConfig)

//send data, no state
val sessionDataEvent = Record<Any, Any>(
FLOW_MAPPER_SESSION_IN, testId, FlowMapperEvent(
FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent(
buildSessionEvent(
MessageDirection.INBOUND,
testId,
Expand All @@ -398,7 +396,7 @@ class FlowMapperServiceIntegrationTest {
val mapperLatch = CountDownLatch(2) // The initial message and the error back.
val records = mutableListOf<SessionEvent>()
val mapperSub = subscriptionFactory.createPubSubSubscription(
SubscriptionConfig("$testId-mapper", FLOW_MAPPER_SESSION_IN),
SubscriptionConfig("$testId-mapper", FLOW_MAPPER_EVENT_TOPIC),
TestFlowMapperProcessor(mapperLatch, records),
messagingConfig
)
Expand All @@ -418,7 +416,7 @@ class FlowMapperServiceIntegrationTest {
assertThat(event.payload).isInstanceOf(SessionError::class.java)
}

//@Test
@Test
fun `mapper state cleanup correctly cleans up old states`() {

// Create a state in the state manager. Note the modified time has to be further in the past than the configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory
import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_SESSION
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.configuration.MessagingConfig
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
Expand Down Expand Up @@ -62,10 +61,7 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor(
.messagingConfig(messagingConfig)
.consumerFactories(
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_START, CONSUMER_GROUP, messagingConfig
),
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_SESSION, CONSUMER_GROUP, messagingConfig
FLOW_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig
),
)
.clientFactories(
Expand All @@ -85,7 +81,7 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor(

MessageRouter { message ->
when (val event = message.payload) {
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_OUT)
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC)
else -> {
val eventType = event?.let { it::class.java }
throw IllegalStateException("No route defined for event type [$eventType]")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.corda.session.mapper.messaging.mediator

import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.StartFlow
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.state.mapper.FlowMapperState
import net.corda.data.p2p.app.AppMessage
Expand All @@ -16,11 +15,8 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory
import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.Schemas.Flow.FLOW_SESSION
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC
import net.corda.schema.configuration.FlowConfig
import net.corda.session.mapper.service.executor.FlowMapperMessageProcessor
Expand Down Expand Up @@ -67,13 +63,7 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor(
.messagingConfig(messagingConfig)
.consumerFactories(
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_MAPPER_START, CONSUMER_GROUP, messagingConfig
),
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_MAPPER_SESSION_IN, CONSUMER_GROUP, messagingConfig
),
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
FLOW_MAPPER_SESSION_OUT, CONSUMER_GROUP, messagingConfig
FLOW_MAPPER_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig
),
)
.clientFactories(
Expand All @@ -94,14 +84,8 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor(
MessageRouter { message ->
when (val event = message.payload) {
is AppMessage -> routeTo(messageBusClient, P2P_OUT_TOPIC)
is FlowEvent -> {
if (event.payload is StartFlow) {
routeTo(messageBusClient, FLOW_START)
} else {
routeTo(messageBusClient, FLOW_SESSION)
}
}
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_IN)
is FlowEvent -> routeTo(messageBusClient, FLOW_EVENT_TOPIC)
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC)
else -> {
val eventType = event?.let { it::class.java }
throw IllegalStateException("No route defined for event type [$eventType]")
Expand Down
Loading

0 comments on commit 9b4bf91

Please sign in to comment.