Skip to content

Commit

Permalink
MODINV-986: remove a namespace from a topic name
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 6, 2024
1 parent 8383758 commit 52908a7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {
public void start(Promise<Void> startPromise) {
var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx, getStorage(), getHttpClient(), getMappingMetadataCache());

//q remove namespace from a topic?
var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC);
var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, false);

consumerWrapper.start(instanceIngressEventHandler, constructModuleName())
.onFailure(startPromise::fail)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.lang.Integer.parseInt;
import static java.lang.Long.*;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.lang.System.getProperty;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.isBlank;
Expand All @@ -12,7 +13,6 @@
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
import static org.folio.kafka.KafkaTopicNameHelper.createSubscriptionDefinition;
import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace;

import io.vertx.core.AbstractVerticle;
Expand All @@ -29,6 +29,7 @@
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.okapi.common.GenericCompositeFuture;

Expand Down Expand Up @@ -61,13 +62,17 @@ public void stop(Promise<Void> stopPromise) {
protected abstract Logger getLogger();

protected KafkaConsumerWrapper<String, String> createConsumer(String eventType) {
return createConsumer(eventType, true);
}

protected KafkaConsumerWrapper<String, String> createConsumer(String eventType, boolean namespacedTopic) {
var kafkaConsumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(getKafkaConfig())
.loadLimit(getLoadLimit())
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType))
.subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType, namespacedTopic))
.build();
consumerWrappers.add(kafkaConsumerWrapper);
return kafkaConsumerWrapper;
Expand Down Expand Up @@ -137,8 +142,21 @@ private JsonObject getConfig() {
return config;
}

private SubscriptionDefinition getSubscriptionDefinition(String envId, String eventType) {
return createSubscriptionDefinition(envId, getDefaultNameSpace(), eventType);
private SubscriptionDefinition getSubscriptionDefinition(String envId, String eventType, boolean namespacedTopic) {
return namespacedTopic
? KafkaTopicNameHelper.createSubscriptionDefinition(envId, getDefaultNameSpace(), eventType)
: createSubscriptionDefinition(envId, eventType);
}

private SubscriptionDefinition createSubscriptionDefinition(String env, String eventType) {
return SubscriptionDefinition.builder()
.eventType(eventType)
.subscriptionPattern(formatSubscriptionPattern(env, eventType))
.build();
}

private String formatSubscriptionPattern(String env, String eventType) {
return join("\\.", env, "\\w{1,}", eventType);
}

private int getLoadLimit() {
Expand Down

0 comments on commit 52908a7

Please sign in to comment.