Skip to content

Commit

Permalink
feat(#3179): new processor sensorlimitalert (#3180)
Browse files Browse the repository at this point in the history
* feat(#3179): Add processor suffix to QualityControlLimitsEnrichmentProcessor

* feat(#3179): Add processor sensor limit alert

* feat(#3180): Change to interface IStreamPipesDataProcessor
  • Loading branch information
tenthe authored Aug 29, 2024
1 parent c818ff1 commit 2c997f0
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment.QualityControlLimitsEnrichment;
import org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.SensorLimitAlertProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment.QualityControlLimitsEnrichmentProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.math.MathOpProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop.StaticMathOpProcessor;
import org.apache.streampipes.processors.enricher.jvm.processor.trigonometry.TrigonometryProcessor;
Expand All @@ -42,7 +43,8 @@ public List<StreamPipesAdapter> adapters() {
public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
new JSEvalProcessor(),
new QualityControlLimitsEnrichment(),
new QualityControlLimitsEnrichmentProcessor(),
new SensorLimitAlertProcessor(),
new MathOpProcessor(),
new StaticMathOpProcessor(),
new TrigonometryProcessor(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.enricher.jvm.processor.limitsalert;

import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;

public class SensorLimitAlertProcessor implements IStreamPipesDataProcessor {

protected static final String SENSOR_VALUE_LABEL = "sensorValue";
protected static final String UPPER_CONTROL_LIMIT_LABEL = "upperControlLimit";
protected static final String UPPER_WARNING_LIMIT_LABEL = "upperWarningLimit";
protected static final String LOWER_WARNING_LIMIT_LABEL = "lowerWarningLimit";
protected static final String LOWER_CONTROL_LIMIT_LABEL = "lowerControlLimit";

// Property names that are appended to the resulting event
protected static final String ALERT_STATUS = "alertStatus";
protected static final String LIMIT_BREACHED = "limitBreached";

protected static final String ALERT = "ALERT";
protected static final String WARNING = "WARNING";
protected static final String UPPER_LIMIT = "UPPER_LIMIT";
protected static final String LOWER_LIMIT = "LOWER_LIMIT";

private String sensorField;
private String upperControlLimitField;
private String upperWarningLimitField;
private String lowerWarningLimitField;
private String lowerControlLimitField;


@Override
public IDataProcessorConfiguration declareConfig() {
return DataProcessorConfiguration.create(
SensorLimitAlertProcessor::new,
ProcessingElementBuilder
.create("org.apache.streampipes.processors.enricher.jvm.processor.limitsalert", 0)
.category(DataProcessorType.ENRICH)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(SENSOR_VALUE_LABEL),
PropertyScope.MEASUREMENT_PROPERTY
)
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(UPPER_CONTROL_LIMIT_LABEL),
PropertyScope.MEASUREMENT_PROPERTY
)
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(UPPER_WARNING_LIMIT_LABEL),
PropertyScope.MEASUREMENT_PROPERTY
)
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(LOWER_WARNING_LIMIT_LABEL),
PropertyScope.MEASUREMENT_PROPERTY
)
.requiredPropertyWithUnaryMapping(
EpRequirements.numberReq(),
Labels.withId(LOWER_CONTROL_LIMIT_LABEL),
PropertyScope.MEASUREMENT_PROPERTY
)
.build())
.outputStrategy(
OutputStrategies.append(
EpProperties.stringEp(Labels.empty(), ALERT_STATUS, SO.TEXT),
EpProperties.stringEp(Labels.empty(), LIMIT_BREACHED, SO.TEXT)
))
.build()
);
}

@Override
public void onPipelineStarted(
IDataProcessorParameters params,
SpOutputCollector collector,
EventProcessorRuntimeContext runtimeContext
) {
var extractor = params.extractor();

sensorField = extractor.mappingPropertyValue(SENSOR_VALUE_LABEL);
upperControlLimitField = extractor.mappingPropertyValue(UPPER_CONTROL_LIMIT_LABEL);
upperWarningLimitField = extractor.mappingPropertyValue(UPPER_WARNING_LIMIT_LABEL);
lowerWarningLimitField = extractor.mappingPropertyValue(LOWER_WARNING_LIMIT_LABEL);
lowerControlLimitField = extractor.mappingPropertyValue(LOWER_CONTROL_LIMIT_LABEL);
}

@Override
public void onEvent(Event event, SpOutputCollector collector) {
var sensorValue = event.getFieldBySelector(sensorField)
.getAsPrimitive()
.getAsDouble();
var upperControlLimit = event.getFieldBySelector(upperControlLimitField)
.getAsPrimitive()
.getAsDouble();
var upperWarningLimit = event.getFieldBySelector(upperWarningLimitField)
.getAsPrimitive()
.getAsDouble();
var lowerWarningLimit = event.getFieldBySelector(lowerWarningLimitField)
.getAsPrimitive()
.getAsDouble();
var lowerControlLimit = event.getFieldBySelector(lowerControlLimitField)
.getAsPrimitive()
.getAsDouble();

String alertStatus = null;
String limitBreached = null;

if (sensorValue > upperControlLimit) {
alertStatus = ALERT;
limitBreached = UPPER_LIMIT;
} else if (sensorValue > upperWarningLimit) {
alertStatus = WARNING;
limitBreached = UPPER_LIMIT;
} else if (sensorValue < lowerControlLimit) {
alertStatus = ALERT;
limitBreached = LOWER_LIMIT;
} else if (sensorValue < lowerWarningLimit) {
alertStatus = WARNING;
limitBreached = LOWER_LIMIT;
}

if (alertStatus != null && limitBreached != null) {
event.addField(ALERT_STATUS, alertStatus);
event.addField(LIMIT_BREACHED, limitBreached);
collector.collect(event);
}

}

@Override
public void onPipelineStopped() {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

public class QualityControlLimitsEnrichment extends StreamPipesDataProcessor {
public class QualityControlLimitsEnrichmentProcessor extends StreamPipesDataProcessor {


protected static final String UPPER_CONTROL_LIMIT_LABEL = "upperControlLimitInput";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

## Sensor Limit Alert

<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
</p>

***

## Description

The Sensor Limit Alert processor monitors sensor values in real-time and triggers alerts when these values exceed user-defined control or warning limits. This processor is useful in scenarios where continuous monitoring of critical parameters is required, and immediate action is needed when values go out of acceptable ranges.

***

## Required Input

This processor accepts any event stream containing sensor data. The events must include fields for sensor values and the corresponding upper and lower limits.

***

## Configuration

#### Sensor Value

Select the sensor value to be monitored. This is the primary measurement that will be checked against the defined limits.

#### Upper Control Limit

Specify the upper control limit for the sensor. This value defines the maximum threshold, beyond which an alert is triggered.

#### Upper Warning Limit

Specify the upper warning limit for the sensor. This value indicates when the sensor value is approaching the upper control limit, triggering a warning.

#### Lower Warning Limit

Specify the lower warning limit for the sensor. This value indicates when the sensor value is approaching the lower control limit, triggering a warning.

#### Lower Control Limit

Specify the lower control limit for the sensor. This value defines the minimum threshold, below which an alert is triggered.

***

## Output

The processor emits events only when the sensor value exceeds the specified limits. The output event includes the original sensor data along with additional fields that indicate:
- **Alert Status**: Whether the sensor value breached a WARNING or control LIMIT.
- **Limit Breached**: Which specific limit was breached (e.g., "UPPER_CONTROL_LIMIT" or "LOWER_WARNING_LIMIT").

These output events can be used for triggering notifications or other actions in downstream processing.

***

## Example

### User Configuration
- Mapping fields for:
- **Sensor Value**
- **Upper Control Limit**
- **Upper Warning Limit**
- **Lower Warning Limit**
- **Lower Control Limit**

### Input Event
```
{
"timestamp": 1627891234000,
"sensorValue": 105.0,
"upperControlLimit": 100.0,
"upperWarningLimit": 90.0,
"lowerWarningLimit": 10.0,
"lowerControlLimit": 0.0
}
```

### Output Event
```
{
"timestamp": 1627891234000,
"sensorValue": 105.0,
"upperControlLimit": 100.0,
"upperWarningLimit": 90.0,
"lowerWarningLimit": 10.0,
"lowerControlLimit": 0.0,
"alertStatus": "ALERT",
"limitBreached": "UPPER_CONTROL_LIMIT"
}
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.title=Sensor Limit Alert
org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.description=Monitors sensor values and triggers alerts when they exceed defined control or warning limits.

sensorValue.title=Sensor Value
sensorValue.description=Select the sensor value to be monitored.

upperControlLimit.title=Upper Control Limit
upperControlLimit.description=Set the upper control limit for the sensor value.

upperWarningLimit.title=Upper Warning Limit
upperWarningLimit.description=Set the upper warning limit for the sensor value.

lowerWarningLimit.title=Lower Warning Limit
lowerWarningLimit.description=Set the lower warning limit for the sensor value.

lowerControlLimit.title=Lower Control Limit
lowerControlLimit.description=Set the lower control limit for the sensor value.

alertStatus.title=Alert Status
alertStatus.description=Indicates whether the sensor value has breached a warning or control limit.

limitBreached.title=Limit Breached
limitBreached.description=Specifies which limit (e.g., Upper Control Limit, Lower Warning Limit) was breached by the sensor value.
Loading

0 comments on commit 2c997f0

Please sign in to comment.