Skip to content

Commit

Permalink
chore(#3165): Remove support for configurable data formats (#3166)
Browse files Browse the repository at this point in the history
* chore: Remove obsolete fields from models

* Fix tests, add header

* Refactor python model

* Migrate model

* chore(#3165): Remove support for configurable data formats

* Merge typescript model
  • Loading branch information
dominikriemer authored Aug 27, 2024
1 parent 9a8ef70 commit 787c6fb
Show file tree
Hide file tree
Showing 90 changed files with 114 additions and 1,614 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
#set( $symbol_escape = '\' )
package ${package};

import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
Expand Down Expand Up @@ -56,11 +52,6 @@ public SpServiceDefinition provideServiceDefinition() {
.registerPipelineElement(new ${classNamePrefix}DataSink())
.registerAdapter(new ${classNamePrefix}GenericAdapter())
.registerAdapter(new ${classNamePrefix}SpecificAdapter())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory())
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@

import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;

import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
Expand All @@ -51,11 +47,6 @@ public SpServiceDefinition provideServiceDefinition() {
"",
8090)
.registerPipelineElement(new ${classNamePrefix}Controller())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory())
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
#set( $symbol_escape = '\' )
package ${package};

import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
Expand All @@ -50,11 +46,6 @@ public SpServiceDefinition provideServiceDefinition() {
"",
8090)
.registerPipelineElement(new ${classNamePrefix}Controller())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory())
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
Expand Down
20 changes: 0 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
<fst.version>2.57</fst.version>
<geojson-jackson.version>1.14</geojson-jackson.version>
<google-maps-services.version>2.2.0</google-maps-services.version>
<graalvm.js.version>23.0.0</graalvm.js.version>
Expand Down Expand Up @@ -187,16 +186,6 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
Expand Down Expand Up @@ -242,11 +231,6 @@
<artifactId>geojson-jackson</artifactId>
<version>${geojson-jackson.version}</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>${fst.version}</version>
</dependency>
<dependency>
<groupId>io.fogsy</groupId>
<artifactId>qudt</artifactId>
Expand Down Expand Up @@ -827,10 +811,6 @@
<module>streampipes-data-explorer-management</module>
<module>streampipes-data-export</module>
<module>streampipes-dataformat</module>
<module>streampipes-dataformat-cbor</module>
<module>streampipes-dataformat-fst</module>
<module>streampipes-dataformat-json</module>
<module>streampipes-dataformat-smile</module>
<module>streampipes-extensions</module>
<module>streampipes-integration-tests</module>
<module>streampipes-mail</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.Serializable;

