diff --git a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt index f38136ef2aa..583da14c500 100644 --- a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt @@ -62,10 +62,12 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor( .messagingConfig(messagingConfig) .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_START, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_SESSION, CONSUMER_GROUP, messagingConfig + listOf( + FLOW_START, + FLOW_SESSION, + ), + CONSUMER_GROUP, + messagingConfig ), ) .clientFactories( diff --git a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt index bc9be7607e6..ee0226c2491 100644 --- a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt @@ -67,13 +67,13 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( .messagingConfig(messagingConfig) .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_MAPPER_START, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_MAPPER_SESSION_IN, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_MAPPER_SESSION_OUT, CONSUMER_GROUP, messagingConfig + listOf( + FLOW_MAPPER_START, + FLOW_MAPPER_SESSION_IN, + FLOW_MAPPER_SESSION_OUT, + ), + CONSUMER_GROUP, + messagingConfig ), ) .clientFactories( diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index af60b311404..6ec63d06d83 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -94,13 +94,13 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .messagingConfig(messagingConfig) .consumerFactories( mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_START, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_SESSION, CONSUMER_GROUP, messagingConfig - ), - mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - FLOW_EVENT_TOPIC, CONSUMER_GROUP, messagingConfig + listOf( + FLOW_START, + FLOW_SESSION, + FLOW_EVENT_TOPIC, + ), + CONSUMER_GROUP, + messagingConfig ), ) .clientFactories( diff --git a/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/consumer/ConsumerGroup.kt b/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/consumer/ConsumerGroup.kt index 92fcde64c04..f6dbd37d2b7 100644 --- a/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/consumer/ConsumerGroup.kt +++ b/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/consumer/ConsumerGroup.kt @@ -82,7 +82,7 @@ class ConsumerGroup( return } - consumersToRepartition.forEach { getInternalPartitionListFor(it).clear() } + consumersToRepartition.forEach { getInternalPartitionListFor(it).removeIf { partition -> partition.topic == topic } } var consumerIterator = consumersToRepartition.iterator() topicPartitionsToUpdate.forEach { topicPartition -> diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusConsumer.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusConsumer.kt index 4176b4f5e85..577887391bb 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusConsumer.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusConsumer.kt @@ -10,10 +10,10 @@ import java.time.Duration * Message bus consumer that reads messages from configured topic. */ class MessageBusConsumer( - private val topic: String, + private val topics: List, private val consumer: CordaConsumer, ): MediatorConsumer { - override fun subscribe() = consumer.subscribe(topic) + override fun subscribe() = consumer.subscribe(topics) override fun poll(timeout: Duration): List> = consumer.poll(timeout) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryImpl.kt index 4b581bab371..82866e379d8 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryImpl.kt @@ -16,11 +16,11 @@ class MediatorConsumerFactoryFactoryImpl @Activate constructor( private val cordaConsumerBuilder: CordaConsumerBuilder, ): MediatorConsumerFactoryFactory { override fun createMessageBusConsumerFactory( - topicName: String, + topicNames: List, groupName: String, messageBusConfig: SmartConfig ) = MessageBusConsumerFactory( - topicName, + topicNames, groupName, messageBusConfig, cordaConsumerBuilder, diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactory.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactory.kt index e5519e805b4..871801ed3a8 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactory.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactory.kt @@ -14,13 +14,13 @@ import java.util.UUID /** * Factory for creating multi-source event mediator message bus consumers. * - * @param topicName Topic name. + * @param topicNames Topic names. * @param groupName Consumer group name. * @param messageBusConfig Message bus related configuration. * @param cordaConsumerBuilder [CordaConsumer] builder. */ class MessageBusConsumerFactory( - private val topicName: String, + private val topicNames: List, private val groupName: String, private val messageBusConfig: SmartConfig, private val cordaConsumerBuilder: CordaConsumerBuilder, @@ -29,7 +29,7 @@ class MessageBusConsumerFactory( override fun create(config: MediatorConsumerConfig): MediatorConsumer { val subscriptionType = "MultiSourceSubscription" val uniqueId = UUID.randomUUID().toString() - val clientId = "$subscriptionType--$groupName--$topicName--$uniqueId" + val clientId = "$subscriptionType--$groupName--$uniqueId" val eventConsumerConfig = ConsumerConfig( groupName, @@ -46,7 +46,7 @@ class MessageBusConsumerFactory( ) return MessageBusConsumer( - topicName, + topicNames, eventConsumer, ) } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusConsumerTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusConsumerTest.kt index 20f4c5e45b0..5d25c75f986 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusConsumerTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusConsumerTest.kt @@ -26,14 +26,14 @@ class MessageBusConsumerTest { @BeforeEach fun setup() { cordaConsumer = mock() - mediatorConsumer = MessageBusConsumer(TOPIC, cordaConsumer) + mediatorConsumer = MessageBusConsumer(listOf(TOPIC), cordaConsumer) } @Test fun testSubscribe() { mediatorConsumer.subscribe() - verify(cordaConsumer).subscribe(eq(TOPIC), anyOrNull()) + verify(cordaConsumer).subscribe(eq(listOf(TOPIC)), anyOrNull()) } @Test diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryTest.kt index 8a856f3d11a..792207f08fc 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorConsumerFactoryFactoryTest.kt @@ -22,7 +22,7 @@ class MediatorConsumerFactoryFactoryTest { @Test fun testCreateMessageBusConsumerFactory() { val messageBusConsumerFactory = mediatorConsumerFactoryFactory.createMessageBusConsumerFactory( - "topic", + listOf("topic"), "consumerGroup", messageBusConfig, ) diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactoryTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactoryTest.kt index 350293a6bf2..0c15660fc7d 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactoryTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MessageBusConsumerFactoryTest.kt @@ -24,7 +24,7 @@ class MessageBusConsumerFactoryTest { any(), any(), any>(), any>(), any(), anyOrNull() ) messageBusConsumerFactory = MessageBusConsumerFactory( - "topic", + listOf("topic"), "group", messageBusConfig, cordaConsumerBuilder, diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MediatorConsumerFactoryFactory.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MediatorConsumerFactoryFactory.kt index 724b6efbde9..c8cedf56949 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MediatorConsumerFactoryFactory.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/factory/MediatorConsumerFactoryFactory.kt @@ -9,12 +9,12 @@ interface MediatorConsumerFactoryFactory { /** * Creates a message bus consumer factory. * - * @param topicName Topic name. + * @param topicNames Topic names. * @param groupName Consumer group name. * @param messageBusConfig Message bus related configuration. */ fun createMessageBusConsumerFactory( - topicName: String, + topicNames: List, groupName: String, messageBusConfig: SmartConfig, ) : MediatorConsumerFactory