diff --git a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java index b467203177..47e910e837 100644 --- a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java +++ b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java @@ -21,10 +21,6 @@ #set( $symbol_escape = '\' ) package ${package}; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; @@ -56,11 +52,6 @@ public SpServiceDefinition provideServiceDefinition() { .registerPipelineElement(new ${classNamePrefix}DataSink()) .registerAdapter(new ${classNamePrefix}GenericAdapter()) .registerAdapter(new ${classNamePrefix}SpecificAdapter()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java index 71e371dc77..1a53b264a8 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java @@ -25,10 +25,6 @@ import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; @@ -51,11 +47,6 @@ public SpServiceDefinition provideServiceDefinition() { "", 8090) .registerPipelineElement(new ${classNamePrefix}Controller()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java index b320c7a3ca..32f521f419 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java @@ -21,10 +21,6 @@ #set( $symbol_escape = '\' ) package ${package}; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; @@ -50,11 +46,6 @@ public SpServiceDefinition provideServiceDefinition() { "", 8090) .registerPipelineElement(new ${classNamePrefix}Controller()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), diff --git a/pom.xml b/pom.xml index 7b63eb5497..6bcdad2b73 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,6 @@ 3.1.0 1.13.5 1.0 - 2.57 1.14 2.2.0 23.0.0 @@ -187,16 +186,6 @@ jackson-databind ${jackson.databind.version} - - com.fasterxml.jackson.dataformat - jackson-dataformat-cbor - ${jackson.version} - - - com.fasterxml.jackson.dataformat - jackson-dataformat-smile - ${jackson.version} - com.fasterxml.jackson.dataformat jackson-dataformat-xml @@ -242,11 +231,6 @@ geojson-jackson ${geojson-jackson.version} - - de.ruedigermoeller - fst - ${fst.version} - io.fogsy qudt @@ -827,10 +811,6 @@ streampipes-data-explorer-management streampipes-data-export streampipes-dataformat - streampipes-dataformat-cbor - streampipes-dataformat-fst - streampipes-dataformat-json - streampipes-dataformat-smile streampipes-extensions streampipes-integration-tests streampipes-mail diff --git a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java index b326b362b9..8e91c1ce45 100644 --- a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java +++ b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java @@ -28,6 +28,8 @@ import java.io.Serializable; public interface IStreamPipesClient extends Serializable { + + @Deprecated(since = "0.97.0", forRemoval = true) void registerDataFormat(SpDataFormatFactory spDataFormatFactory); void registerProtocol(SpProtocolDefinitionFactory spProtocolDefinitionFactory); diff --git a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/config/IStreamPipesClientConfig.java b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/config/IStreamPipesClientConfig.java index 4238eec3d7..5fef7a6ba4 100644 --- a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/config/IStreamPipesClientConfig.java +++ b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/config/IStreamPipesClientConfig.java @@ -18,7 +18,6 @@ package org.apache.streampipes.client.api.config; -import org.apache.streampipes.dataformat.SpDataFormatFactory; import org.apache.streampipes.messaging.SpProtocolDefinitionFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -26,8 +25,6 @@ public interface IStreamPipesClientConfig { ObjectMapper getSerializer(); - void addDataFormat(SpDataFormatFactory spDataFormatFactory); - void addTransportProtocol(SpProtocolDefinitionFactory protocolDefinitionFactory); ClientConnectionUrlResolver getConnectionConfig(); diff --git a/streampipes-client/pom.xml b/streampipes-client/pom.xml index 3707a3e700..af5127564e 100644 --- a/streampipes-client/pom.xml +++ b/streampipes-client/pom.xml @@ -36,17 +36,7 @@ org.apache.streampipes - streampipes-dataformat-json - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-cbor - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-fst + streampipes-dataformat 0.97.0-SNAPSHOT diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java index 1eee9f475d..a8dfa12b14 100644 --- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java +++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java @@ -37,9 +37,6 @@ import org.apache.streampipes.client.model.StreamPipesClientConnectionConfig; import org.apache.streampipes.client.paths.ApiPath; import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; import org.apache.streampipes.messaging.SpProtocolDefinitionFactory; import org.apache.streampipes.model.mail.SpEmail; @@ -52,9 +49,6 @@ public class StreamPipesClient implements private StreamPipesClient(ClientConnectionUrlResolver connectionConfig) { this.config = new StreamPipesClientConfig(connectionConfig); - this.registerDataFormat(new JsonDataFormatFactory()); - this.registerDataFormat(new FstDataFormatFactory()); - this.registerDataFormat(new CborDataFormatFactory()); } private StreamPipesClient(String streamPipesHost, @@ -128,11 +122,12 @@ public static StreamPipesClient create(String streamPipesHost, /** * Register a new data format that is used by the live API * + * @deprecated * @param spDataFormatFactory The data format factory */ + @Deprecated(forRemoval = true, since = "0.97.0") @Override public void registerDataFormat(SpDataFormatFactory spDataFormatFactory) { - this.config.addDataFormat(spDataFormatFactory); } @Override diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/live/ProducerManager.java b/streampipes-client/src/main/java/org/apache/streampipes/client/live/ProducerManager.java index 6cf97026d1..1452d6cfd1 100644 --- a/streampipes-client/src/main/java/org/apache/streampipes/client/live/ProducerManager.java +++ b/streampipes-client/src/main/java/org/apache/streampipes/client/live/ProducerManager.java @@ -19,7 +19,6 @@ package org.apache.streampipes.client.live; import org.apache.streampipes.client.api.live.IConfiguredEventProducer; -import org.apache.streampipes.dataformat.SpDataFormatDefinition; import org.apache.streampipes.dataformat.SpDataFormatManager; import org.apache.streampipes.messaging.EventProducer; import org.apache.streampipes.messaging.SpProtocolManager; @@ -39,7 +38,7 @@ public IConfiguredEventProducer makeProducer() { return new ConfiguredEventProducer( producer, - findFormatDefinition() + SpDataFormatManager.getFormatDefinition() ); } @@ -51,12 +50,4 @@ private EventProducer findProducer() { .orElseThrow() .getProducer(protocol); } - - private SpDataFormatDefinition findFormatDefinition() { - var format = grounding.getTransportFormats().get(0); - return SpDataFormatManager - .INSTANCE - .findDefinition(format) - .orElseThrow(); - } } diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/live/SubscriptionManager.java b/streampipes-client/src/main/java/org/apache/streampipes/client/live/SubscriptionManager.java index 961d9d3006..947e190f60 100644 --- a/streampipes-client/src/main/java/org/apache/streampipes/client/live/SubscriptionManager.java +++ b/streampipes-client/src/main/java/org/apache/streampipes/client/live/SubscriptionManager.java @@ -57,40 +57,31 @@ public SubscriptionManager(IBrokerConfigOverride brokerConfigOverride, } public ISubscription subscribe() { - var formatDefinitionOpt = SpDataFormatManager - .INSTANCE - .findDefinition(this.grounding.getTransportFormats().get(0)); try { SpProtocolDefinition protocolDefinition = findProtocol(getTransportProtocol()); + final SpDataFormatDefinition converter = SpDataFormatManager.getFormatDefinition(); - if (formatDefinitionOpt.isPresent()) { - final SpDataFormatDefinition converter = formatDefinitionOpt.get(); - - var protocol = getTransportProtocol(); - if (overrideSettings) { - if (protocol instanceof KafkaTransportProtocol) { - brokerConfigOverride.overrideKafkaHostname((KafkaTransportProtocol) protocol); - } - brokerConfigOverride.overrideHostname(protocol); - brokerConfigOverride.overridePort(protocol); + var protocol = getTransportProtocol(); + if (overrideSettings) { + if (protocol instanceof KafkaTransportProtocol) { + brokerConfigOverride.overrideKafkaHostname((KafkaTransportProtocol) protocol); } + brokerConfigOverride.overrideHostname(protocol); + brokerConfigOverride.overridePort(protocol); + } - EventConsumer consumer = protocolDefinition.getConsumer(protocol); - consumer.connect(event -> { - try { - Event spEvent = EventFactory.fromMap(converter.toMap(event)); - callback.onEvent(spEvent); - } catch (SpRuntimeException e) { - e.printStackTrace(); - } - }); + EventConsumer consumer = protocolDefinition.getConsumer(protocol); + consumer.connect(event -> { + try { + Event spEvent = EventFactory.fromMap(converter.toMap(event)); + callback.onEvent(spEvent); + } catch (SpRuntimeException e) { + e.printStackTrace(); + } + }); - return new Subscription(consumer); - } else { - throw new SpRuntimeException( - "No converter found for data format - did you add a format factory (client.registerDataFormat)?"); - } + return new Subscription(consumer); } catch (NoSuchElementException e) { throw new SpRuntimeException( "Could not find an implementation for messaging protocol " diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java index 716ed50b21..88e22d56b5 100644 --- a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java +++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java @@ -19,8 +19,6 @@ import org.apache.streampipes.client.api.config.ClientConnectionUrlResolver; import org.apache.streampipes.client.api.config.IStreamPipesClientConfig; -import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.dataformat.SpDataFormatManager; import org.apache.streampipes.messaging.SpProtocolDefinitionFactory; import org.apache.streampipes.messaging.SpProtocolManager; import org.apache.streampipes.serializers.json.JacksonSerializer; @@ -42,11 +40,6 @@ public ObjectMapper getSerializer() { return serializer; } - @Override - public void addDataFormat(SpDataFormatFactory spDataFormatFactory) { - SpDataFormatManager.INSTANCE.register(spDataFormatFactory); - } - @Override public void addTransportProtocol(SpProtocolDefinitionFactory protocolDefinitionFactory) { SpProtocolManager.INSTANCE.register(protocolDefinitionFactory); diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/GroundingUtils.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/GroundingUtils.java index 74c771358e..92870f490c 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/GroundingUtils.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/GroundingUtils.java @@ -29,7 +29,6 @@ import org.apache.streampipes.model.grounding.TopicDefinition; import org.apache.streampipes.model.grounding.TransportProtocol; -import java.util.Collections; import java.util.UUID; public class GroundingUtils { @@ -82,9 +81,6 @@ public static EventGrounding createEventGrounding() { ); } - eventGrounding.setTransportFormats(Collections - .singletonList(TransportFormatGenerator.getTransportFormat())); - return eventGrounding; } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/TransportFormatGenerator.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/TransportFormatGenerator.java deleted file mode 100644 index 7038866af5..0000000000 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/TransportFormatGenerator.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.connect.management.util; - -import org.apache.streampipes.model.configuration.SpDataFormat; -import org.apache.streampipes.model.grounding.TransportFormat; -import org.apache.streampipes.sdk.helpers.SupportedFormats; -import org.apache.streampipes.vocabulary.MessageFormat; - -import java.util.Arrays; -import java.util.List; - -public class TransportFormatGenerator { - - public static TransportFormat getTransportFormat() { - var cfg = Utils.getCoreConfigStorage().get(); - List supportedFormats = - cfg.getMessagingSettings().getPrioritizedFormats(); - - if (supportedFormats.size() > 0) { - return new TransportFormat(supportedFormats.get(0).getMessageFormat()); - } else { - return new TransportFormat(MessageFormat.JSON); - } - } - - public static List getAllFormats() { - return Arrays.asList(SupportedFormats.cborFormat(), - SupportedFormats.jsonFormat(), - SupportedFormats.fstFormat(), - SupportedFormats.smileFormat()); - } -} diff --git a/streampipes-dataformat-cbor/pom.xml b/streampipes-dataformat-cbor/pom.xml deleted file mode 100644 index 9193047547..0000000000 --- a/streampipes-dataformat-cbor/pom.xml +++ /dev/null @@ -1,59 +0,0 @@ - - - - - - streampipes-parent - org.apache.streampipes - 0.97.0-SNAPSHOT - - 4.0.0 - - streampipes-dataformat-cbor - - - - - org.apache.streampipes - streampipes-dataformat - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-vocabulary - 0.97.0-SNAPSHOT - - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-cbor - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - - - diff --git a/streampipes-dataformat-cbor/src/main/java/org/apache/streampipes/dataformat/cbor/CborDataFormatDefinition.java b/streampipes-dataformat-cbor/src/main/java/org/apache/streampipes/dataformat/cbor/CborDataFormatDefinition.java deleted file mode 100644 index 95464b59e8..0000000000 --- a/streampipes-dataformat-cbor/src/main/java/org/apache/streampipes/dataformat/cbor/CborDataFormatDefinition.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.dataformat.cbor; - -import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.SpDataFormatDefinition; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.cbor.CBORFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class CborDataFormatDefinition implements SpDataFormatDefinition { - - private ObjectMapper objectMapper; - - public CborDataFormatDefinition() { - this.objectMapper = new ObjectMapper(new CBORFactory()); - } - - @Override - public Map toMap(byte[] event) throws SpRuntimeException { - try { - return objectMapper.readValue(event, HashMap.class); - } catch (IOException e) { - throw new SpRuntimeException("Could not convert event to map data structure"); - } - } - - @Override - public byte[] fromMap(Map event) throws SpRuntimeException { - try { - return objectMapper.writeValueAsBytes(event); - } catch (JsonProcessingException e) { - throw new SpRuntimeException("Could not convert map data structure to JSON string"); - } - } -} diff --git a/streampipes-dataformat-cbor/src/main/java/org/apache/streampipes/dataformat/cbor/CborDataFormatFactory.java b/streampipes-dataformat-cbor/src/main/java/org/apache/streampipes/dataformat/cbor/CborDataFormatFactory.java deleted file mode 100644 index 5814a4fb5c..0000000000 --- a/streampipes-dataformat-cbor/src/main/java/org/apache/streampipes/dataformat/cbor/CborDataFormatFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.dataformat.cbor; - -import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class CborDataFormatFactory extends SpDataFormatFactory { - - @Override - public String getTransportFormatRdfUri() { - return MessageFormat.CBOR; - } - - @Override - public SpDataFormatDefinition createInstance() { - return new CborDataFormatDefinition(); - } -} diff --git a/streampipes-dataformat-fst/pom.xml b/streampipes-dataformat-fst/pom.xml deleted file mode 100644 index 951a89e185..0000000000 --- a/streampipes-dataformat-fst/pom.xml +++ /dev/null @@ -1,59 +0,0 @@ - - - - - - streampipes-parent - org.apache.streampipes - 0.97.0-SNAPSHOT - - 4.0.0 - - streampipes-dataformat-fst - - - - - org.apache.streampipes - streampipes-dataformat - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-vocabulary - 0.97.0-SNAPSHOT - - - - - de.ruedigermoeller - fst - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - - - diff --git a/streampipes-dataformat-fst/src/main/java/org/apache/streampipes/dataformat/fst/FstDataFormatDefinition.java b/streampipes-dataformat-fst/src/main/java/org/apache/streampipes/dataformat/fst/FstDataFormatDefinition.java deleted file mode 100644 index 39095143fc..0000000000 --- a/streampipes-dataformat-fst/src/main/java/org/apache/streampipes/dataformat/fst/FstDataFormatDefinition.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.dataformat.fst; - -import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.SpDataFormatDefinition; - -import org.nustaq.serialization.FSTConfiguration; - -import java.util.Map; - -public class FstDataFormatDefinition implements SpDataFormatDefinition { - - private static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration(); - - public FstDataFormatDefinition() { - - } - - @Override - public Map toMap(byte[] event) throws SpRuntimeException { - return (Map) conf.asObject(event); - } - - @Override - public byte[] fromMap(Map event) throws SpRuntimeException { - return conf.asByteArray(event); - } -} diff --git a/streampipes-dataformat-fst/src/main/java/org/apache/streampipes/dataformat/fst/FstDataFormatFactory.java b/streampipes-dataformat-fst/src/main/java/org/apache/streampipes/dataformat/fst/FstDataFormatFactory.java deleted file mode 100644 index 285ea9cc25..0000000000 --- a/streampipes-dataformat-fst/src/main/java/org/apache/streampipes/dataformat/fst/FstDataFormatFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.dataformat.fst; - -import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class FstDataFormatFactory extends SpDataFormatFactory { - - @Override - public String getTransportFormatRdfUri() { - return MessageFormat.FST; - } - - @Override - public SpDataFormatDefinition createInstance() { - return new FstDataFormatDefinition(); - } -} diff --git a/streampipes-dataformat-json/pom.xml b/streampipes-dataformat-json/pom.xml deleted file mode 100644 index 493a3197d9..0000000000 --- a/streampipes-dataformat-json/pom.xml +++ /dev/null @@ -1,63 +0,0 @@ - - - - - - streampipes-parent - org.apache.streampipes - 0.97.0-SNAPSHOT - - 4.0.0 - - streampipes-dataformat-json - - - - - org.apache.streampipes - streampipes-dataformat - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-vocabulary - 0.97.0-SNAPSHOT - - - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-core - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - - - diff --git a/streampipes-dataformat-json/src/main/java/org/apache/streampipes/dataformat/json/JsonDataFormatFactory.java b/streampipes-dataformat-json/src/main/java/org/apache/streampipes/dataformat/json/JsonDataFormatFactory.java deleted file mode 100644 index 87821cd37c..0000000000 --- a/streampipes-dataformat-json/src/main/java/org/apache/streampipes/dataformat/json/JsonDataFormatFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.dataformat.json; - -import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class JsonDataFormatFactory extends SpDataFormatFactory { - - @Override - public String getTransportFormatRdfUri() { - return MessageFormat.JSON; - } - - @Override - public SpDataFormatDefinition createInstance() { - return new JsonDataFormatDefinition(); - } -} diff --git a/streampipes-dataformat-smile/pom.xml b/streampipes-dataformat-smile/pom.xml deleted file mode 100644 index e8825fd645..0000000000 --- a/streampipes-dataformat-smile/pom.xml +++ /dev/null @@ -1,59 +0,0 @@ - - - - - - streampipes-parent - org.apache.streampipes - 0.97.0-SNAPSHOT - - 4.0.0 - - streampipes-dataformat-smile - - - - - org.apache.streampipes - streampipes-dataformat - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-vocabulary - 0.97.0-SNAPSHOT - - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-smile - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - - - diff --git a/streampipes-dataformat-smile/src/main/java/org/apache/streampipes/dataformat/smile/SmileDataFormatDefinition.java b/streampipes-dataformat-smile/src/main/java/org/apache/streampipes/dataformat/smile/SmileDataFormatDefinition.java deleted file mode 100644 index 6c6bbdf26f..0000000000 --- a/streampipes-dataformat-smile/src/main/java/org/apache/streampipes/dataformat/smile/SmileDataFormatDefinition.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.dataformat.smile; - -import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.SpDataFormatDefinition; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class SmileDataFormatDefinition implements SpDataFormatDefinition { - - private ObjectMapper objectMapper; - - public SmileDataFormatDefinition() { - this.objectMapper = new ObjectMapper(new SmileFactory()); - } - - @Override - public Map toMap(byte[] event) throws SpRuntimeException { - try { - return objectMapper.readValue(event, HashMap.class); - } catch (IOException e) { - throw new SpRuntimeException("Could not convert event to map data structure"); - } - } - - @Override - public byte[] fromMap(Map event) throws SpRuntimeException { - try { - return objectMapper.writeValueAsBytes(event); - } catch (JsonProcessingException e) { - throw new SpRuntimeException("Could not convert map data structure to JSON string"); - } - } -} diff --git a/streampipes-dataformat-smile/src/main/java/org/apache/streampipes/dataformat/smile/SmileDataFormatFactory.java b/streampipes-dataformat-smile/src/main/java/org/apache/streampipes/dataformat/smile/SmileDataFormatFactory.java deleted file mode 100644 index d04b15197f..0000000000 --- a/streampipes-dataformat-smile/src/main/java/org/apache/streampipes/dataformat/smile/SmileDataFormatFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.dataformat.smile; - -import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class SmileDataFormatFactory extends SpDataFormatFactory { - @Override - public String getTransportFormatRdfUri() { - return MessageFormat.SMILE; - } - - @Override - public SpDataFormatDefinition createInstance() { - return new SmileDataFormatDefinition(); - } -} diff --git a/streampipes-dataformat/pom.xml b/streampipes-dataformat/pom.xml index f35330a3d6..daefc05026 100644 --- a/streampipes-dataformat/pom.xml +++ b/streampipes-dataformat/pom.xml @@ -34,6 +34,14 @@ streampipes-model 0.97.0-SNAPSHOT + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + diff --git a/streampipes-dataformat-json/src/main/java/org/apache/streampipes/dataformat/json/JsonDataFormatDefinition.java b/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/JsonDataFormatDefinition.java similarity index 92% rename from streampipes-dataformat-json/src/main/java/org/apache/streampipes/dataformat/json/JsonDataFormatDefinition.java rename to streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/JsonDataFormatDefinition.java index f3bbacaf06..7433202edb 100644 --- a/streampipes-dataformat-json/src/main/java/org/apache/streampipes/dataformat/json/JsonDataFormatDefinition.java +++ b/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/JsonDataFormatDefinition.java @@ -16,10 +16,9 @@ * */ -package org.apache.streampipes.dataformat.json; +package org.apache.streampipes.dataformat; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.SpDataFormatDefinition; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -30,7 +29,7 @@ public class JsonDataFormatDefinition implements SpDataFormatDefinition { - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; public JsonDataFormatDefinition() { this.objectMapper = new ObjectMapper(); diff --git a/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatFactory.java b/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatFactory.java index b805a23d0d..ca7d9afc66 100644 --- a/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatFactory.java +++ b/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatFactory.java @@ -18,15 +18,6 @@ package org.apache.streampipes.dataformat; -import org.apache.streampipes.model.grounding.TransportFormat; - public abstract class SpDataFormatFactory { - public TransportFormat getTransportFormat() { - return new TransportFormat(getTransportFormatRdfUri()); - } - - public abstract String getTransportFormatRdfUri(); - - public abstract SpDataFormatDefinition createInstance(); } diff --git a/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatManager.java b/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatManager.java index 342a9b40d8..6b8ff4d7ef 100644 --- a/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatManager.java +++ b/streampipes-dataformat/src/main/java/org/apache/streampipes/dataformat/SpDataFormatManager.java @@ -18,43 +18,9 @@ package org.apache.streampipes.dataformat; -import org.apache.streampipes.model.grounding.TransportFormat; +public class SpDataFormatManager { -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public enum SpDataFormatManager { - - INSTANCE; - - private List availableDataFormats; - - SpDataFormatManager() { - this.availableDataFormats = new ArrayList<>(); - } - - public void register(SpDataFormatFactory dataFormatDefinition) { - availableDataFormats.add(dataFormatDefinition); + public static SpDataFormatDefinition getFormatDefinition() { + return new JsonDataFormatDefinition(); } - - public List getAvailableDataFormats() { - return availableDataFormats; - } - - public Optional findDefinition(TransportFormat transportFormat) { - // TODO why is transportFormat.getRdfType a list? - return this.availableDataFormats - .stream() - .filter - (adf -> transportFormat - .getRdfType() - .stream() - .anyMatch(tf -> tf.toString().equals(adf - .getTransportFormatRdfUri()))) - .map(SpDataFormatFactory::createInstance) - .findFirst(); - - } - } diff --git a/streampipes-extensions-management/pom.xml b/streampipes-extensions-management/pom.xml index d4ca517c8c..3c0655c595 100644 --- a/streampipes-extensions-management/pom.xml +++ b/streampipes-extensions-management/pom.xml @@ -48,11 +48,6 @@ streampipes-dataformat 0.97.0-SNAPSHOT - - org.apache.streampipes - streampipes-dataformat-smile - 0.97.0-SNAPSHOT - org.apache.streampipes streampipes-messaging diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java index 66983df357..d63cddc8f1 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java @@ -20,15 +20,14 @@ import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.dataformat.SpDataFormatDefinition; +import org.apache.streampipes.dataformat.SpDataFormatManager; import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement; import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; -import org.apache.streampipes.extensions.management.connect.adapter.util.TransportFormatSelector; import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.messaging.EventProducer; import org.apache.streampipes.messaging.SpProtocolManager; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.grounding.KafkaTransportProtocol; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import java.util.Map; @@ -54,13 +53,7 @@ public SendToBrokerAdapterSink(AdapterDescription adapterDescription) { if (producerOpt.isPresent()) { this.producer = producerOpt.get().getProducer(this.protocol); - TransportFormat transportFormat = adapterDescription - .getEventGrounding() - .getTransportFormats() - .get(0); - - this.dataFormatDefinition = - new TransportFormatSelector(transportFormat).getDataFormatDefinition(); + this.dataFormatDefinition = SpDataFormatManager.getFormatDefinition(); producer.connect(); } else { diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/util/TransportFormatSelector.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/util/TransportFormatSelector.java deleted file mode 100644 index 40d985c6a6..0000000000 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/util/TransportFormatSelector.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.extensions.management.connect.adapter.util; - -import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.cbor.CborDataFormatDefinition; -import org.apache.streampipes.dataformat.fst.FstDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; -import org.apache.streampipes.dataformat.smile.SmileDataFormatDefinition; -import org.apache.streampipes.model.grounding.TransportFormat; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class TransportFormatSelector { - - private TransportFormat transportFormat; - - public TransportFormatSelector(TransportFormat format) { - this.transportFormat = format; - } - - public SpDataFormatDefinition getDataFormatDefinition() { - if (isJsonFormat(transportFormat)) { - return new JsonDataFormatDefinition(); - } else if (isCborFormat(transportFormat)) { - return new CborDataFormatDefinition(); - } else if (isFstFormat(transportFormat)) { - return new FstDataFormatDefinition(); - } else if (isSmileFormat(transportFormat)) { - return new SmileDataFormatDefinition(); - } else { - throw new IllegalArgumentException("Wrong transport format: " + makeError(transportFormat)); - } - } - - private boolean isSmileFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.SMILE, transportFormat); - } - - private boolean isFstFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.FST, transportFormat); - } - - private boolean isCborFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.CBOR, transportFormat); - } - - private Boolean isJsonFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.JSON, transportFormat); - } - - private boolean isFormat(String format, TransportFormat transportFormat) { - return transportFormat.getRdfType().stream().anyMatch(tf -> tf.toString().equals(format)); - } - - private String makeError(TransportFormat transportFormat) { - StringBuilder builder = new StringBuilder(); - transportFormat.getRdfType().forEach(type -> builder.append(type.toString()).append(", ")); - return builder.toString(); - } -} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/DeclarersSingleton.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/DeclarersSingleton.java index fed5a0baa4..311e15ced7 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/DeclarersSingleton.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/DeclarersSingleton.java @@ -18,8 +18,6 @@ package org.apache.streampipes.extensions.management.init; -import org.apache.streampipes.dataformat.SpDataFormatFactory; -import org.apache.streampipes.dataformat.SpDataFormatManager; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; @@ -30,15 +28,10 @@ import org.apache.streampipes.extensions.management.model.SpServiceDefinition; import org.apache.streampipes.messaging.SpProtocolDefinitionFactory; import org.apache.streampipes.messaging.SpProtocolManager; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.model.util.Cloner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -48,7 +41,6 @@ public class DeclarersSingleton implements IDeclarersSingleton { - private static final Logger LOG = LoggerFactory.getLogger(DeclarersSingleton.class); private static final String Http = "http://"; private static final String Colon = ":"; private static final String Slash = "/"; @@ -61,8 +53,6 @@ public class DeclarersSingleton implements IDeclarersSingleton { private final Map functions; private final Map supportedProtocols; - private final Map supportedFormats; - private final Map adapters; private List runtimeProviders; @@ -80,7 +70,6 @@ private DeclarersSingleton() { this.dataSinks = new HashMap<>(); this.dataStreams = new HashMap<>(); this.supportedProtocols = new HashMap<>(); - this.supportedFormats = new HashMap<>(); this.adapters = new HashMap<>(); this.functions = new HashMap<>(); this.runtimeProviders = new ArrayList<>(); @@ -103,7 +92,6 @@ public void populate(String host, Integer port, SpServiceDefinition serviceDef) this.serviceId = serviceDef.getServiceId(); this.serviceGroup = serviceDef.getServiceGroup(); this.registerProtocols(serviceDef.getProtocolDefinitionFactories()); - this.registerDataFormats(serviceDef.getDataFormatFactories()); this.runtimeProviders = serviceDef.getRuntimeProviders(); serviceDef.getAdapters().forEach(a -> this.adapters.put(a.declareConfig().getAdapterDescription().getAppId(), a)); serviceDef.getFunctions().forEach(f -> this.functions.put(f.getFunctionConfig().getFunctionId().getId(), f)); @@ -134,7 +122,6 @@ public Map> getDeclarers() { result.putAll(dataProcessors); result.putAll(dataStreams); result.putAll(dataSinks); - //result.putAll(pipelineTemplateDeclarers); return result; } @@ -148,20 +135,6 @@ public void registerProtocols(List> protocols) { protocols.forEach(this::registerProtocol); } - public void registerDataFormat(SpDataFormatFactory dataFormatDefinition) { - SpDataFormatManager.INSTANCE.register(dataFormatDefinition); - this.supportedFormats.put(dataFormatDefinition.getTransportFormatRdfUri(), - dataFormatDefinition.getTransportFormat()); - } - - public void registerDataFormats(SpDataFormatFactory... dataFormatDefinitions) { - registerDataFormats(Arrays.asList(dataFormatDefinitions)); - } - - public void registerDataFormats(List dataFormatDefinitions) { - dataFormatDefinitions.forEach(this::registerDataFormat); - } - private void addDataProcessor(IStreamPipesDataProcessor dataProcessor) { dataProcessors.put(dataProcessor.declareConfig().getDescription().getAppId(), dataProcessor); } @@ -195,13 +168,6 @@ public Collection getSupportedProtocols() { .collect(Collectors.toList()); } - public Collection getSupportedFormats() { - return this.supportedFormats.values() - .stream() - .map(TransportFormat::new) - .collect(Collectors.toList()); - } - public int getPort() { return this.port; } @@ -260,15 +226,6 @@ private void checkAndStartExecutableStreams(IStreamPipesDataStream declarer) { } } - public SpServiceDefinition toServiceDefinition(String serviceId) { - SpServiceDefinition serviceDef = new SpServiceDefinition(); - serviceDef.setServiceId(serviceId); - serviceDef.setDefaultPort(this.getPort()); - - // TODO create complete service definition - return serviceDef; - } - public String getServiceGroup() { return serviceGroup; } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java index 3c7f3af96e..42bd2dcc18 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java @@ -17,7 +17,6 @@ */ package org.apache.streampipes.extensions.management.model; -import org.apache.streampipes.dataformat.SpDataFormatFactory; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer; import org.apache.streampipes.extensions.api.migration.IModelMigrator; @@ -41,7 +40,6 @@ public class SpServiceDefinition { private Integer defaultPort; private List> pipelineElements; - private List dataFormatFactories; private List> protocolDefinitionFactories; private List functions; @@ -55,7 +53,6 @@ public class SpServiceDefinition { public SpServiceDefinition() { this.serviceId = UUID.randomUUID().toString(); this.pipelineElements = new ArrayList<>(); - this.dataFormatFactories = new ArrayList<>(); this.protocolDefinitionFactories = new ArrayList<>(); this.kvConfigs = new ArrayList<>(); this.functions = new ArrayList<>(); @@ -120,14 +117,6 @@ public void setDeclarers(List> pipelineElements) this.pipelineElements = pipelineElements; } - public void addDataFormatFactory(SpDataFormatFactory factory) { - this.dataFormatFactories.add(factory); - } - - public void addDataFormatFactories(List factories) { - this.dataFormatFactories.addAll(factories); - } - public void addConfig(ConfigItem configItem) { this.kvConfigs.add(configItem); } @@ -140,14 +129,6 @@ public void addAdapters(List adapters) { this.adapters.addAll(adapters); } - public List getDataFormatFactories() { - return dataFormatFactories; - } - - public void setDataFormatFactories(List dataFormatFactories) { - this.dataFormatFactories = dataFormatFactories; - } - public void addProtocolDefinitionFactory(SpProtocolDefinitionFactory factory) { this.protocolDefinitionFactories.add(factory); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java index 1cb2bb004d..ac02d4533b 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java @@ -117,13 +117,19 @@ public SpServiceDefinitionBuilder registerAdapter(StreamPipesAdapter adapter) { } + /** + * @deprecated data format registration is no longer required + */ + @Deprecated(since = "0.97.0", forRemoval = true) public SpServiceDefinitionBuilder registerMessagingFormat(SpDataFormatFactory dataFormatDefinition) { - this.serviceDefinition.addDataFormatFactory(dataFormatDefinition); return this; } + /** + * @deprecated data format registration is no longer required + */ + @Deprecated(since = "0.97.0", forRemoval = true) public SpServiceDefinitionBuilder registerMessagingFormats(SpDataFormatFactory... dataFormatDefinitions) { - this.serviceDefinition.addDataFormatFactories(Arrays.asList(dataFormatDefinitions)); return this; } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java index c84a780488..238bbb8048 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java @@ -19,7 +19,7 @@ package org.apache.streampipes.extensions.connectors.kafka.sink; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java index a780653c27..21b5e786df 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java @@ -18,7 +18,7 @@ package org.apache.streampipes.extensions.connectors.mqtt.sink.common; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters; import org.apache.streampipes.model.runtime.Event; diff --git a/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/sink/NatsSink.java b/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/sink/NatsSink.java index ceed64e61a..2a2c097063 100644 --- a/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/sink/NatsSink.java +++ b/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/sink/NatsSink.java @@ -19,7 +19,7 @@ package org.apache.streampipes.extensions.connectors.nats.sink; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; diff --git a/streampipes-extensions/streampipes-connectors-pulsar/src/main/java/org/apache/streampipes/extensions/connectors/pulsar/sink/PulsarPublisherSink.java b/streampipes-extensions/streampipes-connectors-pulsar/src/main/java/org/apache/streampipes/extensions/connectors/pulsar/sink/PulsarPublisherSink.java index 5be4894b8a..af5d35793a 100644 --- a/streampipes-extensions/streampipes-connectors-pulsar/src/main/java/org/apache/streampipes/extensions/connectors/pulsar/sink/PulsarPublisherSink.java +++ b/streampipes-extensions/streampipes-connectors-pulsar/src/main/java/org/apache/streampipes/extensions/connectors/pulsar/sink/PulsarPublisherSink.java @@ -19,7 +19,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; diff --git a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java index 58aabf4e76..4e1df94272 100644 --- a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java +++ b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java @@ -20,7 +20,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; diff --git a/streampipes-extensions/streampipes-connectors-ros/src/main/java/org/apache/streampipes/connectors/ros/sink/RosBridgeSink.java b/streampipes-extensions/streampipes-connectors-ros/src/main/java/org/apache/streampipes/connectors/ros/sink/RosBridgeSink.java index cac8ef5f6e..4e5e9c6393 100644 --- a/streampipes-extensions/streampipes-connectors-ros/src/main/java/org/apache/streampipes/connectors/ros/sink/RosBridgeSink.java +++ b/streampipes-extensions/streampipes-connectors-ros/src/main/java/org/apache/streampipes/connectors/ros/sink/RosBridgeSink.java @@ -19,7 +19,7 @@ package org.apache.streampipes.connectors.ros.sink; import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; diff --git a/streampipes-extensions/streampipes-connectors-tubemq/src/main/java/org/apache/streampipes/extensions/connectors/tubemq/sink/TubeMQPublisherSink.java b/streampipes-extensions/streampipes-connectors-tubemq/src/main/java/org/apache/streampipes/extensions/connectors/tubemq/sink/TubeMQPublisherSink.java index b9a2dbf72d..1037c26ec2 100644 --- a/streampipes-extensions/streampipes-connectors-tubemq/src/main/java/org/apache/streampipes/extensions/connectors/tubemq/sink/TubeMQPublisherSink.java +++ b/streampipes-extensions/streampipes-connectors-tubemq/src/main/java/org/apache/streampipes/extensions/connectors/tubemq/sink/TubeMQPublisherSink.java @@ -20,7 +20,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; diff --git a/streampipes-extensions/streampipes-extensions-all-iiot/src/main/java/org/apache/streampipes/extensions/all/iiot/AllExtensionsIIoTInit.java b/streampipes-extensions/streampipes-extensions-all-iiot/src/main/java/org/apache/streampipes/extensions/all/iiot/AllExtensionsIIoTInit.java index 66187857a0..f0b49c1831 100644 --- a/streampipes-extensions/streampipes-extensions-all-iiot/src/main/java/org/apache/streampipes/extensions/all/iiot/AllExtensionsIIoTInit.java +++ b/streampipes-extensions/streampipes-extensions-all-iiot/src/main/java/org/apache/streampipes/extensions/all/iiot/AllExtensionsIIoTInit.java @@ -20,10 +20,6 @@ import org.apache.streampipes.connect.iiot.IIoTAdaptersExtensionModuleExport; import org.apache.streampipes.connectors.ros.RosConnectorsModuleExport; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.connectors.influx.InfluxConnectorsModuleExport; import org.apache.streampipes.extensions.connectors.kafka.KafkaConnectorsModuleExport; import org.apache.streampipes.extensions.connectors.mqtt.MqttConnectorsModuleExport; @@ -89,11 +85,6 @@ public SpServiceDefinition provideServiceDefinition() { new NotificationsExtensionModuleExport() ) .registerRuntimeProvider(new StandaloneStreamPipesRuntimeProvider()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java index 19c7e551c1..8510863038 100644 --- a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java +++ b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java @@ -20,10 +20,6 @@ import org.apache.streampipes.connect.GeneralAdaptersExtensionModuleExport; import org.apache.streampipes.connect.iiot.IIoTAdaptersExtensionModuleExport; import org.apache.streampipes.connectors.ros.RosConnectorsModuleExport; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.connectors.influx.InfluxConnectorsModuleExport; import org.apache.streampipes.extensions.connectors.kafka.KafkaConnectorsModuleExport; import org.apache.streampipes.extensions.connectors.mqtt.MqttConnectorsModuleExport; @@ -97,11 +93,6 @@ public SpServiceDefinition provideServiceDefinition() { new NotificationsExtensionModuleExport() ) .registerRuntimeProvider(new StandaloneStreamPipesRuntimeProvider()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), diff --git a/streampipes-extensions/streampipes-extensions-iiot-minimal/src/main/java/org/apache/streampipes/extensions/iiot/minimal/ExtensionsIIoTMinimalInit.java b/streampipes-extensions/streampipes-extensions-iiot-minimal/src/main/java/org/apache/streampipes/extensions/iiot/minimal/ExtensionsIIoTMinimalInit.java index 80db365175..b0e038d549 100644 --- a/streampipes-extensions/streampipes-extensions-iiot-minimal/src/main/java/org/apache/streampipes/extensions/iiot/minimal/ExtensionsIIoTMinimalInit.java +++ b/streampipes-extensions/streampipes-extensions-iiot-minimal/src/main/java/org/apache/streampipes/extensions/iiot/minimal/ExtensionsIIoTMinimalInit.java @@ -19,10 +19,6 @@ package org.apache.streampipes.extensions.iiot.minimal; import org.apache.streampipes.connect.iiot.IIoTAdaptersExtensionModuleExport; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.connectors.influx.InfluxConnectorsModuleExport; import org.apache.streampipes.extensions.connectors.mqtt.MqttConnectorsModuleExport; import org.apache.streampipes.extensions.connectors.nats.NatsConnectorsModuleExport; @@ -74,11 +70,6 @@ public SpServiceDefinition provideServiceDefinition() { new InternalSinksExtensionModuleExports(), new NotificationsExtensionModuleExport()) .registerRuntimeProvider(new StandaloneStreamPipesRuntimeProvider()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpNatsProtocolFactory(), new SpMqttProtocolFactory() diff --git a/streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java b/streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java index 033e658399..3344fa55b1 100644 --- a/streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java +++ b/streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java @@ -17,10 +17,6 @@ */ package org.apache.streampipes.pe.flink; -import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; -import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; @@ -83,11 +79,6 @@ public SpServiceDefinition provideServiceDefinition() { new BoilerplateController(), new ElasticSearchController()) .registerRuntimeProvider(new FlinkRuntimeProvider()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java index aea49f3ab7..95b794e5aa 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java @@ -19,8 +19,8 @@ package org.apache.streampipes.sinks.brokers.jvm.bufferrest; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.dataformat.SpDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.graph.DataSinkDescription; diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java index 60f88237bd..3d2fa15cbb 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java @@ -20,7 +20,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.messaging.jms.ActiveMQPublisher; import org.apache.streampipes.model.DataSinkType; diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java index fe1b264ee8..2e7de12842 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java @@ -19,7 +19,7 @@ package org.apache.streampipes.sinks.brokers.jvm.rabbitmq; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.extensions.ExtensionAssetType; diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java index 666bfb8be7..e09f4f7f81 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java @@ -20,7 +20,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.extensions.ExtensionAssetType; diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java index babd6f8a3f..65b817e95e 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java @@ -18,7 +18,7 @@ package org.apache.streampipes.sinks.brokers.jvm.websocket; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.model.runtime.Event; import org.java_websocket.WebSocket; diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTesterBase.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTesterBase.java index 857191ca41..873a3092b5 100644 --- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTesterBase.java +++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTesterBase.java @@ -25,9 +25,7 @@ import org.apache.streampipes.integration.utils.Utils; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; -import org.apache.streampipes.vocabulary.MessageFormat; import org.testcontainers.shaded.com.google.common.collect.Maps; @@ -89,7 +87,6 @@ private SpDataStream makeDataStream() { private EventGrounding makeEventGrounding() { var grounding = new EventGrounding(); grounding.setTransportProtocol(makeProtocol()); - grounding.setTransportFormats(List.of(new TransportFormat(MessageFormat.JSON))); return grounding; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java index 457ec36b14..6ba4dd0865 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java @@ -50,7 +50,6 @@ public MessagingSettings make() { 5000012, 20, 2, - Arrays.asList(SpDataFormat.JSON, SpDataFormat.CBOR, SpDataFormat.FST, SpDataFormat.SMILE), protocolList); defaultSettings.setJmsHost("activemq"); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/MessagingSettings.java b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/MessagingSettings.java index edad5c1a50..874c200750 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/MessagingSettings.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/MessagingSettings.java @@ -30,7 +30,6 @@ public class MessagingSettings { private Integer lingerMs; private Integer acks; - private List prioritizedFormats; private List prioritizedProtocols; private String jmsHost; @@ -56,13 +55,11 @@ public MessagingSettings(Integer batchSize, Integer messageMaxBytes, Integer lingerMs, Integer acks, - List prioritizedFormats, List prioritizedProtocols) { this.batchSize = batchSize; this.messageMaxBytes = messageMaxBytes; this.lingerMs = lingerMs; this.acks = acks; - this.prioritizedFormats = prioritizedFormats; this.prioritizedProtocols = prioritizedProtocols; this.supportedProtocols = new ArrayList<>(); } @@ -103,14 +100,6 @@ public void setAcks(Integer acks) { this.acks = acks; } - public List getPrioritizedFormats() { - return prioritizedFormats; - } - - public void setPrioritizedFormats(List prioritizedFormats) { - this.prioritizedFormats = prioritizedFormats; - } - public List getPrioritizedProtocols() { return prioritizedProtocols; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpDataFormat.java b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpDataFormat.java deleted file mode 100644 index a2c5b6bf38..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpDataFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.model.configuration; - - -import org.apache.streampipes.vocabulary.MessageFormat; - -public enum SpDataFormat { - - CBOR("Cbor", MessageFormat.CBOR), - JSON("JSON", MessageFormat.JSON), - FST("Fast-Serializer", MessageFormat.FST), - SMILE("Smile", MessageFormat.SMILE); - - private String name; - private String messageFormat; - - SpDataFormat(String name, String messageFormat) { - this.name = name; - this.messageFormat = messageFormat; - } - - public String getName() { - return name; - } - - public String getMessageFormat() { - return messageFormat; - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/EventGrounding.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/EventGrounding.java index ccadf9652c..f22a632b11 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/EventGrounding.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/EventGrounding.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -33,24 +32,18 @@ public class EventGrounding { private List transportProtocols; - private List transportFormats; - public EventGrounding() { super(); - this.transportFormats = new ArrayList<>(); this.transportProtocols = new ArrayList<>(); } - public EventGrounding(TransportProtocol transportProtocol, TransportFormat transportFormat) { + public EventGrounding(TransportProtocol transportProtocol) { this(); - this.transportFormats = new ArrayList<>(); - this.transportFormats.add(transportFormat); - this.transportProtocols = Arrays.asList(transportProtocol); + this.transportProtocols = Collections.singletonList(transportProtocol); } public EventGrounding(EventGrounding other) { this.transportProtocols = new Cloner().protocols(other.getTransportProtocols()); - this.transportFormats = new Cloner().transportFormats(other.getTransportFormats()); } public List getTransportProtocols() { @@ -63,7 +56,7 @@ public void setTransportProtocols(List transportProtocols) { @JsonIgnore public TransportProtocol getTransportProtocol() { - if (transportProtocols.size() == 0) { + if (transportProtocols.isEmpty()) { return null; } else { return transportProtocols.get(0); @@ -73,13 +66,4 @@ public TransportProtocol getTransportProtocol() { public void setTransportProtocol(TransportProtocol transportProtocol) { this.transportProtocols = Collections.singletonList(transportProtocol); } - - public List getTransportFormats() { - return transportFormats; - } - - public void setTransportFormats(List transportFormats) { - this.transportFormats = transportFormats; - } - } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java index ebdf536ce2..853415270c 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java @@ -30,7 +30,6 @@ import org.apache.streampipes.model.grounding.PulsarTransportProtocol; import org.apache.streampipes.model.grounding.SimpleTopicDefinition; import org.apache.streampipes.model.grounding.TopicDefinition; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.model.grounding.WildcardTopicDefinition; import org.apache.streampipes.model.grounding.WildcardTopicMapping; @@ -222,11 +221,6 @@ public List staticProperties( } } - public List transportFormats( - List transportFormats) { - return transportFormats.stream().map(t -> new TransportFormat(t)).collect(Collectors.toList()); - } - public List properties(List eventProperties) { return eventProperties.stream().map(o -> new Cloner().property(o)).collect(Collectors.toList()); } diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml index ef233e90f7..24d59b59e3 100644 --- a/streampipes-pipeline-management/pom.xml +++ b/streampipes-pipeline-management/pom.xml @@ -34,22 +34,7 @@ org.apache.streampipes - streampipes-dataformat-cbor - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-fst - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-json - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-smile + streampipes-dataformat 0.97.0-SNAPSHOT diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java deleted file mode 100644 index 7b38301179..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.matching; - -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.configuration.SpDataFormat; -import org.apache.streampipes.model.grounding.TransportFormat; -import org.apache.streampipes.storage.management.StorageDispatcher; -import org.apache.streampipes.vocabulary.MessageFormat; - -import java.net.URI; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -public class FormatSelector extends GroundingSelector { - - public FormatSelector(NamedStreamPipesEntity source, Set targets) { - super(source, targets); - } - - public TransportFormat getTransportFormat() { - - if (source instanceof SpDataStream) { - return ((SpDataStream) source) - .getEventGrounding() - .getTransportFormats() - .get(0); - } else { - List prioritizedFormats = - StorageDispatcher - .INSTANCE - .getNoSqlStore() - .getSpCoreConfigurationStorage() - .get() - .getMessagingSettings() - .getPrioritizedFormats(); - - List supportedFormats = prioritizedFormats - .stream() - .filter(pf -> supportsFormat(pf.getMessageFormat())).toList(); - - if (supportedFormats.size() > 0) { - return new TransportFormat(supportedFormats.get(0).getMessageFormat()); - } else { - return new TransportFormat(MessageFormat.JSON); - } - } - } - - public boolean supportsFormat(String format) { - List elements = buildInvocables(); - return elements - .stream() - .allMatch(e -> e - .getSupportedGrounding() - .getTransportFormats() - .stream() - .anyMatch(s -> rdfTypesAsString(s.getRdfType()).contains(format))); - } - - private List rdfTypesAsString(List uri) { - return uri.stream().map(URI::toString).collect(Collectors.toList()); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java index ee683afd93..4e711c11af 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingBuilder.java @@ -21,7 +21,6 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import java.util.Collections; @@ -39,15 +38,10 @@ public GroundingBuilder(NamedStreamPipesEntity source, Set { - - public FormatMatch() { - super(MatchingResultType.FORMAT_MATCH); - } - - @Override - public boolean match(TransportFormat offer, TransportFormat requirement, List errorLog) { - return MatchingUtils.nullCheck(offer, requirement) - || requirement.getRdfType().containsAll(offer.getRdfType()); - - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/GroundingMatch.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/GroundingMatch.java index 94bd5d3255..27d403a5e2 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/GroundingMatch.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/GroundingMatch.java @@ -22,7 +22,6 @@ import org.apache.streampipes.model.client.matching.MatchingResultMessage; import org.apache.streampipes.model.client.matching.MatchingResultType; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import java.util.List; @@ -36,8 +35,7 @@ public GroundingMatch() { @Override public boolean match(EventGrounding offer, EventGrounding requirement, List errorLog) { return MatchingUtils.nullCheckRightNullDisallowed(offer, requirement) - || (matchProtocols(offer.getTransportProtocols(), requirement.getTransportProtocols(), errorLog) - && matchFormats(offer.getTransportFormats(), requirement.getTransportFormats(), errorLog)); + || (matchProtocols(offer.getTransportProtocols(), requirement.getTransportProtocols(), errorLog)); } private boolean matchProtocols(List offer, List requirement, @@ -52,18 +50,4 @@ private boolean matchProtocols(List offer, List offer, List requirement, - List errorLog) { - boolean match = MatchingUtils.nullCheckBothNullDisallowed(offer, requirement) - && requirement - .stream() - .anyMatch(req -> offer.stream().anyMatch(of -> new FormatMatch().match(of, req, errorLog))); - - if (!match) { - buildErrorMessage(errorLog, MatchingResultType.FORMAT_MATCH, "Could not find matching format"); - } - return match; - } - } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java index 7221339d13..3fec8e42ae 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java @@ -20,11 +20,11 @@ import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.dataformat.SpDataFormatManager; import org.apache.streampipes.messaging.EventConsumer; import org.apache.streampipes.messaging.SpProtocolManager; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.grounding.KafkaTransportProtocol; -import org.apache.streampipes.model.grounding.TransportFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +60,7 @@ public void startConsuming() throws SpRuntimeException { } } - var converter = new SpDataFormatConverterGenerator(getTransportFormat(dataStream)).makeConverter(); + var converter = new SpDataFormatConverter(SpDataFormatManager.getFormatDefinition()); var protocolDefinitionOpt = SpProtocolManager .INSTANCE .findDefinition(dataStream.getEventGrounding().getTransportProtocol()); @@ -80,10 +80,6 @@ public void startConsuming() throws SpRuntimeException { }); } - private TransportFormat getTransportFormat(SpDataStream spDataStream) { - return spDataStream.getEventGrounding().getTransportFormats().get(0); - } - public Map> getLatestEvents() { return latestEvents; } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverterGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverterGenerator.java deleted file mode 100644 index 2d4e57ab9d..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverterGenerator.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.runtime; - -import org.apache.streampipes.dataformat.cbor.CborDataFormatDefinition; -import org.apache.streampipes.dataformat.fst.FstDataFormatDefinition; -import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; -import org.apache.streampipes.dataformat.smile.SmileDataFormatDefinition; -import org.apache.streampipes.model.grounding.TransportFormat; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class SpDataFormatConverterGenerator { - - private TransportFormat transportFormat; - - public SpDataFormatConverterGenerator(TransportFormat transportFormat) { - this.transportFormat = transportFormat; - } - - public SpDataFormatConverter makeConverter() { - if (isJsonFormat(transportFormat)) { - return new SpDataFormatConverter(new JsonDataFormatDefinition()); - } else if (isCborFormat(transportFormat)) { - return new SpDataFormatConverter(new CborDataFormatDefinition()); - } else if (isFstFormat(transportFormat)) { - return new SpDataFormatConverter(new FstDataFormatDefinition()); - } else if (isSmileFormat(transportFormat)) { - return new SpDataFormatConverter(new SmileDataFormatDefinition()); - } else { - throw new IllegalArgumentException("Wrong transport format: " + makeError(transportFormat)); - } - } - - private boolean isSmileFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.SMILE, transportFormat); - } - - private boolean isFstFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.FST, transportFormat); - } - - private boolean isCborFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.CBOR, transportFormat); - } - - private Boolean isJsonFormat(TransportFormat transportFormat) { - return isFormat(MessageFormat.JSON, transportFormat); - } - - private boolean isFormat(String format, TransportFormat transportFormat) { - return transportFormat.getRdfType().stream().anyMatch(tf -> tf.toString().equals(format)); - } - - private String makeError(TransportFormat transportFormat) { - StringBuilder builder = new StringBuilder(); - transportFormat.getRdfType().forEach(type -> builder.append(type.toString()).append(", ")); - return builder.toString(); - } -} diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestFormatMatch.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestFormatMatch.java deleted file mode 100644 index e753961da0..0000000000 --- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestFormatMatch.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.matching.v2; - -import org.apache.streampipes.model.client.matching.MatchingResultMessage; -import org.apache.streampipes.model.grounding.TransportFormat; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -public class TestFormatMatch { - - @Test - public void testPositiveFormatMatch() { - - TransportFormat offeredJson = TestUtils.jsonFormat(); - TransportFormat requiredJson = TestUtils.jsonFormat(); - - List errorLog = new ArrayList<>(); - - boolean matches = new FormatMatch().match(offeredJson, requiredJson, errorLog); - Assertions.assertTrue(matches); - } - - @Test - public void testNegativeFormatMatch() { - - TransportFormat offeredJson = TestUtils.jsonFormat(); - TransportFormat requiredThrift = TestUtils.thriftFormat(); - - List errorLog = new ArrayList<>(); - - boolean matches = new FormatMatch().match(offeredJson, requiredThrift, errorLog); - Assertions.assertFalse(matches); - } -} diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestGroundingMatch.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestGroundingMatch.java index 67e55e5c1b..2bc0553141 100644 --- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestGroundingMatch.java +++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestGroundingMatch.java @@ -20,7 +20,6 @@ import org.apache.streampipes.model.client.matching.MatchingResultMessage; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.junit.jupiter.api.Assertions; @@ -37,11 +36,8 @@ public void testPositiveGroundingMatch() { TransportProtocol offerProtocol = TestUtils.kafkaProtocol(); TransportProtocol requirementProtocol = TestUtils.kafkaProtocol(); - TransportFormat offerFormat = TestUtils.jsonFormat(); - TransportFormat requirementFormat = TestUtils.jsonFormat(); - - EventGrounding offeredGrounding = new EventGrounding(offerProtocol, offerFormat); - EventGrounding requiredGrounding = new EventGrounding(requirementProtocol, requirementFormat); + EventGrounding offeredGrounding = new EventGrounding(offerProtocol); + EventGrounding requiredGrounding = new EventGrounding(requirementProtocol); List errorLog = new ArrayList<>(); @@ -55,29 +51,8 @@ public void testNegativeGroundingMatchProtocol() { TransportProtocol offerProtocol = TestUtils.kafkaProtocol(); TransportProtocol requirementProtocol = TestUtils.jmsProtocol(); - TransportFormat offerFormat = TestUtils.jsonFormat(); - TransportFormat requirementFormat = TestUtils.jsonFormat(); - - EventGrounding offeredGrounding = new EventGrounding(offerProtocol, offerFormat); - EventGrounding requiredGrounding = new EventGrounding(requirementProtocol, requirementFormat); - - List errorLog = new ArrayList<>(); - - boolean matches = new GroundingMatch().match(offeredGrounding, requiredGrounding, errorLog); - Assertions.assertFalse(matches); - } - - @Test - public void testNegativeGroundingMatchFormat() { - - TransportProtocol offerProtocol = TestUtils.kafkaProtocol(); - TransportProtocol requirementProtocol = TestUtils.kafkaProtocol(); - - TransportFormat offerFormat = TestUtils.jsonFormat(); - TransportFormat requirementFormat = TestUtils.thriftFormat(); - - EventGrounding offeredGrounding = new EventGrounding(offerProtocol, offerFormat); - EventGrounding requiredGrounding = new EventGrounding(requirementProtocol, requirementFormat); + EventGrounding offeredGrounding = new EventGrounding(offerProtocol); + EventGrounding requiredGrounding = new EventGrounding(requirementProtocol); List errorLog = new ArrayList<>(); @@ -91,11 +66,8 @@ public void testNegativeGroundingMatchBoth() { TransportProtocol offerProtocol = TestUtils.kafkaProtocol(); TransportProtocol requirementProtocol = TestUtils.jmsProtocol(); - TransportFormat offerFormat = TestUtils.jsonFormat(); - TransportFormat requirementFormat = TestUtils.thriftFormat(); - - EventGrounding offeredGrounding = new EventGrounding(offerProtocol, offerFormat); - EventGrounding requiredGrounding = new EventGrounding(requirementProtocol, requirementFormat); + EventGrounding offeredGrounding = new EventGrounding(offerProtocol); + EventGrounding requiredGrounding = new EventGrounding(requirementProtocol); List errorLog = new ArrayList<>(); diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java index d1e31832e4..78d291a369 100644 --- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java +++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java @@ -20,9 +20,7 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol; import org.apache.streampipes.model.grounding.KafkaTransportProtocol; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; -import org.apache.streampipes.vocabulary.MessageFormat; public class TestUtils { @@ -33,13 +31,4 @@ public static TransportProtocol kafkaProtocol() { public static TransportProtocol jmsProtocol() { return new JmsTransportProtocol("localhost", 61616, "abc"); } - - public static TransportFormat jsonFormat() { - return new TransportFormat(MessageFormat.JSON); - } - - public static TransportFormat thriftFormat() { - return new TransportFormat(MessageFormat.THRIFT); - } - } diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/AbstractPipelineElementResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/AbstractPipelineElementResource.java index e2bd86a454..b16f23b023 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/AbstractPipelineElementResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/AbstractPipelineElementResource.java @@ -26,7 +26,6 @@ import org.apache.streampipes.model.base.ConsumableStreamPipesEntity; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import com.google.common.base.Charsets; @@ -125,13 +124,11 @@ protected NamedStreamPipesEntity rewrite(NamedStreamPipesEntity desc) { if (desc instanceof ConsumableStreamPipesEntity) { Collection supportedProtocols = DeclarersSingleton.getInstance().getSupportedProtocols(); - Collection supportedFormats = - DeclarersSingleton.getInstance().getSupportedFormats(); - if (supportedProtocols.size() > 0 && supportedFormats.size() > 0) { + if (!supportedProtocols.isEmpty()) { // Overwrite existing grounding from default provided by declarers singleton ((ConsumableStreamPipesEntity) desc) - .setSupportedGrounding(makeGrounding(supportedProtocols, supportedFormats)); + .setSupportedGrounding(makeGrounding(supportedProtocols)); } } } @@ -139,11 +136,9 @@ protected NamedStreamPipesEntity rewrite(NamedStreamPipesEntity desc) { return desc; } - private EventGrounding makeGrounding(Collection supportedProtocols, - Collection supportedFormats) { + private EventGrounding makeGrounding(Collection supportedProtocols) { EventGrounding grounding = new EventGrounding(); grounding.setTransportProtocols(new ArrayList<>(supportedProtocols)); - grounding.setTransportFormats(new ArrayList<>(supportedFormats)); return grounding; } diff --git a/streampipes-sdk-bundle/pom.xml b/streampipes-sdk-bundle/pom.xml index a3c70711cb..4a767c9b33 100644 --- a/streampipes-sdk-bundle/pom.xml +++ b/streampipes-sdk-bundle/pom.xml @@ -39,22 +39,7 @@ org.apache.streampipes - streampipes-dataformat-json - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-cbor - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-smile - 0.97.0-SNAPSHOT - - - org.apache.streampipes - streampipes-dataformat-fst + streampipes-dataformat 0.97.0-SNAPSHOT diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java index d99618eec1..b6d3a94492 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java @@ -145,11 +145,13 @@ public K unaryMappingPropertyWithoutRequirement(Label label, PropertyScope prope * Assigns supported transport formats to the pipeline elements that can be handled at runtime (e.g., * JSON or XMl). * + * @deprecated format assignment is no longer necessary * @param format An arbitrary number of supported {@link org.apache.streampipes.model.grounding.TransportFormat}s. Use * {@link org.apache.streampipes.sdk.helpers.SupportedFormats} to assign formats from some pre-defined * ones or create your own by following the developer guide. * @return this */ + @Deprecated(forRemoval = true, since = "0.97.0") public K supportedFormats(TransportFormat... format) { return supportedFormats(Arrays.asList(format)); } @@ -158,13 +160,14 @@ public K supportedFormats(TransportFormat... format) { * Assigns supported transport formats to the pipeline elements that can be handled at runtime (e.g., * JSON or XMl). * + * @deprecated format assignment is no longer necessary * @param formats A list of supported {@link org.apache.streampipes.model.grounding.TransportFormat}s. Use * {@link org.apache.streampipes.sdk.helpers.SupportedFormats} to assign formats from some pre-defined * ones or create your own by following the developer guide. * @return this */ + @Deprecated(forRemoval = true, since = "0.97.0") public K supportedFormats(List formats) { - this.supportedGrounding.setTransportFormats(formats); return me(); } diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/DataStreamBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/DataStreamBuilder.java index e1f606c497..613a6e8eb4 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/DataStreamBuilder.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/DataStreamBuilder.java @@ -20,13 +20,11 @@ import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventSchema; import java.util.ArrayList; -import java.util.Collections; import java.util.List; public class DataStreamBuilder extends AbstractPipelineElementBuilder { @@ -108,19 +106,6 @@ public DataStreamBuilder protocol(TransportProtocol protocol) { return this; } - /** - * Assigns a new {@link org.apache.streampipes.model.grounding.TransportFormat} to the stream definition. - * - * @param format The transport format of the stream at runtime (e.g., JSON or Thrift). - * Use {@link org.apache.streampipes.sdk.helpers.Formats} to use some pre-defined formats - * (or create a new format as described in the developer guide). - * @return this - */ - public DataStreamBuilder format(TransportFormat format) { - this.eventGrounding.setTransportFormats(Collections.singletonList(format)); - return this; - } - @Override protected DataStreamBuilder me() { return this; diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Formats.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Formats.java deleted file mode 100644 index 88107cb2d3..0000000000 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Formats.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.sdk.helpers; - -import org.apache.streampipes.model.grounding.TransportFormat; -import org.apache.streampipes.vocabulary.MessageFormat; - -public class Formats { - - /** - * Defines the transport format JSON used by a data stream at runtime. - * - * @return The {@link org.apache.streampipes.model.grounding.TransportFormat} of type JSON. - */ - public static TransportFormat jsonFormat() { - return new TransportFormat(MessageFormat.JSON); - } - - /** - * Defines the transport format CBOR used by a data stream at runtime. - * - * @return The {@link org.apache.streampipes.model.grounding.TransportFormat} of type CBOR. - */ - public static TransportFormat cborFormat() { - return new TransportFormat(MessageFormat.CBOR); - } - - /** - * Defines the transport format Fast-Serializer used by a data stream at runtime. - * - * @return The {@link org.apache.streampipes.model.grounding.TransportFormat} of type FST. - */ - public static TransportFormat fstFormat() { - return new TransportFormat(MessageFormat.FST); - } - - /** - * Defines the transport format SMILE used by a data stream at runtime. - * - * @return The {@link org.apache.streampipes.model.grounding.TransportFormat} of type SMILE. - */ - public static TransportFormat smileFormat() { - return new TransportFormat(MessageFormat.SMILE); - } - - /** - * Defines the transport format Apache Thrift used by a data stream at runtime. - * - * @return The {@link org.apache.streampipes.model.grounding.TransportFormat} of type Thrift. - */ - public static TransportFormat thriftFormat() { - return new TransportFormat(MessageFormat.THRIFT); - } -} diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/SupportedFormats.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/SupportedFormats.java index 0f904c4cdb..e70839e182 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/SupportedFormats.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/SupportedFormats.java @@ -21,6 +21,10 @@ import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.vocabulary.MessageFormat; +/** + * @deprecated formats are no longer necessary + */ +@Deprecated(forRemoval = true, since = "0.97.0") public class SupportedFormats { /** diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/grounding/EventGroundingGenerator.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/grounding/EventGroundingGenerator.java index 4f98071bfc..3e0199882f 100644 --- a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/grounding/EventGroundingGenerator.java +++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/grounding/EventGroundingGenerator.java @@ -18,16 +18,12 @@ package org.apache.streampipes.test.generator.grounding; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.test.generator.grounding.protocol.ProtocolGenerator; -import java.util.Arrays; - public class EventGroundingGenerator { public static EventGrounding makeDummyGrounding() { EventGrounding grounding = new EventGrounding(); - grounding.setTransportFormats(Arrays.asList(new TransportFormat())); grounding.setTransportProtocol(ProtocolGenerator.makeDummyProtocol()); return grounding; diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java index 9e9b0578ce..1dcaeff0b7 100644 --- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java +++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java @@ -33,7 +33,6 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol; import org.apache.streampipes.model.grounding.KafkaTransportProtocol; import org.apache.streampipes.model.grounding.MqttTransportProtocol; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime; @@ -61,8 +60,8 @@ protected Properties getProducerProperties(KafkaTransportProtocol protocol) { return new ProducerConfigFactory(protocol).makeDefaultProperties(); } - protected SpDataFormatDefinition getDataFormatDefinition(TransportFormat transportFormat) { - return SpDataFormatManager.INSTANCE.findDefinition(transportFormat).get(); + protected SpDataFormatDefinition getDataFormatDefinition() { + return SpDataFormatManager.getFormatDefinition(); } protected String getTopic(SpDataStream stream) { diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java index 27c8614dfe..71ad6bafe2 100644 --- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java +++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java @@ -80,7 +80,7 @@ protected void appendExecutionConfig(IDataProcessorProgram program, EventGrounding outputGrounding = getOutputStream().getEventGrounding(); SpDataFormatDefinition outputDataFormatDefinition = - getDataFormatDefinition(outputGrounding.getTransportFormats().get(0)); + getDataFormatDefinition(); ByteArraySerializer serializer = new ByteArraySerializer(outputDataFormatDefinition); diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java index 5b77768f4d..022668816f 100644 --- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java +++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java @@ -34,7 +34,6 @@ import org.apache.streampipes.model.grounding.KafkaTransportProtocol; import org.apache.streampipes.model.grounding.MqttTransportProtocol; import org.apache.streampipes.model.grounding.SimpleTopicDefinition; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime; @@ -136,8 +135,7 @@ private SourceFunction> getStreamSource(int i) { SpDataStream stream = runtimeParameters.getModel().getInputStreams().get(i); if (stream != null) { TransportProtocol protocol = stream.getEventGrounding().getTransportProtocol(); - TransportFormat format = stream.getEventGrounding().getTransportFormats().get(0); - SpDataFormatDefinition dataFormatDefinition = getDataFormatDefinition(format); + SpDataFormatDefinition dataFormatDefinition = getDataFormatDefinition(); if (protocol instanceof KafkaTransportProtocol) { return getKafkaConsumer((KafkaTransportProtocol) protocol, dataFormatDefinition); } else if (protocol instanceof JmsTransportProtocol) { diff --git a/streampipes-wrapper-kafka-streams/pom.xml b/streampipes-wrapper-kafka-streams/pom.xml index 579b32bf99..fe6affe34c 100644 --- a/streampipes-wrapper-kafka-streams/pom.xml +++ b/streampipes-wrapper-kafka-streams/pom.xml @@ -34,6 +34,11 @@ streampipes-wrapper-distributed 0.97.0-SNAPSHOT + + org.apache.streampipes + streampipes-dataformat + 0.97.0-SNAPSHOT + diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java index a76552063d..fed80cc1f1 100644 --- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java +++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java @@ -103,10 +103,7 @@ protected void afterStop() { } private SpDataFormatDefinition getDataFormatConverter() { - return SpDataFormatManager - .INSTANCE - .findDefinition(pipelineElementInvocation.getOutputStream().getEventGrounding().getTransportFormats().get(0)) - .orElseThrow(); + return SpDataFormatManager.getFormatDefinition(); } private SpProtocolDefinition getOutputProtocol() { diff --git a/streampipes-wrapper-standalone/pom.xml b/streampipes-wrapper-standalone/pom.xml index f4ac11cb32..e80cce593c 100644 --- a/streampipes-wrapper-standalone/pom.xml +++ b/streampipes-wrapper-standalone/pom.xml @@ -34,6 +34,11 @@ streampipes-wrapper 0.97.0-SNAPSHOT + + org.apache.streampipes + streampipes-dataformat + 0.97.0-SNAPSHOT + @@ -43,4 +48,4 @@ - \ No newline at end of file + diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java index bd3b788c97..857a26312f 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java @@ -152,7 +152,6 @@ private Map getOutputCollectors(FunctionId functionId uniqueStreamId, ProtocolManager.makeOutputCollector( value.getEventGrounding().getTransportProtocol(), - value.getEventGrounding().getTransportFormats().get(0), uniqueStreamId)); }); @@ -169,7 +168,7 @@ private Map getInputCollectors(FunctionId functionId, GroundingDebugUtils.modifyGrounding(is.getEventGrounding()); } inputCollectors.put(uniqueStreamId, ProtocolManager.findInputCollector(is.getEventGrounding() - .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0), false)); + .getTransportProtocol(), false)); } return inputCollectors; } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/PManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/PManager.java index 1701566651..b94f63ea94 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/PManager.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/PManager.java @@ -23,7 +23,6 @@ import org.apache.streampipes.dataformat.SpDataFormatManager; import org.apache.streampipes.messaging.SpProtocolDefinition; import org.apache.streampipes.messaging.SpProtocolManager; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import java.util.Optional; @@ -34,7 +33,7 @@ public static Optional> ge return SpProtocolManager.INSTANCE.findDefinition(protocol); } - public static Optional getDataFormat(TransportFormat format) throws SpRuntimeException { - return SpDataFormatManager.INSTANCE.findDefinition(format); + public static SpDataFormatDefinition getDataFormat() throws SpRuntimeException { + return SpDataFormatManager.getFormatDefinition(); } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java index 19c9b1b79d..9b5a4f3e43 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java @@ -19,7 +19,6 @@ package org.apache.streampipes.wrapper.standalone.manager; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector; import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpOutputCollector; @@ -41,14 +40,13 @@ public class ProtocolManager { // in empire serializers public static StandaloneSpInputCollector findInputCollector(T protocol, - TransportFormat format, Boolean singletonEngine) throws SpRuntimeException { if (consumers.containsKey(topicName(protocol))) { return consumers.get(topicName(protocol)); } else { - consumers.put(topicName(protocol), makeInputCollector(protocol, format, singletonEngine)); + consumers.put(topicName(protocol), makeInputCollector(protocol, singletonEngine)); LOG.info("Adding new consumer to consumer map (size=" + consumers.size() + "): " + topicName(protocol)); return consumers.get(topicName(protocol)); } @@ -56,14 +54,13 @@ public static StandaloneSpInputCollector findInput } public static StandaloneSpOutputCollector findOutputCollector(T protocol, - TransportFormat format, String resourceId) throws SpRuntimeException { if (producers.containsKey(topicName(protocol))) { return producers.get(topicName(protocol)); } else { - producers.put(topicName(protocol), makeOutputCollector(protocol, format, resourceId)); + producers.put(topicName(protocol), makeOutputCollector(protocol, resourceId)); LOG.info("Adding new producer to producer map (size=" + producers.size() + "): " + topicName (protocol)); return producers.get(topicName(protocol)); @@ -72,17 +69,15 @@ public static StandaloneSpOutputCollector findOutp } private static StandaloneSpInputCollector makeInputCollector(T protocol, - TransportFormat format, Boolean singletonEngine) throws SpRuntimeException { - return new StandaloneSpInputCollector<>(protocol, format, singletonEngine); + return new StandaloneSpInputCollector<>(protocol, singletonEngine); } public static StandaloneSpOutputCollector makeOutputCollector(T protocol, - TransportFormat format, String resourceId) throws SpRuntimeException { - return new StandaloneSpOutputCollector<>(protocol, format, resourceId); + return new StandaloneSpOutputCollector<>(protocol, resourceId); } private static String topicName(TransportProtocol protocol) { diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java index a459506149..5a00389200 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java @@ -22,7 +22,6 @@ import org.apache.streampipes.dataformat.SpDataFormatDefinition; import org.apache.streampipes.extensions.api.pe.routing.PipelineElementCollector; import org.apache.streampipes.messaging.SpProtocolDefinition; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.wrapper.standalone.manager.PManager; @@ -37,18 +36,15 @@ public abstract class StandaloneSpCollector impl protected T transportProtocol; protected SpProtocolDefinition protocolDefinition; - protected TransportFormat transportFormat; protected SpDataFormatDefinition dataFormatDefinition; protected String topic; - public StandaloneSpCollector(T protocol, TransportFormat format) throws SpRuntimeException { + public StandaloneSpCollector(T protocol) throws SpRuntimeException { this.transportProtocol = protocol; this.protocolDefinition = PManager.getProtocolDefinition(protocol).orElseThrow(() -> new SpRuntimeException("Could not find protocol")); - this.transportFormat = format; - this.dataFormatDefinition = PManager.getDataFormat(format).orElseThrow(() -> new - SpRuntimeException("Could not find format")); + this.dataFormatDefinition = PManager.getDataFormat(); this.consumers = new ConcurrentHashMap<>(); this.topic = transportProtocol.getTopicDefinition().getActualTopicName(); } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java index 890f16da20..f2ce71ee36 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java @@ -23,7 +23,6 @@ import org.apache.streampipes.extensions.api.pe.routing.SpInputCollector; import org.apache.streampipes.messaging.EventConsumer; import org.apache.streampipes.messaging.InternalEventProcessor; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager; @@ -36,9 +35,8 @@ public class StandaloneSpInputCollector extends private final EventConsumer consumer; public StandaloneSpInputCollector(T protocol, - TransportFormat format, Boolean singletonEngine) throws SpRuntimeException { - super(protocol, format); + super(protocol); this.consumer = protocolDefinition.getConsumer(protocol); this.singletonEngine = singletonEngine; } @@ -66,7 +64,7 @@ public void connect() throws SpRuntimeException { @Override public void disconnect() throws SpRuntimeException { if (consumer.isConnected()) { - if (consumers.size() == 0) { + if (consumers.isEmpty()) { consumer.disconnect(); ProtocolManager.removeInputCollector(transportProtocol); } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java index 671de27820..a228245069 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java @@ -24,7 +24,6 @@ import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.messaging.EventProducer; import org.apache.streampipes.messaging.InternalEventProcessor; -import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.EventConverter; @@ -46,9 +45,8 @@ public class StandaloneSpOutputCollector extends private final ExtensionsLogger extensionsLogger; public StandaloneSpOutputCollector(T protocol, - TransportFormat format, String resourceId) throws SpRuntimeException { - super(protocol, format); + super(protocol); this.producer = protocolDefinition.getProducer(protocol); this.resourceId = resourceId; this.extensionsLogger = new ExtensionsLogger(resourceId); diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java index 356f57b8c7..b9ce761f59 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java @@ -58,12 +58,6 @@ public SpOutputCollector getOutputCollector() throws SpRuntimeException { .getOutputStream() .getEventGrounding() .getTransportProtocol(), - runtimeParameters - .getModel() - .getOutputStream() - .getEventGrounding() - .getTransportFormats() - .get(0), this.instanceId); } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java index a8e48596c0..e906161004 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java @@ -93,7 +93,7 @@ protected List getInputCollectors(List inputStre List inputCollectors = new ArrayList<>(); for (SpDataStream is : inputStreams) { inputCollectors.add(ProtocolManager.findInputCollector(is.getEventGrounding() - .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0), + .getTransportProtocol(), false)); } return inputCollectors; diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 700f825675..c84f23c159 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-08-20 13:02:30. +// Generated using typescript-generator version 3.2.1263 on 2024-08-27 18:35:51. export class NamedStreamPipesEntity implements Storable { '@class': @@ -1456,7 +1456,6 @@ export class ErrorMessage extends Message { } export class EventGrounding { - transportFormats: TransportFormat[]; transportProtocols: TransportProtocolUnion[]; static fromData( @@ -1467,9 +1466,6 @@ export class EventGrounding { return data; } const instance = target || new EventGrounding(); - instance.transportFormats = __getCopyArrayFn(TransportFormat.fromData)( - data.transportFormats, - ); instance.transportProtocols = __getCopyArrayFn( TransportProtocol.fromDataUnion, )(data.transportProtocols); @@ -2287,7 +2283,6 @@ export class MessagingSettings { mqttPort: number; natsHost: string; natsPort: number; - prioritizedFormats: SpDataFormat[]; prioritizedProtocols: SpProtocol[]; pulsarUrl: string; supportedProtocols: string[]; @@ -2314,9 +2309,6 @@ export class MessagingSettings { instance.mqttPort = data.mqttPort; instance.natsHost = data.natsHost; instance.natsPort = data.natsPort; - instance.prioritizedFormats = __getCopyArrayFn( - __identity(), - )(data.prioritizedFormats); instance.prioritizedProtocols = __getCopyArrayFn( __identity(), )(data.prioritizedProtocols); @@ -3828,22 +3820,6 @@ export class TransformOutputStrategy extends OutputStrategy { } } -export class TransportFormat { - rdfType: string[]; - - static fromData( - data: TransportFormat, - target?: TransportFormat, - ): TransportFormat { - if (!data) { - return data; - } - const instance = target || new TransportFormat(); - instance.rdfType = __getCopyArrayFn(__identity())(data.rdfType); - return instance; - } -} - export class TreeInputNode { children: TreeInputNode[]; dataNode: boolean; @@ -4049,8 +4025,6 @@ export type SelectionStaticPropertyUnion = | AnyStaticProperty | OneOfStaticProperty; -export type SpDataFormat = 'CBOR' | 'JSON' | 'FST' | 'SMILE'; - export type SpLogLevel = 'INFO' | 'WARN' | 'ERROR'; export type SpProtocol = 'KAFKA' | 'JMS' | 'MQTT' | 'NATS' | 'PULSAR'; diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html index f31c5fba3f..38c5eee17f 100644 --- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html +++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html @@ -86,48 +86,6 @@ - -
-
-
- {{ format }} -
-
-
- -
-
-
- - - this.getMessagingSettings()); } - drop(event: CdkDragDrop) { - moveItemInArray( - this.messagingSettings.prioritizedFormats, - event.previousIndex, - event.currentIndex, - ); - } - dropProtocol(event: CdkDragDrop) { moveItemInArray( this.messagingSettings.prioritizedProtocols,