From 1ca6d6120aa8b11f9835d02b0540072e206d089e Mon Sep 17 00:00:00 2001 From: rahulbis Date: Fri, 13 Oct 2023 07:57:11 +0530 Subject: [PATCH 1/7] FEAT: unit test for MergeByTimeProcessor class --- .../filters/jvm/processor/merge/MergeByTimeProcessor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java index fcc5c7e229..05cf7a33cb 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java @@ -40,10 +40,10 @@ public class MergeByTimeProcessor extends StreamPipesDataProcessor { - private static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1"; - private static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2"; + protected static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1"; + protected static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2"; private static final String NUMBER_MAPPING = "number_mapping"; - private static final String TIME_INTERVAL = "time-interval"; + protected static final String TIME_INTERVAL = "time-interval"; private List outputKeySelectors; private String timestampFieldStream0; From 3c4ffb065be8b2a4c175a80c243c61ace71e1ffb Mon Sep 17 00:00:00 2001 From: rahulbis Date: Fri, 13 Oct 2023 08:15:41 +0530 Subject: [PATCH 2/7] FEAT: unit test for MergeByTimeProcessor class --- .../merge/TestMergeByTimeProcessor.java | 207 ++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java new file mode 100644 index 0000000000..c1232bf147 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -0,0 +1,207 @@ +package org.apache.streampipes.processors.filters.jvm.processor.merge; + +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.messaging.InternalEventProcessor; +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.runtime.EventFactory; +import org.apache.streampipes.model.runtime.SchemaInfo; +import org.apache.streampipes.model.runtime.SourceInfo; +import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; +import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; +import org.apache.streampipes.test.generator.EventStreamGenerator; +import org.apache.streampipes.test.generator.InvocationGraphGenerator; +import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; +import org.apache.streampipes.wrapper.params.compat.ProcessorParams; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class TestMergeByTimeProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class); + + private static final Integer timeInterval = 100; + + @org.junit.runners.Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][] { + {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), Arrays.asList("(90,0)")}, + {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), Arrays.asList()}, + {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), Arrays.asList("(80,0)")}, + {"testFigGvnInDocs", Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", "s1:510" ), + Arrays.asList("(0,10)", "(115,110)", "(510,500)")} + }); + } + + @org.junit.runners.Parameterized.Parameter + public String testName; + + @org.junit.runners.Parameterized.Parameter(1) + public List eventStrings; + + @org.junit.runners.Parameterized.Parameter(2) + public List expectedValue; + + + @Test + public void testMergeByTimeProcessor() { + MergeByTimeProcessor mergeByTimeProcessor = new MergeByTimeProcessor(); + DataProcessorDescription originalGraph = mergeByTimeProcessor.declareModel(); + originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); + + DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); + graph.setInputStreams(Arrays.asList( + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) + )); + + graph.setOutputStream( + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) + ); + graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() + .setActualTopicName("output-topic"); + + List outputKeySelectors = graph.getOutputStrategies() + .stream() + .filter(CustomOutputStrategy.class::isInstance) + .map(o -> (CustomOutputStrategy) o) + .findFirst() + .map(CustomOutputStrategy::getSelectedPropertyKeys) + .orElse(new ArrayList<>()); + outputKeySelectors.add("s0::timestamp_mapping_stream_1"); + outputKeySelectors.add("s1::timestamp_mapping_stream_2"); + + List mappingPropertyUnaries = graph.getStaticProperties() + .stream() + .filter(p -> p instanceof MappingPropertyUnary) + .map((p -> (MappingPropertyUnary) p)) + .filter(p -> Arrays.asList( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) + .contains(p.getInternalName())) + .collect(Collectors.toList()); + + + assert mappingPropertyUnaries.size() == 2; + mappingPropertyUnaries.get(0).setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); + mappingPropertyUnaries.get(1).setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); + + FreeTextStaticProperty fsp = graph.getStaticProperties().stream() + .filter(p -> p instanceof FreeTextStaticProperty) + .map((p -> (FreeTextStaticProperty) p )) + .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) + .findFirst().orElse(null); + assert fsp != null; + fsp.setValue(String.valueOf(timeInterval)); + + ProcessorParams params = new ProcessorParams(graph); + + StoreEventCollector collector = new StoreEventCollector(); + + mergeByTimeProcessor.onInvocation(params, collector, null); + sendEvents(mergeByTimeProcessor, collector); + + List actualCollectedEvents = collector.getEvents().stream() + .map(e -> formatMergedEvent(e)) + .collect(Collectors.toList()); + + LOG.info("Expected merged event is {}", expectedValue); + LOG.info("Actual merged event is {}", actualCollectedEvents); + assertTrue(eventsEquals(expectedValue, actualCollectedEvents)); + } + + private boolean eventsEquals(List expectedValue, List actualCollectedEvents) { + if (expectedValue.size() != actualCollectedEvents.size()) { + return false; + } + for (int i = 0; i < expectedValue.size(); i++) { + if (!expectedValue.get(i).equalsIgnoreCase(actualCollectedEvents.get(i))) { + return false; + } + } + return true; + } + + private String formatMergedEvent(Event mergedEvent) { + return String.format("(%s)", mergedEvent.getFields().values().stream(). + map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); + } + + private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) { + List events = makeEvents(); + for (Event event : events) { + mergeByTimeProcessor.onEvent(event, spOut); + } + + } + + private List makeEvents() { + List events = Lists.newArrayList(); + for (String eventString : eventStrings) { + events.add(makeEvent(eventString)); + } + return events; + } + + private Event makeEvent(String eventString) { + Map map = Maps.newHashMap(); + String streamId = eventString.split(":")[0]; + String timestamp = eventString.split(":")[1]; + if (streamId.equals("s0")) { + map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, timestamp); + } else { + map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp); + } + return EventFactory.fromMap(map, + new SourceInfo("test", streamId), + new SchemaInfo(null, Lists.newArrayList())); + } + + class StoreEventCollector implements SpOutputCollector { + + ArrayList events = new ArrayList(); + + @Override + public void registerConsumer(String routeId, InternalEventProcessor> consumer) {} + + @Override + public void unregisterConsumer(String routeId) {} + + @Override + public void connect() throws SpRuntimeException {} + + @Override + public void disconnect() throws SpRuntimeException {} + + @Override + public void collect(Event event) { + events.add(event); + } + + public List getEvents() { + return this.events; + } + public void clearEvent() { + this.events.clear(); + } + + } +} + From d3a468b0bd3366eff6c7f7fc9b98b62b8025be04 Mon Sep 17 00:00:00 2001 From: rahulbis Date: Fri, 13 Oct 2023 09:36:47 +0530 Subject: [PATCH 3/7] REFACTOR: added License header --- .../merge/TestMergeByTimeProcessor.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java index c1232bf147..fadc94ee5b 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package org.apache.streampipes.processors.filters.jvm.processor.merge; import static org.junit.Assert.assertTrue; From 17d29bbb89d8f7ebab6d54d0dcdec24e1ad79713 Mon Sep 17 00:00:00 2001 From: rahulbis Date: Fri, 13 Oct 2023 12:14:57 +0530 Subject: [PATCH 4/7] REFACTOR: addressed review comments --- .gitignore | 1 + .../processor/merge/MergeByTimeProcessor.java | 2 +- .../merge/TestMergeByTimeProcessor.java | 34 +------------------ 3 files changed, 3 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index 5f713cb3f1..70ffc0e82c 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ .idea/dynamic.xml .idea/uiDesigner.xml .idea/* +my-remarks.txt # Gradle: .idea/gradle.xml diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java index 05cf7a33cb..0edb33b964 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java @@ -42,7 +42,7 @@ public class MergeByTimeProcessor extends StreamPipesDataProcessor { protected static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1"; protected static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2"; - private static final String NUMBER_MAPPING = "number_mapping"; + protected static final String NUMBER_MAPPING = "number_mapping"; protected static final String TIME_INTERVAL = "time-interval"; private List outputKeySelectors; diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java index fadc94ee5b..ea052f9cd7 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -27,9 +27,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; -import org.apache.streampipes.messaging.InternalEventProcessor; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.output.CustomOutputStrategy; @@ -39,6 +36,7 @@ import org.apache.streampipes.model.runtime.SourceInfo; import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; +import org.apache.streampipes.test.extensions.api.StoreEventCollector; import org.apache.streampipes.test.generator.EventStreamGenerator; import org.apache.streampipes.test.generator.InvocationGraphGenerator; import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; @@ -190,35 +188,5 @@ private Event makeEvent(String eventString) { new SourceInfo("test", streamId), new SchemaInfo(null, Lists.newArrayList())); } - - class StoreEventCollector implements SpOutputCollector { - - ArrayList events = new ArrayList(); - - @Override - public void registerConsumer(String routeId, InternalEventProcessor> consumer) {} - - @Override - public void unregisterConsumer(String routeId) {} - - @Override - public void connect() throws SpRuntimeException {} - - @Override - public void disconnect() throws SpRuntimeException {} - - @Override - public void collect(Event event) { - events.add(event); - } - - public List getEvents() { - return this.events; - } - public void clearEvent() { - this.events.clear(); - } - - } } From b92749046f807db154a54d9834d243791c649375 Mon Sep 17 00:00:00 2001 From: rahulbis Date: Fri, 13 Oct 2023 12:17:59 +0530 Subject: [PATCH 5/7] REFACTOR: .gitignore file --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 70ffc0e82c..5f713cb3f1 100644 --- a/.gitignore +++ b/.gitignore @@ -31,7 +31,6 @@ .idea/dynamic.xml .idea/uiDesigner.xml .idea/* -my-remarks.txt # Gradle: .idea/gradle.xml From 0a4319bcd61acc6ddf289739e6e0bcc3447e55f9 Mon Sep 17 00:00:00 2001 From: rahulbis Date: Sat, 14 Oct 2023 08:13:58 +0530 Subject: [PATCH 6/7] REFACTOR: addressed review comments --- .../merge/TestMergeByTimeProcessor.java | 192 ----------------- .../merge/TestMergeByTimeProcessor.java | 195 ++++++++++++++++++ 2 files changed, 195 insertions(+), 192 deletions(-) delete mode 100644 streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java create mode 100644 streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java deleted file mode 100644 index ea052f9cd7..0000000000 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/test/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.processors.filters.jvm.processor.merge; - -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.streampipes.model.graph.DataProcessorDescription; -import org.apache.streampipes.model.graph.DataProcessorInvocation; -import org.apache.streampipes.model.output.CustomOutputStrategy; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.model.runtime.EventFactory; -import org.apache.streampipes.model.runtime.SchemaInfo; -import org.apache.streampipes.model.runtime.SourceInfo; -import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; -import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; -import org.apache.streampipes.test.extensions.api.StoreEventCollector; -import org.apache.streampipes.test.generator.EventStreamGenerator; -import org.apache.streampipes.test.generator.InvocationGraphGenerator; -import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(Parameterized.class) -public class TestMergeByTimeProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class); - - private static final Integer timeInterval = 100; - - @org.junit.runners.Parameterized.Parameters - public static Iterable data() { - return Arrays.asList(new Object[][] { - {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), Arrays.asList("(90,0)")}, - {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), Arrays.asList()}, - {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), Arrays.asList("(80,0)")}, - {"testFigGvnInDocs", Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", "s1:510" ), - Arrays.asList("(0,10)", "(115,110)", "(510,500)")} - }); - } - - @org.junit.runners.Parameterized.Parameter - public String testName; - - @org.junit.runners.Parameterized.Parameter(1) - public List eventStrings; - - @org.junit.runners.Parameterized.Parameter(2) - public List expectedValue; - - - @Test - public void testMergeByTimeProcessor() { - MergeByTimeProcessor mergeByTimeProcessor = new MergeByTimeProcessor(); - DataProcessorDescription originalGraph = mergeByTimeProcessor.declareModel(); - originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); - - DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); - graph.setInputStreams(Arrays.asList( - EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), - EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) - )); - - graph.setOutputStream( - EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) - ); - graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() - .setActualTopicName("output-topic"); - - List outputKeySelectors = graph.getOutputStrategies() - .stream() - .filter(CustomOutputStrategy.class::isInstance) - .map(o -> (CustomOutputStrategy) o) - .findFirst() - .map(CustomOutputStrategy::getSelectedPropertyKeys) - .orElse(new ArrayList<>()); - outputKeySelectors.add("s0::timestamp_mapping_stream_1"); - outputKeySelectors.add("s1::timestamp_mapping_stream_2"); - - List mappingPropertyUnaries = graph.getStaticProperties() - .stream() - .filter(p -> p instanceof MappingPropertyUnary) - .map((p -> (MappingPropertyUnary) p)) - .filter(p -> Arrays.asList( - MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, - MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) - .contains(p.getInternalName())) - .collect(Collectors.toList()); - - - assert mappingPropertyUnaries.size() == 2; - mappingPropertyUnaries.get(0).setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); - mappingPropertyUnaries.get(1).setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); - - FreeTextStaticProperty fsp = graph.getStaticProperties().stream() - .filter(p -> p instanceof FreeTextStaticProperty) - .map((p -> (FreeTextStaticProperty) p )) - .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) - .findFirst().orElse(null); - assert fsp != null; - fsp.setValue(String.valueOf(timeInterval)); - - ProcessorParams params = new ProcessorParams(graph); - - StoreEventCollector collector = new StoreEventCollector(); - - mergeByTimeProcessor.onInvocation(params, collector, null); - sendEvents(mergeByTimeProcessor, collector); - - List actualCollectedEvents = collector.getEvents().stream() - .map(e -> formatMergedEvent(e)) - .collect(Collectors.toList()); - - LOG.info("Expected merged event is {}", expectedValue); - LOG.info("Actual merged event is {}", actualCollectedEvents); - assertTrue(eventsEquals(expectedValue, actualCollectedEvents)); - } - - private boolean eventsEquals(List expectedValue, List actualCollectedEvents) { - if (expectedValue.size() != actualCollectedEvents.size()) { - return false; - } - for (int i = 0; i < expectedValue.size(); i++) { - if (!expectedValue.get(i).equalsIgnoreCase(actualCollectedEvents.get(i))) { - return false; - } - } - return true; - } - - private String formatMergedEvent(Event mergedEvent) { - return String.format("(%s)", mergedEvent.getFields().values().stream(). - map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); - } - - private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) { - List events = makeEvents(); - for (Event event : events) { - mergeByTimeProcessor.onEvent(event, spOut); - } - - } - - private List makeEvents() { - List events = Lists.newArrayList(); - for (String eventString : eventStrings) { - events.add(makeEvent(eventString)); - } - return events; - } - - private Event makeEvent(String eventString) { - Map map = Maps.newHashMap(); - String streamId = eventString.split(":")[0]; - String timestamp = eventString.split(":")[1]; - if (streamId.equals("s0")) { - map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, timestamp); - } else { - map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp); - } - return EventFactory.fromMap(map, - new SourceInfo("test", streamId), - new SchemaInfo(null, Lists.newArrayList())); - } -} - diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java new file mode 100644 index 0000000000..b419e8e963 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.processors.filters.jvm.processor.merge; + +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.runtime.EventFactory; +import org.apache.streampipes.model.runtime.SchemaInfo; +import org.apache.streampipes.model.runtime.SourceInfo; +import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; +import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; +import org.apache.streampipes.test.extensions.api.StoreEventCollector; +import org.apache.streampipes.test.generator.EventStreamGenerator; +import org.apache.streampipes.test.generator.InvocationGraphGenerator; +import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; +import org.apache.streampipes.wrapper.params.compat.ProcessorParams; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class TestMergeByTimeProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class); + + private static final Integer timeInterval = 100; + + @org.junit.runners.Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), Arrays.asList("(90,0)")}, + {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), Arrays.asList()}, + {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), + Arrays.asList("(80,0)")}, + {"testFigGvnInDocs", + Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", + "s1:510"), + Arrays.asList("(0,10)", "(115,110)", "(510,500)")} + }); + } + + @org.junit.runners.Parameterized.Parameter + public String testName; + + @org.junit.runners.Parameterized.Parameter(1) + public List eventStrings; + + @org.junit.runners.Parameterized.Parameter(2) + public List expectedValue; + + + @Test + public void testMergeByTimeProcessor() { + MergeByTimeProcessor mergeByTimeProcessor = new MergeByTimeProcessor(); + DataProcessorDescription originalGraph = mergeByTimeProcessor.declareModel(); + originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); + + DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); + graph.setInputStreams(Arrays.asList( + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) + )); + + graph.setOutputStream( + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) + ); + graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() + .setActualTopicName("output-topic"); + + List outputKeySelectors = graph.getOutputStrategies() + .stream() + .filter(CustomOutputStrategy.class::isInstance) + .map(o -> (CustomOutputStrategy) o) + .findFirst() + .map(CustomOutputStrategy::getSelectedPropertyKeys) + .orElse(new ArrayList<>()); + outputKeySelectors.add("s0::timestamp_mapping_stream_1"); + outputKeySelectors.add("s1::timestamp_mapping_stream_2"); + + List mappingPropertyUnaries = graph.getStaticProperties() + .stream() + .filter(p -> p instanceof MappingPropertyUnary) + .map((p -> (MappingPropertyUnary) p)) + .filter(p -> Arrays.asList( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) + .contains(p.getInternalName())) + .collect(Collectors.toList()); + + assert mappingPropertyUnaries.size() == 2; + mappingPropertyUnaries.get(0) + .setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); + mappingPropertyUnaries.get(1) + .setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); + + FreeTextStaticProperty fsp = graph.getStaticProperties().stream() + .filter(p -> p instanceof FreeTextStaticProperty) + .map((p -> (FreeTextStaticProperty) p)) + .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) + .findFirst().orElse(null); + assert fsp != null; + fsp.setValue(String.valueOf(timeInterval)); + + ProcessorParams params = new ProcessorParams(graph); + + StoreEventCollector collector = new StoreEventCollector(); + + mergeByTimeProcessor.onInvocation(params, collector, null); + sendEvents(mergeByTimeProcessor, collector); + + List actualCollectedEvents = collector.getEvents().stream() + .map(e -> formatMergedEvent(e)) + .collect(Collectors.toList()); + + LOG.info("Expected merged event is {}", expectedValue); + LOG.info("Actual merged event is {}", actualCollectedEvents); + assertTrue(eventsEquals(expectedValue, actualCollectedEvents)); + } + + private boolean eventsEquals(List expectedValue, List actualCollectedEvents) { + if (expectedValue.size() != actualCollectedEvents.size()) { + return false; + } + for (int i = 0; i < expectedValue.size(); i++) { + if (!expectedValue.get(i).equalsIgnoreCase(actualCollectedEvents.get(i))) { + return false; + } + } + return true; + } + + private String formatMergedEvent(Event mergedEvent) { + return String.format("(%s)", mergedEvent.getFields().values().stream(). + map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); + } + + private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) { + List events = makeEvents(); + for (Event event : events) { + mergeByTimeProcessor.onEvent(event, spOut); + } + + } + + private List makeEvents() { + List events = Lists.newArrayList(); + for (String eventString : eventStrings) { + events.add(makeEvent(eventString)); + } + return events; + } + + private Event makeEvent(String eventString) { + Map map = Maps.newHashMap(); + String streamId = eventString.split(":")[0]; + String timestamp = eventString.split(":")[1]; + if (streamId.equals("s0")) { + map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, timestamp); + } else { + map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp); + } + return EventFactory.fromMap(map, + new SourceInfo("test", streamId), + new SchemaInfo(null, Lists.newArrayList())); + } +} \ No newline at end of file From eab6e31ab59897ee057256749ebe4fa2bbfded01 Mon Sep 17 00:00:00 2001 From: rahulbis Date: Sat, 14 Oct 2023 09:03:52 +0530 Subject: [PATCH 7/7] REFACTOR: fix checkstyle violations --- .../merge/TestMergeByTimeProcessor.java | 112 +++++++++--------- 1 file changed, 55 insertions(+), 57 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java index b419e8e963..e2efbd6957 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -17,16 +17,6 @@ */ package org.apache.streampipes.processors.filters.jvm.processor.merge; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.output.CustomOutputStrategy; @@ -41,42 +31,50 @@ import org.apache.streampipes.test.generator.InvocationGraphGenerator; import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; import org.apache.streampipes.wrapper.params.compat.ProcessorParams; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) public class TestMergeByTimeProcessor { private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class); private static final Integer timeInterval = 100; - - @org.junit.runners.Parameterized.Parameters - public static Iterable data() { - return Arrays.asList(new Object[][]{ - {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), Arrays.asList("(90,0)")}, - {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), Arrays.asList()}, - {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), - Arrays.asList("(80,0)")}, - {"testFigGvnInDocs", - Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", - "s1:510"), - Arrays.asList("(0,10)", "(115,110)", "(510,500)")} - }); - } - @org.junit.runners.Parameterized.Parameter public String testName; - @org.junit.runners.Parameterized.Parameter(1) public List eventStrings; - @org.junit.runners.Parameterized.Parameter(2) public List expectedValue; + @org.junit.runners.Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), List.of("(90,0)")}, + {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), List.of()}, + {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), + List.of("(80,0)")}, + {"testFigGvnInDocs", + Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", + "s1:510"), + Arrays.asList("(0,10)", "(115,110)", "(510,500)")} + }); + } @Test public void testMergeByTimeProcessor() { @@ -86,47 +84,47 @@ public void testMergeByTimeProcessor() { DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); graph.setInputStreams(Arrays.asList( - EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), - EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) )); graph.setOutputStream( - EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) ); graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() - .setActualTopicName("output-topic"); + .setActualTopicName("output-topic"); List outputKeySelectors = graph.getOutputStrategies() - .stream() - .filter(CustomOutputStrategy.class::isInstance) - .map(o -> (CustomOutputStrategy) o) - .findFirst() - .map(CustomOutputStrategy::getSelectedPropertyKeys) - .orElse(new ArrayList<>()); + .stream() + .filter(CustomOutputStrategy.class::isInstance) + .map(o -> (CustomOutputStrategy) o) + .findFirst() + .map(CustomOutputStrategy::getSelectedPropertyKeys) + .orElse(new ArrayList<>()); outputKeySelectors.add("s0::timestamp_mapping_stream_1"); outputKeySelectors.add("s1::timestamp_mapping_stream_2"); List mappingPropertyUnaries = graph.getStaticProperties() - .stream() - .filter(p -> p instanceof MappingPropertyUnary) - .map((p -> (MappingPropertyUnary) p)) - .filter(p -> Arrays.asList( - MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, - MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) - .contains(p.getInternalName())) - .collect(Collectors.toList()); + .stream() + .filter(p -> p instanceof MappingPropertyUnary) + .map((p -> (MappingPropertyUnary) p)) + .filter(p -> Arrays.asList( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) + .contains(p.getInternalName())) + .collect(Collectors.toList()); assert mappingPropertyUnaries.size() == 2; mappingPropertyUnaries.get(0) - .setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); + .setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); mappingPropertyUnaries.get(1) - .setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); + .setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); FreeTextStaticProperty fsp = graph.getStaticProperties().stream() - .filter(p -> p instanceof FreeTextStaticProperty) - .map((p -> (FreeTextStaticProperty) p)) - .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) - .findFirst().orElse(null); + .filter(p -> p instanceof FreeTextStaticProperty) + .map((p -> (FreeTextStaticProperty) p)) + .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) + .findFirst().orElse(null); assert fsp != null; fsp.setValue(String.valueOf(timeInterval)); @@ -138,8 +136,8 @@ public void testMergeByTimeProcessor() { sendEvents(mergeByTimeProcessor, collector); List actualCollectedEvents = collector.getEvents().stream() - .map(e -> formatMergedEvent(e)) - .collect(Collectors.toList()); + .map(e -> formatMergedEvent(e)) + .collect(Collectors.toList()); LOG.info("Expected merged event is {}", expectedValue); LOG.info("Actual merged event is {}", actualCollectedEvents); @@ -159,8 +157,8 @@ private boolean eventsEquals(List expectedValue, List actualColl } private String formatMergedEvent(Event mergedEvent) { - return String.format("(%s)", mergedEvent.getFields().values().stream(). - map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); + return String.format("(%s)", mergedEvent.getFields().values().stream() + .map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); } private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) { @@ -189,7 +187,7 @@ private Event makeEvent(String eventString) { map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp); } return EventFactory.fromMap(map, - new SourceInfo("test", streamId), - new SchemaInfo(null, Lists.newArrayList())); + new SourceInfo("test", streamId), + new SchemaInfo(null, Lists.newArrayList())); } } \ No newline at end of file