Skip to content

Commit

Permalink
refactor: Barrage stream reading into Chunks (deephaven#5692)
Browse files Browse the repository at this point in the history
Like deephaven#5552, applies some after-the-fact review of the design for reading
Barrage/Flight stream, in anticipation of sharing this code with
JavaScript clients.

 * Removes unused BarrageChunkAppendingMarshaller
 * Adds more error detail when stream contents don't match metadata
 * Removes an unused parameter when parsing Flight messages
 * Inlines width of various types into their readers
 * Introduces an interface for reading data into chunks, and a factory
interface to allow JS clients to supply their own implementations.

Partial deephaven#188
  • Loading branch information
niloc132 authored Jul 18, 2024
1 parent 2985690 commit 90b9283
Show file tree
Hide file tree
Showing 35 changed files with 2,222 additions and 2,245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -15,14 +14,10 @@
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableLongChunk;
import org.jetbrains.annotations.Nullable;

import java.io.DataInput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.util.QueryConstants.*;

Expand Down Expand Up @@ -153,130 +148,4 @@ public int drainTo(final OutputStream outputStream) throws IOException {
return LongSizedDataStructure.intSize(DEBUG_NAME, bytesWritten);
}
}

@FunctionalInterface
public interface ByteConversion {
byte apply(byte in);

ByteConversion IDENTITY = (byte a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
return extractChunkFromInputStreamWithConversion(
options, ByteConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final StreamReaderOptions options,
final ByteConversion conversion,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

if (nodeInfo.numElements == 0) {
return chunk;
}

final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) {
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

final int numPayloadBytesNeeded = (int) ((nodeInfo.numElements + 7L) / 8L);
if (payloadBuffer < numPayloadBytesNeeded) {
throw new IllegalStateException("payload buffer is too short for expected number of elements");
}

// cannot use deephaven nulls as booleans are not nullable
useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid);

// flight requires that the payload buffer be padded to multiples of 8 bytes
final long payloadRead = getNumLongsForBitPackOfSize(nodeInfo.numElements) * 8L;
final long overhangPayload = payloadBuffer - payloadRead;
if (overhangPayload > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload));
}
}

return chunk;
}

private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;

int ei = 0;
int pendingSkips = 0;

for (int vi = 0; vi < numValidityWords; ++vi) {
int bitsLeftInThisWord = Math.min(64, numElements - vi * 64);
long validityWord = isValid.get(vi);
long payloadWord = is.readLong();
do {
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
final byte value = (payloadWord & 1) == 1 ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE
: BooleanUtils.FALSE_BOOLEAN_AS_BYTE;
chunk.set(offset + ei++, conversion.apply(value));
validityWord >>= 1;
payloadWord >>= 1;
bitsLeftInThisWord--;
} else {
final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord);
pendingSkips += skips;
validityWord >>= skips;
payloadWord >>= skips;
bitsLeftInThisWord -= skips;
}
} while (bitsLeftInThisWord > 0);
}

if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.extensions.barrage.chunk.BaseChunkInputStreamGenerator.getNumLongsForBitPackOfSize;

public class BooleanChunkReader implements ChunkReader {
private static final String DEBUG_NAME = "BooleanChunkReader";

@FunctionalInterface
public interface ByteConversion {
byte apply(byte in);

ByteConversion IDENTITY = (byte a) -> a;
}

private final ByteConversion conversion;

public BooleanChunkReader() {
this(ByteConversion.IDENTITY);
}

public BooleanChunkReader(ByteConversion conversion) {
this.conversion = conversion;
}

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

if (nodeInfo.numElements == 0) {
return chunk;
}

final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) {
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

final int numPayloadBytesNeeded = (int) ((nodeInfo.numElements + 7L) / 8L);
if (payloadBuffer < numPayloadBytesNeeded) {
throw new IllegalStateException("payload buffer is too short for expected number of elements");
}

// cannot use deephaven nulls as booleans are not nullable
useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid);

// flight requires that the payload buffer be padded to multiples of 8 bytes
final long payloadRead = getNumLongsForBitPackOfSize(nodeInfo.numElements) * 8L;
final long overhangPayload = payloadBuffer - payloadRead;
if (overhangPayload > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload));
}
}

return chunk;
}


private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;

int ei = 0;
int pendingSkips = 0;

for (int vi = 0; vi < numValidityWords; ++vi) {
int bitsLeftInThisWord = Math.min(64, numElements - vi * 64);
long validityWord = isValid.get(vi);
long payloadWord = is.readLong();
do {
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
final byte value = (payloadWord & 1) == 1 ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE
: BooleanUtils.FALSE_BOOLEAN_AS_BYTE;
chunk.set(offset + ei++, conversion.apply(value));
validityWord >>= 1;
payloadWord >>= 1;
bitsLeftInThisWord--;
} else {
final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord);
pendingSkips += skips;
validityWord >>= skips;
payloadWord >>= skips;
bitsLeftInThisWord -= skips;
}
} while (bitsLeftInThisWord > 0);
}

if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Loading

0 comments on commit 90b9283

Please sign in to comment.