From fa25df8d08e70c89c4968aac228d8ce21f1051f3 Mon Sep 17 00:00:00 2001 From: Stefan Obermeier Date: Tue, 3 Oct 2023 17:53:02 +0200 Subject: [PATCH] Set default Kafka auto offset explicitly if it is not provided in configuration (#1992) Set default Kafka auto offset explicitly if it is not defined (#1992) --- .../shared/config/kafka/kafka/KafkaConnectUtils.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java index 6ad3f61db3..1e34ab1b05 100644 --- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java +++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java @@ -123,10 +123,17 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean new KafkaSecurityUnauthenticatedPlainConfig(); } + // Set default value if no value is provided. + if (alternatives == null) { + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } else { + String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); + AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); - String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); - AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); + return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + } return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); }