From d0ca52483729cf64896defdb5af7d7fc2756527f Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Tue, 24 Oct 2023 08:12:31 -0700 Subject: [PATCH] Add ObjectProcessor noop impl and ObjectProcessor tests (#4685) Unit test follow-up to #4648 --- extensions/kafka/build.gradle | 2 + .../deephaven/processor/ObjectProcessor.java | 16 ++ .../processor/ObjectProcessorNoop.java | 37 +++ .../processor/ObjectProcessorStrict.java | 12 +- .../processor/ObjectProcessorNoopTest.java | 130 ++++++++++ .../ObjectProcessorRowLimitedTest.java | 68 ++++++ .../processor/ObjectProcessorStrictTest.java | 223 ++++++++++++++++++ .../processor/ObjectProcessorTest.java | 50 ++++ .../ObjectProcessorFunctionsTest.java | 157 ++++++++++++ 9 files changed, 692 insertions(+), 3 deletions(-) create mode 100644 extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorNoop.java create mode 100644 extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorNoopTest.java create mode 100644 extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorRowLimitedTest.java create mode 100644 extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorStrictTest.java create mode 100644 extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorTest.java create mode 100644 extensions/kafka/src/test/java/io/deephaven/processor/functions/ObjectProcessorFunctionsTest.java diff --git a/extensions/kafka/build.gradle b/extensions/kafka/build.gradle index 55b44db5e97..0543ed01a0b 100644 --- a/extensions/kafka/build.gradle +++ b/extensions/kafka/build.gradle @@ -40,6 +40,8 @@ dependencies { project(path: ':configs'), project(path: ':test-configs') Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly') + + Classpaths.inheritAssertJ(project) } spotless { diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java index 7cd68fe343e..3a5b46c6253 100644 --- a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java @@ -66,6 +66,22 @@ static ObjectProcessor rowLimited(ObjectProcessor delegate, int rowLim return ObjectProcessorRowLimited.of(delegate, rowLimit); } + /** + * Creates a "no-operation" object processor that ignores the input object chunk. If {@code fillWithNullValue} is + * {@code true}, during {@link ObjectProcessor#processAll(ObjectChunk, List) processAll} + * {@link WritableChunk#fillWithNullValue(int, int) fillWithNullValue} will be invoked on each output chunk; + * otherwise, the output chunk contents will not be modified. In either case, the processing will increment the + * output chunks sizes. + * + * @param outputTypes the output types + * @param fillWithNullValue if the output chunks should be filled with the appropriate null value + * @return the no-op object processor + * @param the object type + */ + static ObjectProcessor noop(List> outputTypes, boolean fillWithNullValue) { + return new ObjectProcessorNoop<>(outputTypes, fillWithNullValue); + } + /** * The relationship between {@link #outputTypes() output types} and the {@link #processAll(ObjectChunk, List) * processAll out param} {@link WritableChunk#getChunkType()}. diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorNoop.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorNoop.java new file mode 100644 index 00000000000..0bea17f1e9b --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorNoop.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.qst.type.Type; + +import java.util.List; + +final class ObjectProcessorNoop implements ObjectProcessor { + private final List> outputTypes; + private final boolean fillWithNullValue; + + ObjectProcessorNoop(List> outputTypes, boolean fillWithNullValue) { + this.outputTypes = List.copyOf(outputTypes); + this.fillWithNullValue = fillWithNullValue; + } + + @Override + public List> outputTypes() { + return outputTypes; + } + + @Override + public void processAll(ObjectChunk in, List> out) { + if (fillWithNullValue) { + for (WritableChunk chunk : out) { + chunk.fillWithNullValue(chunk.size(), in.size()); + } + } + for (WritableChunk chunk : out) { + chunk.setSize(chunk.size() + in.size()); + } + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java index a1a7b89282c..b6432931338 100644 --- a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java @@ -12,7 +12,7 @@ import java.util.List; import java.util.Objects; -class ObjectProcessorStrict implements ObjectProcessor { +final class ObjectProcessorStrict implements ObjectProcessor { static ObjectProcessor create(ObjectProcessor delegate) { if (delegate instanceof ObjectProcessorStrict) { @@ -22,14 +22,20 @@ static ObjectProcessor create(ObjectProcessor delegate) { } private final ObjectProcessor delegate; + private final List> outputTypes; ObjectProcessorStrict(ObjectProcessor delegate) { this.delegate = Objects.requireNonNull(delegate); + this.outputTypes = List.copyOf(delegate.outputTypes()); } @Override public List> outputTypes() { - return delegate.outputTypes(); + final List> outputTypes = delegate.outputTypes(); + if (!this.outputTypes.equals(outputTypes)) { + throw new UncheckedDeephavenException("Implementation is returning a different list of outputTypes"); + } + return outputTypes; } @Override @@ -37,7 +43,7 @@ public void processAll(ObjectChunk in, List> ou final int numColumns = delegate.outputTypes().size(); if (numColumns != out.size()) { throw new IllegalArgumentException(String.format( - "Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d", + "Improper number of out chunks. Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d", numColumns, out.size())); } final int[] originalSizes = new int[numColumns]; diff --git a/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorNoopTest.java b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorNoopTest.java new file mode 100644 index 00000000000..aa9ce50b2a3 --- /dev/null +++ b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorNoopTest.java @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.WritableByteChunk; +import io.deephaven.chunk.WritableCharChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableDoubleChunk; +import io.deephaven.chunk.WritableFloatChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.WritableShortChunk; +import io.deephaven.qst.type.Type; +import io.deephaven.util.QueryConstants; +import org.junit.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ObjectProcessorNoopTest { + + @Test + public void testNoopNoFill() { + ObjectProcessor noop = ObjectProcessor.strict(ObjectProcessor.noop(List.of( + Type.booleanType(), + Type.charType(), + Type.byteType(), + Type.shortType(), + Type.intType(), + Type.longType(), + Type.floatType(), + Type.doubleType(), + Type.stringType()), false)); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableByteChunk c1 = WritableByteChunk.makeWritableChunk(1); + WritableCharChunk c2 = WritableCharChunk.makeWritableChunk(1); + WritableByteChunk c3 = WritableByteChunk.makeWritableChunk(1); + WritableShortChunk c4 = WritableShortChunk.makeWritableChunk(1); + WritableIntChunk c5 = WritableIntChunk.makeWritableChunk(1); + WritableLongChunk c6 = WritableLongChunk.makeWritableChunk(1); + WritableFloatChunk c7 = WritableFloatChunk.makeWritableChunk(1); + WritableDoubleChunk c8 = WritableDoubleChunk.makeWritableChunk(1); + WritableObjectChunk c9 = WritableObjectChunk.makeWritableChunk(1)) { + List> out = List.of(c1, c2, c3, c4, c5, c6, c7, c8, c9); + for (WritableChunk c : out) { + c.setSize(0); + } + in.set(0, new Object()); + c1.set(0, (byte) 42); + c2.set(0, (char) 42); + c3.set(0, (byte) 42); + c4.set(0, (short) 42); + c5.set(0, 42); + c6.set(0, 42L); + c7.set(0, 42.0f); + c8.set(0, 42.0); + c9.set(0, "42"); + noop.processAll(in, out); + for (WritableChunk c : out) { + assertThat(c.size()).isEqualTo(1); + } + assertThat(c1.get(0)).isEqualTo((byte) 42); + assertThat(c2.get(0)).isEqualTo((char) 42); + assertThat(c3.get(0)).isEqualTo((byte) 42); + assertThat(c4.get(0)).isEqualTo((short) 42); + assertThat(c5.get(0)).isEqualTo(42); + assertThat(c6.get(0)).isEqualTo(42L); + assertThat(c7.get(0)).isEqualTo(42.0f); + assertThat(c8.get(0)).isEqualTo(42.0); + assertThat(c9.get(0)).isEqualTo("42"); + } + } + + @Test + public void testNoopNullFill() { + ObjectProcessor noop = ObjectProcessor.strict(ObjectProcessor.noop(List.of( + Type.booleanType(), + Type.charType(), + Type.byteType(), + Type.shortType(), + Type.intType(), + Type.longType(), + Type.floatType(), + Type.doubleType(), + Type.stringType()), true)); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableByteChunk c1 = WritableByteChunk.makeWritableChunk(1); + WritableCharChunk c2 = WritableCharChunk.makeWritableChunk(1); + WritableByteChunk c3 = WritableByteChunk.makeWritableChunk(1); + WritableShortChunk c4 = WritableShortChunk.makeWritableChunk(1); + WritableIntChunk c5 = WritableIntChunk.makeWritableChunk(1); + WritableLongChunk c6 = WritableLongChunk.makeWritableChunk(1); + WritableFloatChunk c7 = WritableFloatChunk.makeWritableChunk(1); + WritableDoubleChunk c8 = WritableDoubleChunk.makeWritableChunk(1); + WritableObjectChunk c9 = WritableObjectChunk.makeWritableChunk(1)) { + List> out = List.of(c1, c2, c3, c4, c5, c6, c7, c8, c9); + for (WritableChunk c : out) { + c.setSize(0); + } + in.set(0, new Object()); + c1.set(0, (byte) 42); + c2.set(0, (char) 42); + c3.set(0, (byte) 42); + c4.set(0, (short) 42); + c5.set(0, 42); + c6.set(0, 42L); + c7.set(0, 42.0f); + c8.set(0, 42.0); + c9.set(0, "42"); + noop.processAll(in, out); + for (WritableChunk c : out) { + assertThat(c.size()).isEqualTo(1); + } + assertThat(c1.get(0)).isEqualTo(QueryConstants.NULL_BYTE); + assertThat(c2.get(0)).isEqualTo(QueryConstants.NULL_CHAR); + assertThat(c3.get(0)).isEqualTo(QueryConstants.NULL_BYTE); + assertThat(c4.get(0)).isEqualTo(QueryConstants.NULL_SHORT); + assertThat(c5.get(0)).isEqualTo(QueryConstants.NULL_INT); + assertThat(c6.get(0)).isEqualTo(QueryConstants.NULL_LONG); + assertThat(c7.get(0)).isEqualTo(QueryConstants.NULL_FLOAT); + assertThat(c8.get(0)).isEqualTo(QueryConstants.NULL_DOUBLE); + assertThat(c9.get(0)).isNull(); + } + } +} diff --git a/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorRowLimitedTest.java b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorRowLimitedTest.java new file mode 100644 index 00000000000..89696c65bf2 --- /dev/null +++ b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorRowLimitedTest.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.qst.type.Type; +import org.junit.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ObjectProcessorRowLimitedTest { + + @Test + public void testRowLimit() { + for (int rowLimit : new int[] {1, 2, 3, 4, 7, 8, 9, 16, 32, 64, 127, 128, 129, 256}) { + for (int totalRows : new int[] {0, 1, 2, 3, 4, 7, 8, 9, 16, 32, 64, 127, 128, 129, 256}) { + doRowLimitCheck(rowLimit, totalRows); + } + } + } + + private static void doRowLimitCheck(int rowLimit, int totalRows) { + RowLimitedCheckerImpl checker = new RowLimitedCheckerImpl<>(rowLimit, totalRows); + ObjectProcessor rowLimited = ObjectProcessor.strict(ObjectProcessor.rowLimited(checker, rowLimit)); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(totalRows); + WritableIntChunk c1 = WritableIntChunk.makeWritableChunk(totalRows)) { + c1.setSize(0); + rowLimited.processAll(in, List.of(c1)); + } + checker.assertDone(); + } + + private static class RowLimitedCheckerImpl implements ObjectProcessor { + private static final ObjectProcessor NOOP = ObjectProcessor.noop(List.of(Type.intType()), false); + private final int rowLimit; + private final int totalRows; + private int cumulativeInSize; + + public RowLimitedCheckerImpl(int rowLimit, int totalRows) { + this.rowLimit = rowLimit; + this.totalRows = totalRows; + } + + @Override + public List> outputTypes() { + return NOOP.outputTypes(); + } + + @Override + public void processAll(ObjectChunk in, List> out) { + final int expectedInSize = Math.min(totalRows - cumulativeInSize, rowLimit); + assertThat(in.size()).isEqualTo(expectedInSize); + cumulativeInSize += expectedInSize; + NOOP.processAll(in, out); + } + + public void assertDone() { + assertThat(cumulativeInSize).isEqualTo(totalRows); + } + } +} diff --git a/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorStrictTest.java b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorStrictTest.java new file mode 100644 index 00000000000..674ba8241ce --- /dev/null +++ b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorStrictTest.java @@ -0,0 +1,223 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableDoubleChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Any; +import io.deephaven.qst.type.Type; +import io.deephaven.util.QueryConstants; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class ObjectProcessorStrictTest { + + @Test + public void testNoDoubleWrapping() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + ObjectProcessor strict2 = ObjectProcessor.strict(strict); + assertThat(strict2).isSameAs(strict); + } + + @Test + public void testCorrectOutputTypes() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + assertThat(strict.outputTypes()).containsExactly(Type.intType()); + } + + @Test + public void testNpeOnNullIn() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + try (WritableIntChunk c1 = WritableIntChunk.makeWritableChunk(1)) { + c1.setSize(0); + try { + strict.processAll(null, List.of(c1)); + failBecauseExceptionWasNotThrown(NullPointerException.class); + } catch (NullPointerException e) { + // expected + } + } + } + + @Test + public void testNpeOnNullOut() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + try (WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1)) { + try { + strict.processAll(in, null); + failBecauseExceptionWasNotThrown(NullPointerException.class); + } catch (NullPointerException e) { + // expected + } + } + } + + @Test + public void testIncorrectNumOutChunks() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableIntChunk c1 = WritableIntChunk.makeWritableChunk(1); + WritableDoubleChunk c2 = WritableDoubleChunk.makeWritableChunk(1)) { + c1.setSize(0); + c2.setSize(0); + try { + strict.processAll(in, List.of(c1, c2)); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Improper number of out chunks"); + } + } + } + + @Test + public void testIncorrectChunkType() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableDoubleChunk c1 = WritableDoubleChunk.makeWritableChunk(1)) { + c1.setSize(0); + try { + strict.processAll(in, List.of(c1)); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Improper ChunkType"); + } + } + } + + @Test + public void testNotEnoughOutputSize() { + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType()), false); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableIntChunk c1 = WritableIntChunk.makeWritableChunk(1)) { + try { + c1.setSize(c1.capacity()); + strict.processAll(in, List.of(c1)); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("out chunk does not have enough remaining capacity"); + } + } + } + + @Test + public void testDifferentOutChunkSizes() { + // Note: the ObjectProcesser does *not* guarantee that the output chunk sizes will all be the same. + // In the future if we add additional guarantees about the ObjectProcessor in these regards, we'll update this + // test and the strict impl as necessary. + ObjectProcessor delegate = ObjectProcessor.noop(List.of(Type.intType(), Type.intType()), true); + ObjectProcessor strict = ObjectProcessor.strict(delegate); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableIntChunk c1 = WritableIntChunk.makeWritableChunk(2); + WritableIntChunk c2 = WritableIntChunk.makeWritableChunk(2)) { + + c1.setSize(0); + c1.set(0, 0); + + c2.setSize(1); + c2.set(1, 0); + + strict.processAll(in, List.of(c1, c2)); + + assertThat(c1.size()).isEqualTo(1); + assertThat(c1.get(0)).isEqualTo(QueryConstants.NULL_INT); + + assertThat(c2.size()).isEqualTo(2); + assertThat(c2.get(1)).isEqualTo(QueryConstants.NULL_INT); + } + } + + @Test + public void testNPEOnNullDelegateOutputTypes() { + try { + ObjectProcessor.strict(new ObjectProcessor<>() { + @Override + public List> outputTypes() { + return null; + } + + @Override + public void processAll(ObjectChunk in, List> out) { + + } + }); + failBecauseExceptionWasNotThrown(NullPointerException.class); + } catch (NullPointerException e) { + // expected + } + } + + @Test + public void testBadDelegateOutputTypes() { + ObjectProcessor strict = ObjectProcessor.strict(new ObjectProcessor<>() { + private final List> outputTypes = new ArrayList<>(List.of(Type.intType())); + + @Override + public List> outputTypes() { + try { + return List.copyOf(outputTypes); + } finally { + outputTypes.clear(); + } + } + + @Override + public void processAll(ObjectChunk in, List> out) { + // don't care about impl. + } + }); + try { + strict.outputTypes(); + failBecauseExceptionWasNotThrown(UncheckedDeephavenException.class); + } catch (UncheckedDeephavenException e) { + assertThat(e).hasMessageContaining("Implementation is returning a different list of outputTypes"); + } + } + + @Test + public void testBadDelegateProcessAll() { + ObjectProcessor strict = ObjectProcessor.strict(new ObjectProcessor<>() { + @Override + public List> outputTypes() { + return List.of(Type.intType()); + } + + @Override + public void processAll(ObjectChunk in, List> out) { + // Bad impl + // don't increment out sizes as appropriate + } + }); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(1); + WritableIntChunk c1 = WritableIntChunk.makeWritableChunk(1)) { + try { + c1.setSize(0); + strict.processAll(in, List.of(c1)); + failBecauseExceptionWasNotThrown(UncheckedDeephavenException.class); + } catch (UncheckedDeephavenException e) { + assertThat(e).hasMessageContaining("Implementation did not increment chunk size correctly"); + } + } + } +} diff --git a/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorTest.java b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorTest.java new file mode 100644 index 00000000000..f87d9b10a68 --- /dev/null +++ b/extensions/kafka/src/test/java/io/deephaven/processor/ObjectProcessorTest.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.qst.type.PrimitiveType; +import io.deephaven.qst.type.Type; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ObjectProcessorTest { + + interface Foo { + } + + @Test + public void testChunkTypes() { + chunkType(Type.charType(), ChunkType.Char); + chunkType(Type.booleanType(), ChunkType.Byte); + chunkType(Type.byteType(), ChunkType.Byte); + chunkType(Type.shortType(), ChunkType.Short); + chunkType(Type.intType(), ChunkType.Int); + chunkType(Type.longType(), ChunkType.Long); + chunkType(Type.floatType(), ChunkType.Float); + chunkType(Type.doubleType(), ChunkType.Double); + + chunkType(Type.charType().boxedType(), ChunkType.Char); + chunkType(Type.booleanType().boxedType(), ChunkType.Byte); + chunkType(Type.byteType().boxedType(), ChunkType.Byte); + chunkType(Type.shortType().boxedType(), ChunkType.Short); + chunkType(Type.intType().boxedType(), ChunkType.Int); + chunkType(Type.longType().boxedType(), ChunkType.Long); + chunkType(Type.floatType().boxedType(), ChunkType.Float); + chunkType(Type.doubleType().boxedType(), ChunkType.Double); + + chunkType(Type.stringType(), ChunkType.Object); + chunkType(Type.instantType(), ChunkType.Long); + chunkType(Type.ofCustom(Foo.class), ChunkType.Object); + + PrimitiveType.instances() + .map(Type::arrayType) + .forEach(nativeArrayType -> chunkType(nativeArrayType, ChunkType.Object)); + } + + private static void chunkType(Type type, ChunkType expectedChunkType) { + assertThat(ObjectProcessor.chunkType(type)).isSameAs(expectedChunkType); + } +} diff --git a/extensions/kafka/src/test/java/io/deephaven/processor/functions/ObjectProcessorFunctionsTest.java b/extensions/kafka/src/test/java/io/deephaven/processor/functions/ObjectProcessorFunctionsTest.java new file mode 100644 index 00000000000..66cf7fa1853 --- /dev/null +++ b/extensions/kafka/src/test/java/io/deephaven/processor/functions/ObjectProcessorFunctionsTest.java @@ -0,0 +1,157 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor.functions; + +import io.deephaven.chunk.WritableByteChunk; +import io.deephaven.chunk.WritableCharChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableDoubleChunk; +import io.deephaven.chunk.WritableFloatChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.WritableShortChunk; +import io.deephaven.functions.ToByteFunction; +import io.deephaven.functions.ToCharFunction; +import io.deephaven.functions.ToDoubleFunction; +import io.deephaven.functions.ToFloatFunction; +import io.deephaven.functions.ToIntFunction; +import io.deephaven.functions.ToLongFunction; +import io.deephaven.functions.ToObjectFunction; +import io.deephaven.functions.ToShortFunction; +import io.deephaven.processor.ObjectProcessor; +import io.deephaven.qst.type.Type; +import io.deephaven.util.BooleanUtils; +import org.junit.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ObjectProcessorFunctionsTest { + + @Test + public void testObjectProcessorFunctionsCorrectAndColumnOriented() { + int numRows = 128; + FunctionOrchestrator orchestrator = new FunctionOrchestrator(numRows); + ObjectProcessor functions = ObjectProcessor.strict(ObjectProcessorFunctions.of(List.of( + ToObjectFunction.of(orchestrator::toBoolean, Type.booleanType().boxedType()), + (ToCharFunction) orchestrator::toChar, + (ToByteFunction) orchestrator::toByte, + (ToShortFunction) orchestrator::toShort, + (ToIntFunction) orchestrator::toInt, + (ToLongFunction) orchestrator::toLong, + (ToFloatFunction) orchestrator::toFloat, + (ToDoubleFunction) orchestrator::toDouble, + ToObjectFunction.of(orchestrator::toString, Type.stringType())))); + try ( + WritableObjectChunk in = WritableObjectChunk.makeWritableChunk(numRows); + WritableByteChunk c1 = WritableByteChunk.makeWritableChunk(numRows); + WritableCharChunk c2 = WritableCharChunk.makeWritableChunk(numRows); + WritableByteChunk c3 = WritableByteChunk.makeWritableChunk(numRows); + WritableShortChunk c4 = WritableShortChunk.makeWritableChunk(numRows); + WritableIntChunk c5 = WritableIntChunk.makeWritableChunk(numRows); + WritableLongChunk c6 = WritableLongChunk.makeWritableChunk(numRows); + WritableFloatChunk c7 = WritableFloatChunk.makeWritableChunk(numRows); + WritableDoubleChunk c8 = WritableDoubleChunk.makeWritableChunk(numRows); + WritableObjectChunk c9 = WritableObjectChunk.makeWritableChunk(numRows)) { + List> out = List.of(c1, c2, c3, c4, c5, c6, c7, c8, c9); + for (WritableChunk c : out) { + c.setSize(0); + } + functions.processAll(in, out); + for (WritableChunk c : out) { + assertThat(c.size()).isEqualTo(numRows); + } + for (int i = 0; i < numRows; ++i) { + assertThat(c1.get(i)).isEqualTo(BooleanUtils.booleanAsByte(true)); + assertThat(c2.get(i)).isEqualTo((char) 42); + assertThat(c3.get(i)).isEqualTo((byte) 42); + assertThat(c4.get(i)).isEqualTo((short) 42); + assertThat(c5.get(i)).isEqualTo(42); + assertThat(c6.get(i)).isEqualTo(42L); + assertThat(c7.get(i)).isEqualTo(42.0f); + assertThat(c8.get(i)).isEqualTo(42.0); + assertThat(c9.get(i)).isEqualTo("42"); + } + } + orchestrator.assertDone(); + } + + private static class FunctionOrchestrator { + + private final int numRows; + + private int booleans; + private int chars; + private int bytes; + private int shorts; + private int ints; + private int longs; + private int floats; + private int doubles; + private int strings; + + public FunctionOrchestrator(int numRows) { + this.numRows = numRows; + } + + Boolean toBoolean(Object ignore) { + ++booleans; + return true; + } + + char toChar(Object ignore) { + assertThat(booleans).isEqualTo(numRows); + ++chars; + return (char) 42; + } + + byte toByte(Object ignore) { + assertThat(chars).isEqualTo(numRows); + ++bytes; + return (byte) 42; + } + + short toShort(Object ignore) { + assertThat(bytes).isEqualTo(numRows); + ++shorts; + return (short) 42; + } + + int toInt(Object ignore) { + assertThat(shorts).isEqualTo(numRows); + ++ints; + return (short) 42; + } + + long toLong(Object ignore) { + assertThat(ints).isEqualTo(numRows); + ++longs; + return 42L; + } + + float toFloat(Object ignore) { + assertThat(longs).isEqualTo(numRows); + ++floats; + return 42.0f; + } + + double toDouble(Object ignore) { + assertThat(floats).isEqualTo(numRows); + ++doubles; + return 42.0; + } + + String toString(Object ignore) { + assertThat(doubles).isEqualTo(numRows); + ++strings; + return "42"; + } + + void assertDone() { + assertThat(strings).isEqualTo(numRows); + } + } +}