Skip to content

Commit

Permalink
feat: Add additional properties to Kafka connectors, improve logging (#…
Browse files Browse the repository at this point in the history
…3378)

* feat: Add additional properties to Kafka connectors, improve logging

* Fix checkstyle

* Fix test
  • Loading branch information
dominikriemer authored Dec 18, 2024
1 parent 727d146 commit 88396aa
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1;
import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV2;
import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1;
import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV2;
import org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;

import java.util.List;
Expand All @@ -48,7 +50,9 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
public List<IModelMigrator<?, ?>> migrators() {
return List.of(
new KafkaAdapterMigrationV1(),
new KafkaSinkMigrationV1()
new KafkaSinkMigrationV1(),
new KafkaAdapterMigrationV2(),
new KafkaSinkMigrationV2()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;

import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -61,6 +62,7 @@
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -71,8 +73,8 @@

public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig {

private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
KafkaAdapterConfig config;
private static final Logger LOG = LoggerFactory.getLogger(KafkaProtocol.class);
private KafkaAdapterConfig config;

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka";

Expand Down Expand Up @@ -131,9 +133,9 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName,
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));

return config;
} catch (KafkaException e) {
} catch (Exception e) {
var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
throw new SpConfigurationException(message, e);
throw new SpConfigurationException(message, e.getCause());
}
}

Expand All @@ -144,7 +146,7 @@ public IAdapterConfiguration declareConfig() {
latestAlternative.setSelected(true);

return AdapterConfigurationBuilder
.create(ID, 1, KafkaProtocol::new)
.create(ID, 2, KafkaProtocol::new)
.withSupportedParsers(Parsers.defaultParsers())
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
Expand Down Expand Up @@ -172,6 +174,10 @@ public IAdapterConfiguration declareConfig() {
KafkaConfigProvider.getAlternativesEarliest(),
latestAlternative,
KafkaConfigProvider.getAlternativesNone())
.requiredCodeblock(
Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
"# key=value, comments are ignored"
)
.buildConfiguration();
}

Expand Down Expand Up @@ -201,16 +207,16 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor,
try {
kafkaConsumer.disconnect();
} catch (SpRuntimeException e) {
e.printStackTrace();
LOG.warn("Runtime exception when disconnecting from Kafka", e);
}

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
LOG.warn("Interrupted exception when stopping thread", e);
}

logger.info("Kafka Adapter was sucessfully stopped");
LOG.info("Kafka Adapter was sucessfully stopped");
thread.interrupt();
}

Expand Down Expand Up @@ -241,7 +247,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> collection) {

while (true) {
final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(1000);
consumer.poll(Duration.ofMillis(1000));

consumerRecords.forEach(record -> nEventsByte.add(record.value()));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.connectors.kafka.migration;

import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.Labels;

public class KafkaAdapterMigrationV2 implements IAdapterMigrator {
@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
KafkaProtocol.ID,
SpServiceTagPrefix.ADAPTER,
1,
2
);
}

@Override
public MigrationResult<AdapterDescription> migrate(AdapterDescription element,
IStaticPropertyExtractor extractor) throws RuntimeException {
element.getConfig().add(
StaticProperties
.codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
CodeLanguage.None,
"# key=value, comments are ignored")
);
return MigrationResult.success(element);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.connectors.kafka.migration;

import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.Labels;

public class KafkaSinkMigrationV2 implements IDataSinkMigrator {

@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
KafkaPublishSink.ID,
SpServiceTagPrefix.DATA_SINK,
1,
2
);
}

@Override
public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element,
IDataSinkParameterExtractor extractor) throws RuntimeException {
element.getStaticProperties().add(
StaticProperties
.codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
CodeLanguage.None,
"# key=value, comments are ignored")
);
return MigrationResult.success(element);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.messaging.kafka.config.SimpleConfigAppender;
import org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;

import org.apache.kafka.common.security.auth.SecurityProtocol;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ADDITIONAL_PROPERTIES;
import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
Expand Down Expand Up @@ -101,6 +107,9 @@ private <T extends KafkaBaseConfig> T extractCommonConfigs(IParameterExtractor e

configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, username, password));
}
configAppenders.add(new SimpleConfigAppender(
parseAdditionalProperties(extractor.codeblockValue(ADDITIONAL_PROPERTIES)))
);
config.setConfigAppenders(configAppenders);

return config;
Expand All @@ -118,4 +127,22 @@ public SecurityProtocol getSecurityProtocol(String selectedSecurityConfiguration
default -> SecurityProtocol.PLAINTEXT;
};
}

public static Map<String, String> parseAdditionalProperties(String text) {
if (text == null || text.isEmpty()) {
return Map.of();
} else {
return Arrays.stream(text.split("\\R"))
.map(String::trim)
.filter(line -> !line.isEmpty() && !line.startsWith("#"))
.filter(line -> line.contains("="))
.map(line -> line.split("=", 2))
.collect(Collectors.toMap(
parts -> parts[0].trim(),
parts -> parts[1].trim(),
(existing, replacement) -> replacement,
LinkedHashMap::new
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class KafkaConfigProvider {
public static final String RANDOM_GROUP_ID = "random-group-id";
public static final String GROUP_ID = "group-id";
public static final String GROUP_ID_INPUT = "group-id-input";
public static final String ADDITIONAL_PROPERTIES = "additional-properties";


private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
Expand All @@ -58,7 +59,7 @@ public KafkaPublishSink() {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
KafkaPublishSink::new,
DataSinkBuilder.create(ID, 1)
DataSinkBuilder.create(ID, 2)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
Expand All @@ -76,6 +77,11 @@ public IDataSinkConfiguration declareConfig() {
KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
KafkaConfigProvider.getAlternativesSaslPlain(),
KafkaConfigProvider.getAlternativesSaslSSL())
.requiredCodeblock(Labels.withId(
KafkaConfigProvider.ADDITIONAL_PROPERTIES),
CodeLanguage.None,
"# key=value, comments are ignored"
)
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,6 @@ latest.description=Offsets are initialized to the Latest

none.title=None
none.description=Consumer throws exceptions

additional-properties.title=Additional configurations
additional-properties.description=Additional Kafka consumer configurations in the form key=value
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The Kafka broker URL indicates the URL of the broker (e.g., localhost), the port
The topic where events should be sent to.


## Output
### Additional configurations

(not applicable for data sinks)
Can be used to provide additional Kafka producer configurations. Input must be in form of key-value pairs, e.g.
buffer.memory=33554432
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ security-mechanism.title=Security Mechanism
security-mechanism.description=SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property

username-group.title=Username and password

additional-properties.title=Additional configurations
additional-properties.description=Additional Kafka producer configurations in the form key=value
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public IAdapterConfiguration prepareAdapter() throws AdapterException {
// Set format to Json
((StaticPropertyAlternatives) (desc)
.getConfig()
.get(7))
.get(8))
.getAlternatives()
.get(0)
.setSelected(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.messaging.kafka.config;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;

import java.util.Map;
import java.util.Properties;

public class SimpleConfigAppender implements KafkaConfigAppender {

private final Map<String, String> configs;

public SimpleConfigAppender(Map<String, String> configs) {
this.configs = configs;
}

@Override
public void appendConfig(Properties props) throws SpRuntimeException {
configs.forEach(props::setProperty);
}
}
Loading

0 comments on commit 88396aa

Please sign in to comment.