Skip to content

Commit

Permalink
More Feedback + Tests + BugFixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 16, 2024
1 parent ae71435 commit 0400086
Show file tree
Hide file tree
Showing 28 changed files with 446 additions and 1,733 deletions.
2 changes: 2 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public interface Table extends
String BARRAGE_PERFORMANCE_KEY_ATTRIBUTE = "BarragePerformanceTableKey";
/**
* Set an Apache Arrow POJO Schema to this attribute to control the column encoding used for barrage serialization.
* <p>
* See {@code org.apache.arrow.vector.types.pojo.Schema}.
*/
String BARRAGE_SCHEMA_ATTRIBUTE = "BarrageSchema";

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
* data, supporting various data types and logical structures. This interface is part of the Deephaven Barrage
* extensions for handling streamed data ingestion.
*
* @param <ReadChunkType> The type of chunk being read, extending {@link WritableChunk} with {@link Values}.
* @param <READ_CHUNK_TYPE> The type of chunk being read, extending {@link WritableChunk} with {@link Values}.
*/
public interface ChunkReader<ReadChunkType extends WritableChunk<Values>> {
public interface ChunkReader<READ_CHUNK_TYPE extends WritableChunk<Values>> {

/**
* Supports creation of {@link ChunkReader} instances to use when processing a flight stream. JVM implementations
Expand Down Expand Up @@ -55,7 +55,7 @@ <T extends WritableChunk<Values>> ChunkReader<T> newReader(
* @throws IOException if an error occurred while reading the stream
*/
@FinalDefault
default ReadChunkType readChunk(
default READ_CHUNK_TYPE readChunk(
@NotNull Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
@NotNull PrimitiveIterator.OfLong bufferInfoIter,
@NotNull DataInput is) throws IOException {
Expand All @@ -74,7 +74,7 @@ default ReadChunkType readChunk(
* @return a Chunk containing the data from the stream
* @throws IOException if an error occurred while reading the stream
*/
ReadChunkType readChunk(
READ_CHUNK_TYPE readChunk(
@NotNull Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
@NotNull PrimitiveIterator.OfLong bufferInfoIter,
@NotNull DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,9 @@ private static ChunkReader<WritableObjectChunk<BigInteger, Values>> decimalToBig
}

BigInteger unscaledValue = new BigInteger(value);
if (scale == 0) {
return unscaledValue;
}
return unscaledValue.divide(BigInteger.TEN.pow(scale));
});
}
Expand Down Expand Up @@ -702,17 +705,22 @@ private static ChunkReader<WritableByteChunk<Values>> intToByte(
final BarrageOptions options) {
final ArrowType.Int intType = (ArrowType.Int) arrowType;
final int bitWidth = intType.getBitWidth();
final boolean unsigned = !intType.getIsSigned();

switch (bitWidth) {
case 8:
// note unsigned mappings to byte will overflow byte; but user has asked for this
// note unsigned mappings to byte will overflow; but user has asked for this
return new ByteChunkReader(options);
case 16:
// note shorts may overflow byte; but user has asked for this
if (unsigned) {
// note shorts may overflow; but user has asked for this
return ByteChunkReader.transformTo(new CharChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)));
}
return ByteChunkReader.transformTo(new ShortChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)));
case 32:
// note ints may overflow byte; but user has asked for this
// note ints may overflow; but user has asked for this
return ByteChunkReader.transformTo(new IntChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)));
case 64:
Expand All @@ -735,17 +743,19 @@ private static ChunkReader<WritableShortChunk<Values>> intToShort(
switch (bitWidth) {
case 8:
return ShortChunkReader.transformTo(new ByteChunkReader(options),
(chunk, ii) -> maskIfOverflow(unsigned,
Byte.BYTES, QueryLanguageFunctionUtils.shortCast(chunk.get(ii))));
(chunk, ii) -> maskIfOverflow(unsigned, QueryLanguageFunctionUtils.shortCast(chunk.get(ii))));
case 16:
// note unsigned mappings to short will overflow short; but user has asked for this
if (unsigned) {
return ShortChunkReader.transformTo(new CharChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)));
}
return new ShortChunkReader(options);
case 32:
// note ints may overflow short; but user has asked for this
// note ints may overflow; but user has asked for this
return ShortChunkReader.transformTo(new IntChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)));
case 64:
// note longs may overflow short; but user has asked for this
// note longs may overflow; but user has asked for this
return ShortChunkReader.transformTo(new LongChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)));
default:
Expand All @@ -767,6 +777,10 @@ private static ChunkReader<WritableIntChunk<Values>> intToInt(
(chunk, ii) -> maskIfOverflow(unsigned, Byte.BYTES,
QueryLanguageFunctionUtils.intCast(chunk.get(ii))));
case 16:
if (unsigned) {
return IntChunkReader.transformTo(new CharChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)));
}
return IntChunkReader.transformTo(new ShortChunkReader(options), (chunk, ii) -> maskIfOverflow(unsigned,
Short.BYTES, QueryLanguageFunctionUtils.intCast(chunk.get(ii))));
case 32:
Expand Down Expand Up @@ -795,6 +809,10 @@ private static ChunkReader<WritableLongChunk<Values>> intToLong(
(chunk, ii) -> maskIfOverflow(unsigned, Byte.BYTES,
QueryLanguageFunctionUtils.longCast(chunk.get(ii))));
case 16:
if (unsigned) {
return LongChunkReader.transformTo(new CharChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)));
}
return LongChunkReader.transformTo(new ShortChunkReader(options),
(chunk, ii) -> maskIfOverflow(unsigned,
Short.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii))));
Expand Down Expand Up @@ -822,6 +840,10 @@ private static ChunkReader<WritableObjectChunk<BigInteger, Values>> intToBigInt(
return transformToObject(new ByteChunkReader(options), (chunk, ii) -> toBigInt(maskIfOverflow(
unsigned, Byte.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))));
case 16:
if (unsigned) {
return transformToObject(new CharChunkReader(options),
(chunk, ii) -> toBigInt(QueryLanguageFunctionUtils.longCast(chunk.get(ii))));
}
return transformToObject(new ShortChunkReader(options), (chunk, ii) -> toBigInt(maskIfOverflow(
unsigned, Short.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))));
case 32:
Expand All @@ -848,6 +870,10 @@ private static ChunkReader<WritableFloatChunk<Values>> intToFloat(
return FloatChunkReader.transformTo(new ByteChunkReader(options),
(chunk, ii) -> floatCast(Byte.BYTES, signed, chunk.isNull(ii), chunk.get(ii)));
case 16:
if (!signed) {
return FloatChunkReader.transformTo(new CharChunkReader(options),
(chunk, ii) -> floatCast(Character.BYTES, signed, chunk.isNull(ii), chunk.get(ii)));
}
return FloatChunkReader.transformTo(new ShortChunkReader(options),
(chunk, ii) -> floatCast(Short.BYTES, signed, chunk.isNull(ii), chunk.get(ii)));
case 32:
Expand Down Expand Up @@ -898,6 +924,10 @@ private static ChunkReader<WritableDoubleChunk<Values>> intToDouble(
return DoubleChunkReader.transformTo(new ByteChunkReader(options),
(chunk, ii) -> doubleCast(Byte.BYTES, signed, chunk.isNull(ii), chunk.get(ii)));
case 16:
if (!signed) {
return DoubleChunkReader.transformTo(new CharChunkReader(options),
(chunk, ii) -> doubleCast(Character.BYTES, signed, chunk.isNull(ii), chunk.get(ii)));
}
return DoubleChunkReader.transformTo(new ShortChunkReader(options),
(chunk, ii) -> doubleCast(Short.BYTES, signed, chunk.isNull(ii), chunk.get(ii)));
case 32:
Expand Down Expand Up @@ -948,6 +978,10 @@ private static ChunkReader<WritableObjectChunk<BigDecimal, Values>> intToBigDeci
return transformToObject(new ByteChunkReader(options), (chunk, ii) -> toBigDecimal(maskIfOverflow(
unsigned, Byte.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))));
case 16:
if (unsigned) {
return transformToObject(new CharChunkReader(options), (chunk, ii) -> toBigDecimal(maskIfOverflow(
unsigned, Character.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))));
}
return transformToObject(new ShortChunkReader(options), (chunk, ii) -> toBigDecimal(maskIfOverflow(
unsigned, Short.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))));
case 32:
Expand Down Expand Up @@ -983,11 +1017,11 @@ private static ChunkReader<WritableCharChunk<Values>> intToChar(
(chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii)));
}
case 32:
// note unsigned mappings to char will overflow short; but user has asked for this
// note int mappings to char will overflow; but user has asked for this
return CharChunkReader.transformTo(new IntChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii)));
case 64:
// note unsigned mappings to short will overflow short; but user has asked for this
// note long mappings to short will overflow; but user has asked for this
return CharChunkReader.transformTo(new LongChunkReader(options),
(chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii)));
default:
Expand Down Expand Up @@ -1248,16 +1282,17 @@ private static BigDecimal toBigDecimal(final long value) {
* <p>
* Special handling is included to preserve the value of null-equivalent constants and to skip masking for signed
* values.
* <p>
* Note that short can only be sign extended from byte so we don't need to consider other numByte configurations.
*
* @param unsigned Whether the value should be treated as unsigned.
* @param numBytes The number of bytes to constrain the value to (e.g., 1 for byte, 2 for short).
* @param value The input value to potentially mask.
* @return The masked value if unsigned and overflow occurs; otherwise, the original value.
*/
@SuppressWarnings("SameParameterValue")
private static short maskIfOverflow(final boolean unsigned, final int numBytes, short value) {
private static short maskIfOverflow(final boolean unsigned, short value) {
if (unsigned && value != QueryConstants.NULL_SHORT) {
value &= (short) ((1L << (numBytes * 8)) - 1);
value &= (short) ((1L << 8) - 1);
}
return value;
}
Expand Down Expand Up @@ -1332,13 +1367,13 @@ private static BigInteger maskIfOverflow(final boolean unsigned, final int numBy
return value;
}

private interface ToObjectTransformFunction<T, WireChunkType extends WritableChunk<Values>> {
T get(WireChunkType wireValues, int wireOffset);
private interface ToObjectTransformFunction<T, WIRE_CHUNK_TYPE extends WritableChunk<Values>> {
T get(WIRE_CHUNK_TYPE wireValues, int wireOffset);
}

private static <T, WireChunkType extends WritableChunk<Values>, CR extends ChunkReader<WireChunkType>> ChunkReader<WritableObjectChunk<T, Values>> transformToObject(
private static <T, WIRE_CHUNK_TYPE extends WritableChunk<Values>, CR extends ChunkReader<WIRE_CHUNK_TYPE>> ChunkReader<WritableObjectChunk<T, Values>> transformToObject(
final CR wireReader,
final ToObjectTransformFunction<T, WireChunkType> wireTransform) {
final ToObjectTransformFunction<T, WIRE_CHUNK_TYPE> wireTransform) {
return new TransformingChunkReader<>(
wireReader,
WritableObjectChunk::makeWritableChunk,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.ShortChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableCharChunk;
import io.deephaven.chunk.WritableDoubleChunk;
import io.deephaven.chunk.WritableFloatChunk;
import io.deephaven.chunk.WritableIntChunk;
Expand Down Expand Up @@ -806,6 +807,15 @@ private static ChunkWriter<ByteChunk<Values>> intFromByte(
case 8:
return ByteChunkWriter.getIdentity(typeInfo.arrowField().isNullable());
case 16:
if (!intType.getIsSigned()) {
return new CharChunkWriter<>((ByteChunk<Values> source) -> {
final WritableCharChunk<Values> chunk = WritableCharChunk.makeWritableChunk(source.size());
for (int ii = 0; ii < source.size(); ++ii) {
chunk.set(ii, QueryLanguageFunctionUtils.charCast(source.get(ii)));
}
return chunk;
}, ByteChunk::getEmptyChunk, typeInfo.arrowField().isNullable());
}
return new ShortChunkWriter<>((ByteChunk<Values> source) -> {
final WritableShortChunk<Values> chunk = WritableShortChunk.makeWritableChunk(source.size());
for (int ii = 0; ii < source.size(); ++ii) {
Expand Down Expand Up @@ -849,6 +859,15 @@ private static ChunkWriter<ShortChunk<Values>> intFromShort(
return chunk;
}, ShortChunk::getEmptyChunk, typeInfo.arrowField().isNullable());
case 16:
if (!intType.getIsSigned()) {
return new CharChunkWriter<>((ShortChunk<Values> source) -> {
final WritableCharChunk<Values> chunk = WritableCharChunk.makeWritableChunk(source.size());
for (int ii = 0; ii < source.size(); ++ii) {
chunk.set(ii, QueryLanguageFunctionUtils.charCast(source.get(ii)));
}
return chunk;
}, ShortChunk::getEmptyChunk, typeInfo.arrowField().isNullable());
}
return ShortChunkWriter.getIdentity(typeInfo.arrowField().isNullable());
case 32:
return new IntChunkWriter<>((ShortChunk<Values> source) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
public class DoubleChunkReader extends BaseChunkReader<WritableDoubleChunk<Values>> {
private static final String DEBUG_NAME = "DoubleChunkReader";

public interface ToDoubleTransformFunction<WireChunkType extends WritableChunk<Values>> {
double get(WireChunkType wireValues, int wireOffset);
public interface ToDoubleTransformFunction<WIRE_CHUNK_TYPE extends WritableChunk<Values>> {
double get(WIRE_CHUNK_TYPE wireValues, int wireOffset);
}

public static <WireChunkType extends WritableChunk<Values>, T extends ChunkReader<WireChunkType>> ChunkReader<WritableDoubleChunk<Values>> transformTo(
public static <WIRE_CHUNK_TYPE extends WritableChunk<Values>, T extends ChunkReader<WIRE_CHUNK_TYPE>> ChunkReader<WritableDoubleChunk<Values>> transformTo(
final T wireReader,
final ToDoubleTransformFunction<WireChunkType> wireTransform) {
final ToDoubleTransformFunction<WIRE_CHUNK_TYPE> wireTransform) {
return new TransformingChunkReader<>(
wireReader,
WritableDoubleChunk::makeWritableChunk,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
public class FloatChunkReader extends BaseChunkReader<WritableFloatChunk<Values>> {
private static final String DEBUG_NAME = "FloatChunkReader";

public interface ToFloatTransformFunction<WireChunkType extends WritableChunk<Values>> {
float get(WireChunkType wireValues, int wireOffset);
public interface ToFloatTransformFunction<WIRE_CHUNK_TYPE extends WritableChunk<Values>> {
float get(WIRE_CHUNK_TYPE wireValues, int wireOffset);
}

public static <WireChunkType extends WritableChunk<Values>, T extends ChunkReader<WireChunkType>> ChunkReader<WritableFloatChunk<Values>> transformTo(
public static <WIRE_CHUNK_TYPE extends WritableChunk<Values>, T extends ChunkReader<WIRE_CHUNK_TYPE>> ChunkReader<WritableFloatChunk<Values>> transformTo(
final T wireReader,
final ToFloatTransformFunction<WireChunkType> wireTransform) {
final ToFloatTransformFunction<WIRE_CHUNK_TYPE> wireTransform) {
return new TransformingChunkReader<>(
wireReader,
WritableFloatChunk::makeWritableChunk,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Iterator;
import java.util.PrimitiveIterator;

public class NullChunkReader<ReadChunkType extends WritableChunk<Values>> extends BaseChunkReader<ReadChunkType> {
public class NullChunkReader<READ_CHUNK_TYPE extends WritableChunk<Values>> extends BaseChunkReader<READ_CHUNK_TYPE> {

private final ChunkType resultType;

Expand All @@ -23,7 +23,7 @@ public NullChunkReader(Class<?> destType) {
}

@Override
public ReadChunkType readChunk(
public READ_CHUNK_TYPE readChunk(
@NotNull final Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
@NotNull final PrimitiveIterator.OfLong bufferInfoIter,
@NotNull final DataInput is,
Expand All @@ -42,6 +42,6 @@ public ReadChunkType readChunk(
chunk.fillWithNullValue(0, nodeInfo.numElements);

// noinspection unchecked
return (ReadChunkType) chunk;
return (READ_CHUNK_TYPE) chunk;
}
}
Loading

0 comments on commit 0400086

Please sign in to comment.