diff --git a/pom.xml b/pom.xml index 6bcdad2b73..a9be6d64a1 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ 1.26.0 2.16.1 3.14.0 + 1.3.2 2.12.0 1.12.0 1.0.0 @@ -171,6 +172,11 @@ commons-io ${commons-io.version} + + commons-logging + commons-logging + ${commons-logging.version} + com.fasterxml.jackson.core jackson-annotations diff --git a/streampipes-client/pom.xml b/streampipes-client/pom.xml index af5127564e..e132a4fad7 100644 --- a/streampipes-client/pom.xml +++ b/streampipes-client/pom.xml @@ -72,6 +72,11 @@ org.apache.httpcomponents fluent-hc + + + commons-logging + commons-logging + diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/pom.xml b/streampipes-extensions/streampipes-processors-enricher-jvm/pom.xml index bc17948896..c9177f7fca 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/pom.xml +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/pom.xml @@ -46,6 +46,12 @@ js-scriptengine + + org.apache.commons + commons-jexl3 + 3.4.0 + + org.apache.streampipes diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java index f26795f7ac..a762e777ca 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java @@ -22,6 +22,7 @@ import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport; import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; +import org.apache.streampipes.processors.enricher.jvm.processor.expression.MathExpressionProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.SensorLimitAlertProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment.QualityControlLimitsEnrichmentProcessor; @@ -48,7 +49,8 @@ public List> pipelineElements() { new MathOpProcessor(), new StaticMathOpProcessor(), new TrigonometryProcessor(), - new ValueChangeProcessor() + new ValueChangeProcessor(), + new MathExpressionProcessor() ); } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlContextGenerator.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlContextGenerator.java new file mode 100644 index 0000000000..0994a25976 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlContextGenerator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.model.runtime.Event; + +import org.apache.commons.jexl3.MapContext; + +public class JexlContextGenerator { + + private final MathExpressionFieldExtractor extractor; + private final MapContext mapContext; + + public JexlContextGenerator(MathExpressionFieldExtractor extractor) { + this.extractor = extractor; + this.mapContext = makeInitialContext(); + } + + private MapContext makeInitialContext() { + var ctx = new MapContext(); + ctx.set("Math", Math.class); + extractor.getInputProperties().forEach(ep -> + ctx.set(ep.getRuntimeName(), 0) + ); + return ctx; + } + + public MapContext makeContext(Event event) { + event.getRaw().forEach(mapContext::set); + return mapContext; + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlDescription.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlDescription.java new file mode 100644 index 0000000000..df7c065612 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlDescription.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.model.schema.EventPropertyPrimitive; + +public record JexlDescription(EventPropertyPrimitive ep, + String script) { + + public String getFieldName() { + return ep.getRuntimeName(); + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEngineProvider.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEngineProvider.java new file mode 100644 index 0000000000..b360d613bb --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEngineProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.commons.jexl3.JexlBuilder; +import org.apache.commons.jexl3.JexlEngine; +import org.apache.commons.jexl3.JexlFeatures; +import org.apache.commons.jexl3.introspection.JexlPermissions; + +public class JexlEngineProvider { + + public JexlEngine getEngine() { + var features = new JexlFeatures() + .loops(false) + .sideEffect(false) + .sideEffectGlobal(false); + + var permissions = new JexlPermissions.ClassPermissions(java.lang.Math.class); + + return new JexlBuilder().features(features).permissions(permissions).create(); + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEvaluator.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEvaluator.java new file mode 100644 index 0000000000..5e232c828e --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEvaluator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.model.runtime.Event; + +import org.apache.commons.jexl3.JexlEngine; +import org.apache.commons.jexl3.JexlScript; +import org.apache.commons.jexl3.MapContext; + +public class JexlEvaluator { + + private final JexlDescription jexlDescription; + private final JexlScript script; + + public JexlEvaluator(JexlDescription jexlDescription, + JexlEngine engine) { + this.jexlDescription = jexlDescription; + this.script = engine.createScript(jexlDescription.script()); + } + + public void evaluate(MapContext context, + Event event) { + event.addField(jexlDescription.getFieldName(), script.execute(context)); + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionFieldExtractor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionFieldExtractor.java new file mode 100644 index 0000000000..ed2df9b041 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionFieldExtractor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.model.staticproperty.CodeInputStaticProperty; +import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; +import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.vocabulary.SO; + +import java.util.List; + +import static org.apache.streampipes.processors.enricher.jvm.processor.expression.MathExpressionProcessor.ENRICHED_FIELDS; +import static org.apache.streampipes.processors.enricher.jvm.processor.expression.MathExpressionProcessor.EXPRESSION; +import static org.apache.streampipes.processors.enricher.jvm.processor.expression.MathExpressionProcessor.FIELD_NAME; + +public class MathExpressionFieldExtractor { + + private final DataProcessorInvocation processingElement; + private final IDataProcessorParameterExtractor extractor; + + public MathExpressionFieldExtractor(DataProcessorInvocation processingElement) { + this.processingElement = processingElement; + this.extractor = ProcessingElementParameterExtractor.from(processingElement); + } + + public MathExpressionFieldExtractor(DataProcessorInvocation processingElement, + IDataProcessorParameterExtractor extractor) { + this.processingElement = processingElement; + this.extractor = extractor; + } + + public List getAdditionalFields() { + return extractor + .collectionMembersAsGroup(ENRICHED_FIELDS) + .stream().map(group -> { + var runtimeName = extractor.extractGroupMember(FIELD_NAME, group).as(FreeTextStaticProperty.class).getValue(); + var expression = extractor.extractGroupMember(EXPRESSION, group).as(CodeInputStaticProperty.class).getValue(); + return new JexlDescription(EpProperties.doubleEp(Labels.empty(), runtimeName, SO.NUMBER), expression); + }).toList(); + } + + public List getInputProperties() { + var inputStreams = processingElement.getInputStreams(); + if (!inputStreams.isEmpty()) { + return inputStreams.get(0).getEventSchema().getEventProperties(); + } else { + return List.of(); + } + } +} + diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionProcessor.java new file mode 100644 index 0000000000..bff0417c75 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionProcessor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; +import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy; +import org.apache.streampipes.model.DataProcessorType; +import org.apache.streampipes.model.extensions.ExtensionAssetType; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.schema.EventSchema; +import org.apache.streampipes.model.staticproperty.CollectionStaticProperty; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; +import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; +import org.apache.streampipes.sdk.helpers.CodeLanguage; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; +import org.apache.streampipes.sdk.utils.Datatypes; + +import org.apache.commons.jexl3.JexlException; + +import java.util.List; + +public class MathExpressionProcessor implements + IStreamPipesDataProcessor, + ResolvesContainerProvidedOutputStrategy { + + private static final String ID = "org.apache.streampipes.processors.enricher.jvm.processor.expression"; + static final String ENRICHED_FIELDS = "enriched-fields"; + static final String FIELD_NAME = "field-name"; + static final String EXPRESSION = "expression"; + + private List jexlEvaluators; + private JexlContextGenerator jexlContextGenerator; + private EventProcessorRuntimeContext context; + + @Override + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + MathExpressionProcessor::new, + ProcessingElementBuilder.create(ID, 0) + .category(DataProcessorType.ENRICH) + .withLocales(Locales.EN) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .requiredStream(StreamRequirementsBuilder.create() + .requiredProperty(EpRequirements.numberReq()) + .build()) + .requiredStaticProperty(makeCollection()) + .outputStrategy(OutputStrategies.customTransformation()) + .build() + ); + } + + private CollectionStaticProperty makeCollection() { + return StaticProperties.collection( + Labels.withId(ENRICHED_FIELDS), + false, + StaticProperties.freeTextProperty(Labels.withId(FIELD_NAME), Datatypes.String), + StaticProperties.codeStaticProperty(Labels.withId(EXPRESSION), CodeLanguage.None, getJexlComment())); + } + + private String getJexlComment() { + return "## Provide JEXL syntax here"; + } + + @Override + public void onPipelineStarted(IDataProcessorParameters params, + SpOutputCollector collector, + EventProcessorRuntimeContext runtimeContext) { + var extractor = new MathExpressionFieldExtractor(params.getModel()); + var engine = new JexlEngineProvider().getEngine(); + var scripts = extractor.getAdditionalFields(); + jexlEvaluators = scripts.stream().map(script -> new JexlEvaluator(script, engine)).toList(); + jexlContextGenerator = new JexlContextGenerator(extractor); + context = runtimeContext; + } + + @Override + public void onEvent(Event event, SpOutputCollector collector) { + var ctx = jexlContextGenerator.makeContext(event); + jexlEvaluators.forEach(evaluator -> { + try { + evaluator.evaluate(ctx, event); + } catch (JexlException e) { + context.getLogger().error(e); + } + }); + collector.collect(event); + } + + @Override + public void onPipelineStopped() { + + } + + + @Override + public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, + ProcessingElementParameterExtractor parameterExtractor) { + var fieldExtractor = new MathExpressionFieldExtractor(processingElement, parameterExtractor); + var existingFields = fieldExtractor.getInputProperties(); + var additionalFields = fieldExtractor.getAdditionalFields(); + existingFields.addAll(additionalFields.stream().map(JexlDescription::ep).toList()); + return new EventSchema(processingElement.getInputStreams().get(0).getEventSchema()); + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/documentation.md b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/documentation.md new file mode 100644 index 0000000000..3545bb1c06 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/documentation.md @@ -0,0 +1,55 @@ + + +## Math Expression Evaluator + +

+ +

+ +*** + +## Description +A pipeline element that evaluates Math expressions using the Apache Commons JEXL library. + +*** + +## Required input +This processor works with any input stream that contains numerical values. + +*** + +## Configuration +A math expression can be defined using the JEXL syntax (see https://commons.apache.org/proper/commons-jexl/index.html). + +Example: + +``` +flow_rate*2 +``` + +It is also possible to use methods from `java.lang.Math`: + +``` +Math.pow(flow_rate^2) +``` + +All fields from th einput stream are available as variables. + +## Output +For each expression, an additional field is created in the output stream. Field names are user-defined. diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/icon.png b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/icon.png new file mode 100644 index 0000000000..e86193ad0b Binary files /dev/null and b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/icon.png differ diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/strings.en b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/strings.en new file mode 100644 index 0000000000..0243cfa1ea --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.expression/strings.en @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +org.apache.streampipes.processors.enricher.jvm.processor.expression.title=Math Expression +org.apache.streampipes.processors.enricher.jvm.processor.expression.description=Evaluates math expressions + +enriched-fields.title=Additional Fields +enriched-fields.description=Additional fields + +field-name.title=Field name +field-name.description=Field description + +expression.title=Expression +expression.description=The Math expression to apply. All numerical input fields can be used as variables. diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEvaluatorTest.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEvaluatorTest.java new file mode 100644 index 0000000000..09455e153c --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/JexlEvaluatorTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.schema.EventPropertyPrimitive; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.vocabulary.SO; + +import org.apache.commons.jexl3.JexlEngine; +import org.apache.commons.jexl3.MapContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class JexlEvaluatorTest { + + private JexlEngine engine; + + @BeforeEach + void setup() { + this.engine = new JexlEngineProvider().getEngine(); + } + + @ParameterizedTest + @MethodSource("provideTestArguments") + void testEvaluate(JexlDescription jexlDescription, + Object expectedResult, + MapContext context) { + var evaluator = new JexlEvaluator(jexlDescription, engine); + var event = makeBaseEvent(); + evaluator.evaluate(context, event); + + assertEquals(4, event.getRaw().size()); + assertEquals(expectedResult, event.getRaw().get("result1")); + + } + + static Stream provideTestArguments() { + MapContext context = new MapContext(); + context.set("a", 10); + context.set("b", 20.0); + context.set("c", 2); + context.set("Math", Math.class); + + return Stream.of( + Arguments.of(number("result1", "a + 5"), 15, context), + Arguments.of(number("result1", "b + 5"), 25.0, context), + Arguments.of(number("result1", "a + b * 5"), 110.0, context), + Arguments.of(number("result1", "a + b * 5"), 110.0, context), + Arguments.of(number("result1", "Math.pow(a, c)"), 100.0, context) + ); + } + + private static JexlDescription number(String runtimeName, + String script) { + return new JexlDescription(numberEp(runtimeName), script); + } + + private static EventPropertyPrimitive numberEp(String runtimeName) { + return EpProperties.doubleEp(Labels.empty(), runtimeName, SO.NUMBER); + } + + private static Event makeBaseEvent() { + var event = new Event(); + event.addField("a", 10); + event.addField("b", 20.0); + event.addField("c", 2); + return event; + } + + +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionProcessorTest.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionProcessorTest.java new file mode 100644 index 0000000000..bf69c18890 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/expression/MathExpressionProcessorTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.expression; + +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +public class MathExpressionProcessorTest { + + + private MathExpressionProcessor processor; + + @BeforeEach + public void setup() { + processor = new MathExpressionProcessor(); + } + + @Test + public void testProcessor() { + + List> inputEvents = List.of(Map.of( + "temperature", 10.1, + "flowrate", 2 + )); + + List> outputEvents = List.of(Map.of( + "temperature", 10.1, + "flowrate", 2, + "result1", 22.1, + "result2", 20.2 + )); + + var config = Map.of("members", List.of(Map.of( + MathExpressionProcessor.ENRICHED_FIELDS, + Map.of("staticProperties", Map.of( + MathExpressionProcessor.FIELD_NAME, "result1", + MathExpressionProcessor.EXPRESSION, "temperature+12" + ))), + Map.of( + MathExpressionProcessor.ENRICHED_FIELDS, + Map.of("staticProperties", Map.of( + MathExpressionProcessor.FIELD_NAME, "result2", + MathExpressionProcessor.EXPRESSION, "temperature*flowrate" + ))) + )); + + var configuration = TestConfiguration + .builder() + .config(MathExpressionProcessor.ENRICHED_FIELDS, config) + .prefixStrategy(PrefixStrategy.SAME_PREFIX) + .build(); + + var testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(inputEvents, outputEvents); + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/StaticPropertyGroup.java b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/StaticPropertyGroup.java index e783d5e946..506f246d83 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/StaticPropertyGroup.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/StaticPropertyGroup.java @@ -29,7 +29,7 @@ public class StaticPropertyGroup extends StaticProperty { private Boolean showLabel; - private boolean horizontalRendering; + private boolean horizontalRendering = true; public StaticPropertyGroup() { super(StaticPropertyType.StaticPropertyGroup); diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java index 9dd2df2b4d..2ad5f735e5 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java @@ -32,7 +32,6 @@ import org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticProperty; import org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty; import org.apache.streampipes.model.staticproperty.SecretStaticProperty; -import org.apache.streampipes.model.staticproperty.SelectionStaticProperty; import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; @@ -239,14 +238,11 @@ public static SecretStaticProperty secretValue(Label label) { label.getLabel(), label.getDescription()); } - public static CollectionStaticProperty collection(Label label, StaticProperty... sp) { - for (StaticProperty staticProperty : sp) { - setHorizontalRendering(staticProperty); - } + public static CollectionStaticProperty collection(Label label, boolean horizontalAlignment, StaticProperty... sp) { if (sp.length > 1) { StaticPropertyGroup group = StaticProperties.group(label); - group.setHorizontalRendering(true); + group.setHorizontalRendering(horizontalAlignment); group.setStaticProperties(Arrays.asList(sp)); return new CollectionStaticProperty(label.getInternalId(), label.getLabel(), @@ -257,6 +253,10 @@ public static CollectionStaticProperty collection(Label label, StaticProperty... } } + public static CollectionStaticProperty collection(Label label, StaticProperty... sp) { + return collection(label, true, sp); + } + public static CodeInputStaticProperty codeStaticProperty(Label label, CodeLanguage codeLanguage, String defaultSkeleton) { @@ -266,21 +266,4 @@ public static CodeInputStaticProperty codeStaticProperty(Label label, codeInputStaticProperty.setCodeTemplate(defaultSkeleton); return codeInputStaticProperty; } - - private static StaticProperty setHorizontalRendering(StaticProperty sp) { - if (sp instanceof StaticPropertyGroup) { - ((StaticPropertyGroup) sp).setHorizontalRendering(true); - ((StaticPropertyGroup) sp).getStaticProperties().stream() - .forEach(property -> setHorizontalRendering(property)); - } else if (sp instanceof SelectionStaticProperty) { - ((SelectionStaticProperty) sp).setHorizontalRendering(true); - } else if (sp instanceof StaticPropertyAlternatives) { - ((StaticPropertyAlternatives) sp).getAlternatives().stream() - .forEach(property -> setHorizontalRendering(property.getStaticProperty())); - - } - - return sp; - } - } diff --git a/ui/src/app/core-ui/pipeline/pipeline-started-status/pipeline-started-status.component.html b/ui/src/app/core-ui/pipeline/pipeline-started-status/pipeline-started-status.component.html index 85f64484d8..45e70af034 100644 --- a/ui/src/app/core-ui/pipeline/pipeline-started-status/pipeline-started-status.component.html +++ b/ui/src/app/core-ui/pipeline/pipeline-started-status/pipeline-started-status.component.html @@ -82,6 +82,7 @@ class="w-100" > diff --git a/ui/src/app/core-ui/static-properties/static-property-util.service.ts b/ui/src/app/core-ui/static-properties/static-property-util.service.ts index bed7c81cd8..b85882cd26 100644 --- a/ui/src/app/core-ui/static-properties/static-property-util.service.ts +++ b/ui/src/app/core-ui/static-properties/static-property-util.service.ts @@ -19,6 +19,7 @@ import { Injectable } from '@angular/core'; import { AnyStaticProperty, + CodeInputStaticProperty, CollectionStaticProperty, ColorPickerStaticProperty, FileStaticProperty, @@ -77,6 +78,12 @@ export class StaticPropertyUtilService { clone = new ColorPickerStaticProperty(); clone.id = id; clone.selectedProperty = val.selectedColor; + } else if (val instanceof CodeInputStaticProperty) { + clone = new CodeInputStaticProperty(); + clone.elementId = id; + clone.codeTemplate = val.codeTemplate; + clone.value = val.value; + clone.language = val.language; } else if (val instanceof StaticPropertyGroup) { clone = new StaticPropertyGroup(); clone.elementId = id; diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts index 7efa549cf1..58e0bee821 100644 --- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts +++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts @@ -296,7 +296,7 @@ export class SavePipelineComponent implements OnInit { if (this.shepherdService.isTourActive()) { this.shepherdService.hideCurrentStep(); } - if (this.storageOptions.navigateToPipelineOverview) { + if (this.storageOptions.navigateToPipelineOverview && status?.success) { this.navigateToPipelineOverview(); } } diff --git a/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-details-expansion-panel.component.html b/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-details-expansion-panel.component.html index 848ae64550..0f65683cf6 100644 --- a/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-details-expansion-panel.component.html +++ b/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-details-expansion-panel.component.html @@ -28,7 +28,7 @@ > diff --git a/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-element-details-row/pipeline-element-details-row.component.html b/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-element-details-row/pipeline-element-details-row.component.html index 284166d2dd..4c3a6cf22e 100644 --- a/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-element-details-row/pipeline-element-details-row.component.html +++ b/ui/src/app/pipeline-details/components/pipeline-details-expansion-panel/pipeline-element-details-row/pipeline-element-details-row.component.html @@ -27,7 +27,7 @@ mat-icon-button color="accent" matTooltip="Logs" - [disabled]="!pipelineRunning" + [disabled]="!pipelineRunning || logInfo.length === 0" > topic diff --git a/ui/src/app/pipeline-details/dialogs/pipeline-logs/pipeline-logs-dialog.component.html b/ui/src/app/pipeline-details/dialogs/pipeline-logs/pipeline-logs-dialog.component.html index a473730836..8edd5242bb 100644 --- a/ui/src/app/pipeline-details/dialogs/pipeline-logs/pipeline-logs-dialog.component.html +++ b/ui/src/app/pipeline-details/dialogs/pipeline-logs/pipeline-logs-dialog.component.html @@ -19,7 +19,7 @@
-
+