diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml b/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml index 98bfabefdb..09dc0887bf 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml +++ b/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml @@ -43,6 +43,12 @@ org.quartz-scheduler quartz + + org.apache.streampipes + streampipes-test-utils + 0.93.0-SNAPSHOT + test + diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java new file mode 100644 index 0000000000..30a5358308 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.booleanfilter; + +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.runtime.EventFactory; +import org.apache.streampipes.model.runtime.SchemaInfo; +import org.apache.streampipes.model.runtime.SourceInfo; +import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; +import org.apache.streampipes.model.staticproperty.OneOfStaticProperty; +import org.apache.streampipes.test.extensions.api.StoreEventCollector; +import org.apache.streampipes.test.generator.EventStreamGenerator; +import org.apache.streampipes.test.generator.InvocationGraphGenerator; +import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; +import org.apache.streampipes.wrapper.params.compat.ProcessorParams; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class TestBooleanFilterProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(TestBooleanFilterProcessor.class); + + @org.junit.runners.Parameterized.Parameters + public static Iterable data() { + List allTrue = Arrays.asList(true, true, true); + List allFalse = Arrays.asList(false, false, false); + List someTrueSomeFalse = Arrays.asList(true, false, true, false, true, false, true, false, true); + List empty = Arrays.asList(); + return Arrays.asList(new Object[][]{ + {"True", "Test", someTrueSomeFalse, 5}, + {"True", "Test", allTrue, 3}, + {"True", "Test", allFalse, 0}, + {"True", "Test", empty, 0}, + {"False", "Test", someTrueSomeFalse, 4}, + {"False", "Test", allTrue, 0}, + {"False", "Test", allFalse, 3}, + {"False", "Test", empty, 0}, + }); + } + + @org.junit.runners.Parameterized.Parameter + public String boolToKeep; + + @org.junit.runners.Parameterized.Parameter(1) + public String fieldName; + + @org.junit.runners.Parameterized.Parameter(2) + public List eventBooleans; + + @org.junit.runners.Parameterized.Parameter(3) + public int expectedFilteredBooleanCount; + + @Test + public void testBoolenFilter() { + BooleanFilterProcessor bfp = new BooleanFilterProcessor(); + DataProcessorDescription originalGraph = bfp.declareModel(); + originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); + + DataProcessorInvocation graph = + InvocationGraphGenerator.makeEmptyInvocation(originalGraph); + + graph.setInputStreams(Collections + .singletonList(EventStreamGenerator + .makeStreamWithProperties(Collections.singletonList(fieldName)))); + + graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList(fieldName))); + + graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() + .setActualTopicName("output-topic"); + + graph.getStaticProperties().stream() + .filter(p -> p instanceof MappingPropertyUnary) + .map((p -> (MappingPropertyUnary) p)) + // Must hardcode since BOOLEAN_MAPPING is private + .filter(p -> p.getInternalName().equals("boolean-mapping")) + .findFirst().get().setSelectedProperty("s0::" + fieldName); + ProcessorParams params = new ProcessorParams(graph); + params.extractor().getStaticPropertyByName(BooleanFilterProcessor.VALUE, OneOfStaticProperty.class).getOptions() + .stream().filter(ot -> ot.getName().equals(boolToKeep)).findFirst() + .get().setSelected(true); + StoreEventCollector collector = new StoreEventCollector(); + + bfp.onInvocation(params, collector, null); + + int result = sendEvents(bfp, collector); + + LOG.info("Expected filtered boolean count is {}", expectedFilteredBooleanCount); + LOG.info("Actual filtered boolean count is {}", result); + assertEquals(expectedFilteredBooleanCount, result); + } + + private int sendEvents(BooleanFilterProcessor processor, StoreEventCollector collector) { + List events = makeEvents(); + for (Event event : events) { + LOG.info("Sending event with value " + + event.getFieldBySelector("s0::" + fieldName).getAsPrimitive().getAsBoolean()); + processor.onEvent(event, collector); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return collector.getEvents().size(); + } + + private List makeEvents() { + List events = new ArrayList<>(); + for (Boolean eventSetting : eventBooleans) { + events.add(makeEvent(eventSetting)); + } + return events; + } + + private Event makeEvent(Boolean value) { + Map map = new HashMap<>(); + map.put(fieldName, value); + return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "s0"), + new SchemaInfo(null, new ArrayList<>())); + } +} diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/value/change/TestChangedValueDetectionProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/value/change/TestChangedValueDetectionProcessor.java index 001991630f..f7c38a6c3a 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/value/change/TestChangedValueDetectionProcessor.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/value/change/TestChangedValueDetectionProcessor.java @@ -18,9 +18,6 @@ package org.apache.streampipes.processors.transformation.jvm.processor.value.change; -import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; -import org.apache.streampipes.messaging.InternalEventProcessor; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.EventFactory; @@ -31,6 +28,7 @@ import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder; import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder; import org.apache.streampipes.sdk.utils.Datatypes; +import org.apache.streampipes.test.extensions.api.StoreEventCollector; import org.apache.streampipes.test.generator.InvocationGraphGenerator; import org.apache.streampipes.wrapper.params.compat.ProcessorParams; @@ -38,7 +36,6 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -60,8 +57,8 @@ public void getDimensionKeyForOneDimension() { // Create event with no DIMENSION_PROPERTY event.getSchemaInfo() - .getEventSchema() - .getEventProperties().get(0) + .getEventSchema() + .getEventProperties().get(0) .setPropertyScope(PropertyScope.MEASUREMENT_PROPERTY.name()); assertEquals("l1", processor.getDimensionKey(event)); @@ -87,9 +84,9 @@ public void detectChangedValue() { DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(processor.declareModel()); graph.getStaticProperties().stream() - .filter(p -> p instanceof MappingPropertyUnary) - .map((p -> (MappingPropertyUnary) p)) - .findFirst().get().setSelectedProperty("s0::value"); + .filter(p -> p instanceof MappingPropertyUnary) + .map((p -> (MappingPropertyUnary) p)) + .findFirst().get().setSelectedProperty("s0::value"); ProcessorParams params = new ProcessorParams(graph); processor.onInvocation(params, null, null); @@ -117,9 +114,9 @@ public void detectChangedValueMultiDim() { DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(processor.declareModel()); graph.getStaticProperties().stream() - .filter(p -> p instanceof MappingPropertyUnary) - .map((p -> (MappingPropertyUnary) p)) - .findFirst().get().setSelectedProperty("s0::value"); + .filter(p -> p instanceof MappingPropertyUnary) + .map((p -> (MappingPropertyUnary) p)) + .findFirst().get().setSelectedProperty("s0::value"); ProcessorParams params = new ProcessorParams(graph); processor.onInvocation(params, null, null); @@ -170,31 +167,4 @@ private Event createTestEvent(Integer value, String location) { return EventFactory.fromMap(map, new SourceInfo("", "s0"), new SchemaInfo(eventSchema, new ArrayList<>())); } - - class StoreEventCollector implements SpOutputCollector { - - ArrayList events = new ArrayList(); - - @Override - public void registerConsumer(String routeId, InternalEventProcessor> consumer) {} - - @Override - public void unregisterConsumer(String routeId) {} - - @Override - public void connect() throws SpRuntimeException {} - - @Override - public void disconnect() throws SpRuntimeException {} - - @Override - public void collect(Event event) { - events.add(event); - } - - public List getEvents() { - return this.events; - } - - } } diff --git a/streampipes-test-utils/pom.xml b/streampipes-test-utils/pom.xml index 75c3a80d1a..de133e551b 100644 --- a/streampipes-test-utils/pom.xml +++ b/streampipes-test-utils/pom.xml @@ -41,6 +41,18 @@ junit test + + org.apache.streampipes + streampipes-messaging + 0.93.0-SNAPSHOT + compile + + + org.apache.streampipes + streampipes-extensions-api + 0.93.0-SNAPSHOT + compile + diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/extensions/api/StoreEventCollector.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/extensions/api/StoreEventCollector.java new file mode 100644 index 0000000000..fd4080d8c9 --- /dev/null +++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/extensions/api/StoreEventCollector.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.test.extensions.api; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.messaging.InternalEventProcessor; +import org.apache.streampipes.model.runtime.Event; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class StoreEventCollector implements SpOutputCollector { + private ArrayList events = new ArrayList(); + + @Override + public void registerConsumer(String routeId, InternalEventProcessor> consumer) { + } + + @Override + public void unregisterConsumer(String routeId) { + } + + @Override + public void connect() throws SpRuntimeException { + } + + @Override + public void disconnect() throws SpRuntimeException { + } + + @Override + public void collect(Event event) { + events.add(event); + } + + public List getEvents() { + return this.events; + } +} \ No newline at end of file