From cfb5f0ed7cd800f229b39bd7fd3d99a8ef0eaa65 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Tue, 17 Dec 2024 20:43:45 +0100 Subject: [PATCH] feat: Add additional properties to Kafka connectors, improve logging --- .../kafka/KafkaConnectorsModuleExport.java | 6 +- .../kafka/adapter/KafkaProtocol.java | 24 +++++--- .../migration/KafkaAdapterMigrationV2.java | 55 ++++++++++++++++++ .../kafka/migration/KafkaSinkMigrationV2.java | 56 +++++++++++++++++++ .../shared/kafka/KafkaConfigExtractor.java | 23 ++++++++ .../shared/kafka/KafkaConfigProvider.java | 1 + .../kafka/sink/KafkaPublishSink.java | 8 ++- .../strings.en | 3 + .../documentation.md | 5 +- .../strings.en | 3 + .../kafka/config/SimpleConfigAppender.java | 38 +++++++++++++ .../KafkaSecurityProtocolConfigAppender.java | 30 +++++----- .../PipelineElementTemplateVisitor.java | 1 + .../connect/RuntimeResolvableResource.java | 6 ++ 14 files changed, 232 insertions(+), 27 deletions(-) create mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java create mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java index c79c075576..d434a9ecea 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java @@ -24,7 +24,9 @@ import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol; import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1; +import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV2; import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1; +import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV2; import org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink; import java.util.List; @@ -48,7 +50,9 @@ public List> pipelineElements() { public List> migrators() { return List.of( new KafkaAdapterMigrationV1(), - new KafkaSinkMigrationV1() + new KafkaSinkMigrationV1(), + new KafkaAdapterMigrationV2(), + new KafkaSinkMigrationV2() ); } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java index 490df5a089..780d454e60 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java @@ -47,6 +47,7 @@ import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; +import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.kafka.clients.consumer.Consumer; @@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -71,8 +73,8 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig { - private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class); - KafkaAdapterConfig config; + private static final Logger LOG = LoggerFactory.getLogger(KafkaProtocol.class); + private KafkaAdapterConfig config; public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka"; @@ -131,9 +133,9 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName, config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList())); return config; - } catch (KafkaException e) { + } catch (Exception e) { var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); - throw new SpConfigurationException(message, e); + throw new SpConfigurationException(message, e.getCause()); } } @@ -144,7 +146,7 @@ public IAdapterConfiguration declareConfig() { latestAlternative.setSelected(true); return AdapterConfigurationBuilder - .create(ID, 1, KafkaProtocol::new) + .create(ID, 2, KafkaProtocol::new) .withSupportedParsers(Parsers.defaultParsers()) .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withLocales(Locales.EN) @@ -172,6 +174,10 @@ public IAdapterConfiguration declareConfig() { KafkaConfigProvider.getAlternativesEarliest(), latestAlternative, KafkaConfigProvider.getAlternativesNone()) + .requiredCodeblock( + Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES), + "# key=value, comments are ignored" + ) .buildConfiguration(); } @@ -201,16 +207,16 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor, try { kafkaConsumer.disconnect(); } catch (SpRuntimeException e) { - e.printStackTrace(); + LOG.warn("Runtime exception when disconnecting from Kafka", e); } try { Thread.sleep(5000); } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("Interrupted exception when stopping thread", e); } - logger.info("Kafka Adapter was sucessfully stopped"); + LOG.info("Kafka Adapter was sucessfully stopped"); thread.interrupt(); } @@ -241,7 +247,7 @@ public void onPartitionsAssigned(Collection collection) { while (true) { final ConsumerRecords consumerRecords = - consumer.poll(1000); + consumer.poll(Duration.ofMillis(1000)); consumerRecords.forEach(record -> nEventsByte.add(record.value())); diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java new file mode 100644 index 0000000000..77ca6669c0 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.kafka.migration; + +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.extensions.api.migration.IAdapterMigrator; +import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.helpers.CodeLanguage; +import org.apache.streampipes.sdk.helpers.Labels; + +public class KafkaAdapterMigrationV2 implements IAdapterMigrator { + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + KafkaProtocol.ID, + SpServiceTagPrefix.ADAPTER, + 1, + 2 + ); + } + + @Override + public MigrationResult migrate(AdapterDescription element, + IStaticPropertyExtractor extractor) throws RuntimeException { + element.getConfig().add( + StaticProperties + .codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES), + CodeLanguage.None, + "# key=value, comments are ignored") + ); + return MigrationResult.success(element); + } +} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java new file mode 100644 index 0000000000..1323f659f6 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.kafka.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; +import org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.helpers.CodeLanguage; +import org.apache.streampipes.sdk.helpers.Labels; + +public class KafkaSinkMigrationV2 implements IDataSinkMigrator { + + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + KafkaPublishSink.ID, + SpServiceTagPrefix.DATA_SINK, + 1, + 2 + ); + } + + @Override + public MigrationResult migrate(DataSinkInvocation element, + IDataSinkParameterExtractor extractor) throws RuntimeException { + element.getStaticProperties().add( + StaticProperties + .codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES), + CodeLanguage.None, + "# key=value, comments are ignored") + ); + return MigrationResult.success(element); + } +} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java index d06e544e74..399f42a8c9 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java @@ -23,6 +23,7 @@ import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig; import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; +import org.apache.streampipes.messaging.kafka.config.SimpleConfigAppender; import org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender; import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; @@ -30,8 +31,13 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ADDITIONAL_PROPERTIES; import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG; import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP; import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT; @@ -101,6 +107,9 @@ private T extractCommonConfigs(IParameterExtractor e configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, username, password)); } + configAppenders.add(new SimpleConfigAppender( + parseAdditionalProperties(extractor.codeblockValue(ADDITIONAL_PROPERTIES))) + ); config.setConfigAppenders(configAppenders); return config; @@ -118,4 +127,18 @@ public SecurityProtocol getSecurityProtocol(String selectedSecurityConfiguration default -> SecurityProtocol.PLAINTEXT; }; } + + public static Map parseAdditionalProperties(String text) { + return Arrays.stream(text.split("\\R")) + .map(String::trim) + .filter(line -> !line.isEmpty() && !line.startsWith("#")) + .filter(line -> line.contains("=")) + .map(line -> line.split("=", 2)) + .collect(Collectors.toMap( + parts -> parts[0].trim(), + parts -> parts[1].trim(), + (existing, replacement) -> replacement, + LinkedHashMap::new + )); + } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java index 05f652f780..6981696b9a 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java @@ -51,6 +51,7 @@ public class KafkaConfigProvider { public static final String RANDOM_GROUP_ID = "random-group-id"; public static final String GROUP_ID = "group-id"; public static final String GROUP_ID_INPUT = "group-id-input"; + public static final String ADDITIONAL_PROPERTIES = "additional-properties"; private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics"; diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java index 61460413b3..9ac428a3bf 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java @@ -33,6 +33,7 @@ import org.apache.streampipes.sdk.builder.DataSinkBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration; +import org.apache.streampipes.sdk.helpers.CodeLanguage; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; @@ -58,7 +59,7 @@ public KafkaPublishSink() { public IDataSinkConfiguration declareConfig() { return DataSinkConfiguration.create( KafkaPublishSink::new, - DataSinkBuilder.create(ID, 1) + DataSinkBuilder.create(ID, 2) .category(DataSinkType.MESSAGING) .withLocales(Locales.EN) .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) @@ -76,6 +77,11 @@ public IDataSinkConfiguration declareConfig() { KafkaConfigProvider.getAlternativeUnauthenticatedSSL(), KafkaConfigProvider.getAlternativesSaslPlain(), KafkaConfigProvider.getAlternativesSaslSSL()) + .requiredCodeblock(Labels.withId( + KafkaConfigProvider.ADDITIONAL_PROPERTIES), + CodeLanguage.None, + "# key=value, comments are ignored" + ) .build() ); } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en index 4809237f5b..9e2d47b5be 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en @@ -86,3 +86,6 @@ latest.description=Offsets are initialized to the Latest none.title=None none.description=Consumer throws exceptions + +additional-properties.title=Additional configurations +additional-properties.description=Additional Kafka consumer configurations in the form key=value diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md index aec97f992b..14b2b66e25 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md @@ -50,6 +50,7 @@ The Kafka broker URL indicates the URL of the broker (e.g., localhost), the port The topic where events should be sent to. -## Output +### Additional configurations -(not applicable for data sinks) \ No newline at end of file +Can be used to provide additional Kafka producer configurations. Input must be in form of key-value pairs, e.g. +buffer.memory=33554432 diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en index c4cef7cf4e..3d70886857 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en @@ -55,3 +55,6 @@ security-mechanism.title=Security Mechanism security-mechanism.description=SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property username-group.title=Username and password + +additional-properties.title=Additional configurations +additional-properties.description=Additional Kafka producer configurations in the form key=value diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java new file mode 100644 index 0000000000..3b4dadadac --- /dev/null +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.messaging.kafka.config; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; + +import java.util.Map; +import java.util.Properties; + +public class SimpleConfigAppender implements KafkaConfigAppender { + + private final Map configs; + + public SimpleConfigAppender(Map configs) { + this.configs = configs; + } + + @Override + public void appendConfig(Properties props) throws SpRuntimeException { + configs.forEach(props::setProperty); + } +} diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java index 1a1a9bd92a..968b056b2f 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java @@ -19,6 +19,7 @@ package org.apache.streampipes.messaging.kafka.security; import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.variable.EnvironmentVariable; import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; import org.apache.kafka.clients.CommonClientConfigs; @@ -43,20 +44,13 @@ public void appendConfig(Properties props) { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.toString()); if (isSslProtocol()) { - props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, env.getKeystoreType().getValueOrDefault()); - props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getKeystoreFilename().getValueOrDefault()); - props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getKeystorePassword().getValueOrDefault()); - - if (env.getKeyPassword().exists()) { - props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getKeyPassword().getValueOrDefault()); - } - - props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, env.getTruststoreType().getValueOrDefault()); - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getTruststoreFilename().getValueOrDefault()); - - if (env.getTruststorePassword().exists()) { - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getTruststorePassword().getValueOrDefault()); - } + addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, env.getKeystoreType()); + addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getKeystoreFilename()); + addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getKeystorePassword()); + addConfigIfPresent(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getKeyPassword()); + addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, env.getTruststoreType()); + addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getTruststoreFilename()); + addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getTruststorePassword()); if (env.getAllowSelfSignedCertificates().getValueOrDefault()) { props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); @@ -67,4 +61,12 @@ public void appendConfig(Properties props) { private boolean isSslProtocol() { return securityProtocol == SecurityProtocol.SSL || securityProtocol == SecurityProtocol.SASL_SSL; } + + private void addConfigIfPresent(Properties props, + String configKey, + EnvironmentVariable environmentVariable) { + if (environmentVariable.exists()) { + props.put(configKey, environmentVariable.getValueOrDefault()); + } + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java index 7e91b9ce3e..f2121d267b 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java @@ -172,6 +172,7 @@ public void visit(StaticPropertyAlternatives staticPropertyAlternatives) { if (hasConfig(staticPropertyAlternatives)) { Map values = getConfig(staticPropertyAlternatives); var selectedId = getConfigValueAsString(staticPropertyAlternatives); + staticPropertyAlternatives.getAlternatives().forEach(a -> a.setSelected(false)); staticPropertyAlternatives .getAlternatives() .stream() diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java index 0412560c99..057e36dffd 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java @@ -28,6 +28,9 @@ import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; @@ -41,6 +44,8 @@ @RequestMapping("/api/v1/worker/resolvable") public class RuntimeResolvableResource extends AbstractSharedRestInterface { + private static final Logger LOG = LoggerFactory.getLogger(RuntimeResolvableResource.class); + @PostMapping( path = "{id}/configurations", consumes = MediaType.APPLICATION_JSON_VALUE, @@ -64,6 +69,7 @@ public ResponseEntity fetchConfigurations(@PathVariable("id") String elementI "This element does not support dynamic options - is the pipeline element description up to date?"); } } catch (SpConfigurationException e) { + LOG.warn("Error when fetching runtime configurations: {}", e.getMessage(), e); return ResponseEntity .status(HttpStatus.BAD_REQUEST) .body(e);