From 52908a719f197d04ca34928200fd2951c459f2f2 Mon Sep 17 00:00:00 2001 From: pbobylev Date: Tue, 4 Jun 2024 18:59:58 +0500 Subject: [PATCH] MODINV-986: remove a namespace from a topic name --- .../InstanceIngressConsumerVerticle.java | 3 +-- .../support/KafkaConsumerVerticle.java | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java index f234ead38..fb1835463 100644 --- a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java @@ -17,8 +17,7 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle { public void start(Promise startPromise) { var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx, getStorage(), getHttpClient(), getMappingMetadataCache()); - //q remove namespace from a topic? - var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC); + var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, false); consumerWrapper.start(instanceIngressEventHandler, constructModuleName()) .onFailure(startPromise::fail) diff --git a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java index 700b3a44c..04857b402 100644 --- a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java @@ -3,6 +3,7 @@ import static java.lang.Integer.parseInt; import static java.lang.Long.*; import static java.lang.String.format; +import static java.lang.String.join; import static java.lang.System.getProperty; import static java.util.Objects.isNull; import static org.apache.commons.lang.StringUtils.isBlank; @@ -12,7 +13,6 @@ import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; -import static org.folio.kafka.KafkaTopicNameHelper.createSubscriptionDefinition; import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace; import io.vertx.core.AbstractVerticle; @@ -29,6 +29,7 @@ import org.folio.kafka.GlobalLoadSensor; import org.folio.kafka.KafkaConfig; import org.folio.kafka.KafkaConsumerWrapper; +import org.folio.kafka.KafkaTopicNameHelper; import org.folio.kafka.SubscriptionDefinition; import org.folio.okapi.common.GenericCompositeFuture; @@ -61,13 +62,17 @@ public void stop(Promise stopPromise) { protected abstract Logger getLogger(); protected KafkaConsumerWrapper createConsumer(String eventType) { + return createConsumer(eventType, true); + } + + protected KafkaConsumerWrapper createConsumer(String eventType, boolean namespacedTopic) { var kafkaConsumerWrapper = KafkaConsumerWrapper.builder() .context(context) .vertx(vertx) .kafkaConfig(getKafkaConfig()) .loadLimit(getLoadLimit()) .globalLoadSensor(new GlobalLoadSensor()) - .subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType)) + .subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType, namespacedTopic)) .build(); consumerWrappers.add(kafkaConsumerWrapper); return kafkaConsumerWrapper; @@ -137,8 +142,21 @@ private JsonObject getConfig() { return config; } - private SubscriptionDefinition getSubscriptionDefinition(String envId, String eventType) { - return createSubscriptionDefinition(envId, getDefaultNameSpace(), eventType); + private SubscriptionDefinition getSubscriptionDefinition(String envId, String eventType, boolean namespacedTopic) { + return namespacedTopic + ? KafkaTopicNameHelper.createSubscriptionDefinition(envId, getDefaultNameSpace(), eventType) + : createSubscriptionDefinition(envId, eventType); + } + + private SubscriptionDefinition createSubscriptionDefinition(String env, String eventType) { + return SubscriptionDefinition.builder() + .eventType(eventType) + .subscriptionPattern(formatSubscriptionPattern(env, eventType)) + .build(); + } + + private String formatSubscriptionPattern(String env, String eventType) { + return join("\\.", env, "\\w{1,}", eventType); } private int getLoadLimit() {