diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java index 6ad3f61db3..1e34ab1b05 100644 --- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java +++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java @@ -123,10 +123,17 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean new KafkaSecurityUnauthenticatedPlainConfig(); } + // Set default value if no value is provided. + if (alternatives == null) { + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } else { + String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); - String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); - AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); }