From b51326d1fcaea9f1f93eb0ab6110ff60f627929b Mon Sep 17 00:00:00 2001 From: Stefan Obermeier Date: Thu, 5 Oct 2023 18:47:53 +0200 Subject: [PATCH] Set default Kafka auto offset explicitly if it is not provided in configuration (#1992) (#1993) --- .../config/kafka/kafka/KafkaConnectUtils.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java index 6ad3f61db3..8112b8fe14 100644 --- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java +++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java @@ -26,6 +26,7 @@ import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig; import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.helpers.Alternatives; import org.apache.streampipes.sdk.helpers.Label; @@ -123,12 +124,20 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean new KafkaSecurityUnauthenticatedPlainConfig(); } + StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, + StaticPropertyAlternatives.class); + // Set default value if no value is provided. + if (alternatives == null) { + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST); - String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); - AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } else { + String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); - return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } } private static boolean isUseSSL(String authentication) {