Skip to content

Commit

Permalink
More BugFixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 5, 2024
1 parent c5000e7 commit 8875c0b
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,17 @@ public int available() throws IOException {
}

/**
* There are two cases we don't send a validity buffer: - the simplest case is following the arrow flight spec,
* which says that if there are no nulls present, the buffer is optional. - Our implementation of nullCount()
* for primitive types will return zero if the useDeephavenNulls flag is set, so the buffer will also be omitted
* in that case. The client's marshaller does not need to be aware of deephaven nulls but in this mode we assume
* the consumer understands which value is the assigned NULL.
* @formatter:off
* There are two cases we don't send a validity buffer:
* - the simplest case is following the arrow flight spec, which says that if there are no nulls present, the
* buffer is optional.
* - Our implementation of nullCount() for primitive types will return zero if the useDeephavenNulls flag is
* set, so the buffer will also be omitted in that case. The client's marshaller does not need to be aware of
* deephaven nulls but in this mode we assume the consumer understands which value is the assigned NULL.
* @formatter:on
*/
protected boolean sendValidityBuffer() {
return !fieldNullable || nullCount() != 0;
return fieldNullable && nullCount() != 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;
Expand All @@ -16,6 +18,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;

import static io.deephaven.extensions.barrage.chunk.BaseChunkWriter.getNumLongsForBitPackOfSize;

Expand All @@ -39,6 +42,33 @@ public BooleanChunkReader(ByteConversion conversion) {
this.conversion = conversion;
}

public <T> ChunkReader<WritableObjectChunk<T, Values>> transform(Function<Byte, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableByteChunk<Values> inner = BooleanChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}
chunk.setSize(outOffset + inner.size());

return chunk;
}
};
}

@Override
public WritableByteChunk<Values> readChunk(
@NotNull final Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
Expand Down Expand Up @@ -99,7 +129,6 @@ public WritableByteChunk<Values> readChunk(
return chunk;
}


private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
Expand Down Expand Up @@ -275,7 +276,17 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReaderPojo(

if (typeId == ArrowType.ArrowTypeID.Union) {
final ArrowType.Union unionType = (ArrowType.Union) field.getType();
final List<ChunkReader<WritableChunk<Values>>> innerReaders = new ArrayList<>();
final List<ChunkReader<? extends WritableChunk<Values>>> innerReaders = new ArrayList<>();

for (int ii = 0; ii < field.getChildren().size(); ++ii) {
final Field childField = field.getChildren().get(ii);
final BarrageTypeInfo<Field> childTypeInfo = BarrageUtil.getDefaultType(childField);
ChunkReader<? extends WritableChunk<Values>> childReader = newReaderPojo(childTypeInfo, options, false);
if (childField.getType().getTypeID() == ArrowType.ArrowTypeID.Bool) {
childReader = ((BooleanChunkReader) childReader).transform(BooleanUtils::byteAsBoolean);
}
innerReaders.add(childReader);
}

// noinspection unchecked
return (ChunkReader<T>) new UnionChunkReader<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public OutputChunkType readChunk(
for (int ii = 0; ii < wireValues.size(); ++ii) {
transformFunction.apply(wireValues, chunk, ii, outOffset + ii);
}
chunk.setSize(outOffset + wireValues.size());
return chunk;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ public static Mode mode(UnionMode mode) {
private static final String DEBUG_NAME = "UnionChunkReader";

private final Mode mode;
private final List<ChunkReader<WritableChunk<Values>>> readers;
private final List<ChunkReader<? extends WritableChunk<Values>>> readers;

public UnionChunkReader(
final Mode mode,
final List<ChunkReader<WritableChunk<Values>>> readers) {
final List<ChunkReader<? extends WritableChunk<Values>>> readers) {
this.mode = mode;
this.readers = readers;
// the specification doesn't allow the union column to have more than signed byte number of types
Expand All @@ -65,7 +65,7 @@ public WritableObjectChunk<T, Values> readChunk(
int numRows = nodeInfo.numElements;
if (numRows == 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, coiBufferLength + offsetsBufferLength));
for (final ChunkReader<WritableChunk<Values>> reader : readers) {
for (final ChunkReader<? extends WritableChunk<Values>> reader : readers) {
// noinspection EmptyTryBlock
try (final SafeCloseable ignored = reader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
// do nothing; we need each reader to consume fieldNodeIter and bufferInfoIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ public static BarrageTable make(
return value instanceof Boolean && (Boolean) value;
};

schema.attributes.put(Table.BARRAGE_SCHEMA_ATTRIBUTE, schema.arrowSchema);
if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) {
final LinkedHashMap<String, ColumnSource<?>> finalColumns = makeColumns(schema, writableSources);
table = new BarrageBlinkTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ public BarrageMessage safelyParseFrom(final BarrageOptions options,
}

// fill the chunk with data and assign back into the array
chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, chunk.size(),
final int origSize = chunk.size();
chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, origSize,
(int) batch.length());
acd.data.set(lastChunkIndex, chunk);
if (!options.columnsAsList()) {
chunk.setSize(chunk.size() + (int) batch.length());
chunk.setSize(origSize + (int) batch.length());
}
}

Expand Down

0 comments on commit 8875c0b

Please sign in to comment.