Skip to content

Commit

Permalink
refactor(#2938): replace ExtensionsModelSubmitter with StreamPipesExt…
Browse files Browse the repository at this point in the history
…ensionsServiceBase (#2939)

* refactor(#2938): Replace ExtensionsModelSubmitter with StreamPipesExtensionsServiceBase

* refactor(#2938): Replace ExtensionsModelSubmitter in archetypes

* refactor(#2938): Remove deprecated class ExtensionsModelSubmitter

* refactor(#2938): Fix checkstyle
  • Loading branch information
tenthe authored Jun 18, 2024
1 parent 1a5073c commit 974abf1
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneStreamPipesRuntimeProvider;

import ${package}.pe.${packageName}.${classNamePrefix}DataProcessor;
import ${package}.pe.${packageName}.${classNamePrefix}DataSink;
import ${package}.pe.${packageName}.${classNamePrefix}GenericAdapter;
import ${package}.pe.${packageName}.${classNamePrefix}SpecificAdapter;

public class Init extends ExtensionsModelSubmitter {
public class Init extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) {
new Init().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;

public class Init extends ExtensionsModelSubmitter {
public class Init extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) throws Exception {
new Init().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;

import ${package}.config.ConfigKeys;
import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;

public class Init extends ExtensionsModelSubmitter {
public class Init extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) throws Exception {
new Init().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
import org.apache.streampipes.processors.filters.jvm.FilterExtensionModuleExport;
import org.apache.streampipes.processors.siddhi.SiddhiFilterExtensionModuleExport;
import org.apache.streampipes.processors.transformation.jvm.TransformationExtensionModuleExport;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.sinks.brokers.jvm.BrokerSinksExtensionModuleExport;
import org.apache.streampipes.sinks.databases.jvm.DatabaseSinksExtensionModuleExport;
import org.apache.streampipes.sinks.internal.jvm.InternalSinksExtensionModuleExports;
import org.apache.streampipes.sinks.notifications.jvm.NotificationsExtensionModuleExport;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneStreamPipesRuntimeProvider;

public class AllExtensionsIIoTInit extends ExtensionsModelSubmitter {
public class AllExtensionsIIoTInit extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) {
new AllExtensionsIIoTInit().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@
import org.apache.streampipes.processors.siddhi.SiddhiFilterExtensionModuleExport;
import org.apache.streampipes.processors.textmining.jvm.TextMiningExtensionModuleExport;
import org.apache.streampipes.processors.transformation.jvm.TransformationExtensionModuleExport;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.sinks.brokers.jvm.BrokerSinksExtensionModuleExport;
import org.apache.streampipes.sinks.databases.jvm.DatabaseSinksExtensionModuleExport;
import org.apache.streampipes.sinks.internal.jvm.InternalSinksExtensionModuleExports;
import org.apache.streampipes.sinks.notifications.jvm.NotificationsExtensionModuleExport;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneStreamPipesRuntimeProvider;


public class AllExtensionsInit extends ExtensionsModelSubmitter {
public class AllExtensionsInit extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) {
new AllExtensionsInit().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.apache.streampipes.processors.filters.jvm.FilterExtensionModuleExport;
import org.apache.streampipes.processors.siddhi.SiddhiFilterExtensionModuleExport;
import org.apache.streampipes.processors.transformation.jvm.TransformationExtensionModuleExport;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.sinks.brokers.jvm.BrokerSinksExtensionModuleExport;
import org.apache.streampipes.sinks.databases.jvm.DatabaseSinksExtensionModuleExport;
import org.apache.streampipes.sinks.internal.jvm.InternalSinksExtensionModuleExports;
import org.apache.streampipes.sinks.notifications.jvm.NotificationsExtensionModuleExport;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneStreamPipesRuntimeProvider;

public class ExtensionsIIoTMinimalInit extends ExtensionsModelSubmitter {
public class ExtensionsIIoTMinimalInit extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) {
new ExtensionsIIoTMinimalInit().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
import org.apache.streampipes.pe.flink.processor.urldereferencing.UrlDereferencingController;
import org.apache.streampipes.pe.flink.processor.wordcount.WordCountController;
import org.apache.streampipes.pe.flink.sink.elasticsearch.ElasticSearchController;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.wrapper.flink.FlinkRuntimeProvider;


public class AllFlinkPipelineElementsInit extends ExtensionsModelSubmitter {
public class AllFlinkPipelineElementsInit extends StreamPipesExtensionsServiceBase {

public static void main(String[] args) {
new AllFlinkPipelineElementsInit().init();
Expand Down
2 changes: 1 addition & 1 deletion streampipes-maven-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The streampipes-maven-plugin can either be started from the command line or embe

### Prerequisites

The plugin must be started from a module which contains an `Init` class which inherits `ExtensionsModelSubmitter`.
The plugin must be started from a module which contains an `Init` class which inherits `StreamPipesExtensionsServiceBase`.
By default, the goal runs in the `package` phase.

### Command line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.config.IPipelineElementConfiguration;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.service.extensions.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.smp.constants.PeType;
import org.apache.streampipes.smp.model.AssetModel;

Expand All @@ -39,17 +39,23 @@ public class ExtensionsFinder {
private final ClassLoader loader;
private final String initClass;

public ExtensionsFinder(ClassLoader loader,
String initClass) {
public ExtensionsFinder(
ClassLoader loader,
String initClass
) {
this.loader = loader;
this.initClass = initClass;
}

public List<AssetModel> findExtensions()
throws MalformedURLException, DependencyResolutionRequiredException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
InstantiationException, IllegalAccessException {
var extensions = new ArrayList<AssetModel>();
var serviceDef = ((ExtensionsModelSubmitter) loader.loadClass(initClass).newInstance()).provideServiceDefinition();
var serviceDef = (
(StreamPipesExtensionsServiceBase) loader
.loadClass(initClass)
.newInstance()
).provideServiceDefinition();

extensions.addAll(findAdapters(serviceDef));
extensions.addAll(findPipelineElements(serviceDef, IDataProcessorConfiguration.class, PeType.PROCESSOR));
Expand All @@ -58,20 +64,28 @@ public List<AssetModel> findExtensions()
return extensions;
}

private List<AssetModel> findPipelineElements(SpServiceDefinition serviceDef,
Class<? extends IPipelineElementConfiguration<?, ?>> configType,
PeType peType) {
private List<AssetModel> findPipelineElements(
SpServiceDefinition serviceDef,
Class<? extends IPipelineElementConfiguration<?, ?>> configType,
PeType peType
) {
return serviceDef.getDeclarers()
.stream()
.map(IStreamPipesPipelineElement::declareConfig)
.filter(configType::isInstance)
.map(config -> new AssetModel(config.getDescription().getAppId(), peType)).toList();
.stream()
.map(IStreamPipesPipelineElement::declareConfig)
.filter(configType::isInstance)
.map(config -> new AssetModel(config.getDescription()
.getAppId(), peType))
.toList();
}

private Collection<? extends AssetModel> findAdapters(SpServiceDefinition serviceDef) {
return serviceDef.getAdapters().stream().map(adapter -> {
var config = adapter.declareConfig();
return new AssetModel(config.getAdapterDescription().getAppId(), PeType.ADAPTER);
}).toList();
return serviceDef.getAdapters()
.stream()
.map(adapter -> {
var config = adapter.declareConfig();
return new AssetModel(config.getAdapterDescription()
.getAppId(), PeType.ADAPTER);
})
.toList();
}
}

This file was deleted.

0 comments on commit 974abf1

Please sign in to comment.