From 8b57a3250ec86a3684cad44636774f83f6636098 Mon Sep 17 00:00:00 2001 From: Miljenko Brkic Date: Mon, 6 Nov 2023 09:03:22 +0000 Subject: [PATCH 1/2] CORE-17768 No dedicated topics for flow start event --- .../impl/FlowMapperEventExecutorFactoryImpl.kt | 5 +++-- .../mediator/FlowMapperEventMediatorFactoryImpl.kt | 13 ++----------- .../mediator/FlowEventMediatorFactoryImpl.kt | 3 +-- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt b/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt index efd465ba2c9..5923cd39262 100644 --- a/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt +++ b/components/flow/flow-mapper-impl/src/main/kotlin/net/corda/flow/mapper/impl/FlowMapperEventExecutorFactoryImpl.kt @@ -17,7 +17,7 @@ import net.corda.flow.mapper.impl.executor.SessionEventExecutor import net.corda.flow.mapper.impl.executor.SessionInitProcessor import net.corda.flow.mapper.impl.executor.StartFlowExecutor import net.corda.libs.configuration.SmartConfig -import net.corda.schema.Schemas.Flow.FLOW_START +import net.corda.schema.Schemas import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -66,7 +66,8 @@ class FlowMapperEventExecutorFactoryImpl @Activate constructor( } } - is StartFlow -> StartFlowExecutor(eventKey, FLOW_START, flowMapperEventPayload, state) + is StartFlow -> StartFlowExecutor(eventKey, + Schemas.Flow.FLOW_MAPPER_SESSION_IN, flowMapperEventPayload, state) is ExecuteCleanup -> ExecuteCleanupEventExecutor(eventKey) is ScheduleCleanup -> ScheduleCleanupEventExecutor(eventKey, flowMapperEventPayload, state) diff --git a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt index ee0226c2491..5a9dba03a4c 100644 --- a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt @@ -1,7 +1,6 @@ package net.corda.session.mapper.messaging.mediator import net.corda.data.flow.event.FlowEvent -import net.corda.data.flow.event.StartFlow import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.state.mapper.FlowMapperState import net.corda.data.p2p.app.AppMessage @@ -18,9 +17,7 @@ import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START import net.corda.schema.Schemas.Flow.FLOW_SESSION -import net.corda.schema.Schemas.Flow.FLOW_START import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC import net.corda.schema.configuration.FlowConfig import net.corda.session.mapper.service.executor.FlowMapperMessageProcessor @@ -68,7 +65,7 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( listOf( - FLOW_MAPPER_START, + // FLOW_MAPPER_START, FLOW_MAPPER_SESSION_IN, FLOW_MAPPER_SESSION_OUT, ), @@ -94,13 +91,7 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( MessageRouter { message -> when (val event = message.payload) { is AppMessage -> routeTo(messageBusClient, P2P_OUT_TOPIC) - is FlowEvent -> { - if (event.payload is StartFlow) { - routeTo(messageBusClient, FLOW_START) - } else { - routeTo(messageBusClient, FLOW_SESSION) - } - } + is FlowEvent -> routeTo(messageBusClient, FLOW_SESSION) is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_IN) else -> { val eventType = event?.let { it::class.java } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 6ec63d06d83..2c5c19aa12a 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -34,7 +34,6 @@ import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT import net.corda.schema.Schemas.Flow.FLOW_SESSION -import net.corda.schema.Schemas.Flow.FLOW_START import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.configuration.BootConfig.CRYPTO_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.PERSISTENCE_WORKER_REST_ENDPOINT @@ -95,7 +94,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( listOf( - FLOW_START, + // FLOW_START, FLOW_SESSION, FLOW_EVENT_TOPIC, ), From 89d4fcb7a78611cd7bf84e2ade7fb6245d10a0ca Mon Sep 17 00:00:00 2001 From: Miljenko Brkic Date: Mon, 6 Nov 2023 10:36:19 +0000 Subject: [PATCH 2/2] CORE-17768 No dedicated topics for flow start event --- .../net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt b/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt index deadd7a07b3..203a62b746a 100644 --- a/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt +++ b/components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt @@ -40,7 +40,7 @@ import net.corda.rest.response.ResponseEntity import net.corda.rest.security.CURRENT_REST_CONTEXT import net.corda.rest.ws.DuplexChannel import net.corda.rest.ws.WebSocketValidationException -import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START +import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.tracing.TraceTag import net.corda.tracing.addTraceContextToRecord @@ -202,7 +202,7 @@ class FlowRestResourceImpl @Activate constructor( val status = messageFactory.createStartFlowStatus(clientRequestId, vNode, flowClassName) val records = listOf( - addTraceContextToRecord(Record(FLOW_MAPPER_START, status.key.toString(), startEvent)), + addTraceContextToRecord(Record(FLOW_MAPPER_SESSION_IN, status.key.toString(), startEvent)), Record(FLOW_STATUS_TOPIC, status.key, status), )