diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java index 6ad3f61db3..8112b8fe14 100644 --- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java +++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java @@ -26,6 +26,7 @@ import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig; import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.helpers.Alternatives; import org.apache.streampipes.sdk.helpers.Label; @@ -123,12 +124,20 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean new KafkaSecurityUnauthenticatedPlainConfig(); } + StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, + StaticPropertyAlternatives.class); + // Set default value if no value is provided. + if (alternatives == null) { + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST); - String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); - AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } else { + String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); - return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } } private static boolean isUseSSL(String authentication) {