Skip to content

Commit

Permalink
Merge branch 'dev' into 2002-harmonize-registration-of-adapters-and-p…
Browse files Browse the repository at this point in the history
…ipeline-elements
  • Loading branch information
bossenti committed Oct 10, 2023
2 parents d437edc + 8a565bc commit f74ae39
Show file tree
Hide file tree
Showing 76 changed files with 2,573 additions and 1,216 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<rendersnake.version>1.9.0</rendersnake.version>
<roaster.version>2.29.0.Final</roaster.version>
<siddhi.version>5.1.27</siddhi.version>
<simple-java-mail.version>8.2.0</simple-java-mail.version>
<simple-java-mail.version>8.3.1</simple-java-mail.version>
<slf4j.version>2.0.6</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<snappy-java.version>1.1.10.4</snappy-java.version>
Expand All @@ -128,7 +128,7 @@

<!-- Test dependencies -->
<junit.version>4.13.2</junit.version>
<mockito.version>5.5.0</mockito.version>
<mockito.version>5.6.0</mockito.version>
<powermock.version>2.0.9</powermock.version>
<rest-assured.version>2.9.0</rest-assured.version>
<wiremock.version>2.27.2</wiremock.version>
Expand Down Expand Up @@ -186,7 +186,7 @@
<commons-collections4.version>4.4</commons-collections4.version>
<eclipse.milo.version>0.6.9</eclipse.milo.version>
<netty.version>4.1.72.Final</netty.version>
<nimbus-jose-jwt.version>9.35</nimbus-jose-jwt.version>
<nimbus-jose-jwt.version>9.36</nimbus-jose-jwt.version>
<opencsv.version>5.8</opencsv.version>
<plc4x.version>0.10.0</plc4x.version>
<protobuf.version>3.24.0</protobuf.version>
Expand Down
8 changes: 4 additions & 4 deletions streampipes-client-python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion streampipes-client-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ ruff = "0.0.280"
pre-commit = "3.4.0"
pytest = "7.4.0"
pytest-cov = "4.1.0"
pyupgrade = "3.14.0"
pyupgrade = "3.15.0"

