diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java index a762e777ca..f70a521d60 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java @@ -27,6 +27,7 @@ import org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.SensorLimitAlertProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment.QualityControlLimitsEnrichmentProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.math.MathOpProcessor; +import org.apache.streampipes.processors.enricher.jvm.processor.math.SigmaOpProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop.StaticMathOpProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.trigonometry.TrigonometryProcessor; import org.apache.streampipes.processors.enricher.jvm.processor.valuechange.ValueChangeProcessor; @@ -50,7 +51,9 @@ public List> pipelineElements() { new StaticMathOpProcessor(), new TrigonometryProcessor(), new ValueChangeProcessor(), - new MathExpressionProcessor() + new MathExpressionProcessor(), + // New processor added to perform sigma operation + new SigmaOpProcessor() ); } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/SigmaOpProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/SigmaOpProcessor.java new file mode 100644 index 0000000000..7a509d047b --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/SigmaOpProcessor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.streampipes.processors.enricher.jvm.processor.math; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.model.DataProcessorType; +import org.apache.streampipes.model.extensions.ExtensionAssetType; +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; +import org.apache.streampipes.vocabulary.SO; +import org.apache.streampipes.wrapper.params.compat.ProcessorParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; + +public class SigmaOpProcessor extends StreamPipesDataProcessor { + + protected static final String FIELD_ID = "field"; + private static final String RESULT_FIELD = "sigmaResult"; + + private Double fieldSigmaValue; + private String selectedFieldName; + + @Override + public DataProcessorDescription declareModel() { + return ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop", 0) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .category(DataProcessorType.ALGORITHM) + .requiredStream(StreamRequirementsBuilder.create() + .requiredPropertyWithUnaryMapping( + EpRequirements.numberReq(), + Labels.withId(FIELD_ID), + PropertyScope.NONE) + .build()) + .outputStrategy( + OutputStrategies.append( + EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) + .build(); + } + + @Override + public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, + EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException { + ProcessingElementParameterExtractor extractor = parameters.extractor(); + this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID); + this.fieldSigmaValue = 0.0; + } + + @Override + public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { + Double fieldValue = in.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsDouble(); + + fieldSigmaValue += fieldValue; + in.addField(RESULT_FIELD, fieldSigmaValue); + + out.collect(in); + } + + @Override + public void onDetach() throws SpRuntimeException { + + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/documentation.md b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/documentation.md new file mode 100644 index 0000000000..6e6d4a67ed --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/documentation.md @@ -0,0 +1,51 @@ + + +## Sigma + +

+ +

+ +*** + +## Description + +This processor performs sigma operation on the selected number field i.e. addition of a sequence of numbers. + +*** + +## Required input + +A number field is required in the data stream and can be selected with the field mapping. + +### Number Field + +The number field to be summed. + +*** + +## Configuration + +(no further configuration required) + +## Output + +* [sigmaResult] Sum of the last value and new value + +The event is emitted whenever new value comes in stream. \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/icon.png b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/icon.png new file mode 100644 index 0000000000..a29ed83649 Binary files /dev/null and b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/icon.png differ diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/strings.en b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/strings.en new file mode 100644 index 0000000000..85bfec298f --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop/strings.en @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop.title=Sigma ∑ +org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop.description=Performs sigma operation on the selected number field + +field.title=Number Field +field.description=The field of the number to sigma \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/math/SigmaOpProcessorTest.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/math/SigmaOpProcessorTest.java new file mode 100644 index 0000000000..abae5652f8 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/math/SigmaOpProcessorTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.math; + +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.StreamPrefix; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +class SigmaOpProcessorTest { + + private SigmaOpProcessor processor; + + @BeforeEach + public void setup() { + processor = new SigmaOpProcessor(); + } + + + static Stream arguments() { + return Stream.of( + Arguments.of( + List.of(Map.of("mass_flow", 10.0), Map.of("mass_flow", 20.0), Map.of("mass_flow", 30.0)), + List.of(Map.of("sigmaResult", 10.0), Map.of("sigmaResult", 30.0), Map.of("sigmaResult", 60.0)) + ) + ); + } + + @ParameterizedTest + @MethodSource("arguments") + public void testProcessor( + List> inputEvents, + List> outputEvents + ) { + + var configuration = TestConfiguration + .builder() + .config(SigmaOpProcessor.FIELD_ID, StreamPrefix.s0("mass_flow")) + .prefixStrategy(PrefixStrategy.SAME_PREFIX) + .build(); + + var testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(inputEvents, outputEvents); + } +} \ No newline at end of file