diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java index da01403d..40102a50 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java @@ -19,11 +19,11 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cdc.AbstractCdcEventsApplier; @@ -152,13 +152,13 @@ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcS protected void runAppliers() { AtomicBoolean stopped = new AtomicBoolean(); - Set caches = null; + Set caches = new HashSet<>(); if (!F.isEmpty(streamerCfg.getCaches())) { checkCaches(streamerCfg.getCaches()); - caches = streamerCfg.getCaches().stream() - .map(CU::cacheId).collect(Collectors.toSet()); + streamerCfg.getCaches().stream() + .map(CU::cacheId).forEach(caches::add); } KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(