public interface IStreamPipesClient extends Serializable {

@Deprecated(since = "0.97.0", forRemoval = true)
void registerDataFormat(SpDataFormatFactory spDataFormatFactory);

void registerProtocol(SpProtocolDefinitionFactory<?> spProtocolDefinitionFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.streampipes.client.api.config;

import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

public interface IStreamPipesClientConfig {
ObjectMapper getSerializer();

void addDataFormat(SpDataFormatFactory spDataFormatFactory);

void addTransportProtocol(SpProtocolDefinitionFactory<?> protocolDefinitionFactory);

ClientConnectionUrlResolver getConnectionConfig();
Expand Down
12 changes: 1 addition & 11 deletions streampipes-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,7 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-dataformat-json</artifactId>
<version>0.97.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-dataformat-cbor</artifactId>
<version>0.97.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-dataformat-fst</artifactId>
<artifactId>streampipes-dataformat</artifactId>
<version>0.97.0-SNAPSHOT</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import org.apache.streampipes.client.model.StreamPipesClientConnectionConfig;
import org.apache.streampipes.client.paths.ApiPath;
import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
import org.apache.streampipes.model.mail.SpEmail;

Expand All @@ -52,9 +49,6 @@ public class StreamPipesClient implements

private StreamPipesClient(ClientConnectionUrlResolver connectionConfig) {
this.config = new StreamPipesClientConfig(connectionConfig);
this.registerDataFormat(new JsonDataFormatFactory());
this.registerDataFormat(new FstDataFormatFactory());
this.registerDataFormat(new CborDataFormatFactory());
}

private StreamPipesClient(String streamPipesHost,
Expand Down Expand Up @@ -128,11 +122,12 @@ public static StreamPipesClient create(String streamPipesHost,
/**
* Register a new data format that is used by the live API
*
* @deprecated
* @param spDataFormatFactory The data format factory
*/
@Deprecated(forRemoval = true, since = "0.97.0")
@Override
public void registerDataFormat(SpDataFormatFactory spDataFormatFactory) {
this.config.addDataFormat(spDataFormatFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.client.live;

import org.apache.streampipes.client.api.live.IConfiguredEventProducer;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.SpDataFormatManager;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.SpProtocolManager;
Expand All @@ -39,7 +38,7 @@ public IConfiguredEventProducer makeProducer() {

return new ConfiguredEventProducer(
producer,
findFormatDefinition()
SpDataFormatManager.getFormatDefinition()
);
}

Expand All @@ -51,12 +50,4 @@ private EventProducer findProducer() {
.orElseThrow()
.getProducer(protocol);
}

private SpDataFormatDefinition findFormatDefinition() {
var format = grounding.getTransportFormats().get(0);
return SpDataFormatManager
.INSTANCE
.findDefinition(format)
.orElseThrow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,31 @@ public SubscriptionManager(IBrokerConfigOverride brokerConfigOverride,
}

public ISubscription subscribe() {
var formatDefinitionOpt = SpDataFormatManager
.INSTANCE
.findDefinition(this.grounding.getTransportFormats().get(0));

try {
SpProtocolDefinition<TransportProtocol> protocolDefinition = findProtocol(getTransportProtocol());
final SpDataFormatDefinition converter = SpDataFormatManager.getFormatDefinition();

if (formatDefinitionOpt.isPresent()) {
final SpDataFormatDefinition converter = formatDefinitionOpt.get();

var protocol = getTransportProtocol();
if (overrideSettings) {
if (protocol instanceof KafkaTransportProtocol) {
brokerConfigOverride.overrideKafkaHostname((KafkaTransportProtocol) protocol);
}
brokerConfigOverride.overrideHostname(protocol);
brokerConfigOverride.overridePort(protocol);
var protocol = getTransportProtocol();
if (overrideSettings) {
if (protocol instanceof KafkaTransportProtocol) {
brokerConfigOverride.overrideKafkaHostname((KafkaTransportProtocol) protocol);
}
brokerConfigOverride.overrideHostname(protocol);
brokerConfigOverride.overridePort(protocol);
}

EventConsumer consumer = protocolDefinition.getConsumer(protocol);
consumer.connect(event -> {
try {
Event spEvent = EventFactory.fromMap(converter.toMap(event));
callback.onEvent(spEvent);
} catch (SpRuntimeException e) {
e.printStackTrace();
}
});
EventConsumer consumer = protocolDefinition.getConsumer(protocol);
consumer.connect(event -> {
try {
Event spEvent = EventFactory.fromMap(converter.toMap(event));
callback.onEvent(spEvent);
} catch (SpRuntimeException e) {
e.printStackTrace();
}
});

return new Subscription(consumer);
} else {
throw new SpRuntimeException(
"No converter found for data format - did you add a format factory (client.registerDataFormat)?");
}
return new Subscription(consumer);
} catch (NoSuchElementException e) {
throw new SpRuntimeException(
"Could not find an implementation for messaging protocol "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.streampipes.client.api.config.ClientConnectionUrlResolver;
import org.apache.streampipes.client.api.config.IStreamPipesClientConfig;
import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.dataformat.SpDataFormatManager;
import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
import org.apache.streampipes.messaging.SpProtocolManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
Expand All @@ -42,11 +40,6 @@ public ObjectMapper getSerializer() {
return serializer;
}

@Override
public void addDataFormat(SpDataFormatFactory spDataFormatFactory) {
SpDataFormatManager.INSTANCE.register(spDataFormatFactory);
}

@Override
public void addTransportProtocol(SpProtocolDefinitionFactory<?> protocolDefinitionFactory) {
SpProtocolManager.INSTANCE.register(protocolDefinitionFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.streampipes.model.grounding.TopicDefinition;
import org.apache.streampipes.model.grounding.TransportProtocol;

import java.util.Collections;
import java.util.UUID;

public class GroundingUtils {
Expand Down Expand Up @@ -82,9 +81,6 @@ public static EventGrounding createEventGrounding() {
);
}

eventGrounding.setTransportFormats(Collections
.singletonList(TransportFormatGenerator.getTransportFormat()));

return eventGrounding;
}

Expand Down

This file was deleted.

Loading

0 comments on commit 787c6fb

Please sign in to comment.