[tool.poetry.group.docs]
optional = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class AdapterEventPreviewPipeline implements IAdapterPipeline {

public AdapterEventPreviewPipeline(AdapterEventPreview previewRequest) {
this.objectMapper = new ObjectMapper();
this.pipelineElements = new AdapterPipelineGeneratorBase().makeAdapterPipelineElements(previewRequest.getRules());
this.pipelineElements = new AdapterPipelineGeneratorBase()
.makeAdapterPipelineElements(previewRequest.getRules(), false);
this.event = previewRequest.getInputData();
}

Expand Down
10 changes: 10 additions & 0 deletions streampipes-connect-shared/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@
<version>0.93.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,93 +18,30 @@

package org.apache.streampipes.connect.shared;

import org.apache.streampipes.connect.shared.preprocessing.elements.AddTimestampPipelineElement;
import org.apache.streampipes.connect.shared.preprocessing.elements.AddValuePipelineElement;
import org.apache.streampipes.connect.shared.preprocessing.elements.TransformSchemaAdapterPipelineElement;
import org.apache.streampipes.connect.shared.preprocessing.elements.TransformValueAdapterPipelineElement;
import org.apache.streampipes.connect.shared.preprocessing.elements.AdapterTransformationPipelineElement;
import org.apache.streampipes.connect.shared.preprocessing.generator.StatefulTransformationRuleGeneratorVisitor;
import org.apache.streampipes.connect.shared.preprocessing.generator.StatelessTransformationRuleGeneratorVisitor;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.schema.SchemaTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class AdapterPipelineGeneratorBase {

public List<IAdapterPipelineElement> makeAdapterPipelineElements(List<TransformationRuleDescription> rules) {
List<IAdapterPipelineElement> pipelineElements = new ArrayList<>();

// Must be before the schema transformations to ensure that user can move this event property
var timestampTransformationRuleDescription = getTimestampRule(rules);
if (timestampTransformationRuleDescription != null) {
pipelineElements.add(new AddTimestampPipelineElement(
timestampTransformationRuleDescription.getRuntimeKey()));
}

var valueTransformationRuleDescription = getAddValueRule(rules);
if (valueTransformationRuleDescription != null) {
pipelineElements.add(new AddValuePipelineElement(
valueTransformationRuleDescription.getRuntimeKey(),
valueTransformationRuleDescription.getStaticValue()));
}

// first transform schema before transforming vales
// value rules should use unique keys for of new schema
pipelineElements.add(new TransformSchemaAdapterPipelineElement(getSchemaRules(rules)));
pipelineElements.add(new TransformValueAdapterPipelineElement(getValueRules(rules)));

return pipelineElements;
}

protected RemoveDuplicatesTransformationRuleDescription getRemoveDuplicateRule(
List<TransformationRuleDescription> rules) {
return getRule(rules, RemoveDuplicatesTransformationRuleDescription.class);
}

protected EventRateTransformationRuleDescription getEventRateTransformationRule(
List<TransformationRuleDescription> rules) {
return getRule(rules, EventRateTransformationRuleDescription.class);
}

protected AddTimestampRuleDescription getTimestampRule(List<TransformationRuleDescription> rules) {
return getRule(rules, AddTimestampRuleDescription.class);
}

protected AddValueTransformationRuleDescription getAddValueRule(List<TransformationRuleDescription> rules) {
return getRule(rules, AddValueTransformationRuleDescription.class);
}

private <T extends TransformationRuleDescription> T getRule(List<TransformationRuleDescription> rules,
Class<T> type) {

if (rules != null) {
for (TransformationRuleDescription tr : rules) {
if (type.isInstance(tr)) {
return type.cast(tr);
}
}
public List<IAdapterPipelineElement> makeAdapterPipelineElements(List<TransformationRuleDescription> rules,
boolean includeStateful) {
var elements = new ArrayList<IAdapterPipelineElement>();
elements.add(new AdapterTransformationPipelineElement(
rules,
new StatelessTransformationRuleGeneratorVisitor())
);
if (includeStateful) {
elements.add(new AdapterTransformationPipelineElement(
rules,
new StatefulTransformationRuleGeneratorVisitor())
);
}

return null;
}

private List<TransformationRuleDescription> getValueRules(List<TransformationRuleDescription> rules) {
return rules
.stream()
.filter(r -> r instanceof ValueTransformationRuleDescription && !(r instanceof AddTimestampRuleDescription))
.collect(Collectors.toList());
}

private List<TransformationRuleDescription> getSchemaRules(List<TransformationRuleDescription> rules) {
return rules
.stream()
.filter(r -> r instanceof SchemaTransformationRuleDescription)
.collect(Collectors.toList());
return elements;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.shared.preprocessing;

import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule;

import java.util.List;
import java.util.Map;

public abstract class SupportsNestedTransformationRule implements TransformationRule {

@Override
public Map<String, Object> apply(Map<String, Object> event) {
return applyNested(event, getEventKeys());
}

protected Map<String, Object> applyNested(Map<String, Object> event,
List<String> eventKey) {
if (eventKey.size() == 1) {
applyTransformation(event, eventKey);
} else {
String key = eventKey.get(0);
List<String> newKeysTmpList = eventKey.subList(1, eventKey.size());

Map<String, Object> newSubEvent =
applyNested((Map<String, Object>) event.get(eventKey.get(0)), newKeysTmpList);

event.remove(key);
event.put(key, newSubEvent);
}
return event;
}

protected abstract List<String> getEventKeys();

protected abstract void applyTransformation(Map<String, Object> event,
List<String> eventKey);


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*
*/

package org.apache.streampipes.connect.adapters.generic;
package org.apache.streampipes.connect.shared.preprocessing.convert;

public class Mock {
public static final int PORT = 8042;
import org.apache.streampipes.model.connect.rules.ITransformationRuleVisitor;
import org.apache.streampipes.model.schema.EventProperty;

public static final String HOST = "http://localhost:" + PORT;
import java.util.List;

public interface ProvidesConversionResult extends ITransformationRuleVisitor {

List<EventProperty> getTransformedProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.shared.preprocessing.convert;

import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
import org.apache.streampipes.model.schema.EventSchema;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

public class SchemaConverter {

public EventSchema toOriginalSchema(EventSchema transformedSchema,
List<TransformationRuleDescription> transformationRules) {

var converter = new ToOriginalSchemaConverter(
transformedSchema.getEventProperties()
);

return transform(transformationRules, converter, true);
}

public EventSchema toTransformedSchema(EventSchema originalSchema,
List<TransformationRuleDescription> transformationRules) {

var converter = new ToTransformedSchemaConverter(
originalSchema.getEventProperties()
);

return transform(transformationRules, converter, false);
}

private EventSchema transform(List<TransformationRuleDescription> transformationRules,
ProvidesConversionResult converter,
boolean reverseRules) {
var rules = transformationRules
.stream()
.sorted(Comparator.comparingInt(TransformationRuleDescription::getRulePriority))
.collect(Collectors.toCollection(ArrayList::new));

if (reverseRules) {
Collections.reverse(rules);
}

rules.forEach(rule -> rule.accept(converter));
return new EventSchema(converter.getTransformedProperties());
}
}
Loading

0 comments on commit f74ae39

Please sign in to comment.