Skip to content

Commit

Permalink
refactor(#2754): Fix test for SizeMeasureProcessor (#3051)
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe authored Jul 19, 2024
1 parent ddc476b commit 3c7d0a0
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils-executors</artifactId>
<version>0.97.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,48 @@ public class SizeMeasureProcessor extends StreamPipesDataProcessor {
static final String KILOBYTE_SIZE = "KILOBYTE";
static final String MEGABYTE_SIZE = "MEGABYTE";

static final String BYTES_OPTION = "Bytes";
static final String KILO_BYTES_OPTION = "Kilobytes (1024 Bytes)";
static final String MEGA_BYTES_OPTION = "Megabytes (1024 Kilobytes)";

static final String EVENT_SIZE = "eventSize";

private String sizeUnit;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.sizemeasure")
.category(DataProcessorType.STRUCTURE_ANALYTICS)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredSingleValueSelection(Labels.withId(SIZE_UNIT),
Options.from(new Tuple2<>("Bytes", BYTE_SIZE),
new Tuple2<>("Kilobytes (1024 Bytes)", KILOBYTE_SIZE),
new Tuple2<>("Megabytes (1024 Kilobytes)", MEGABYTE_SIZE)))
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(
Labels.withId(EVENT_SIZE),
EVENT_SIZE,
"http://schema.org/contentSize")))
.build();
.category(DataProcessorType.STRUCTURE_ANALYTICS)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredSingleValueSelection(
Labels.withId(SIZE_UNIT),
Options.from(
new Tuple2<>(BYTES_OPTION, BYTE_SIZE),
new Tuple2<>(KILO_BYTES_OPTION, KILOBYTE_SIZE),
new Tuple2<>(MEGA_BYTES_OPTION, MEGABYTE_SIZE)
)
)
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(
Labels.withId(EVENT_SIZE),
EVENT_SIZE,
"http://schema.org/contentSize"
)))
.build();
}

@Override
public void onInvocation(ProcessorParams parameters,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
this.sizeUnit = parameters.extractor().selectedSingleValueInternalName(SIZE_UNIT, String.class);
public void onInvocation(
ProcessorParams parameters,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext
) throws SpRuntimeException {
this.sizeUnit = parameters.extractor()
.selectedSingleValueInternalName(SIZE_UNIT, String.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,99 +19,75 @@
package org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure;


//@RunWith(Parameterized.class)
import org.apache.streampipes.test.executors.PrefixStrategy;
import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
import org.apache.streampipes.test.executors.TestConfiguration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureProcessor.BYTES_OPTION;
import static org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureProcessor.EVENT_SIZE;
import static org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureProcessor.KILO_BYTES_OPTION;
import static org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureProcessor.SIZE_UNIT;

public class TestSizeMeasureProcessor {
//
// private static final Logger LOG = LoggerFactory.getLogger(TestSizeMeasureProcessor.class);
//
// @org.junit.runners.Parameterized.Parameters
// public static Iterable<Object[]> data() {
//
// /*
// 249 bytes is the base size of an object when first tested.
// Allowing up to 250 bytes comparison error in case of inconsistent base sizes across platforms
// */
// return Arrays.asList(new Object[][]{
// { BYTE_SIZE, 10240 - 249, 10240.0, 250.0},
// { KILOBYTE_SIZE, 10240 - 249, 10.0, 0.025},
// { MEGABYTE_SIZE, (1024 * 1024) - 249, 1.0, 0.0025}
// });
// }
//
// @Parameterized.Parameter
// public String sizeUnit;
//
// @Parameterized.Parameter(1)
// public int numOfBytes;
//
// @Parameterized.Parameter(2)
// public double expectedSize;
//
// @Parameterized.Parameter(3)
// public double allowableError;
//
// @Test
// public void testSizeMeasureProcessor() {
// SizeMeasureProcessor processor = new SizeMeasureProcessor();
// DataProcessorDescription originalGraph = processor.declareModel();
// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
//
// DataProcessorInvocation graph =
// InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
//
//
// ProcessorParams params = new ProcessorParams(graph);
// params.extractor().getStaticPropertyByName(SIZE_UNIT, OneOfStaticProperty.class).getOptions()
// .stream().filter(ot -> ot.getInternalName().equals(sizeUnit)).findFirst()
// .get().setSelected(true);
//
//
// SpOutputCollector eventCollector = new StoreEventCollector();
//
// processor.onInvocation(params, eventCollector, null);
//
// LOG.info("Sending event with numOfBytes "
// + numOfBytes + ", sizeUnit: " + sizeUnit);
// double size = sendEvents(processor, eventCollector);
//
// LOG.info("Expected size is {}", expectedSize);
// LOG.info("Actual size is {}", size);
// LOG.info("Allowable error is {}", allowableError);
// assertEquals(expectedSize, size, allowableError);
// }
//
// private Double sendEvents(SizeMeasureProcessor processor, SpOutputCollector spOut) {
// double size = 0.0;
// Event event = makeEvent(numOfBytes);
//
// processor.onEvent(event, spOut);
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// try {
// size = event.getFieldBySelector(SizeMeasureProcessor.EVENT_SIZE)
// .getAsPrimitive()
// .getAsDouble();
// } catch (IllegalArgumentException e) {
// e.printStackTrace();
// }
// return size;
// }
// private Event makeEvent(int numOfBytes) {
// Map<String, Object> map = new HashMap<>();
// map.put("Test", new ObjectWithSize(numOfBytes));
// return EventFactory.fromMap(map,
// new SourceInfo("test" + "-topic", "s0"),
// new SchemaInfo(null, new ArrayList<>()));
// }
//
// public static class ObjectWithSize implements Serializable {
// private final byte[] data;
//
// public ObjectWithSize(int numOfBytes) {
// this.data = new byte[numOfBytes];
// }
// }
private static final String KEY = "key";
private static final String VALUE = "value";

private static Map<String, Object> inputEvent = Map.of(KEY, VALUE);

private SizeMeasureProcessor processor;

@BeforeEach
public void setup() {
processor = new SizeMeasureProcessor();
}


static Stream<Arguments> arguments() {
return Stream.of(
Arguments.of(
BYTES_OPTION,
inputEvent,
96.0
),
Arguments.of(
KILO_BYTES_OPTION,
inputEvent,
0.09375
)
);
}

@ParameterizedTest
@MethodSource("arguments")
public void testStringToState(
String sizeUnit,
Map<String, Object> intpuEvent,
double expectedventSize
) {

var inputEvents = List.of(intpuEvent);
List<Map<String, Object>> outputEvents = List.of(Map.of(
KEY, VALUE,
EVENT_SIZE, expectedventSize
));

var configuration = TestConfiguration
.builder()
.config(SIZE_UNIT, sizeUnit)
.prefixStrategy(PrefixStrategy.SAME_PREFIX)
.build();

var testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(inputEvents, outputEvents);
}
}

0 comments on commit 3c7d0a0

Please sign in to comment.