From 8f4a8f5809645a8db724d5ccbe1c8df4103167d5 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Thu, 26 Oct 2023 16:12:54 +0100 Subject: [PATCH] Revert "CORE-17768 Topology changes - Use new topics (#4931)" This reverts commit c706f7c4850d9eb5925fc5a9bbd392d108bf36a1. --- .../net/corda/example/vnode/CordaVNode.kt | 6 ++-- .../bus/CryptoFlowOpsBusProcessorTests.kt | 6 ++-- .../FlowMapperEventExecutorFactoryImpl.kt | 4 +-- .../flow/mapper/impl/RecordFactoryImpl.kt | 12 ++++---- .../flow/mapper/impl/RecordFactoryImplTest.kt | 11 +++---- .../impl/executor/SessionInitProcessorTest.kt | 4 +-- .../FlowMapperServiceIntegrationTest.kt | 30 +++++++++---------- .../TestFlowEventMediatorFactoryImpl.kt | 12 +++----- .../FlowMapperEventMediatorFactoryImpl.kt | 26 ++++------------ .../service/executor/FlowMapperListener.kt | 6 ++-- .../FlowFilterServiceIntegrationTest.kt | 4 +-- .../flow/p2p/filter/FlowP2PFilterProcessor.kt | 6 ++-- .../flow/rest/impl/v1/FlowRestResourceImpl.kt | 4 +-- .../testing/context/FlowServiceTestContext.kt | 4 +-- .../testing/context/OutputAssertionsImpl.kt | 4 +-- .../events/CryptoFlowOpsTransformerService.kt | 2 +- .../mediator/FlowEventMediatorFactoryImpl.kt | 12 ++------ .../factory/impl/FlowRecordFactoryImpl.kt | 8 ++--- .../FlowEventMediatorFactoryImplTest.kt | 4 +-- .../factory/FlowRecordFactoryImplTest.kt | 10 +++---- .../FlowEventExceptionProcessorImplTest.kt | 2 +- .../impl/FlowEventProcessorImplTest.kt | 6 ++-- .../ExternalEventResponseFactoryImpl.kt | 4 +-- .../ExternalEventResponseFactoryImplTest.kt | 12 ++++---- .../resources/kafka-messaging-defaults.conf | 8 ++--- .../resources/kafka-messaging-enforced.conf | 5 ---- .../config/MessageBusConfigResolverTest.kt | 3 +- .../mediator/MultiSourceEventMediatorImpl.kt | 6 +--- .../messaging/api/mediator/MessageRouter.kt | 2 +- .../mediator/factory/MessageRouterFactory.kt | 2 +- .../crypto/tests/CryptoProcessorTests.kt | 4 +-- .../TestExternalEventResponseMonitor.kt | 2 +- 32 files changed, 96 insertions(+), 135 deletions(-) diff --git a/applications/examples/sandbox-app/src/main/kotlin/net/corda/example/vnode/CordaVNode.kt b/applications/examples/sandbox-app/src/main/kotlin/net/corda/example/vnode/CordaVNode.kt index f68705aa43c..beb905752af 100644 --- a/applications/examples/sandbox-app/src/main/kotlin/net/corda/example/vnode/CordaVNode.kt +++ b/applications/examples/sandbox-app/src/main/kotlin/net/corda/example/vnode/CordaVNode.kt @@ -19,7 +19,7 @@ import net.corda.libs.packaging.core.CpkMetadata import net.corda.messaging.api.records.Record import net.corda.osgi.api.Application import net.corda.osgi.api.Shutdown -import net.corda.schema.Schemas.Flow.FLOW_START +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG import net.corda.schema.configuration.FlowConfig.PROCESSING_FLOW_CLEANUP_TIME import net.corda.schema.configuration.FlowConfig.PROCESSING_MAX_FLOW_SLEEP_DURATION @@ -172,11 +172,11 @@ class CordaVNode @Activate constructor( val rpcStartFlow = createRPCStartFlow(clientId, vnodeInfo.toAvro()) val flowId = generateRandomId() - val record = Record(FLOW_START, flowId, FlowEvent(flowId, rpcStartFlow)) + val record = Record(FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, rpcStartFlow)) flowEventProcessorFactory.create(mapOf(FLOW_CONFIG to smartConfig)).apply { val result = onNext(null, record) result.responseEvents.singleOrNull { evt -> - evt.topic == FLOW_START + evt.topic == FLOW_EVENT_TOPIC }?.also { evt -> @Suppress("unchecked_cast") onNext(result.updatedState, evt as Record) diff --git a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoFlowOpsBusProcessorTests.kt b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoFlowOpsBusProcessorTests.kt index 9cf345b8f1c..ddcbb332345 100644 --- a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoFlowOpsBusProcessorTests.kt +++ b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoFlowOpsBusProcessorTests.kt @@ -257,7 +257,7 @@ import kotlin.test.assertTrue ) ).thenReturn( Record( - Schemas.Flow.FLOW_SESSION, + Schemas.Flow.FLOW_EVENT_TOPIC, flowExternalEventContexts.get(it).flowId, FlowEvent() ) @@ -269,7 +269,7 @@ import kotlin.test.assertTrue ) ).thenReturn( Record( - Schemas.Flow.FLOW_SESSION, + Schemas.Flow.FLOW_EVENT_TOPIC, flowExternalEventContexts.get(it).flowId, FlowEvent() ) @@ -281,7 +281,7 @@ import kotlin.test.assertTrue ) ).thenReturn( Record( - Schemas.Flow.FLOW_SESSION, + Schemas.Flow.FLOW_EVENT_TOPIC, flowExternalEventContexts.get(it).flowId, FlowEvent() ) diff --git a/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt b/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt index efd465ba2c9..453df0c1281 100644 --- a/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt +++ b/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt @@ -17,7 +17,7 @@ import net.corda.flow.mapper.impl.executor.SessionEventExecutor import net.corda.flow.mapper.impl.executor.SessionInitProcessor import net.corda.flow.mapper.impl.executor.StartFlowExecutor import net.corda.libs.configuration.SmartConfig -import net.corda.schema.Schemas.Flow.FLOW_START +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -66,7 +66,7 @@ class FlowMapperEventExecutorFactoryImpl @Activate constructor( } } - is StartFlow -> StartFlowExecutor(eventKey, FLOW_START, flowMapperEventPayload, state) + is StartFlow -> StartFlowExecutor(eventKey, FLOW_EVENT_TOPIC, flowMapperEventPayload, state) is ExecuteCleanup -> ExecuteCleanupEventExecutor(eventKey) is ScheduleCleanup -> ScheduleCleanupEventExecutor(eventKey, flowMapperEventPayload, state) diff --git a/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/RecordFactoryImpl.kt b/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/RecordFactoryImpl.kt index 488a7940e64..d64f71a9f49 100644 --- a/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/RecordFactoryImpl.kt +++ b/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/RecordFactoryImpl.kt @@ -97,10 +97,10 @@ class RecordFactoryImpl @Activate constructor( private fun getSessionEventOutputTopic(sessionEvent: SessionEvent): String { return when (sessionEvent.messageDirection) { - MessageDirection.INBOUND -> Schemas.Flow.FLOW_SESSION + MessageDirection.INBOUND -> Schemas.Flow.FLOW_EVENT_TOPIC MessageDirection.OUTBOUND -> { if (isLocalCluster(sessionEvent)) { - Schemas.Flow.FLOW_MAPPER_SESSION_IN + Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC } else { Schemas.P2P.P2P_OUT_TOPIC } @@ -120,8 +120,8 @@ class RecordFactoryImpl @Activate constructor( ) : Record<*, *> { val outputTopic = getSessionEventOutputTopic(sourceEvent) val (newDirection, sessionId) = when (outputTopic) { - Schemas.Flow.FLOW_MAPPER_SESSION_IN -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId)) - Schemas.Flow.FLOW_SESSION -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId) + Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId)) + Schemas.Flow.FLOW_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId) else -> Pair(MessageDirection.OUTBOUND, sourceEvent.sessionId) } val sequenceNumber = if (newPayload is SessionError) null else sourceEvent.sequenceNum @@ -136,14 +136,14 @@ class RecordFactoryImpl @Activate constructor( sourceEvent.contextSessionProperties ) return when (outputTopic) { - Schemas.Flow.FLOW_SESSION -> { + Schemas.Flow.FLOW_EVENT_TOPIC -> { if (flowId == null) { throw IllegalArgumentException("Flow ID is required to forward an event back to the flow event" + "topic, but it was not provided.") } Record(outputTopic, flowId, FlowEvent(flowId, sessionEvent)) } - Schemas.Flow.FLOW_MAPPER_SESSION_IN -> { + Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> { Record(outputTopic, sessionEvent.sessionId, FlowMapperEvent(sessionEvent)) } Schemas.P2P.P2P_OUT_TOPIC -> { diff --git a/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/RecordFactoryImplTest.kt b/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/RecordFactoryImplTest.kt index f33d096ba6d..4ce1a362f65 100644 --- a/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/RecordFactoryImplTest.kt +++ b/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/RecordFactoryImplTest.kt @@ -27,6 +27,7 @@ import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.lang.IllegalArgumentException import java.nio.ByteBuffer import java.time.Instant @@ -97,7 +98,7 @@ internal class RecordFactoryImplTest { "my-flow-id" ) assertThat(record).isNotNull - assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN) + assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC) assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class) verify(locallyHostedIdentitiesServiceSameCluster).isHostedLocally(bobId.toCorda()) val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent @@ -164,7 +165,7 @@ internal class RecordFactoryImplTest { flowConfig, FLOW_ID ) - assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION) + assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC) assertThat(record.key).isEqualTo(FLOW_ID) assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java) val sessionOutput = (record.value as FlowEvent).payload as SessionEvent @@ -193,7 +194,7 @@ internal class RecordFactoryImplTest { FLOW_ID ) assertThat(record).isNotNull - assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN) + assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC) assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class) val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent assertThat(sessionOutput.sessionId).isEqualTo("$SESSION_ID-INITIATED") @@ -248,7 +249,7 @@ internal class RecordFactoryImplTest { flowConfig, FLOW_ID ) - assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION) + assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC) assertThat(record.key).isEqualTo(FLOW_ID) assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java) val sessionOutput = (record.value as FlowEvent).payload as SessionEvent @@ -315,7 +316,7 @@ internal class RecordFactoryImplTest { timestamp, flowConfig, ) - assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN) + assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC) assertThat(record.key).isEqualTo("$SESSION_ID-INITIATED") assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class) val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent diff --git a/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/executor/SessionInitProcessorTest.kt b/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/executor/SessionInitProcessorTest.kt index 208f8756f48..ee944955522 100644 --- a/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/executor/SessionInitProcessorTest.kt +++ b/components/flow/flow-mapper-impl/src/test/kotlin/net/corda/flow/mapper/impl/executor/SessionInitProcessorTest.kt @@ -29,7 +29,7 @@ class SessionInitProcessorTest { flowId: String ): Record<*, *> { return if (sourceEvent.messageDirection == MessageDirection.INBOUND) { - Record(Schemas.Flow.FLOW_SESSION, flowId, FlowEvent(flowId, sourceEvent)) + Record(Schemas.Flow.FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, sourceEvent)) } else { Record(Schemas.P2P.P2P_OUT_TOPIC, "sessionId", "") } @@ -79,7 +79,7 @@ class SessionInitProcessorTest { Assertions.assertThat(outboundEvents.size).isEqualTo(1) val outboundEvent = outboundEvents.first() - Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_SESSION) + Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC) Assertions.assertThat(outboundEvent.key::class).isEqualTo(String::class) Assertions.assertThat(outboundEvent.value!!::class).isEqualTo(FlowEvent::class) Assertions.assertThat(payload.sessionId).isEqualTo("sessionId-INITIATED") diff --git a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt index edc392a5064..1cea67f0b38 100644 --- a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt +++ b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt @@ -37,9 +37,7 @@ import net.corda.messaging.api.subscription.config.SubscriptionConfig import net.corda.messaging.api.subscription.factory.SubscriptionFactory import net.corda.schema.Schemas.Config.CONFIG_TOPIC import net.corda.schema.Schemas.Flow.FLOW_MAPPER_CLEANUP_TOPIC -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC import net.corda.schema.Schemas.ScheduledTask import net.corda.schema.configuration.BootConfig.BOOT_MAX_ALLOWED_MSG_SIZE @@ -147,7 +145,7 @@ class FlowMapperServiceIntegrationTest { } } - //@Test + @Test fun `Test first session event outbound sets up flow mapper state, verify subsequent messages received are passed to flow event topic` () { val testId = "test1" @@ -158,7 +156,7 @@ class FlowMapperServiceIntegrationTest { //send 2 session init val sessionDataAndInitEvent = Record( - FLOW_MAPPER_SESSION_OUT, testSessionId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent( buildSessionEvent( MessageDirection.OUTBOUND, testSessionId, 1, SessionData(ByteBuffer.wrap("bytes".toByteArray()), SessionInit( testCpiId, testFlowId, emptyKeyValuePairList(), emptyKeyValuePairList() @@ -183,7 +181,7 @@ class FlowMapperServiceIntegrationTest { //send data back val sessionDataEvent = Record( - FLOW_MAPPER_SESSION_IN, testSessionId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent( buildSessionEvent( MessageDirection.INBOUND, testSessionId, @@ -209,7 +207,7 @@ class FlowMapperServiceIntegrationTest { flowEventMediator.close() } - //@Test + @Test fun testStartRPCDuplicatesAndCleanup() { val testId = "test2" val publisher = publisherFactory.createPublisher(PublisherConfig(testId), messagingConfig) @@ -230,7 +228,7 @@ class FlowMapperServiceIntegrationTest { ) val startRPCEvent = Record( - FLOW_MAPPER_START, testId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent( StartFlow( context, "" @@ -279,14 +277,14 @@ class FlowMapperServiceIntegrationTest { flowEventMediator.close() } - //@Test + @Test fun testNoStateForMapper() { val testId = "test3" val publisher = publisherFactory.createPublisher(PublisherConfig(testId), messagingConfig) //send data, no state val sessionDataEvent = Record( - FLOW_MAPPER_SESSION_OUT, testId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent( buildSessionEvent( MessageDirection.OUTBOUND, testId, @@ -319,7 +317,7 @@ class FlowMapperServiceIntegrationTest { //send 2 session init, 1 is duplicate val sessionInitEvent = Record( - FLOW_MAPPER_SESSION_OUT, testSessionId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent( buildSessionEvent( MessageDirection.OUTBOUND, testSessionId, 1, SessionCounterpartyInfoRequest(SessionInit( testCpiId, testFlowId, emptyKeyValuePairList(), emptyKeyValuePairList() @@ -347,7 +345,7 @@ class FlowMapperServiceIntegrationTest { //send data back val sessionDataEvent = Record( - FLOW_MAPPER_SESSION_IN, testSessionId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testSessionId, FlowMapperEvent( buildSessionEvent( MessageDirection.INBOUND, testSessionId, @@ -375,14 +373,14 @@ class FlowMapperServiceIntegrationTest { } - //@Test + @Test fun `when the flow mapper receives an inbound session message for a non-existent session, an error is returned`() { val testId = "test5" val publisher = publisherFactory.createPublisher(PublisherConfig(testId), messagingConfig) //send data, no state val sessionDataEvent = Record( - FLOW_MAPPER_SESSION_IN, testId, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, testId, FlowMapperEvent( buildSessionEvent( MessageDirection.INBOUND, testId, @@ -398,7 +396,7 @@ class FlowMapperServiceIntegrationTest { val mapperLatch = CountDownLatch(2) // The initial message and the error back. val records = mutableListOf() val mapperSub = subscriptionFactory.createPubSubSubscription( - SubscriptionConfig("$testId-mapper", FLOW_MAPPER_SESSION_IN), + SubscriptionConfig("$testId-mapper", FLOW_MAPPER_EVENT_TOPIC), TestFlowMapperProcessor(mapperLatch, records), messagingConfig ) @@ -418,7 +416,7 @@ class FlowMapperServiceIntegrationTest { assertThat(event.payload).isInstanceOf(SessionError::class.java) } - //@Test + @Test fun `mapper state cleanup correctly cleans up old states`() { // Create a state in the state manager. Note the modified time has to be further in the past than the configured diff --git a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt index f38136ef2aa..5ef089f6d79 100644 --- a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt @@ -14,9 +14,8 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory import net.corda.messaging.api.processor.StateAndEventProcessor -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_SESSION -import net.corda.schema.Schemas.Flow.FLOW_START +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.configuration.MessagingConfig import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -62,10 +61,7 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor( .messagingConfig(messagingConfig) .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_START, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_SESSION, CONSUMER_GROUP, messagingConfig + FLOW_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig ), ) .clientFactories( @@ -85,7 +81,7 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor( MessageRouter { message -> when (val event = message.payload) { - is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_OUT) + is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC) else -> { val eventType = event?.let { it::class.java } throw IllegalStateException("No route defined for event type [$eventType]") diff --git a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt index bc9be7607e6..888a6ea3d1a 100644 --- a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt @@ -1,7 +1,6 @@ package net.corda.session.mapper.messaging.mediator import net.corda.data.flow.event.FlowEvent -import net.corda.data.flow.event.StartFlow import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.state.mapper.FlowMapperState import net.corda.data.p2p.app.AppMessage @@ -16,11 +15,8 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory import net.corda.messaging.api.processor.StateAndEventProcessor -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START -import net.corda.schema.Schemas.Flow.FLOW_SESSION -import net.corda.schema.Schemas.Flow.FLOW_START +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC import net.corda.schema.configuration.FlowConfig import net.corda.session.mapper.service.executor.FlowMapperMessageProcessor @@ -67,13 +63,7 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( .messagingConfig(messagingConfig) .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_MAPPER_START, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_MAPPER_SESSION_IN, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_MAPPER_SESSION_OUT, CONSUMER_GROUP, messagingConfig + FLOW_MAPPER_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig ), ) .clientFactories( @@ -94,14 +84,8 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( MessageRouter { message -> when (val event = message.payload) { is AppMessage -> routeTo(messageBusClient, P2P_OUT_TOPIC) - is FlowEvent -> { - if (event.payload is StartFlow) { - routeTo(messageBusClient, FLOW_START) - } else { - routeTo(messageBusClient, FLOW_SESSION) - } - } - is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_IN) + is FlowEvent -> routeTo(messageBusClient, FLOW_EVENT_TOPIC) + is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC) else -> { val eventType = event?.let { it::class.java } throw IllegalStateException("No route defined for event type [$eventType]") diff --git a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/service/executor/FlowMapperListener.kt b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/service/executor/FlowMapperListener.kt index a5e815dda40..67de181aee1 100644 --- a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/service/executor/FlowMapperListener.kt +++ b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/service/executor/FlowMapperListener.kt @@ -6,7 +6,7 @@ import net.corda.data.flow.state.mapper.FlowMapperState import net.corda.data.flow.state.mapper.FlowMapperStateType import net.corda.messaging.api.records.Record import net.corda.messaging.api.subscription.listener.StateAndEventListener -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.utilities.debug import net.corda.utilities.trace import org.slf4j.LoggerFactory @@ -38,7 +38,7 @@ class FlowMapperListener( publisher?.publish( listOf( Record( - FLOW_MAPPER_SESSION_IN, key, FlowMapperEvent( + FLOW_MAPPER_EVENT_TOPIC, key, FlowMapperEvent( ExecuteCleanup(listOf()) ) ) @@ -81,7 +81,7 @@ class FlowMapperListener( executorService.schedule( { log.debug { "Clearing up mapper state for key $eventKey" } - publisher?.publish(listOf(Record(FLOW_MAPPER_SESSION_IN, eventKey, FlowMapperEvent(ExecuteCleanup(listOf()))))) + publisher?.publish(listOf(Record(FLOW_MAPPER_EVENT_TOPIC, eventKey, FlowMapperEvent(ExecuteCleanup(listOf()))))) }, expiryTime - clock.millis(), TimeUnit.MILLISECONDS diff --git a/components/flow/flow-p2p-filter-service/src/integrationTest/kotlin/net/corda/flow/p2p/filter/integration/FlowFilterServiceIntegrationTest.kt b/components/flow/flow-p2p-filter-service/src/integrationTest/kotlin/net/corda/flow/p2p/filter/integration/FlowFilterServiceIntegrationTest.kt index 8bac03e4bfd..7b474acbd5b 100644 --- a/components/flow/flow-p2p-filter-service/src/integrationTest/kotlin/net/corda/flow/p2p/filter/integration/FlowFilterServiceIntegrationTest.kt +++ b/components/flow/flow-p2p-filter-service/src/integrationTest/kotlin/net/corda/flow/p2p/filter/integration/FlowFilterServiceIntegrationTest.kt @@ -27,7 +27,7 @@ import net.corda.messaging.api.records.Record import net.corda.messaging.api.subscription.config.SubscriptionConfig import net.corda.messaging.api.subscription.factory.SubscriptionFactory import net.corda.schema.Schemas.Config.CONFIG_TOPIC -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.P2P.P2P_IN_TOPIC import net.corda.schema.configuration.BootConfig.BOOT_MAX_ALLOWED_MSG_SIZE import net.corda.schema.configuration.BootConfig.INSTANCE_ID @@ -138,7 +138,7 @@ class FlowFilterServiceIntegrationTest { //validate mapper receives 2 inits val mapperLatch = CountDownLatch(2) val p2pOutSub = subscriptionFactory.createDurableSubscription( - SubscriptionConfig("$testId-flow-mapper", FLOW_MAPPER_SESSION_IN), + SubscriptionConfig("$testId-flow-mapper", FLOW_MAPPER_EVENT_TOPIC), TestFlowSessionFilterProcessor("$testId-INITIATED", mapperLatch, 2), bootConfig, null diff --git a/components/flow/flow-p2p-filter-service/src/main/kotlin/net/corda/flow/p2p/filter/FlowP2PFilterProcessor.kt b/components/flow/flow-p2p-filter-service/src/main/kotlin/net/corda/flow/p2p/filter/FlowP2PFilterProcessor.kt index 0eb0789c2fb..4099e22d74f 100644 --- a/components/flow/flow-p2p-filter-service/src/main/kotlin/net/corda/flow/p2p/filter/FlowP2PFilterProcessor.kt +++ b/components/flow/flow-p2p-filter-service/src/main/kotlin/net/corda/flow/p2p/filter/FlowP2PFilterProcessor.kt @@ -9,7 +9,7 @@ import net.corda.data.p2p.app.AppMessage import net.corda.data.p2p.app.AuthenticatedMessage import net.corda.messaging.api.processor.DurableProcessor import net.corda.messaging.api.records.Record -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.session.manager.Constants.Companion.FLOW_SESSION_SUBSYSTEM import net.corda.session.manager.Constants.Companion.INITIATED_SESSION_ID_SUFFIX import net.corda.tracing.traceEventProcessingNullableSingle @@ -21,7 +21,7 @@ import java.nio.ByteBuffer * Processes events from the P2P.in topic. * If events have a subsystem of "flowSession", payloads are parsed into SessionEvents. * SessionEvent sessionId's are flipped to that of the counterparty, as well as the event key sessionId. - * Messages are forwarded to the flow.mapper.session.in topic + * Messages are forwarded to the flow.mapper.event topic */ class FlowP2PFilterProcessor(cordaAvroSerializationFactory: CordaAvroSerializationFactory) : DurableProcessor { @@ -71,7 +71,7 @@ class FlowP2PFilterProcessor(cordaAvroSerializationFactory: CordaAvroSerializati sessionEvent.messageDirection = MessageDirection.INBOUND val sessionId = toggleSessionId(key) sessionEvent.sessionId = sessionId - Record(FLOW_MAPPER_SESSION_IN, sessionId, FlowMapperEvent(sessionEvent)) + Record(FLOW_MAPPER_EVENT_TOPIC, sessionId, FlowMapperEvent(sessionEvent)) } else { null } diff --git a/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt b/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt index deadd7a07b3..ce41da44409 100644 --- a/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt +++ b/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt @@ -40,7 +40,7 @@ import net.corda.rest.response.ResponseEntity import net.corda.rest.security.CURRENT_REST_CONTEXT import net.corda.rest.ws.DuplexChannel import net.corda.rest.ws.WebSocketValidationException -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.tracing.TraceTag import net.corda.tracing.addTraceContextToRecord @@ -202,7 +202,7 @@ class FlowRestResourceImpl @Activate constructor( val status = messageFactory.createStartFlowStatus(clientRequestId, vNode, flowClassName) val records = listOf( - addTraceContextToRecord(Record(FLOW_MAPPER_START, status.key.toString(), startEvent)), + addTraceContextToRecord(Record(FLOW_MAPPER_EVENT_TOPIC, status.key.toString(), startEvent)), Record(FLOW_STATUS_TOPIC, status.key, status), ) diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt index a5c3665cd45..6394cd846c1 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt @@ -58,7 +58,7 @@ import net.corda.messaging.api.processor.StateAndEventProcessor.State import net.corda.messaging.api.records.Record import net.corda.sandboxgroupcontext.SandboxGroupType.FLOW import net.corda.sandboxgroupcontext.VirtualNodeContext -import net.corda.schema.Schemas.Flow.FLOW_SESSION +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG import net.corda.schema.configuration.FlowConfig import net.corda.schema.configuration.MessagingConfig @@ -513,7 +513,7 @@ class FlowServiceTestContext @Activate constructor( } private fun createFlowEventRecord(key: String, payload: Any): Record { - return Record(FLOW_SESSION, key, FlowEvent(key, payload)) + return Record(FLOW_EVENT_TOPIC, key, FlowEvent(key, payload)) } private fun getCpiIdentifier(cpiId: String): CpiIdentifier { diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/OutputAssertionsImpl.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/OutputAssertionsImpl.kt index 5136baf9ce9..246df3eaa6d 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/OutputAssertionsImpl.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/OutputAssertionsImpl.kt @@ -445,7 +445,7 @@ class OutputAssertionsImpl( response: StateAndEventProcessor.Response, ): List { return response.responseEvents - .filter { it.key == flowId || it.topic == Schemas.Flow.FLOW_SESSION || it.value is FlowEvent } + .filter { it.key == flowId || it.topic == Schemas.Flow.FLOW_EVENT_TOPIC || it.value is FlowEvent } .map { it.value as FlowEvent } } @@ -454,7 +454,7 @@ class OutputAssertionsImpl( ): List> { @Suppress("unchecked_cast") return response.responseEvents - .filter { it.topic == Schemas.Flow.FLOW_MAPPER_SESSION_OUT && it.value is FlowMapperEvent } + .filter { it.topic == Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC && it.value is FlowMapperEvent } as List> } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/application/crypto/external/events/CryptoFlowOpsTransformerService.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/application/crypto/external/events/CryptoFlowOpsTransformerService.kt index ece2a786ef5..5c9c67afc1c 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/application/crypto/external/events/CryptoFlowOpsTransformerService.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/application/crypto/external/events/CryptoFlowOpsTransformerService.kt @@ -13,6 +13,6 @@ class CryptoFlowOpsTransformerService @Activate constructor( cryptoFlowOpsTransformerFactory: CryptoFlowOpsTransformerFactory, ) : CryptoFlowOpsTransformer by cryptoFlowOpsTransformerFactory.create( requestingComponent = "Flow worker", - responseTopic = Schemas.Flow.FLOW_SESSION, + responseTopic = Schemas.Flow.FLOW_EVENT_TOPIC, requestValidityWindowSeconds = 300 ) \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index af60b311404..8c2968b55e7 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -32,9 +32,7 @@ import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_SESSION -import net.corda.schema.Schemas.Flow.FLOW_START +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.configuration.BootConfig.CRYPTO_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.PERSISTENCE_WORKER_REST_ENDPOINT @@ -93,12 +91,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .name("FlowEventMediator") .messagingConfig(messagingConfig) .consumerFactories( - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_START, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_SESSION, CONSUMER_GROUP, messagingConfig - ), mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( FLOW_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig ), @@ -130,7 +122,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor( MessageRouter { message -> when (val event = message.event()) { is EntityRequest -> routeTo(rpcClient, rpcEndpoint(PERSISTENCE_WORKER_REST_ENDPOINT, PERSISTENCE_PATH)) - is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_OUT) + is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC) is FlowOpsRequest -> routeTo(rpcClient, rpcEndpoint(CRYPTO_WORKER_REST_ENDPOINT, CRYPTO_PATH)) is FlowStatus -> routeTo(messageBusClient, FLOW_STATUS_TOPIC) is LedgerPersistenceRequest -> routeTo(rpcClient, rpcEndpoint(PERSISTENCE_WORKER_REST_ENDPOINT, LEDGER_PATH)) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/factory/impl/FlowRecordFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/factory/impl/FlowRecordFactoryImpl.kt index 16c2ec4cd4f..caf71f1e4ae 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/factory/impl/FlowRecordFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/factory/impl/FlowRecordFactoryImpl.kt @@ -6,8 +6,8 @@ import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.output.FlowStatus import net.corda.flow.pipeline.factory.FlowRecordFactory import net.corda.messaging.api.records.Record -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_SESSION +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import org.osgi.service.component.annotations.Component @@ -16,7 +16,7 @@ class FlowRecordFactoryImpl : FlowRecordFactory { override fun createFlowEventRecord(flowId: String, payload: Any): Record { return Record( - topic = FLOW_SESSION, + topic = FLOW_EVENT_TOPIC, key = flowId, value = FlowEvent(flowId, payload) ) @@ -32,7 +32,7 @@ class FlowRecordFactoryImpl : FlowRecordFactory { override fun createFlowMapperEventRecord(key: String, payload: Any): Record<*, FlowMapperEvent> { return Record( - topic = FLOW_MAPPER_SESSION_OUT, + topic = FLOW_MAPPER_EVENT_TOPIC, key = key, value = FlowMapperEvent(payload) ) diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt index e7bb8f815a2..06c2a0b083d 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt @@ -29,7 +29,7 @@ import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MessagingClientFinder import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.configuration.ConfigKeys import net.corda.schema.configuration.FlowConfig @@ -95,7 +95,7 @@ class FlowEventMediatorFactoryImplTest { val router = config.messageRouterFactory.create(clientFinder) assertThat(router.getDestination(MediatorMessage(FlowEvent())).endpoint).isEqualTo(FLOW_EVENT_TOPIC) assertThat(router.getDestination(MediatorMessage(FlowMapperEvent())).endpoint) - .isEqualTo(FLOW_MAPPER_SESSION_OUT) + .isEqualTo(FLOW_MAPPER_EVENT_TOPIC) assertThat(router.getDestination(MediatorMessage(EntityRequest())).endpoint) .isEqualTo(endpoint(PERSISTENCE_PATH)) assertThat(router.getDestination(MediatorMessage(FlowOpsRequest())).endpoint) diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/factory/FlowRecordFactoryImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/factory/FlowRecordFactoryImplTest.kt index b4df3877be4..fad69ab4b68 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/factory/FlowRecordFactoryImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/factory/FlowRecordFactoryImplTest.kt @@ -9,8 +9,8 @@ import net.corda.data.flow.output.FlowStatus import net.corda.data.identity.HoldingIdentity import net.corda.flow.pipeline.factory.impl.FlowRecordFactoryImpl import net.corda.messaging.api.records.Record -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_SESSION +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test @@ -19,7 +19,7 @@ class FlowRecordFactoryImplTest { @Test fun `create flow event record`() { - val expected = Record(FLOW_SESSION, "flowId", FlowEvent("flowId", 3)) + val expected = Record(FLOW_EVENT_TOPIC, "flowId", FlowEvent("flowId", 3)) assertThat(FlowRecordFactoryImpl().createFlowEventRecord("flowId", 3)).isEqualTo(expected) } @@ -33,14 +33,14 @@ class FlowRecordFactoryImplTest { @Test fun `create flow mapper event record with session event`() { val sessionEvent = SessionEvent().apply { sessionId = "id1" } - val expected = Record(FLOW_MAPPER_SESSION_OUT, sessionEvent.sessionId, FlowMapperEvent(sessionEvent)) + val expected = Record(FLOW_MAPPER_EVENT_TOPIC, sessionEvent.sessionId, FlowMapperEvent(sessionEvent)) assertThat(FlowRecordFactoryImpl().createFlowMapperEventRecord(sessionEvent.sessionId, sessionEvent)).isEqualTo(expected) } @Test fun `create flow mapper event record with schedule cleanup event`() { val cleanup = ScheduleCleanup(1000) - val expected = Record(FLOW_MAPPER_SESSION_OUT, "flowKey.toString", FlowMapperEvent(cleanup)) + val expected = Record(FLOW_MAPPER_EVENT_TOPIC, "flowKey.toString", FlowMapperEvent(cleanup)) assertThat(FlowRecordFactoryImpl().createFlowMapperEventRecord("flowKey.toString", cleanup)).isEqualTo(expected) } } diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt index 373db46dc0c..64d35755599 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt @@ -173,7 +173,7 @@ class FlowEventExceptionProcessorImplTest { val key = FlowKey() val flowStatusUpdateRecord = Record("", key, flowStatusUpdate) val flowMapperEvent = mock() - val flowMapperRecord = Record(Schemas.Flow.FLOW_MAPPER_SESSION_OUT, "key", flowMapperEvent) + val flowMapperRecord = Record(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC, "key", flowMapperEvent) whenever( flowMessageFactory.createFlowFailedStatusMessage( diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt index a86febaafe7..a3208b8ca9a 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt @@ -33,7 +33,7 @@ import net.corda.flow.test.utils.buildFlowEventContext import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.State import net.corda.messaging.api.records.Record -import net.corda.schema.Schemas.Flow.FLOW_SESSION +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach @@ -89,7 +89,7 @@ class FlowEventProcessorImplTest { private val flowState: FlowState = mock() private val flowStartContext: FlowStartContext = mock() private val externalEventState: ExternalEventState = mock() - private val outputRecords = listOf(Record(FLOW_SESSION, "key", "value")) + private val outputRecords = listOf(Record(FLOW_EVENT_TOPIC, "key", "value")) private val updatedContext = buildFlowEventContext( flowCheckpoint, payload, @@ -384,6 +384,6 @@ class FlowEventProcessorImplTest { } private fun getFlowEventRecord(flowEvent: FlowEvent?): Record { - return Record(FLOW_SESSION, flowKey, flowEvent) + return Record(FLOW_EVENT_TOPIC, flowKey, flowEvent) } } \ No newline at end of file diff --git a/libs/flows/external-event-responses-impl/src/main/kotlin/net/corda/flow/external/events/responses/impl/factory/ExternalEventResponseFactoryImpl.kt b/libs/flows/external-event-responses-impl/src/main/kotlin/net/corda/flow/external/events/responses/impl/factory/ExternalEventResponseFactoryImpl.kt index d28bbc5c29c..b6f4073d530 100644 --- a/libs/flows/external-event-responses-impl/src/main/kotlin/net/corda/flow/external/events/responses/impl/factory/ExternalEventResponseFactoryImpl.kt +++ b/libs/flows/external-event-responses-impl/src/main/kotlin/net/corda/flow/external/events/responses/impl/factory/ExternalEventResponseFactoryImpl.kt @@ -1,5 +1,6 @@ package net.corda.flow.external.events.responses.impl.factory +import java.nio.ByteBuffer import net.corda.avro.serialization.CordaAvroSerializationFactory import net.corda.avro.serialization.CordaAvroSerializer import net.corda.data.ExceptionEnvelope @@ -16,7 +17,6 @@ import net.corda.utilities.time.UTCClock import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference -import java.nio.ByteBuffer @Component(service = [ExternalEventResponseFactory::class]) class ExternalEventResponseFactoryImpl( @@ -128,7 +128,7 @@ class ExternalEventResponseFactoryImpl( response: ExternalEventResponse ): Record { return Record( - Schemas.Flow.FLOW_SESSION, + Schemas.Flow.FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, response) ) diff --git a/libs/flows/external-event-responses-impl/src/test/kotlin/net/corda/flow/eternal/events/responses/impl/factory/ExternalEventResponseFactoryImplTest.kt b/libs/flows/external-event-responses-impl/src/test/kotlin/net/corda/flow/eternal/events/responses/impl/factory/ExternalEventResponseFactoryImplTest.kt index 4994ca13923..a74c8918bd3 100644 --- a/libs/flows/external-event-responses-impl/src/test/kotlin/net/corda/flow/eternal/events/responses/impl/factory/ExternalEventResponseFactoryImplTest.kt +++ b/libs/flows/external-event-responses-impl/src/test/kotlin/net/corda/flow/eternal/events/responses/impl/factory/ExternalEventResponseFactoryImplTest.kt @@ -1,5 +1,8 @@ package net.corda.flow.eternal.events.responses.impl.factory +import java.nio.ByteBuffer +import java.time.Instant +import java.time.temporal.ChronoUnit import net.corda.avro.serialization.CordaAvroSerializer import net.corda.data.ExceptionEnvelope import net.corda.data.KeyValuePairList @@ -18,9 +21,6 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.mock import org.mockito.kotlin.whenever -import java.nio.ByteBuffer -import java.time.Instant -import java.time.temporal.ChronoUnit class ExternalEventResponseFactoryImplTest { @@ -51,7 +51,7 @@ class ExternalEventResponseFactoryImplTest { val flowEvent = record.value!! val response = flowEvent.payload as ExternalEventResponse - assertEquals(Schemas.Flow.FLOW_SESSION, record.topic) + assertEquals(Schemas.Flow.FLOW_EVENT_TOPIC, record.topic) assertEquals(EXTERNAL_EVENT_CONTEXT.flowId, record.key) assertEquals(EXTERNAL_EVENT_CONTEXT.flowId, flowEvent.flowId) assertEquals(EXTERNAL_EVENT_CONTEXT.requestId, response.requestId) @@ -75,7 +75,7 @@ class ExternalEventResponseFactoryImplTest { val flowEvent = record.value!! val response = flowEvent.payload as ExternalEventResponse - assertEquals(Schemas.Flow.FLOW_SESSION, record.topic) + assertEquals(Schemas.Flow.FLOW_EVENT_TOPIC, record.topic) assertEquals(EXTERNAL_EVENT_CONTEXT.flowId, record.key) assertEquals(EXTERNAL_EVENT_CONTEXT.flowId, flowEvent.flowId) assertEquals(EXTERNAL_EVENT_CONTEXT.requestId, response.requestId) @@ -136,7 +136,7 @@ class ExternalEventResponseFactoryImplTest { val flowEvent = record.value!! val response = flowEvent.payload as ExternalEventResponse - assertEquals(Schemas.Flow.FLOW_SESSION, record.topic) + assertEquals(Schemas.Flow.FLOW_EVENT_TOPIC, record.topic) assertEquals(EXTERNAL_EVENT_CONTEXT.flowId, record.key) assertEquals(EXTERNAL_EVENT_CONTEXT.flowId, flowEvent.flowId) assertEquals(EXTERNAL_EVENT_CONTEXT.requestId, response.requestId) diff --git a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf index 1d6fbfdd6f7..f105646011c 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf +++ b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf @@ -19,7 +19,7 @@ consumer = ${common} { enable.auto.commit = false # Retrieve 500 records maximum per poll. Note that poll will return immediately if any records are available, so a # batch may contain fewer than 500 records. - max.poll.records = 100 + max.poll.records = 500 # Time to allow between polls on a consumer before a rebalance occurs that removes this consumer's partitions. max.poll.interval.ms = 300000 # Timeout of heartbeats between the consumer and the broker. If no heartbeat is received in this timeframe, a @@ -29,7 +29,7 @@ consumer = ${common} { heartbeat.interval.ms = 20000 # The maximum amount of time kafka broker will wait before answering a consumer request if there hasn't been # an update to the topic - fetch.max.wait.ms = 20 + fetch.max.wait.ms = 1500 } # Defaults for all producers. @@ -78,10 +78,10 @@ roles { # within this pattern is transactional by default, and the transaction commit also causes a flush. producer = ${producer} { # Maximum time to wait for additional messages before sending the current batch. - linger.ms = 50 + linger.ms = 0 # Maximum amount of memory in bytes (not messages) that will be used for each batch. Can not be higher than # the value configured for "message.max.bytes" on the broker side (1mb by default). - batch.size = 204800 + batch.size = 750000 } } eventLog { diff --git a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-enforced.conf b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-enforced.conf index f61137ae30a..7c9de507d37 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-enforced.conf +++ b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-enforced.conf @@ -69,11 +69,6 @@ roles { eventConsumer = ${consumer} { # Need to be able to distinguish between the state and event consumers for this pattern. client.id = eventConsumer--${clientId} - # Extra suffix to prevent clashes within consumer groups with more than one member. - group.id = ${group}-cooperative - # Identical to StickyAssignor but supports cooperative rebalances (consumers can continue consuming from - # the partitions that are not reassigned). - partition.assignment.strategy = org.apache.kafka.clients.consumer.CooperativeStickyAssignor } producer = ${producer} } diff --git a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt index e64b50d8a20..27ca3dc12eb 100644 --- a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt +++ b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt @@ -73,8 +73,7 @@ class MessageBusConfigResolverTest { mapOf( BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", - CLIENT_ID_PROP to "eventConsumer--$CLIENT_ID", - GROUP_ID_PROP to "group-cooperative" + CLIENT_ID_PROP to "eventConsumer--$CLIENT_ID" ) ), ConsumerRoles.EVENT_LOG to getExpectedConsumerProperties( diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 4751d951a48..c8a7f2da992 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -20,7 +20,6 @@ import net.corda.taskmanager.TaskManager import net.corda.utilities.debug import org.slf4j.LoggerFactory import java.lang.Thread.sleep -import java.time.Duration import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean @@ -61,9 +60,6 @@ class MultiSourceEventMediatorImpl( private val stopped = AtomicBoolean(false) private val running = AtomicBoolean(false) - // TODO This timeout was set with CORE-17768 (changing configuration value would affect other messaging patterns) - // This should be reverted to use configuration value once event mediator polling is refactored (planned for 5.2) - private val pollTimeout = Duration.ofMillis(20) override fun start() { log.debug { "Starting multi-source event mediator with config: $config" } @@ -196,7 +192,7 @@ class MultiSourceEventMediatorImpl( private fun pollConsumers(): List> { return metrics.pollTimer.recordCallable { consumers.map { consumer -> - consumer.poll(pollTimeout) + consumer.poll(config.pollTimeout) }.flatten() }!! } diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/MessageRouter.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/MessageRouter.kt index b904982e68e..ccb69988a9c 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/MessageRouter.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/MessageRouter.kt @@ -7,7 +7,7 @@ package net.corda.messaging.api.mediator * ``` * MessageRouter { message -> * when (message.payload) { - * is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_OUT_TOPIC) + * is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC) * is FlowStatus -> routeTo(messageBusClient, FLOW_STATUS_TOPIC) * else -> throw IllegalStateException("No route defined for message $message") * } diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MessageRouterFactory.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MessageRouterFactory.kt index 7a3a7d165f9..5b83c951625 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MessageRouterFactory.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MessageRouterFactory.kt @@ -18,7 +18,7 @@ fun interface MessageRouterFactory { * * MessageRouter { message -> * when (message.payload) { - * is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_OUT_TOPIC) + * is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_EVENT_TOPIC) * is FlowStatus -> routeTo(messageBusClient, FLOW_STATUS_TOPIC) * else -> throw IllegalStateException("No route defined for message $message") * } diff --git a/processors/crypto-processor/src/integrationTest/kotlin/net/corda/processors/crypto/tests/CryptoProcessorTests.kt b/processors/crypto-processor/src/integrationTest/kotlin/net/corda/processors/crypto/tests/CryptoProcessorTests.kt index dda10c882d0..99748206a98 100644 --- a/processors/crypto-processor/src/integrationTest/kotlin/net/corda/processors/crypto/tests/CryptoProcessorTests.kt +++ b/processors/crypto-processor/src/integrationTest/kotlin/net/corda/processors/crypto/tests/CryptoProcessorTests.kt @@ -1,7 +1,6 @@ package net.corda.processors.crypto.tests import com.typesafe.config.ConfigRenderOptions -import net.corda.avro.serialization.CordaAvroSerializationFactory import net.corda.crypto.cipher.suite.CipherSchemeMetadata import net.corda.crypto.cipher.suite.SignatureSpecImpl import net.corda.crypto.cipher.suite.SignatureSpecs @@ -22,6 +21,7 @@ import net.corda.crypto.hes.EphemeralKeyPairEncryptor import net.corda.crypto.hes.HybridEncryptionParams import net.corda.crypto.hes.StableKeyPairDecryptor import net.corda.crypto.persistence.db.model.CryptoEntities +import net.corda.avro.serialization.CordaAvroSerializationFactory import net.corda.data.KeyValuePairList import net.corda.data.config.Configuration import net.corda.data.config.ConfigurationSchemaVersion @@ -248,7 +248,7 @@ class CryptoProcessorTests { flowOpsResponsesSub = subscriptionFactory.createDurableSubscription( subscriptionConfig = SubscriptionConfig( groupName = "TEST", - eventTopic = Schemas.Flow.FLOW_SESSION + eventTopic = Schemas.Flow.FLOW_EVENT_TOPIC ), processor = flowOpsResponses, messagingConfig = messagingConfig, diff --git a/testing/flow/external-events/src/main/kotlin/net/corda/test/flow/external/events/TestExternalEventResponseMonitor.kt b/testing/flow/external-events/src/main/kotlin/net/corda/test/flow/external/events/TestExternalEventResponseMonitor.kt index 9885eea1144..2bbeca21cd2 100644 --- a/testing/flow/external-events/src/main/kotlin/net/corda/test/flow/external/events/TestExternalEventResponseMonitor.kt +++ b/testing/flow/external-events/src/main/kotlin/net/corda/test/flow/external/events/TestExternalEventResponseMonitor.kt @@ -43,7 +43,7 @@ class TestExternalEventResponseMonitor( val responses = requestIds.associateWith { CompletableFuture() } val responseSubscription = subscriptionFactory.createCompactedSubscription( - SubscriptionConfig(SUBSCRIPTION_GROUP_NAME, Schemas.Flow.FLOW_SESSION), + SubscriptionConfig(SUBSCRIPTION_GROUP_NAME, Schemas.Flow.FLOW_EVENT_TOPIC), object : CompactedProcessor { override val keyClass = String::class.java override val valueClass = FlowEvent::class.java