Skip to content

Commit

Permalink
FEAT: unit test for MergeByTimeProcessor class
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulbis committed Oct 13, 2023
1 parent b9133a2 commit f4c714f
Showing 1 changed file with 207 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package org.apache.streampipes.processors.filters.jvm.processor.merge;

import static org.junit.Assert.assertTrue;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.output.CustomOutputStrategy;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
import org.apache.streampipes.test.generator.EventStreamGenerator;
import org.apache.streampipes.test.generator.InvocationGraphGenerator;
import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class TestMergeByTimeProcessor {

private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class);

private static final Integer timeInterval = 100;

@org.junit.runners.Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{"testWithInInterval", Arrays.asList("s0:0", "s1:90"), Arrays.asList("(90,0)")},
{"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), Arrays.asList()},
{"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), Arrays.asList("(80,0)")},
{"testFigGvnInDocs", Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", "s1:510" ),
Arrays.asList("(0,10)", "(115,110)", "(510,500)")}
});
}

@org.junit.runners.Parameterized.Parameter
public String testName;

@org.junit.runners.Parameterized.Parameter(1)
public List<String> eventStrings;

@org.junit.runners.Parameterized.Parameter(2)
public List<String> expectedValue;


@Test
public void testMergeByTimeProcessor() {
MergeByTimeProcessor mergeByTimeProcessor = new MergeByTimeProcessor();
DataProcessorDescription originalGraph = mergeByTimeProcessor.declareModel();
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());

DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
graph.setInputStreams(Arrays.asList(
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")),
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1"))
));

graph.setOutputStream(
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream"))
);
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
.setActualTopicName("output-topic");

List<String> outputKeySelectors = graph.getOutputStrategies()
.stream()
.filter(CustomOutputStrategy.class::isInstance)
.map(o -> (CustomOutputStrategy) o)
.findFirst()
.map(CustomOutputStrategy::getSelectedPropertyKeys)
.orElse(new ArrayList<>());
outputKeySelectors.add("s0::timestamp_mapping_stream_1");
outputKeySelectors.add("s1::timestamp_mapping_stream_2");

List<MappingPropertyUnary> mappingPropertyUnaries = graph.getStaticProperties()
.stream()
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.filter(p -> Arrays.asList(
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY,
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY)
.contains(p.getInternalName()))
.collect(Collectors.toList());


assert mappingPropertyUnaries.size() == 2;
mappingPropertyUnaries.get(0).setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY);
mappingPropertyUnaries.get(1).setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY);

FreeTextStaticProperty fsp = graph.getStaticProperties().stream()
.filter(p -> p instanceof FreeTextStaticProperty)
.map((p -> (FreeTextStaticProperty) p ))
.filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL))
.findFirst().orElse(null);
assert fsp != null;
fsp.setValue(String.valueOf(timeInterval));

ProcessorParams params = new ProcessorParams(graph);

StoreEventCollector collector = new StoreEventCollector();

mergeByTimeProcessor.onInvocation(params, collector, null);
sendEvents(mergeByTimeProcessor, collector);

List<String> actualCollectedEvents = collector.getEvents().stream()
.map(e -> formatMergedEvent(e))
.collect(Collectors.toList());

LOG.info("Expected merged event is {}", expectedValue);
LOG.info("Actual merged event is {}", actualCollectedEvents);
assertTrue(eventsEquals(expectedValue, actualCollectedEvents));
}

private boolean eventsEquals(List<String> expectedValue, List<String> actualCollectedEvents) {
if (expectedValue.size() != actualCollectedEvents.size()) {
return false;
}
for (int i = 0; i < expectedValue.size(); i++) {
if (!expectedValue.get(i).equalsIgnoreCase(actualCollectedEvents.get(i))) {
return false;
}
}
return true;
}

private String formatMergedEvent(Event mergedEvent) {
return String.format("(%s)", mergedEvent.getFields().values().stream().
map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(",")));
}

private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) {
List<Event> events = makeEvents();
for (Event event : events) {
mergeByTimeProcessor.onEvent(event, spOut);
}

}

private List<Event> makeEvents() {
List<Event> events = Lists.newArrayList();
for (String eventString : eventStrings) {
events.add(makeEvent(eventString));
}
return events;
}

private Event makeEvent(String eventString) {
Map<String, Object> map = Maps.newHashMap();
String streamId = eventString.split(":")[0];
String timestamp = eventString.split(":")[1];
if (streamId.equals("s0")) {
map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, timestamp);
} else {
map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp);
}
return EventFactory.fromMap(map,
new SourceInfo("test", streamId),
new SchemaInfo(null, Lists.newArrayList()));
}

class StoreEventCollector implements SpOutputCollector {

ArrayList<Event> events = new ArrayList<Event>();

@Override
public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {}

@Override
public void unregisterConsumer(String routeId) {}

@Override
public void connect() throws SpRuntimeException {}

@Override
public void disconnect() throws SpRuntimeException {}

@Override
public void collect(Event event) {
events.add(event);
}

public List<Event> getEvents() {
return this.events;
}
public void clearEvent() {
this.events.clear();
}

}
}

0 comments on commit f4c714f

Please sign in to comment.