Skip to content

Commit

Permalink
refactor: include version in all data sink descriptions as required f…
Browse files Browse the repository at this point in the history
…or migrations (#2260)
  • Loading branch information
bossenti authored Dec 1, 2023
1 parent 709d6e3 commit 06a37b3
Show file tree
Hide file tree
Showing 26 changed files with 121 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static String prepareString(String s) {

@Override
public DataSinkDescription declareModel() {
var builder = DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.influxdb")
var builder = DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.influxdb", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.DATABASE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public KafkaPublishSink() {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
KafkaPublishSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class MqttPublisherSink implements IStreamPipesDataSink {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
MqttPublisherSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt")
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class NatsSink implements IStreamPipesDataSink {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
NatsSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.nats")
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.nats", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class OpcUaSink implements IStreamPipesDataSink, SupportsRuntimeConfig {

@Override
public IDataSinkConfiguration declareConfig() {
var builder = DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua")
var builder = DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.FORWARD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public PulsarPublisherSink(ClientBuilder pulsarClientBuilder) {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
PulsarPublisherSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar")
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public RocketMQPublisherSink(ClientServiceProvider provider) {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
RocketMQPublisherSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rocketmq")
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rocketmq", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ public class TubeMQPublisherSink implements IStreamPipesDataSink {
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
TubeMQPublisherSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.tubemq").category(DataSinkType.MESSAGING)
.withLocales(Locales.EN).withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build())
.requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
.build()
DataSinkBuilder
.create("org.apache.streampipes.sinks.brokers.jvm.tubemq", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN).withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build())
.requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
.build()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class BufferRestPublisherSink extends StreamPipesDataSink implements Buff

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.bufferrest")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.brokers.jvm.bufferrest", 0)
.category(DataSinkType.NOTIFICATION)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class JmsPublisherSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.jms")
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.jms", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class RabbitMqPublisherSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq")
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class RestSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rest")
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rest", 0)
.category(DataSinkType.FORWARD)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class WebsocketServerSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket")
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class CouchDbSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.couchdb")
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.couchdb", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class DittoSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.ditto")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.databases.ditto", 0)
.category(DataSinkType.FORWARD)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,23 @@ public class IotDbSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.iotdb").withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON).category(DataSinkType.DATABASE).requiredStream(
return DataSinkBuilder
.create("org.apache.streampipes.sinks.databases.jvm.iotdb", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON).
category(DataSinkType.DATABASE)
.requiredStream(
StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(), Labels.withId(TIMESTAMP_MAPPING_KEY),
PropertyScope.NONE).build()).requiredTextParameter(Labels.withId(HOST_KEY))
.requiredIntegerParameter(Labels.withId(PORT_KEY), 6667).requiredTextParameter(Labels.withId(USER_KEY), "root")
.requiredSecret(Labels.withId(PASSWORD_KEY)).requiredTextParameter(Labels.withId(DATABASE_KEY))
.requiredTextParameter(Labels.withId(DEVICE_KEY)).build();
PropertyScope.NONE).build()
)
.requiredTextParameter(Labels.withId(HOST_KEY))
.requiredIntegerParameter(Labels.withId(PORT_KEY), 6667)
.requiredTextParameter(Labels.withId(USER_KEY), "root")
.requiredSecret(Labels.withId(PASSWORD_KEY))
.requiredTextParameter(Labels.withId(DATABASE_KEY))
.requiredTextParameter(Labels.withId(DEVICE_KEY))
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class PostgreSqlSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.postgresql")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.databases.jvm.postgresql", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class RedisSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.redis")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.databases.jvm.redis", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class DataLakeSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.datalake")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.internal.jvm.datalake", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.INTERNAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ private boolean shouldSendNotification() {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.notification")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.internal.jvm.notification", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.INTERNAL, DataSinkType.NOTIFICATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public class EmailSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.email")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.notifications.jvm.email", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.NOTIFICATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,37 +87,38 @@ public void onEvent(Event event) {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.msteams")
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.NOTIFICATION)
.requiredStream(
StreamRequirementsBuilder
.create()
.requiredProperty(EpRequirements.anyProperty())
.build()
)
.requiredSecret(Labels.withId(KEY_WEBHOOK_URL))
.requiredAlternatives(
Labels.withId(KEY_MESSAGE_TYPE_ALTERNATIVES),
Alternatives.from(
Labels.withId(KEY_MESSAGE_SIMPLE),
StaticProperties.stringFreeTextProperty(
Labels.withId(KEY_MESSAGE_SIMPLE_CONTENT),
true,
true
),
true),
Alternatives.from(
Labels.withId(KEY_MESSAGE_ADVANCED),
StaticProperties.stringFreeTextProperty(
Labels.withId(KEY_MESSAGE_ADVANCED_CONTENT),
true,
true
)
)
)
.build();
return DataSinkBuilder
.create("org.apache.streampipes.sinks.notifications.jvm.msteams", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.NOTIFICATION)
.requiredStream(
StreamRequirementsBuilder
.create()
.requiredProperty(EpRequirements.anyProperty())
.build()
)
.requiredSecret(Labels.withId(KEY_WEBHOOK_URL))
.requiredAlternatives(
Labels.withId(KEY_MESSAGE_TYPE_ALTERNATIVES),
Alternatives.from(
Labels.withId(KEY_MESSAGE_SIMPLE),
StaticProperties.stringFreeTextProperty(
Labels.withId(KEY_MESSAGE_SIMPLE_CONTENT),
true,
true
),
true),
Alternatives.from(
Labels.withId(KEY_MESSAGE_ADVANCED),
StaticProperties.stringFreeTextProperty(
Labels.withId(KEY_MESSAGE_ADVANCED_CONTENT),
true,
true
)
)
)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class OneSignalSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.onesignal")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.notifications.jvm.onesignal", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.NOTIFICATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class SlackNotificationSink extends StreamPipesDataSink {
@Override
public DataSinkDescription declareModel() {

return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.slack")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.notifications.jvm.slack", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.NOTIFICATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class TelegramSink extends StreamPipesDataSink {

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.telegram")
return DataSinkBuilder
.create("org.apache.streampipes.sinks.notifications.jvm.telegram", 0)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.NOTIFICATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,63 @@

public class DataSinkBuilder extends AbstractProcessingElementBuilder<DataSinkBuilder, DataSinkDescription> {

protected DataSinkBuilder(String id, String label, String description) {
protected DataSinkBuilder(
String id,
String label,
String description,
int version
) {
super(id, label, description, new DataSinkDescription());
this.elementDescription.setVersion(version);
}

protected DataSinkBuilder(String id) {
protected DataSinkBuilder(
String id,
int version
) {
super(id, new DataSinkDescription());
this.elementDescription.setVersion(version);
}

/**
* Creates a new data sink using the builder pattern.
*
* @deprecated
* This method is no longer recommend since we rely on a version for migration purposes.
* <p> Please adopt {@link #create(String, int)} instead.
* @param id A unique identifier of the new element, e.g., com.mycompany.sink.mynewdatasink
* @param label A human-readable name of the element.
* Will later be shown as the element name in the StreamPipes UI.
* @param description A human-readable description of the element.
*/
@Deprecated(since = "0.93.0", forRemoval = true)
public static DataSinkBuilder create(String id, String label, String description) {
return new DataSinkBuilder(id, label, description);
return new DataSinkBuilder(id, label, description, 0);
}

/**
* Creates a new data sink using the builder pattern. If no label and description is given
* for an element,
* Creates a new data sink using the builder pattern.
* {@link org.apache.streampipes.sdk.builder.AbstractProcessingElementBuilder#withLocales(Locales...)}
* must be called.
* must be called, to provide a label and description.
* @deprecated
* This method is no longer recommend since we rely on a version for migration purposes.
* <p> Please adopt {@link #create(String, int)} instead.
*
* @param id A unique identifier of the new element, e.g., com.mycompany.sink.mynewdatasink
*/
@Deprecated(since = "0.93.0", forRemoval = true)
public static DataSinkBuilder create(String id) {
return new DataSinkBuilder(id);
return new DataSinkBuilder(id, 0);
}

/**
* Creates a new data sink using the builder pattern.
* {@link org.apache.streampipes.sdk.builder.AbstractProcessingElementBuilder#withLocales(Locales...)}
* must be called, to provide a label and description.
*
* @param id A unique identifier of the new element, e.g., com.mycompany.sink.mynewdatasink
*/
public static DataSinkBuilder create(String id, int version) {
return new DataSinkBuilder(id, version);
}

public DataSinkBuilder category(DataSinkType... categories) {
Expand Down

0 comments on commit 06a37b3

Please sign in to comment.