Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*DO NOT MERGE* Topology changes - One consumer for multiple topics #2 #5043

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import net.corda.flow.mapper.impl.executor.SessionEventExecutor
import net.corda.flow.mapper.impl.executor.SessionInitProcessor
import net.corda.flow.mapper.impl.executor.StartFlowExecutor
import net.corda.libs.configuration.SmartConfig
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
Expand Down Expand Up @@ -66,7 +66,8 @@ class FlowMapperEventExecutorFactoryImpl @Activate constructor(
}
}

is StartFlow -> StartFlowExecutor(eventKey, FLOW_START, flowMapperEventPayload, state)
is StartFlow -> StartFlowExecutor(eventKey,
Schemas.Flow.FLOW_MAPPER_SESSION_IN, flowMapperEventPayload, state)
is ExecuteCleanup -> ExecuteCleanupEventExecutor(eventKey)
is ScheduleCleanup -> ScheduleCleanupEventExecutor(eventKey, flowMapperEventPayload, state)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.corda.session.mapper.messaging.mediator

import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.StartFlow
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.state.mapper.FlowMapperState
import net.corda.data.p2p.app.AppMessage
Expand All @@ -18,9 +17,7 @@ import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.Schemas.Flow.FLOW_SESSION
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC
import net.corda.schema.configuration.FlowConfig
import net.corda.session.mapper.service.executor.FlowMapperMessageProcessor
Expand Down Expand Up @@ -68,7 +65,7 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor(
.consumerFactories(
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
listOf(
FLOW_MAPPER_START,
// FLOW_MAPPER_START,
FLOW_MAPPER_SESSION_IN,
FLOW_MAPPER_SESSION_OUT,
),
Expand All @@ -94,13 +91,7 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor(
MessageRouter { message ->
when (val event = message.payload) {
is AppMessage -> routeTo(messageBusClient, P2P_OUT_TOPIC)
is FlowEvent -> {
if (event.payload is StartFlow) {
routeTo(messageBusClient, FLOW_START)
} else {
routeTo(messageBusClient, FLOW_SESSION)
}
}
is FlowEvent -> routeTo(messageBusClient, FLOW_SESSION)
is FlowMapperEvent -> routeTo(messageBusClient, FLOW_MAPPER_SESSION_IN)
else -> {
val eventType = event?.let { it::class.java }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import net.corda.rest.response.ResponseEntity
import net.corda.rest.security.CURRENT_REST_CONTEXT
import net.corda.rest.ws.DuplexChannel
import net.corda.rest.ws.WebSocketValidationException
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_IN
import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC
import net.corda.tracing.TraceTag
import net.corda.tracing.addTraceContextToRecord
Expand Down Expand Up @@ -202,7 +202,7 @@ class FlowRestResourceImpl @Activate constructor(
val status = messageFactory.createStartFlowStatus(clientRequestId, vNode, flowClassName)

val records = listOf(
addTraceContextToRecord(Record(FLOW_MAPPER_START, status.key.toString(), startEvent)),
addTraceContextToRecord(Record(FLOW_MAPPER_SESSION_IN, status.key.toString(), startEvent)),
Record(FLOW_STATUS_TOPIC, status.key, status),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT
import net.corda.schema.Schemas.Flow.FLOW_SESSION
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC
import net.corda.schema.configuration.BootConfig.CRYPTO_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.PERSISTENCE_WORKER_REST_ENDPOINT
Expand Down Expand Up @@ -95,7 +94,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
.consumerFactories(
mediatorConsumerFactoryFactory.createMessageBusConsumerFactory(
listOf(
FLOW_START,
// FLOW_START,
FLOW_SESSION,
FLOW_EVENT_TOPIC,
),
Expand Down