diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSnapshotPerformanceLoggerImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSnapshotPerformanceLoggerImpl.java index ac26fe4f524..537698ad234 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSnapshotPerformanceLoggerImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSnapshotPerformanceLoggerImpl.java @@ -27,7 +27,7 @@ public BarrageSnapshotPerformanceLoggerImpl() { ExecutionContext.getContext().getUpdateGraph(), BarrageSnapshotPerformanceLoggerImpl.class.getName(), Map.of( - BaseTable.BARRAGE_PERFORMANCE_KEY_ATTRIBUTE, + Table.BARRAGE_PERFORMANCE_KEY_ATTRIBUTE, BarrageSnapshotPerformanceLogger.getDefaultTableName())); blink = adapter.table(); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSubscriptionPerformanceLoggerImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSubscriptionPerformanceLoggerImpl.java index 024e7a6f141..1fd22d04e7b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSubscriptionPerformanceLoggerImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageSubscriptionPerformanceLoggerImpl.java @@ -28,7 +28,7 @@ public BarrageSubscriptionPerformanceLoggerImpl() { ExecutionContext.getContext().getUpdateGraph(), BarrageSubscriptionPerformanceLoggerImpl.class.getName(), Map.of( - BaseTable.BARRAGE_PERFORMANCE_KEY_ATTRIBUTE, + Table.BARRAGE_PERFORMANCE_KEY_ATTRIBUTE, BarrageSubscriptionPerformanceLogger.getDefaultTableName())); blink = adapter.table(); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageTypeInfo.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageTypeInfo.java index d802730aef6..78edb33654e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageTypeInfo.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageTypeInfo.java @@ -4,14 +4,13 @@ package io.deephaven.extensions.barrage; import io.deephaven.chunk.ChunkType; -import org.apache.arrow.flatbuf.Field; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** * Describes type info used by factory implementations when creating a ChunkReader. */ -public class BarrageTypeInfo { +public class BarrageTypeInfo { /** * Factory method to create a TypeInfo instance. * @@ -20,22 +19,22 @@ public class BarrageTypeInfo { * @param arrowField the Arrow type to be read into the chunk * @return a TypeInfo instance */ - public static BarrageTypeInfo make( + public static BarrageTypeInfo make( @NotNull final Class type, @Nullable final Class componentType, - @NotNull final Field arrowField) { - return new BarrageTypeInfo(type, componentType, arrowField); + @NotNull final FIELD_TYPE arrowField) { + return new BarrageTypeInfo<>(type, componentType, arrowField); } private final Class type; @Nullable private final Class componentType; - private final Field arrowField; + private final FIELD_TYPE arrowField; public BarrageTypeInfo( @NotNull final Class type, @Nullable final Class componentType, - @NotNull final Field arrowField) { + @NotNull final FIELD_TYPE arrowField) { this.type = type; this.componentType = componentType; this.arrowField = arrowField; @@ -50,7 +49,7 @@ public Class componentType() { return componentType; } - public Field arrowField() { + public FIELD_TYPE arrowField() { return arrowField; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java index fb2bdf9e636..8e6f24d39f5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java @@ -13,7 +13,8 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java index bf5b820e4c2..f15d0a0b7c6 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java @@ -12,7 +12,8 @@ import io.deephaven.engine.rowset.RowSet; import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ByteChunk; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java index 7d1ccdd3145..8ffa38fb445 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java @@ -8,6 +8,7 @@ import io.deephaven.extensions.barrage.BarrageOptions; import io.deephaven.extensions.barrage.BarrageTypeInfo; import io.deephaven.util.annotations.FinalDefault; +import org.apache.arrow.flatbuf.Field; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -35,7 +36,7 @@ interface Factory { * @return a ChunkReader based on the given options, factory, and type to read */ > ChunkReader newReader( - @NotNull BarrageTypeInfo typeInfo, + @NotNull BarrageTypeInfo typeInfo, @NotNull BarrageOptions options); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkWriter.java index d3a19f78a98..21efe610eaf 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkWriter.java @@ -13,6 +13,7 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.Chunk; import io.deephaven.util.referencecounting.ReferenceCounted; +import org.apache.arrow.flatbuf.Field; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -33,7 +34,7 @@ interface Factory { * @return a ChunkWriter based on the given options, factory, and type to write */ > ChunkWriter newWriter( - @NotNull BarrageTypeInfo typeInfo); + @NotNull BarrageTypeInfo typeInfo); } /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java index 4b6f3822599..6500ed749f8 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java @@ -73,7 +73,7 @@ public class DefaultChunkReaderFactory implements ChunkReader.Factory { protected interface ChunkReaderFactory { ChunkReader> make( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options); } @@ -133,24 +133,27 @@ protected DefaultChunkReaderFactory() { register(ArrowType.ArrowTypeID.Interval, Period.class, DefaultChunkReaderFactory::intervalToPeriod); register(ArrowType.ArrowTypeID.Interval, PeriodDuration.class, DefaultChunkReaderFactory::intervalToPeriodDuration); - // TODO NATE NOCOMMIT: Test each of Arrow's timezone formats in Instant -> ZonedDateTime } @Override public > ChunkReader newReader( - @NotNull final BarrageTypeInfo typeInfo, + @NotNull final BarrageTypeInfo typeInfo, @NotNull final BarrageOptions options) { - return newReader(typeInfo, options, true); + final BarrageTypeInfo fieldTypeInfo = new BarrageTypeInfo<>( + typeInfo.type(), + typeInfo.componentType(), + Field.convertField(typeInfo.arrowField())); + return newReaderPojo(fieldTypeInfo, options, true); } - public > ChunkReader newReader( - @NotNull final BarrageTypeInfo typeInfo, + public > ChunkReader newReaderPojo( + @NotNull final BarrageTypeInfo typeInfo, @NotNull final BarrageOptions options, final boolean isTopLevel) { // TODO (deephaven/deephaven-core#6033): Run-End Support // TODO (deephaven/deephaven-core#6034): Dictionary Support - final Field field = Field.convertField(typeInfo.arrowField()); + final Field field = typeInfo.arrowField(); final ArrowType.ArrowTypeID typeId = field.getType().getTypeID(); final boolean isSpecialType = SPECIAL_TYPES.contains(typeId); @@ -208,28 +211,28 @@ public > ChunkReader newReader( fixedSizeLength = ((ArrowType.FixedSizeList) field.getType()).getListSize(); } - final BarrageTypeInfo componentTypeInfo; + final BarrageTypeInfo componentTypeInfo; final boolean useVectorKernels = Vector.class.isAssignableFrom(typeInfo.type()); if (useVectorKernels) { final Class componentType = VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); - componentTypeInfo = new BarrageTypeInfo( + componentTypeInfo = new BarrageTypeInfo<>( componentType, componentType.getComponentType(), - typeInfo.arrowField().children(0)); + typeInfo.arrowField().getChildren().get(0)); } else if (typeInfo.type().isArray()) { final Class componentType = typeInfo.componentType(); // noinspection DataFlowIssue - componentTypeInfo = new BarrageTypeInfo( + componentTypeInfo = new BarrageTypeInfo<>( componentType, componentType.getComponentType(), - typeInfo.arrowField().children(0)); + typeInfo.arrowField().getChildren().get(0)); } else if (isTopLevel && options.columnsAsList()) { - final BarrageTypeInfo realTypeInfo = new BarrageTypeInfo( + final BarrageTypeInfo realTypeInfo = new BarrageTypeInfo<>( typeInfo.type(), typeInfo.componentType(), - typeInfo.arrowField().children(0)); - final ChunkReader> componentReader = newReader(realTypeInfo, options, false); + typeInfo.arrowField().getChildren().get(0)); + final ChunkReader> componentReader = newReaderPojo(realTypeInfo, options, false); // noinspection unchecked return (ChunkReader) new SingleElementListHeaderReader<>(componentReader); } else { @@ -246,7 +249,7 @@ public > ChunkReader newReader( } else { kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentTypeInfo.type()); } - final ChunkReader> componentReader = newReader(componentTypeInfo, options, false); + final ChunkReader> componentReader = newReaderPojo(componentTypeInfo, options, false); // noinspection unchecked return (ChunkReader) new ListChunkReader<>(mode, fixedSizeLength, kernel, componentReader); @@ -254,11 +257,11 @@ public > ChunkReader newReader( if (typeId == ArrowType.ArrowTypeID.Map) { final Field structField = field.getChildren().get(0); - final BarrageTypeInfo keyTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(0)); - final BarrageTypeInfo valueTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(1)); + final BarrageTypeInfo keyTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(0)); + final BarrageTypeInfo valueTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(1)); - final ChunkReader> keyReader = newReader(keyTypeInfo, options, false); - final ChunkReader> valueReader = newReader(valueTypeInfo, options, false); + final ChunkReader> keyReader = newReaderPojo(keyTypeInfo, options, false); + final ChunkReader> valueReader = newReaderPojo(valueTypeInfo, options, false); // noinspection unchecked return (ChunkReader) new MapChunkReader<>(keyReader, valueReader); @@ -357,7 +360,7 @@ private static long factorForTimeUnit(final TimeUnit unit) { private static ChunkReader> timestampToLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final long factor = factorForTimeUnit(((ArrowType.Timestamp) arrowType).getUnit()); return factor == 1 @@ -368,7 +371,7 @@ private static ChunkReader> timestampToLong( private static ChunkReader> timestampToInstant( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final long factor = factorForTimeUnit(((ArrowType.Timestamp) arrowType).getUnit()); return new FixedWidthChunkReader<>(Long.BYTES, true, options, io -> { @@ -382,7 +385,7 @@ private static ChunkReader> timestampToInst private static ChunkReader> timestampToZonedDateTime( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; final String timezone = tsType.getTimezone(); @@ -399,7 +402,7 @@ private static ChunkReader> timestamp private static ChunkReader> timestampToLocalDateTime( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; final ZoneId tz = DateTimeUtils.parseTimeZone(tsType.getTimezone()); @@ -416,14 +419,14 @@ private static ChunkReader> timestamp private static ChunkReader> utf8ToString( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new VarBinaryChunkReader<>((buf, off, len) -> new String(buf, off, len, Charsets.UTF_8)); } private static ChunkReader> durationToLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final long factor = factorForTimeUnit(((ArrowType.Duration) arrowType).getUnit()); return factor == 1 @@ -434,7 +437,7 @@ private static ChunkReader> durationToLong( private static ChunkReader> durationToDuration( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final long factor = factorForTimeUnit(((ArrowType.Duration) arrowType).getUnit()); return transformToObject(new LongChunkReader(options), (chunk, ii) -> { @@ -445,21 +448,21 @@ private static ChunkReader> durationToDura private static ChunkReader> floatingPointToFloat( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new FloatChunkReader(((ArrowType.FloatingPoint) arrowType).getPrecision().getFlatbufID(), options); } private static ChunkReader> floatingPointToDouble( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new DoubleChunkReader(((ArrowType.FloatingPoint) arrowType).getPrecision().getFlatbufID(), options); } private static ChunkReader> floatingPointToBigDecimal( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return transformToObject( new DoubleChunkReader(((ArrowType.FloatingPoint) arrowType).getPrecision().getFlatbufID(), options), @@ -471,21 +474,21 @@ private static ChunkReader> floatingPoin private static ChunkReader> binaryToByteArray( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new VarBinaryChunkReader<>((buf, off, len) -> Arrays.copyOfRange(buf, off, off + len)); } private static ChunkReader> binaryToBigInt( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new VarBinaryChunkReader<>(BigInteger::new); } private static ChunkReader> binaryToBigDecimal( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new VarBinaryChunkReader<>((final byte[] buf, final int offset, final int length) -> { // read the int scale value as little endian, arrow's endianness. @@ -500,14 +503,14 @@ private static ChunkReader> binaryToBigD private static ChunkReader> binaryToSchema( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new VarBinaryChunkReader<>(ArrowIpcUtil::deserialize); } private static ChunkReader> timeToLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // See timeToLocalTime's comment for more information on wire format. final ArrowType.Time timeType = (ArrowType.Time) arrowType; @@ -533,7 +536,7 @@ private static ChunkReader> timeToLong( private static ChunkReader> timeToLocalTime( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { /* * Time is either a 32-bit or 64-bit signed integer type representing an elapsed time since midnight, stored in @@ -574,7 +577,7 @@ private static ChunkReader> timeToLocalTi private static ChunkReader> decimalToByte( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return ByteChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); @@ -582,7 +585,7 @@ private static ChunkReader> decimalToByte( private static ChunkReader> decimalToChar( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return CharChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii))); @@ -590,7 +593,7 @@ private static ChunkReader> decimalToChar( private static ChunkReader> decimalToShort( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return ShortChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); @@ -598,7 +601,7 @@ private static ChunkReader> decimalToShort( private static ChunkReader> decimalToInt( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return IntChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); @@ -606,7 +609,7 @@ private static ChunkReader> decimalToInt( private static ChunkReader> decimalToLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return LongChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); @@ -614,7 +617,7 @@ private static ChunkReader> decimalToLong( private static ChunkReader> decimalToBigInteger( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // note this mapping is particularly useful if scale == 0 final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; @@ -643,7 +646,7 @@ private static ChunkReader> decimalToBig private static ChunkReader> decimalToFloat( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return FloatChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.floatCast(chunk.get(ii))); @@ -651,7 +654,7 @@ private static ChunkReader> decimalToFloat( private static ChunkReader> decimalToDouble( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return DoubleChunkReader.transformTo(decimalToBigDecimal(arrowType, typeInfo, options), (chunk, ii) -> QueryLanguageFunctionUtils.doubleCast(chunk.get(ii))); @@ -659,7 +662,7 @@ private static ChunkReader> decimalToDouble( private static ChunkReader> decimalToBigDecimal( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; final int byteWidth = decimalType.getBitWidth() / 8; @@ -687,7 +690,7 @@ private static ChunkReader> decimalToBig private static ChunkReader> intToByte( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -715,7 +718,7 @@ private static ChunkReader> intToByte( private static ChunkReader> intToShort( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -744,7 +747,7 @@ private static ChunkReader> intToShort( private static ChunkReader> intToInt( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -772,7 +775,7 @@ private static ChunkReader> intToInt( private static ChunkReader> intToLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -800,7 +803,7 @@ private static ChunkReader> intToLong( private static ChunkReader> intToBigInt( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -826,7 +829,7 @@ private static ChunkReader> intToBigInt( private static ChunkReader> intToFloat( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -876,7 +879,7 @@ private static float floatCast( private static ChunkReader> intToDouble( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -926,7 +929,7 @@ private static double doubleCast( private static ChunkReader> intToBigDecimal( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -954,7 +957,7 @@ private static ChunkReader> intToBigDeci private static ChunkReader> intToChar( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); @@ -987,14 +990,14 @@ private static ChunkReader> intToChar( private static ChunkReader> boolToBoolean( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { return new BooleanChunkReader(); } private static ChunkReader> fixedSizeBinaryToByteArray( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { final ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) arrowType; final int elementWidth = fixedSizeBinary.getByteWidth(); @@ -1007,7 +1010,7 @@ private static ChunkReader> fixedSizeBinaryT private static ChunkReader> dateToInt( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // see dateToLocalDate's comment for more information on wire format final ArrowType.Date dateType = (ArrowType.Date) arrowType; @@ -1027,7 +1030,7 @@ private static ChunkReader> dateToInt( private static ChunkReader> dateToLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // see dateToLocalDate's comment for more information on wire format final ArrowType.Date dateType = (ArrowType.Date) arrowType; @@ -1048,7 +1051,7 @@ private static ChunkReader> dateToLong( private static ChunkReader> dateToLocalDate( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { /* * Date is either a 32-bit or 64-bit signed integer type representing an elapsed time since UNIX epoch @@ -1082,7 +1085,7 @@ private static ChunkReader> dateToLocalDa private static ChunkReader> intervalToDurationLong( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // See intervalToPeriod's comment for more information on wire format. @@ -1111,7 +1114,7 @@ private static ChunkReader> intervalToDurationLong( private static ChunkReader> intervalToDuration( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // See intervalToPeriod's comment for more information on wire format. @@ -1136,7 +1139,7 @@ private static ChunkReader> intervalToDura private static ChunkReader> intervalToPeriod( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { /* * A "calendar" interval which models types that don't necessarily have a precise duration without the context @@ -1189,7 +1192,7 @@ private static ChunkReader> intervalToPeriod private static ChunkReader> intervalToPeriodDuration( final ArrowType arrowType, - final BarrageTypeInfo typeInfo, + final BarrageTypeInfo typeInfo, final BarrageOptions options) { // See intervalToPeriod's comment for more information on wire format. diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java index ab4a5caf1b0..75f4cd2c624 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java @@ -53,6 +53,7 @@ import java.time.ZonedDateTime; import java.util.EnumMap; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -71,8 +72,7 @@ public class DefaultChunkWriterFactory implements ChunkWriter.Factory { */ protected interface ArrowTypeChunkWriterSupplier { ChunkWriter> make( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo); + final BarrageTypeInfo typeInfo); } private boolean toStringUnknownTypes = true; @@ -141,11 +141,20 @@ public void disableToStringUnknownTypes() { @Override public > ChunkWriter newWriter( - @NotNull final BarrageTypeInfo typeInfo) { + @NotNull final BarrageTypeInfo typeInfo) { + BarrageTypeInfo fieldTypeInfo = new BarrageTypeInfo<>( + typeInfo.type(), + typeInfo.componentType(), + Field.convertField(typeInfo.arrowField())); + return newWriterPojo(fieldTypeInfo); + } + + public > ChunkWriter newWriterPojo( + @NotNull final BarrageTypeInfo typeInfo) { // TODO (deephaven/deephaven-core#6033): Run-End Support // TODO (deephaven/deephaven-core#6034): Dictionary Support - final Field field = Field.convertField(typeInfo.arrowField()); + final Field field = typeInfo.arrowField(); final ArrowType.ArrowTypeID typeId = field.getType().getTypeID(); final boolean isSpecialType = DefaultChunkReaderFactory.SPECIAL_TYPES.contains(typeId); @@ -172,7 +181,7 @@ public > ChunkWriter newWriter( knownWriters == null ? null : knownWriters.get(typeInfo.type()); if (chunkWriterFactory != null) { // noinspection unchecked - final ChunkWriter writer = (ChunkWriter) chunkWriterFactory.make(field.getType(), typeInfo); + final ChunkWriter writer = (ChunkWriter) chunkWriterFactory.make(typeInfo); if (writer != null) { return writer; } @@ -211,22 +220,22 @@ public > ChunkWriter newWriter( fixedSizeLength = ((ArrowType.FixedSizeList) field.getType()).getListSize(); } - final BarrageTypeInfo componentTypeInfo; + final BarrageTypeInfo componentTypeInfo; final boolean useVectorKernels = Vector.class.isAssignableFrom(typeInfo.type()); if (useVectorKernels) { final Class componentType = VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); - componentTypeInfo = new BarrageTypeInfo( + componentTypeInfo = new BarrageTypeInfo<>( componentType, componentType.getComponentType(), - typeInfo.arrowField().children(0)); + field.getChildren().get(0)); } else if (typeInfo.type().isArray()) { final Class componentType = typeInfo.componentType(); // noinspection DataFlowIssue - componentTypeInfo = new BarrageTypeInfo( + componentTypeInfo = new BarrageTypeInfo<>( componentType, componentType.getComponentType(), - typeInfo.arrowField().children(0)); + field.getChildren().get(0)); } else { throw new UnsupportedOperationException(String.format( "No known ChunkWriter for arrow type %s from %s. Expected destination type to be an array.", @@ -241,7 +250,7 @@ public > ChunkWriter newWriter( } else { kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentTypeInfo.type()); } - final ChunkWriter> componentWriter = newWriter(componentTypeInfo); + final ChunkWriter> componentWriter = newWriterPojo(componentTypeInfo); // noinspection unchecked return (ChunkWriter) new ListChunkWriter<>(mode, fixedSizeLength, kernel, componentWriter); @@ -250,11 +259,11 @@ public > ChunkWriter newWriter( if (typeId == ArrowType.ArrowTypeID.Map) { // TODO: should we allow the user to supply the collector? final Field structField = field.getChildren().get(0); - final BarrageTypeInfo keyTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(0)); - final BarrageTypeInfo valueTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(1)); + final BarrageTypeInfo keyTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(0)); + final BarrageTypeInfo valueTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(1)); - final ChunkWriter> keyWriter = newWriter(keyTypeInfo); - final ChunkWriter> valueWriter = newWriter(valueTypeInfo); + final ChunkWriter> keyWriter = newWriterPojo(keyTypeInfo); + final ChunkWriter> valueWriter = newWriterPojo(valueTypeInfo); // noinspection unchecked return (ChunkWriter) new MapChunkWriter<>( @@ -266,6 +275,11 @@ public > ChunkWriter newWriter( if (typeId == ArrowType.ArrowTypeID.Union) { final ArrowType.Union unionType = (ArrowType.Union) field.getType(); + + final List>> childWriters = field.getChildren().stream() + .map(child -> newWriterPojo(BarrageUtil.getDefaultType(child))) + .collect(Collectors.toList()); + switch (unionType.getMode()) { case Sparse: // TODO NATE NOCOMMIT: implement @@ -298,37 +312,37 @@ protected void register( // if primitive automatically register the boxed version of this mapping, too if (deephavenType == byte.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Byte.class, (at, typeInfo) -> new ByteChunkWriter>( + .put(Byte.class, typeInfo -> new ByteChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } else if (deephavenType == short.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Short.class, (at, typeInfo) -> new ShortChunkWriter>( + .put(Short.class, typeInfo -> new ShortChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } else if (deephavenType == int.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Integer.class, (at, typeInfo) -> new IntChunkWriter>( + .put(Integer.class, typeInfo -> new IntChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } else if (deephavenType == long.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Long.class, (at, typeInfo) -> new LongChunkWriter>( + .put(Long.class, typeInfo -> new LongChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } else if (deephavenType == char.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Character.class, (at, typeInfo) -> new CharChunkWriter>( + .put(Character.class, typeInfo -> new CharChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } else if (deephavenType == float.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Float.class, (at, typeInfo) -> new FloatChunkWriter>( + .put(Float.class, typeInfo -> new FloatChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } else if (deephavenType == double.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) - .put(Double.class, (at, typeInfo) -> new DoubleChunkWriter>( + .put(Double.class, typeInfo -> new DoubleChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); } @@ -350,9 +364,8 @@ private static long factorForTimeUnit(final TimeUnit unit) { } private static ChunkWriter> timestampFromLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Timestamp tsType = (ArrowType.Timestamp) typeInfo.arrowField().getType(); final long factor = factorForTimeUnit(tsType.getUnit()); // TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime // we do not know whether the incoming chunk source is a LongChunk or ObjectChunk @@ -377,9 +390,8 @@ private static ChunkWriter> timestampFromLong( } private static ChunkWriter> timestampFromInstant( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final long factor = factorForTimeUnit(((ArrowType.Timestamp) arrowType).getUnit()); + final BarrageTypeInfo typeInfo) { + final long factor = factorForTimeUnit(((ArrowType.Timestamp) typeInfo.arrowField().getType()).getUnit()); return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { final Instant value = source.get(offset); return value == null ? QueryConstants.NULL_LONG : DateTimeUtils.epochNanos(value) / factor; @@ -387,9 +399,8 @@ private static ChunkWriter> timestampFromInstant( } private static ChunkWriter> timestampFromZonedDateTime( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Timestamp tsType = (ArrowType.Timestamp) typeInfo.arrowField().getType(); final long factor = factorForTimeUnit(tsType.getUnit()); return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { final ZonedDateTime value = source.get(offset); @@ -398,27 +409,23 @@ private static ChunkWriter> timestampFromZone } private static ChunkWriter> utf8FromString( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>((out, item) -> out.write(item.getBytes(StandardCharsets.UTF_8))); } private static ChunkWriter> utf8FromObject( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); } private static ChunkWriter> utf8FromPyObject( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); } private static ChunkWriter> durationFromLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final long factor = factorForTimeUnit(((ArrowType.Duration) arrowType).getUnit()); + final BarrageTypeInfo typeInfo) { + final long factor = factorForTimeUnit(((ArrowType.Duration) typeInfo.arrowField().getType()).getUnit()); return factor == 1 ? LongChunkWriter.IDENTITY_INSTANCE : new LongChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, (source, offset) -> { @@ -428,9 +435,8 @@ private static ChunkWriter> durationFromLong( } private static ChunkWriter> durationFromDuration( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final long factor = factorForTimeUnit(((ArrowType.Duration) arrowType).getUnit()); + final BarrageTypeInfo typeInfo) { + final long factor = factorForTimeUnit(((ArrowType.Duration) typeInfo.arrowField().getType()).getUnit()); return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { final Duration value = source.get(offset); return value == null ? QueryConstants.NULL_LONG : value.toNanos() / factor; @@ -438,9 +444,8 @@ private static ChunkWriter> durationFromDuration( } private static ChunkWriter> floatingPointFromFloat( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) typeInfo.arrowField().getType(); switch (fpType.getPrecision()) { case HALF: return new ShortChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, (source, offset) -> { @@ -463,9 +468,8 @@ private static ChunkWriter> floatingPointFromFloat( } private static ChunkWriter> floatingPointFromDouble( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) typeInfo.arrowField().getType(); switch (fpType.getPrecision()) { case HALF: return new ShortChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, (source, offset) -> { @@ -487,9 +491,8 @@ private static ChunkWriter> floatingPointFromDouble( } private static ChunkWriter> floatingPointFromBigDecimal( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) typeInfo.arrowField().getType(); switch (fpType.getPrecision()) { case HALF: return new ShortChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { @@ -513,20 +516,17 @@ private static ChunkWriter> floatingPointFromBig } private static ChunkWriter> binaryFromByteArray( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>(OutputStream::write); } private static ChunkWriter> binaryFromBigInt( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toByteArray())); } private static ChunkWriter> binaryFromBigDecimal( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>((out, item) -> { final BigDecimal normal = item.stripTrailingZeros(); final int v = normal.scale(); @@ -540,16 +540,14 @@ private static ChunkWriter> binaryFromBigDecimal } private static ChunkWriter> binaryFromSchema( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new VarBinaryChunkWriter<>(ArrowIpcUtil::serialize); } private static ChunkWriter> timeFromLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { // See timeFromLocalTime's comment for more information on wire format. - final ArrowType.Time timeType = (ArrowType.Time) arrowType; + final ArrowType.Time timeType = (ArrowType.Time) typeInfo.arrowField().getType(); final int bitWidth = timeType.getBitWidth(); final long factor = factorForTimeUnit(timeType.getUnit()); switch (bitWidth) { @@ -573,8 +571,7 @@ private static ChunkWriter> timeFromLong( } private static ChunkWriter> timeFromLocalTime( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { /* * Time is either a 32-bit or 64-bit signed integer type representing an elapsed time since midnight, stored in * either of four units: seconds, milliseconds, microseconds or nanoseconds. @@ -591,7 +588,7 @@ private static ChunkWriter> timeFromLocalTime( * (for example by replacing the value 86400 with 86399). */ - final ArrowType.Time timeType = (ArrowType.Time) arrowType; + final ArrowType.Time timeType = (ArrowType.Time) typeInfo.arrowField().getType(); final int bitWidth = timeType.getBitWidth(); final long factor = factorForTimeUnit(timeType.getUnit()); switch (bitWidth) { @@ -615,9 +612,8 @@ private static ChunkWriter> timeFromLocalTime( } private static ChunkWriter> decimalFromByte( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -639,9 +635,8 @@ private static ChunkWriter> decimalFromByte( } private static ChunkWriter> decimalFromChar( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -663,9 +658,8 @@ private static ChunkWriter> decimalFromChar( } private static ChunkWriter> decimalFromShort( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -687,9 +681,8 @@ private static ChunkWriter> decimalFromShort( } private static ChunkWriter> decimalFromInt( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -711,9 +704,8 @@ private static ChunkWriter> decimalFromInt( } private static ChunkWriter> decimalFromLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -735,9 +727,8 @@ private static ChunkWriter> decimalFromLong( } private static ChunkWriter> decimalFromBigInteger( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -759,9 +750,8 @@ private static ChunkWriter> decimalFromBigIntege } private static ChunkWriter> decimalFromFloat( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -783,9 +773,8 @@ private static ChunkWriter> decimalFromFloat( } private static ChunkWriter> decimalFromDouble( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -807,9 +796,8 @@ private static ChunkWriter> decimalFromDouble( } private static ChunkWriter> decimalFromBigDecimal( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Decimal decimalType = (ArrowType.Decimal) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Decimal decimalType = (ArrowType.Decimal) typeInfo.arrowField().getType(); final int byteWidth = decimalType.getBitWidth() / 8; final int scale = decimalType.getScale(); final byte[] nullValue = new byte[byteWidth]; @@ -851,9 +839,8 @@ private static void writeBigDecimal( } private static ChunkWriter> intFromByte( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -874,9 +861,8 @@ private static ChunkWriter> intFromByte( } private static ChunkWriter> intFromShort( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -897,9 +883,8 @@ private static ChunkWriter> intFromShort( } private static ChunkWriter> intFromInt( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -920,9 +905,8 @@ private static ChunkWriter> intFromInt( } private static ChunkWriter> intFromLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -943,9 +927,8 @@ private static ChunkWriter> intFromLong( } private static ChunkWriter> intFromObject( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -967,9 +950,8 @@ private static ChunkWriter> intFromObject( } private static ChunkWriter> intFromChar( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); final boolean unsigned = !intType.getIsSigned(); @@ -996,9 +978,8 @@ private static ChunkWriter> intFromChar( } private static ChunkWriter> intFromFloat( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -1020,9 +1001,8 @@ private static ChunkWriter> intFromFloat( } private static ChunkWriter> intFromDouble( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.Int intType = (ArrowType.Int) typeInfo.arrowField().getType(); final int bitWidth = intType.getBitWidth(); switch (bitWidth) { @@ -1044,15 +1024,13 @@ private static ChunkWriter> intFromDouble( } private static ChunkWriter> boolFromBoolean( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { return new BooleanChunkWriter(); } private static ChunkWriter> fixedSizeBinaryFromByteArray( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { - final ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) arrowType; + final BarrageTypeInfo typeInfo) { + final ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) typeInfo.arrowField().getType(); final int elementWidth = fixedSizeBinary.getByteWidth(); return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, elementWidth, false, (out, chunk, offset) -> { @@ -1067,10 +1045,9 @@ private static ChunkWriter> fixedSizeBinaryFromByteA } private static ChunkWriter> dateFromInt( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { // see dateFromLocalDate's comment for more information on wire format - final ArrowType.Date dateType = (ArrowType.Date) arrowType; + final ArrowType.Date dateType = (ArrowType.Date) typeInfo.arrowField().getType(); switch (dateType.getUnit()) { case DAY: return new IntChunkWriter<>(IntChunk::isNull, IntChunk::getEmptyChunk, @@ -1090,10 +1067,9 @@ private static ChunkWriter> dateFromInt( } private static ChunkWriter> dateFromLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { // see dateFromLocalDate's comment for more information on wire format - final ArrowType.Date dateType = (ArrowType.Date) arrowType; + final ArrowType.Date dateType = (ArrowType.Date) typeInfo.arrowField().getType(); switch (dateType.getUnit()) { case DAY: return new IntChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, @@ -1113,8 +1089,7 @@ private static ChunkWriter> dateFromLong( } private static ChunkWriter> dateFromLocalDate( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { /* * Date is either a 32-bit or 64-bit signed integer type representing an elapsed time since UNIX epoch * (1970-01-01), stored in either of two units: @@ -1126,7 +1101,7 @@ private static ChunkWriter> dateFromLocalDate( * @formatter:on */ - final ArrowType.Date dateType = (ArrowType.Date) arrowType; + final ArrowType.Date dateType = (ArrowType.Date) typeInfo.arrowField().getType(); switch (dateType.getUnit()) { case DAY: return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { @@ -1145,11 +1120,10 @@ private static ChunkWriter> dateFromLocalDate( } private static ChunkWriter> intervalFromDurationLong( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { // See intervalFromPeriod's comment for more information on wire format. - final ArrowType.Interval intervalType = (ArrowType.Interval) arrowType; + final ArrowType.Interval intervalType = (ArrowType.Interval) typeInfo.arrowField().getType(); switch (intervalType.getUnit()) { case YEAR_MONTH: case MONTH_DAY_NANO: @@ -1179,11 +1153,10 @@ private static ChunkWriter> intervalFromDurationLong( } private static ChunkWriter> intervalFromDuration( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { // See intervalFromPeriod's comment for more information on wire format. - final ArrowType.Interval intervalType = (ArrowType.Interval) arrowType; + final ArrowType.Interval intervalType = (ArrowType.Interval) typeInfo.arrowField().getType(); switch (intervalType.getUnit()) { case YEAR_MONTH: case MONTH_DAY_NANO: @@ -1212,8 +1185,7 @@ private static ChunkWriter> intervalFromDuration( } private static ChunkWriter> intervalFromPeriod( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { /* * A "calendar" interval which models types that don't necessarily have a precise duration without the context * of a base timestamp (e.g. days can differ in length during day light savings time transitions). All integers @@ -1236,7 +1208,7 @@ private static ChunkWriter> intervalFromPeriod( * Note: Period does not handle the time portion of DAY_TIME and MONTH_DAY_NANO. Arrow stores these in * PeriodDuration pairs. */ - final ArrowType.Interval intervalType = (ArrowType.Interval) arrowType; + final ArrowType.Interval intervalType = (ArrowType.Interval) typeInfo.arrowField().getType(); switch (intervalType.getUnit()) { case YEAR_MONTH: return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { @@ -1278,11 +1250,10 @@ private static ChunkWriter> intervalFromPeriod( } private static ChunkWriter> intervalFromPeriodDuration( - final ArrowType arrowType, - final BarrageTypeInfo typeInfo) { + final BarrageTypeInfo typeInfo) { // See intervalToPeriod's comment for more information on wire format. - final ArrowType.Interval intervalType = (ArrowType.Interval) arrowType; + final ArrowType.Interval intervalType = (ArrowType.Interval) typeInfo.arrowField().getType(); switch (intervalType.getUnit()) { case YEAR_MONTH: return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java index 70af51ac0a7..6be0ccb88e6 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java @@ -12,7 +12,8 @@ import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.extensions.barrage.util.Float16; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.extensions.barrage.util.Float16; import io.deephaven.util.QueryConstants; import io.deephaven.util.datastructures.LongSizedDataStructure; import org.apache.arrow.flatbuf.Precision; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java index 4c849042d29..965ae4d1f47 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java @@ -12,7 +12,8 @@ import io.deephaven.engine.rowset.RowSet; import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.DoubleChunk; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ExpansionKernel.java index 8c18bb552e2..d528c59a28e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ExpansionKernel.java @@ -32,17 +32,9 @@ public interface ExpansionKernel { * {@code source.get(i).length} * @return an unrolled/flattened chunk of T */ - default WritableChunk expand( - @NotNull ObjectChunk source, - int fixedSizeLength, - @Nullable WritableIntChunk offsetDest) { - // TODO NATE NOCOMMII: implement fixed size list length restrictions! - return expand(source, offsetDest); - } - - // TODO NATE NOCOMMIT: THIS METHOD DOES NOT GET TO STAY WritableChunk expand( @NotNull ObjectChunk source, + int fixedSizeLength, @Nullable WritableIntChunk offsetDest); /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java index 5bd066451ec..1b3b58689a0 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java @@ -12,7 +12,8 @@ import io.deephaven.engine.rowset.RowSet; import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.FloatChunk; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java index 8ec029e0858..8ec0d0ddc29 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java @@ -13,7 +13,8 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java index e200591c265..f598e8a629b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java @@ -12,7 +12,8 @@ import io.deephaven.engine.rowset.RowSet; import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.IntChunk; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java index beda3d71e3a..8663d530da9 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java @@ -10,9 +10,11 @@ import io.deephaven.base.verify.Assert; import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java index 3d3b884f722..016a0ec4bb7 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java @@ -12,7 +12,8 @@ import io.deephaven.engine.rowset.RowSet; import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.LongChunk; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java index 09160dfea1f..6016025ecde 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java @@ -13,7 +13,8 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java index 23a5aeef5f2..eb257457b2c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java @@ -12,7 +12,8 @@ import io.deephaven.engine.rowset.RowSet; import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.extensions.barrage.BarrageOptions;import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.extensions.barrage.BarrageOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ShortChunk; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BooleanArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BooleanArrayExpansionKernel.java index 3cee60be8a0..ca09b12d014 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BooleanArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BooleanArrayExpansionKernel.java @@ -26,6 +26,7 @@ public class BooleanArrayExpansionKernel implements ArrayExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -39,7 +40,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final boolean[] row = typedSource.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableByteChunk result = WritableByteChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -56,11 +61,15 @@ public WritableChunk expand( if (row == null) { continue; } - for (int j = 0; j < row.length; ++j) { + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + for (int j = 0; j < rowLen; ++j) { final byte value = row[j] ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE : BooleanUtils.FALSE_BOOLEAN_AS_BYTE; result.set(lenWritten + j, value); } - lenWritten += row.length; + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(typedSource.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java index c37a2c8fa74..0aae8b92bca 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java @@ -26,6 +26,7 @@ public class BoxedBooleanArrayExpansionKernel implements ArrayExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -39,7 +40,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final Boolean[] row = typedSource.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableByteChunk result = WritableByteChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -56,11 +61,15 @@ public WritableChunk expand( if (row == null) { continue; } - for (int j = 0; j < row.length; ++j) { + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + for (int j = 0; j < rowLen; ++j) { final byte value = BooleanUtils.booleanAsByte(row[j]); result.set(lenWritten + j, value); } - lenWritten += row.length; + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(typedSource.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ByteArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ByteArrayExpansionKernel.java index df5edb14662..1312720d20e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ByteArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ByteArrayExpansionKernel.java @@ -29,6 +29,7 @@ public class ByteArrayExpansionKernel implements ArrayExpansionKernel { @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final byte[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableByteChunk result = WritableByteChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/CharArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/CharArrayExpansionKernel.java index 3cd6749d0a7..06b51614b80 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/CharArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/CharArrayExpansionKernel.java @@ -25,6 +25,7 @@ public class CharArrayExpansionKernel implements ArrayExpansionKernel { @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -36,7 +37,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final char[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableCharChunk result = WritableCharChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -53,8 +58,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/DoubleArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/DoubleArrayExpansionKernel.java index be2cfcbfe27..5d201f9bcea 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/DoubleArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/DoubleArrayExpansionKernel.java @@ -29,6 +29,7 @@ public class DoubleArrayExpansionKernel implements ArrayExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final double[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableDoubleChunk result = WritableDoubleChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/FloatArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/FloatArrayExpansionKernel.java index 7e227fa8861..e9853645ec5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/FloatArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/FloatArrayExpansionKernel.java @@ -29,6 +29,7 @@ public class FloatArrayExpansionKernel implements ArrayExpansionKernel @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final float[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableFloatChunk result = WritableFloatChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/IntArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/IntArrayExpansionKernel.java index 53f6a3ba1a6..5e10b9d751e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/IntArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/IntArrayExpansionKernel.java @@ -29,6 +29,7 @@ public class IntArrayExpansionKernel implements ArrayExpansionKernel { @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final int[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableIntChunk result = WritableIntChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/LongArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/LongArrayExpansionKernel.java index da031b5f882..99bbba564d4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/LongArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/LongArrayExpansionKernel.java @@ -29,6 +29,7 @@ public class LongArrayExpansionKernel implements ArrayExpansionKernel { @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final long[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableLongChunk result = WritableLongChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ObjectArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ObjectArrayExpansionKernel.java index 28210c017c5..83e944acb7f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ObjectArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ObjectArrayExpansionKernel.java @@ -27,6 +27,7 @@ public ObjectArrayExpansionKernel(final Class componentType) { @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final T[] row = typedSource.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableObjectChunk result = WritableObjectChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(typedSource.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ShortArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ShortArrayExpansionKernel.java index 29f81202b6e..51d51864ffb 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ShortArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/ShortArrayExpansionKernel.java @@ -29,6 +29,7 @@ public class ShortArrayExpansionKernel implements ArrayExpansionKernel @Override public WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -40,7 +41,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < source.size(); ++ii) { final short[] row = source.get(ii); - totalSize += row == null ? 0 : row.length; + int rowLen = row == null ? 0 : row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableShortChunk result = WritableShortChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -57,8 +62,12 @@ public WritableChunk expand( if (row == null) { continue; } - result.copyFromArray(row, 0, lenWritten, row.length); - lenWritten += row.length; + int rowLen = row.length; + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + result.copyFromArray(row, 0, lenWritten, rowLen); + lenWritten += rowLen; } if (offsetsDest != null) { offsetsDest.set(source.size(), lenWritten); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ByteVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ByteVectorExpansionKernel.java index cc95e60e05d..582f6377d08 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ByteVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ByteVectorExpansionKernel.java @@ -34,6 +34,7 @@ public class ByteVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -47,7 +48,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final ByteVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableByteChunk result = WritableByteChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -66,7 +71,11 @@ public WritableChunk expand( } final ByteConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfByte iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/CharVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/CharVectorExpansionKernel.java index b0d6089b2a5..3ab4037bb7b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/CharVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/CharVectorExpansionKernel.java @@ -30,6 +30,7 @@ public class CharVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -43,7 +44,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final CharVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableCharChunk result = WritableCharChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -62,7 +67,11 @@ public WritableChunk expand( } final CharConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfChar iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/DoubleVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/DoubleVectorExpansionKernel.java index bc0a726b560..37ebe5626dc 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/DoubleVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/DoubleVectorExpansionKernel.java @@ -35,6 +35,7 @@ public class DoubleVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -48,7 +49,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final DoubleVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableDoubleChunk result = WritableDoubleChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -67,7 +72,11 @@ public WritableChunk expand( } final DoubleConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfDouble iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/FloatVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/FloatVectorExpansionKernel.java index 9e0ba1818d7..c2038e78859 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/FloatVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/FloatVectorExpansionKernel.java @@ -34,6 +34,7 @@ public class FloatVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -47,7 +48,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final FloatVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableFloatChunk result = WritableFloatChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -66,7 +71,11 @@ public WritableChunk expand( } final FloatConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfFloat iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/IntVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/IntVectorExpansionKernel.java index 3369d81f1bb..f3fed8ad122 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/IntVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/IntVectorExpansionKernel.java @@ -35,6 +35,7 @@ public class IntVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -48,7 +49,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final IntVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableIntChunk result = WritableIntChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -67,7 +72,11 @@ public WritableChunk expand( } final IntConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfInt iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/LongVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/LongVectorExpansionKernel.java index 4d208c394b0..c346ba3859a 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/LongVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/LongVectorExpansionKernel.java @@ -35,6 +35,7 @@ public class LongVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -48,7 +49,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final LongVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableLongChunk result = WritableLongChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -67,7 +72,11 @@ public WritableChunk expand( } final LongConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfLong iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ObjectVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ObjectVectorExpansionKernel.java index ec081c7c9a3..d28b00d5064 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ObjectVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ObjectVectorExpansionKernel.java @@ -31,6 +31,7 @@ public ObjectVectorExpansionKernel(final Class componentType) { @Override public WritableChunk expand( @NotNull final ObjectChunk, A> source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -44,7 +45,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final ObjectVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableObjectChunk result = WritableObjectChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -62,8 +67,13 @@ public WritableChunk expand( continue; } try (final CloseableIterator iter = row.iterator()) { - // noinspection unchecked - iter.forEachRemaining(v -> result.add((T) v)); + if (fixedSizeLength > 0) { + // noinspection unchecked + iter.stream().limit(fixedSizeLength).forEach(v -> result.add((T) v)); + } else { + // noinspection unchecked + iter.forEachRemaining(v -> result.add((T) v)); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ShortVectorExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ShortVectorExpansionKernel.java index 9300aec814b..2f2ce6a244e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ShortVectorExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/vector/ShortVectorExpansionKernel.java @@ -34,6 +34,7 @@ public class ShortVectorExpansionKernel implements VectorExpansionKernel WritableChunk expand( @NotNull final ObjectChunk source, + final int fixedSizeLength, @Nullable final WritableIntChunk offsetsDest) { if (source.size() == 0) { if (offsetsDest != null) { @@ -47,7 +48,11 @@ public WritableChunk expand( long totalSize = 0; for (int ii = 0; ii < typedSource.size(); ++ii) { final ShortVector row = typedSource.get(ii); - totalSize += row == null ? 0 : row.size(); + long rowLen = row == null ? 0 : row.size(); + if (fixedSizeLength > 0) { + rowLen = Math.min(rowLen, fixedSizeLength); + } + totalSize += rowLen; } final WritableShortChunk result = WritableShortChunk.makeWritableChunk( LongSizedDataStructure.intSize("ExpansionKernel", totalSize)); @@ -66,7 +71,11 @@ public WritableChunk expand( } final ShortConsumer consumer = result::add; try (final CloseablePrimitiveIteratorOfShort iter = row.iterator()) { - iter.forEachRemaining(consumer); + if (fixedSizeLength > 0) { + iter.stream().limit(fixedSizeLength).forEach(consumer::accept); + } else { + iter.forEachRemaining(consumer); + } } } if (offsetsDest != null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index 3ea376610ee..88899aba5a6 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -82,6 +82,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -351,14 +352,9 @@ public static Stream columnDefinitionsToFields( @NotNull final Function> fieldMetadataFactory, @NotNull final Map attributes, final boolean columnsAsList) { - // Find the format columns - final Set formatColumns = new HashSet<>(); - columnDefinitions.stream().map(ColumnDefinition::getName) - .filter(ColumnFormatting::isFormattingColumn) - .forEach(formatColumns::add); // Find columns that are sortable - Set sortableColumns; + final Set sortableColumns; if (attributes.containsKey(GridAttributes.SORTABLE_COLUMNS_ATTRIBUTE)) { final String[] restrictedSortColumns = attributes.get(GridAttributes.SORTABLE_COLUMNS_ATTRIBUTE).toString().split(","); @@ -372,8 +368,12 @@ public static Stream columnDefinitionsToFields( .collect(Collectors.toSet()); } - // Build metadata for columns and add the fields - return columnDefinitions.stream().map((final ColumnDefinition column) -> { + final Schema targetSchema; + final Set formatColumns = new HashSet<>(); + final Map fieldMap = new LinkedHashMap<>(); + + final Function, Field> fieldFor = (final ColumnDefinition column) -> { + final Field field = fieldMap.get(column.getName()); final String name = column.getName(); Class dataType = column.getDataType(); Class componentType = column.getComponentType(); @@ -431,18 +431,51 @@ public static Stream columnDefinitionsToFields( dataType = Array.newInstance(dataType, 0).getClass(); } + if (field != null) { + final FieldType origType = field.getFieldType(); + // user defined metadata should override the default metadata + metadata.putAll(field.getMetadata()); + final FieldType newType = + new FieldType(origType.isNullable(), origType.getType(), origType.getDictionary(), + origType.getMetadata()); + return new Field(field.getName(), newType, field.getChildren()); + } + if (Vector.class.isAssignableFrom(dataType)) { return arrowFieldForVectorType(name, dataType, componentType, metadata); } return arrowFieldFor(name, dataType, componentType, metadata, columnsAsList); - }); + }; + + if (attributes.containsKey(Table.BARRAGE_SCHEMA_ATTRIBUTE)) { + targetSchema = (Schema) attributes.get(Table.BARRAGE_SCHEMA_ATTRIBUTE); + targetSchema.getFields().forEach(field -> fieldMap.put(field.getName(), field)); + + fieldMap.keySet().stream() + .filter(ColumnFormatting::isFormattingColumn) + .forEach(formatColumns::add); + + final Map> columnDefinitionMap = new LinkedHashMap<>(); + columnDefinitions.stream().filter(column -> fieldMap.containsKey(column.getName())) + .forEach(column -> columnDefinitionMap.put(column.getName(), column)); + + return fieldMap.keySet().stream().map(columnDefinitionMap::get).map(fieldFor); + } + + // Find the format columns + columnDefinitions.stream().map(ColumnDefinition::getName) + .filter(ColumnFormatting::isFormattingColumn) + .forEach(formatColumns::add); + + // Build metadata for columns and add the fields + return columnDefinitions.stream().map(fieldFor); } public static void putMetadata(final Map metadata, final String key, final String value) { metadata.put(ATTR_DH_PREFIX + key, value); } - public static BarrageTypeInfo getDefaultType(@NotNull final Field field) { + public static BarrageTypeInfo getDefaultType(@NotNull final Field field) { Class explicitClass = null; final String explicitClassName = field.getMetadata().get(ATTR_DH_PREFIX + ATTR_TYPE_TAG); @@ -464,19 +497,27 @@ public static BarrageTypeInfo getDefaultType(@NotNull final Field field) { } } - final Class columnType = getDefaultType(field.getType(), explicitClass); + if (field.getType().getTypeID() == ArrowType.ArrowTypeID.Map) { + return new BarrageTypeInfo<>(Map.class, null, + arrowFieldFor(field.getName(), Map.class, null, field.getMetadata(), false)); + } + + final Class columnType = getDefaultType(field, explicitClass); + if (columnComponentType == null && columnType.isArray()) { + columnComponentType = columnType.getComponentType(); + } - return new BarrageTypeInfo(columnType, columnComponentType, - flatbufFieldFor(field.getName(), columnType, columnComponentType, field.getMetadata())); + return new BarrageTypeInfo<>(columnType, columnComponentType, + arrowFieldFor(field.getName(), columnType, columnComponentType, field.getMetadata(), false)); } private static Class getDefaultType( - final ArrowType arrowType, + final Field arrowField, final Class explicitType) { - final String exMsg = "Schema did not include `" + ATTR_DH_PREFIX + ATTR_TYPE_TAG + "` metadata for field "; - switch (arrowType.getTypeID()) { + final String exMsg = "Schema did not include `" + ATTR_DH_PREFIX + ATTR_TYPE_TAG + "` metadata for field"; + switch (arrowField.getType().getTypeID()) { case Int: - final ArrowType.Int intType = (ArrowType.Int) arrowType; + final ArrowType.Int intType = (ArrowType.Int) arrowField.getType(); if (intType.getIsSigned()) { // SIGNED switch (intType.getBitWidth()) { @@ -509,7 +550,7 @@ private static Class getDefaultType( case Duration: return long.class; case Timestamp: - final ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowType; + final ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowField.getType(); final String tz = timestampType.getTimezone(); final TimeUnit timestampUnit = timestampType.getUnit(); if ((tz == null || "UTC".equals(tz))) { @@ -522,7 +563,7 @@ private static Class getDefaultType( " of timestampType(Timezone=" + tz + ", Unit=" + timestampUnit.toString() + ")"); case FloatingPoint: - final ArrowType.FloatingPoint floatingPointType = (ArrowType.FloatingPoint) arrowType; + final ArrowType.FloatingPoint floatingPointType = (ArrowType.FloatingPoint) arrowField.getType(); switch (floatingPointType.getPrecision()) { case SINGLE: return float.class; @@ -539,8 +580,12 @@ private static Class getDefaultType( if (explicitType != null) { return explicitType; } + if (arrowField.getType().getTypeID() == ArrowType.ArrowTypeID.List) { + final Class childType = getDefaultType(arrowField.getChildren().get(0), null); + return Array.newInstance(childType, 0).getClass(); + } throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg + - " of type " + arrowType.getTypeID().toString()); + " of type " + arrowField.getType().getTypeID().toString()); } } @@ -595,7 +640,7 @@ private ChunkReader[] computeChunkReaders( final List> columns = tableDef.getColumns(); for (int ii = 0; ii < tableDef.numColumns(); ++ii) { final ColumnDefinition columnDefinition = ReinterpretUtils.maybeConvertToPrimitive(columns.get(ii)); - final BarrageTypeInfo typeInfo = BarrageTypeInfo.make( + final BarrageTypeInfo typeInfo = BarrageTypeInfo.make( columnDefinition.getDataType(), columnDefinition.getComponentType(), schema.fields(ii)); readers[ii] = chunkReaderFactory.newReader(typeInfo, barrageOptions); } @@ -616,8 +661,7 @@ public static ConvertedArrowSchema convertArrowSchema( final org.apache.arrow.flatbuf.Schema schema) { return convertArrowSchema( schema.fieldsLength(), - i -> schema.fields(i).name(), - i -> ArrowType.getTypeForField(schema.fields(i)), + i -> Field.convertField(schema.fields(i)), i -> visitor -> { final org.apache.arrow.flatbuf.Field field = schema.fields(i); if (field.dictionary() != null) { @@ -640,8 +684,7 @@ public static ConvertedArrowSchema convertArrowSchema( public static ConvertedArrowSchema convertArrowSchema(final Schema schema) { return convertArrowSchema( schema.getFields().size(), - i -> schema.getFields().get(i).getName(), - i -> schema.getFields().get(i).getType(), + i -> schema.getFields().get(i), i -> visitor -> { schema.getFields().get(i).getMetadata().forEach(visitor); }, @@ -650,15 +693,15 @@ public static ConvertedArrowSchema convertArrowSchema(final Schema schema) { private static ConvertedArrowSchema convertArrowSchema( final int numColumns, - final IntFunction getName, - final IntFunction getArrowType, + final IntFunction getField, final IntFunction>> columnMetadataVisitor, final Consumer> tableMetadataVisitor) { final ConvertedArrowSchema result = new ConvertedArrowSchema(); final ColumnDefinition[] columns = new ColumnDefinition[numColumns]; for (int i = 0; i < numColumns; ++i) { - final String origName = getName.apply(i); + final Field field = getField.apply(i); + final String origName = field.getName(); final String name = NameValidator.legalizeColumnName(origName); final MutableObject> type = new MutableObject<>(); final MutableObject> componentType = new MutableObject<>(); @@ -680,7 +723,7 @@ private static ConvertedArrowSchema convertArrowSchema( }); // this has side effects such as type validation; must call even if dest type is well known - Class defaultType = getDefaultType(getArrowType.apply(i), type.getValue()); + Class defaultType = getDefaultType(field, type.getValue()); if (type.getValue() == null) { type.setValue(defaultType); @@ -902,12 +945,27 @@ public static void createAndSendStaticSnapshot( long snapshotTargetCellCount = MIN_SNAPSHOT_CELL_COUNT; double snapshotNanosPerCell = 0.0; + final Map fieldFor; + if (table.hasAttribute(Table.BARRAGE_SCHEMA_ATTRIBUTE)) { + fieldFor = new HashMap<>(); + final Schema targetSchema = (Schema) table.getAttribute(Table.BARRAGE_SCHEMA_ATTRIBUTE); + // noinspection DataFlowIssue + targetSchema.getFields().forEach(f -> { + final FlatBufferBuilder fbb = new FlatBufferBuilder(); + final int offset = f.getField(fbb); + fbb.finish(offset); + fieldFor.put(f.getName(), org.apache.arrow.flatbuf.Field.getRootAsField(fbb.dataBuffer())); + }); + } else { + fieldFor = null; + } + // noinspection unchecked final ChunkWriter>[] chunkWriters = table.getDefinition().getColumns().stream() .map(cd -> DefaultChunkWriterFactory.INSTANCE.newWriter(BarrageTypeInfo.make( ReinterpretUtils.maybeConvertToPrimitiveDataType(cd.getDataType()), cd.getComponentType(), - flatbufFieldFor(cd, Map.of())))) + fieldFor != null ? fieldFor.get(cd.getName()) : flatbufFieldFor(cd, Map.of())))) .toArray(ChunkWriter[]::new); final long columnCount = diff --git a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java index ed1aa105830..81d44d55dc5 100644 --- a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java +++ b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java @@ -19,6 +19,7 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.engine.table.impl.TableCreatorImpl; import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; @@ -1395,7 +1396,8 @@ private static class CommandGetSqlInfoImpl extends CommandHandlerFixedBase ATTRIBUTES = Map.of(); + private static final Map ATTRIBUTES = Map.of( + Table.BARRAGE_SCHEMA_ATTRIBUTE, FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA); private static final ByteString SCHEMA_BYTES = BarrageUtil.schemaBytesFromTableDefinition(DEFINITION, ATTRIBUTES, true); diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/WebChunkReaderFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/WebChunkReaderFactory.java index 0fedeb406cc..f1b037854fd 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/WebChunkReaderFactory.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/WebChunkReaderFactory.java @@ -38,6 +38,7 @@ import io.deephaven.web.client.api.LongWrapper; import org.apache.arrow.flatbuf.Date; import org.apache.arrow.flatbuf.DateUnit; +import org.apache.arrow.flatbuf.Field; import org.apache.arrow.flatbuf.FloatingPoint; import org.apache.arrow.flatbuf.Int; import org.apache.arrow.flatbuf.Precision; @@ -66,7 +67,7 @@ public class WebChunkReaderFactory implements ChunkReader.Factory { @SuppressWarnings("unchecked") @Override public > ChunkReader newReader( - @NotNull final BarrageTypeInfo typeInfo, + @NotNull final BarrageTypeInfo typeInfo, @NotNull final BarrageOptions options) { switch (typeInfo.arrowField().typeType()) { case Type.Int: { @@ -265,7 +266,8 @@ public > ChunkReader newReader( outChunk, outOffset, totalRows); } - final BarrageTypeInfo componentTypeInfo = new BarrageTypeInfo( + // noinspection DataFlowIssue + final BarrageTypeInfo componentTypeInfo = new BarrageTypeInfo<>( typeInfo.componentType(), typeInfo.componentType().getComponentType(), typeInfo.arrowField().children(0));