Skip to content

Commit

Permalink
test job
Browse files Browse the repository at this point in the history
  • Loading branch information
salmanaslam25 committed Nov 12, 2024
1 parent 7349246 commit adde534
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/


package org.apache.streampipes.processors.enricher.jvm.processor.math;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

public class SigmaOpProcessor extends StreamPipesDataProcessor {

protected static final String FIELD_ID = "field";
private static final String RESULT_FIELD = "sigmaResult";

private Double fieldSigmaValue;
private String selectedFieldName;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder
.create("org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop", 0)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.category(DataProcessorType.ALGORITHM)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(FIELD_ID),
PropertyScope.NONE)
.build())
.outputStrategy(
OutputStrategies.append(
EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER)))
.build();
}

@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
ProcessingElementParameterExtractor extractor = parameters.extractor();
this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
this.fieldSigmaValue = 0.0;
}

@Override
public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException {
Double fieldValue = in.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsDouble();

fieldSigmaValue += fieldValue;
in.addField(RESULT_FIELD, fieldSigmaValue);

out.collect(in);
}

@Override
public void onDetach() throws SpRuntimeException {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

## Sigma

<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
</p>

***

## Description

This processor performs sigma operation on the selected number field i.e. addition of a sequence of numbers.

***

## Required input

A number field is required in the data stream and can be selected with the field mapping.

### Number Field

The number field to be summed.

***

## Configuration

(no further configuration required)

## Output

* [sigmaResult] Sum of the last value and new value

The event is emitted whenever new value comes in stream.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop.title=Sigma ∑
org.apache.streampipes.processors.enricher.jvm.processor.math.sigmaop.description=Performs sigma operation on the selected number field

field.title=Number Field
field.description=The field of the number to sigma
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.enricher.jvm.processor.math;

import org.apache.streampipes.test.executors.PrefixStrategy;
import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
import org.apache.streampipes.test.executors.StreamPrefix;
import org.apache.streampipes.test.executors.TestConfiguration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

class SigmaOpProcessorTest {

private SigmaOpProcessor processor;

@BeforeEach
public void setup() {
processor = new SigmaOpProcessor();
}


static Stream<Arguments> arguments() {
return Stream.of(
Arguments.of(
List.of(Map.of("mass_flow", 10.0), Map.of("mass_flow", 20.0), Map.of("mass_flow", 30.0)),
List.of(Map.of("sigmaResult", 10.0), Map.of("sigmaResult", 30.0), Map.of("sigmaResult", 60.0))
)
);
}

@ParameterizedTest
@MethodSource("arguments")
public void testProcessor(
List<Map<String, Object>> inputEvents,
List<Map<String, Object>> outputEvents
) {

var configuration = TestConfiguration
.builder()
.config(SigmaOpProcessor.FIELD_ID, StreamPrefix.s0("mass_flow"))
.prefixStrategy(PrefixStrategy.SAME_PREFIX)
.build();

var testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(inputEvents, outputEvents);
}
}

0 comments on commit adde534

Please sign in to comment.