diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java index 1f3dafa51d..4cb96fc744 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java @@ -30,10 +30,10 @@ import io.cloudevents.jackson.JsonFormat; import io.cloudevents.kafka.CloudEventSerializer; import io.vertx.core.Future; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; @@ -76,10 +76,10 @@ public IngressProducerReconcilableStore( this.producerConfigs = producerConfigs; this.producerFactory = producerFactory; - this.ingressInfos = new HashMap<>(); - this.producerReferences = new HashMap<>(); - this.pathMapper = new HashMap<>(); - this.hostMapper = new HashMap<>(); + this.ingressInfos = new ConcurrentHashMap<>(); + this.producerReferences = new ConcurrentHashMap<>(); + this.pathMapper = new ConcurrentHashMap<>(); + this.hostMapper = new ConcurrentHashMap<>(); } public IngressProducer resolve(String host, String path) {