Skip to content

Commit

Permalink
Add ObjectProcessor noop impl and ObjectProcessor tests (#4685)
Browse files Browse the repository at this point in the history
Unit test follow-up to #4648
  • Loading branch information
devinrsmith authored Oct 24, 2023
1 parent d90b968 commit d0ca524
Show file tree
Hide file tree
Showing 9 changed files with 692 additions and 3 deletions.
2 changes: 2 additions & 0 deletions extensions/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ dependencies {
project(path: ':configs'),
project(path: ':test-configs')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')

Classpaths.inheritAssertJ(project)
}

spotless {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ static <T> ObjectProcessor<T> rowLimited(ObjectProcessor<T> delegate, int rowLim
return ObjectProcessorRowLimited.of(delegate, rowLimit);
}

/**
* Creates a "no-operation" object processor that ignores the input object chunk. If {@code fillWithNullValue} is
* {@code true}, during {@link ObjectProcessor#processAll(ObjectChunk, List) processAll}
* {@link WritableChunk#fillWithNullValue(int, int) fillWithNullValue} will be invoked on each output chunk;
* otherwise, the output chunk contents will not be modified. In either case, the processing will increment the
* output chunks sizes.
*
* @param outputTypes the output types
* @param fillWithNullValue if the output chunks should be filled with the appropriate null value
* @return the no-op object processor
* @param <T> the object type
*/
static <T> ObjectProcessor<T> noop(List<Type<?>> outputTypes, boolean fillWithNullValue) {
return new ObjectProcessorNoop<>(outputTypes, fillWithNullValue);
}

/**
* The relationship between {@link #outputTypes() output types} and the {@link #processAll(ObjectChunk, List)
* processAll out param} {@link WritableChunk#getChunkType()}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.processor;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.qst.type.Type;

import java.util.List;

final class ObjectProcessorNoop<T> implements ObjectProcessor<T> {
private final List<Type<?>> outputTypes;
private final boolean fillWithNullValue;

ObjectProcessorNoop(List<Type<?>> outputTypes, boolean fillWithNullValue) {
this.outputTypes = List.copyOf(outputTypes);
this.fillWithNullValue = fillWithNullValue;
}

@Override
public List<Type<?>> outputTypes() {
return outputTypes;
}

@Override
public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out) {
if (fillWithNullValue) {
for (WritableChunk<?> chunk : out) {
chunk.fillWithNullValue(chunk.size(), in.size());
}
}
for (WritableChunk<?> chunk : out) {
chunk.setSize(chunk.size() + in.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.List;
import java.util.Objects;

class ObjectProcessorStrict<T> implements ObjectProcessor<T> {
final class ObjectProcessorStrict<T> implements ObjectProcessor<T> {

static <T> ObjectProcessor<T> create(ObjectProcessor<T> delegate) {
if (delegate instanceof ObjectProcessorStrict) {
Expand All @@ -22,22 +22,28 @@ static <T> ObjectProcessor<T> create(ObjectProcessor<T> delegate) {
}

private final ObjectProcessor<T> delegate;
private final List<Type<?>> outputTypes;

ObjectProcessorStrict(ObjectProcessor<T> delegate) {
this.delegate = Objects.requireNonNull(delegate);
this.outputTypes = List.copyOf(delegate.outputTypes());
}

@Override
public List<Type<?>> outputTypes() {
return delegate.outputTypes();
final List<Type<?>> outputTypes = delegate.outputTypes();
if (!this.outputTypes.equals(outputTypes)) {
throw new UncheckedDeephavenException("Implementation is returning a different list of outputTypes");
}
return outputTypes;
}

@Override
public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out) {
final int numColumns = delegate.outputTypes().size();
if (numColumns != out.size()) {
throw new IllegalArgumentException(String.format(
"Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d",
"Improper number of out chunks. Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d",
numColumns, out.size()));
}
final int[] originalSizes = new int[numColumns];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.processor;

import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableCharChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableDoubleChunk;
import io.deephaven.chunk.WritableFloatChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.WritableShortChunk;
import io.deephaven.qst.type.Type;
import io.deephaven.util.QueryConstants;
import org.junit.Test;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class ObjectProcessorNoopTest {

@Test
public void testNoopNoFill() {
ObjectProcessor<Object> noop = ObjectProcessor.strict(ObjectProcessor.noop(List.of(
Type.booleanType(),
Type.charType(),
Type.byteType(),
Type.shortType(),
Type.intType(),
Type.longType(),
Type.floatType(),
Type.doubleType(),
Type.stringType()), false));
try (
WritableObjectChunk<Object, ?> in = WritableObjectChunk.makeWritableChunk(1);
WritableByteChunk<?> c1 = WritableByteChunk.makeWritableChunk(1);
WritableCharChunk<?> c2 = WritableCharChunk.makeWritableChunk(1);
WritableByteChunk<?> c3 = WritableByteChunk.makeWritableChunk(1);
WritableShortChunk<?> c4 = WritableShortChunk.makeWritableChunk(1);
WritableIntChunk<?> c5 = WritableIntChunk.makeWritableChunk(1);
WritableLongChunk<?> c6 = WritableLongChunk.makeWritableChunk(1);
WritableFloatChunk<?> c7 = WritableFloatChunk.makeWritableChunk(1);
WritableDoubleChunk<?> c8 = WritableDoubleChunk.makeWritableChunk(1);
WritableObjectChunk<String, ?> c9 = WritableObjectChunk.makeWritableChunk(1)) {
List<WritableChunk<?>> out = List.of(c1, c2, c3, c4, c5, c6, c7, c8, c9);
for (WritableChunk<?> c : out) {
c.setSize(0);
}
in.set(0, new Object());
c1.set(0, (byte) 42);
c2.set(0, (char) 42);
c3.set(0, (byte) 42);
c4.set(0, (short) 42);
c5.set(0, 42);
c6.set(0, 42L);
c7.set(0, 42.0f);
c8.set(0, 42.0);
c9.set(0, "42");
noop.processAll(in, out);
for (WritableChunk<?> c : out) {
assertThat(c.size()).isEqualTo(1);
}
assertThat(c1.get(0)).isEqualTo((byte) 42);
assertThat(c2.get(0)).isEqualTo((char) 42);
assertThat(c3.get(0)).isEqualTo((byte) 42);
assertThat(c4.get(0)).isEqualTo((short) 42);
assertThat(c5.get(0)).isEqualTo(42);
assertThat(c6.get(0)).isEqualTo(42L);
assertThat(c7.get(0)).isEqualTo(42.0f);
assertThat(c8.get(0)).isEqualTo(42.0);
assertThat(c9.get(0)).isEqualTo("42");
}
}

@Test
public void testNoopNullFill() {
ObjectProcessor<Object> noop = ObjectProcessor.strict(ObjectProcessor.noop(List.of(
Type.booleanType(),
Type.charType(),
Type.byteType(),
Type.shortType(),
Type.intType(),
Type.longType(),
Type.floatType(),
Type.doubleType(),
Type.stringType()), true));
try (
WritableObjectChunk<Object, ?> in = WritableObjectChunk.makeWritableChunk(1);
WritableByteChunk<?> c1 = WritableByteChunk.makeWritableChunk(1);
WritableCharChunk<?> c2 = WritableCharChunk.makeWritableChunk(1);
WritableByteChunk<?> c3 = WritableByteChunk.makeWritableChunk(1);
WritableShortChunk<?> c4 = WritableShortChunk.makeWritableChunk(1);
WritableIntChunk<?> c5 = WritableIntChunk.makeWritableChunk(1);
WritableLongChunk<?> c6 = WritableLongChunk.makeWritableChunk(1);
WritableFloatChunk<?> c7 = WritableFloatChunk.makeWritableChunk(1);
WritableDoubleChunk<?> c8 = WritableDoubleChunk.makeWritableChunk(1);
WritableObjectChunk<String, ?> c9 = WritableObjectChunk.makeWritableChunk(1)) {
List<WritableChunk<?>> out = List.of(c1, c2, c3, c4, c5, c6, c7, c8, c9);
for (WritableChunk<?> c : out) {
c.setSize(0);
}
in.set(0, new Object());
c1.set(0, (byte) 42);
c2.set(0, (char) 42);
c3.set(0, (byte) 42);
c4.set(0, (short) 42);
c5.set(0, 42);
c6.set(0, 42L);
c7.set(0, 42.0f);
c8.set(0, 42.0);
c9.set(0, "42");
noop.processAll(in, out);
for (WritableChunk<?> c : out) {
assertThat(c.size()).isEqualTo(1);
}
assertThat(c1.get(0)).isEqualTo(QueryConstants.NULL_BYTE);
assertThat(c2.get(0)).isEqualTo(QueryConstants.NULL_CHAR);
assertThat(c3.get(0)).isEqualTo(QueryConstants.NULL_BYTE);
assertThat(c4.get(0)).isEqualTo(QueryConstants.NULL_SHORT);
assertThat(c5.get(0)).isEqualTo(QueryConstants.NULL_INT);
assertThat(c6.get(0)).isEqualTo(QueryConstants.NULL_LONG);
assertThat(c7.get(0)).isEqualTo(QueryConstants.NULL_FLOAT);
assertThat(c8.get(0)).isEqualTo(QueryConstants.NULL_DOUBLE);
assertThat(c9.get(0)).isNull();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.processor;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.qst.type.Type;
import org.junit.Test;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class ObjectProcessorRowLimitedTest {

@Test
public void testRowLimit() {
for (int rowLimit : new int[] {1, 2, 3, 4, 7, 8, 9, 16, 32, 64, 127, 128, 129, 256}) {
for (int totalRows : new int[] {0, 1, 2, 3, 4, 7, 8, 9, 16, 32, 64, 127, 128, 129, 256}) {
doRowLimitCheck(rowLimit, totalRows);
}
}
}

private static void doRowLimitCheck(int rowLimit, int totalRows) {
RowLimitedCheckerImpl<Object> checker = new RowLimitedCheckerImpl<>(rowLimit, totalRows);
ObjectProcessor<Object> rowLimited = ObjectProcessor.strict(ObjectProcessor.rowLimited(checker, rowLimit));
try (
WritableObjectChunk<Object, ?> in = WritableObjectChunk.makeWritableChunk(totalRows);
WritableIntChunk<?> c1 = WritableIntChunk.makeWritableChunk(totalRows)) {
c1.setSize(0);
rowLimited.processAll(in, List.of(c1));
}
checker.assertDone();
}

private static class RowLimitedCheckerImpl<T> implements ObjectProcessor<T> {
private static final ObjectProcessor<Object> NOOP = ObjectProcessor.noop(List.of(Type.intType()), false);
private final int rowLimit;
private final int totalRows;
private int cumulativeInSize;

public RowLimitedCheckerImpl(int rowLimit, int totalRows) {
this.rowLimit = rowLimit;
this.totalRows = totalRows;
}

@Override
public List<Type<?>> outputTypes() {
return NOOP.outputTypes();
}

@Override
public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out) {
final int expectedInSize = Math.min(totalRows - cumulativeInSize, rowLimit);
assertThat(in.size()).isEqualTo(expectedInSize);
cumulativeInSize += expectedInSize;
NOOP.processAll(in, out);
}

public void assertDone() {
assertThat(cumulativeInSize).isEqualTo(totalRows);
}
}
}
Loading

0 comments on commit d0ca524

Please sign in to comment.