Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test job #3330

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.SensorLimitAlertProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment.QualityControlLimitsEnrichmentProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.math.MathOpProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.math.SigmaOpProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop.StaticMathOpProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.trigonometry.TrigonometryProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.valuechange.ValueChangeProcessor;
Expand All @@ -50,7 +51,9 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
new StaticMathOpProcessor(),
new TrigonometryProcessor(),
new ValueChangeProcessor(),
new MathExpressionProcessor()
new MathExpressionProcessor(),
// New processor added to perform sigma operation
new SigmaOpProcessor()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/


package org.apache.streampipes.processors.enricher.jvm.processor.math;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

public class SigmaOpProcessor extends StreamPipesDataProcessor {

protected static final String FIELD_ID = "field";
private static final String RESULT_FIELD = "sigmaResult";

private Double fieldSigmaValue;
private String selectedFieldName;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder
.create("org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop", 0)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.category(DataProcessorType.ALGORITHM)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(FIELD_ID),
PropertyScope.NONE)
.build())
.outputStrategy(
OutputStrategies.append(
EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER)))
.build();
}

@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
ProcessingElementParameterExtractor extractor = parameters.extractor();
this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
this.fieldSigmaValue = 0.0;
}

@Override
public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException {
Double fieldValue = in.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsDouble();

fieldSigmaValue += fieldValue;
in.addField(RESULT_FIELD, fieldSigmaValue);

out.collect(in);
}

@Override
public void onDetach() throws SpRuntimeException {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

## Sigma

<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
</p>

***

## Description

This processor performs sigma operation on the selected number field i.e. addition of a sequence of numbers.

***

## Required input

A number field is required in the data stream and can be selected with the field mapping.

### Number Field

The number field to be summed.

***

## Configuration

(no further configuration required)

## Output

* [sigmaResult] Sum of the last value and new value

The event is emitted whenever new value comes in stream.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop.title=Sigma ∑
org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop.description=Performs sigma operation on the selected number field

field.title=Number Field
field.description=The field of the number to sigma
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.enricher.jvm.processor.math;

import org.apache.streampipes.test.executors.PrefixStrategy;
import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
import org.apache.streampipes.test.executors.StreamPrefix;
import org.apache.streampipes.test.executors.TestConfiguration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

class SigmaOpProcessorTest {

private SigmaOpProcessor processor;

@BeforeEach
public void setup() {
processor = new SigmaOpProcessor();
}


static Stream<Arguments> arguments() {
return Stream.of(
Arguments.of(
List.of(Map.of("mass_flow", 10.0), Map.of("mass_flow", 20.0), Map.of("mass_flow", 30.0)),
List.of(Map.of("sigmaResult", 10.0), Map.of("sigmaResult", 30.0), Map.of("sigmaResult", 60.0))
)
);
}

@ParameterizedTest
@MethodSource("arguments")
public void testProcessor(
List<Map<String, Object>> inputEvents,
List<Map<String, Object>> outputEvents
) {

var configuration = TestConfiguration
.builder()
.config(SigmaOpProcessor.FIELD_ID, StreamPrefix.s0("mass_flow"))
.prefixStrategy(PrefixStrategy.SAME_PREFIX)
.build();

var testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(inputEvents, outputEvents);
}
}
Loading