Skip to content

Commit

Permalink
Set default Kafka auto offset explicitly if it is not provided in con…
Browse files Browse the repository at this point in the history
…figuration (#1992) (#1993)
  • Loading branch information
obermeier authored Oct 5, 2023
1 parent e680d47 commit b51326d
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Label;
Expand Down Expand Up @@ -123,12 +124,20 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean
new KafkaSecurityUnauthenticatedPlainConfig();
}

StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
StaticPropertyAlternatives.class);

// Set default value if no value is provided.
if (alternatives == null) {
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST);

String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto);
return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
} else {
String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto);

return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
}
}

private static boolean isUseSSL(String authentication) {
Expand Down

0 comments on commit b51326d

Please sign in to comment.