From 8875c0b40260c75cee0161a02c01ac04f3c49838 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Wed, 4 Dec 2024 17:54:48 -0700 Subject: [PATCH] More BugFixes --- .../barrage/chunk/BaseChunkWriter.java | 15 +++++---- .../barrage/chunk/BooleanChunkReader.java | 31 ++++++++++++++++++- .../chunk/DefaultChunkReaderFactory.java | 13 +++++++- .../chunk/TransformingChunkReader.java | 1 + .../barrage/chunk/UnionChunkReader.java | 6 ++-- .../barrage/table/BarrageTable.java | 1 + .../util/BarrageMessageReaderImpl.java | 5 +-- 7 files changed, 59 insertions(+), 13 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java index 33300a800b3..0a1e0f94046 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java @@ -124,14 +124,17 @@ public int available() throws IOException { } /** - * There are two cases we don't send a validity buffer: - the simplest case is following the arrow flight spec, - * which says that if there are no nulls present, the buffer is optional. - Our implementation of nullCount() - * for primitive types will return zero if the useDeephavenNulls flag is set, so the buffer will also be omitted - * in that case. The client's marshaller does not need to be aware of deephaven nulls but in this mode we assume - * the consumer understands which value is the assigned NULL. + * @formatter:off + * There are two cases we don't send a validity buffer: + * - the simplest case is following the arrow flight spec, which says that if there are no nulls present, the + * buffer is optional. + * - Our implementation of nullCount() for primitive types will return zero if the useDeephavenNulls flag is + * set, so the buffer will also be omitted in that case. The client's marshaller does not need to be aware of + * deephaven nulls but in this mode we assume the consumer understands which value is the assigned NULL. + * @formatter:on */ protected boolean sendValidityBuffer() { - return !fieldNullable || nullCount() != 0; + return fieldNullable && nullCount() != 0; } @Override diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java index 2c289e4e859..239acab83cf 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java @@ -3,9 +3,11 @@ // package io.deephaven.extensions.barrage.chunk; +import io.deephaven.base.verify.Assert; import io.deephaven.chunk.WritableByteChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.util.BooleanUtils; import io.deephaven.util.datastructures.LongSizedDataStructure; @@ -16,6 +18,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.PrimitiveIterator; +import java.util.function.Function; import static io.deephaven.extensions.barrage.chunk.BaseChunkWriter.getNumLongsForBitPackOfSize; @@ -39,6 +42,33 @@ public BooleanChunkReader(ByteConversion conversion) { this.conversion = conversion; } + public ChunkReader> transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableByteChunk inner = BooleanChunkReader.this.readChunk( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + byte value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + chunk.setSize(outOffset + inner.size()); + + return chunk; + } + }; + } + @Override public WritableByteChunk readChunk( @NotNull final Iterator fieldNodeIter, @@ -99,7 +129,6 @@ public WritableByteChunk readChunk( return chunk; } - private static void useValidityBuffer( final ByteConversion conversion, final DataInput is, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java index 9fe71ffdd0f..f17f64183df 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java @@ -25,6 +25,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.BooleanUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.type.TypeUtils; import io.deephaven.vector.Vector; @@ -275,7 +276,17 @@ public > ChunkReader newReaderPojo( if (typeId == ArrowType.ArrowTypeID.Union) { final ArrowType.Union unionType = (ArrowType.Union) field.getType(); - final List>> innerReaders = new ArrayList<>(); + final List>> innerReaders = new ArrayList<>(); + + for (int ii = 0; ii < field.getChildren().size(); ++ii) { + final Field childField = field.getChildren().get(ii); + final BarrageTypeInfo childTypeInfo = BarrageUtil.getDefaultType(childField); + ChunkReader> childReader = newReaderPojo(childTypeInfo, options, false); + if (childField.getType().getTypeID() == ArrowType.ArrowTypeID.Bool) { + childReader = ((BooleanChunkReader) childReader).transform(BooleanUtils::byteAsBoolean); + } + innerReaders.add(childReader); + } // noinspection unchecked return (ChunkReader) new UnionChunkReader( diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java index 931894da676..6689e9e45ac 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java @@ -63,6 +63,7 @@ public OutputChunkType readChunk( for (int ii = 0; ii < wireValues.size(); ++ii) { transformFunction.apply(wireValues, chunk, ii, outOffset + ii); } + chunk.setSize(outOffset + wireValues.size()); return chunk; } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java index 294ea850892..e11f28d2abe 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java @@ -37,11 +37,11 @@ public static Mode mode(UnionMode mode) { private static final String DEBUG_NAME = "UnionChunkReader"; private final Mode mode; - private final List>> readers; + private final List>> readers; public UnionChunkReader( final Mode mode, - final List>> readers) { + final List>> readers) { this.mode = mode; this.readers = readers; // the specification doesn't allow the union column to have more than signed byte number of types @@ -65,7 +65,7 @@ public WritableObjectChunk readChunk( int numRows = nodeInfo.numElements; if (numRows == 0) { is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, coiBufferLength + offsetsBufferLength)); - for (final ChunkReader> reader : readers) { + for (final ChunkReader> reader : readers) { // noinspection EmptyTryBlock try (final SafeCloseable ignored = reader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { // do nothing; we need each reader to consume fieldNodeIter and bufferInfoIter diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index c153155f830..7ebce9f43d6 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -460,6 +460,7 @@ public static BarrageTable make( return value instanceof Boolean && (Boolean) value; }; + schema.attributes.put(Table.BARRAGE_SCHEMA_ATTRIBUTE, schema.arrowSchema); if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) { final LinkedHashMap> finalColumns = makeColumns(schema, writableSources); table = new BarrageBlinkTable( diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java index 81edb2241ee..952e27e36d0 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java @@ -265,11 +265,12 @@ public BarrageMessage safelyParseFrom(final BarrageOptions options, } // fill the chunk with data and assign back into the array - chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, chunk.size(), + final int origSize = chunk.size(); + chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, origSize, (int) batch.length()); acd.data.set(lastChunkIndex, chunk); if (!options.columnsAsList()) { - chunk.setSize(chunk.size() + (int) batch.length()); + chunk.setSize(origSize + (int) batch.length()); } }