Skip to content

Commit

Permalink
java client support for column as list feature
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 2, 2024
1 parent 4ba36f0 commit aa45bcd
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 12 deletions.
1 change: 0 additions & 1 deletion cpp-client/deephaven/tests/src/ticking_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ class WaitForPopulatedTableCallback final : public CommonBase {
};

TEST_CASE("Ticking Table: all the data is eventually present", "[ticking]") {
if (true) return;
const int64_t target = 10;
auto client = TableMakerForTests::CreateClient();
auto tm = client.GetManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ protected DefaultChunkReaderFactory() {
public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
@NotNull final BarrageTypeInfo typeInfo,
@NotNull final BarrageOptions options) {
return newReader(typeInfo, options, true);
}

public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
@NotNull final BarrageTypeInfo typeInfo,
@NotNull final BarrageOptions options,
final boolean isTopLevel) {
// TODO (deephaven/deephaven-core#6033): Run-End Support
// TODO (deephaven/deephaven-core#6034): Dictionary Support

Expand Down Expand Up @@ -231,7 +238,12 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
} else {
kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentTypeInfo.type());
}
final ChunkReader<WritableChunk<Values>> componentReader = newReader(componentTypeInfo, options);
final ChunkReader<WritableChunk<Values>> componentReader = newReader(componentTypeInfo, options, false);

if (isTopLevel && options.columnsAsList()) {
// noinspection unchecked
return (ChunkReader<T>) new SingleElementListHeaderReader<>(kernel, componentReader);
}

// noinspection unchecked
return (ChunkReader<T>) new ListChunkReader<>(mode, fixedSizeLength, kernel, componentReader);
Expand All @@ -242,8 +254,8 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
final BarrageTypeInfo keyTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(0));
final BarrageTypeInfo valueTypeInfo = BarrageUtil.getDefaultType(structField.getChildren().get(1));

final ChunkReader<WritableChunk<Values>> keyReader = newReader(keyTypeInfo, options);
final ChunkReader<WritableChunk<Values>> valueReader = newReader(valueTypeInfo, options);
final ChunkReader<WritableChunk<Values>> keyReader = newReader(keyTypeInfo, options, false);
final ChunkReader<WritableChunk<Values>> valueReader = newReader(valueTypeInfo, options, false);

// noinspection unchecked
return (ChunkReader<T>) new MapChunkReader<>(keyReader, valueReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public <T extends Chunk<Values>> ChunkWriter<T> newWriter(
// TODO (deephaven/deephaven-core#6033): Run-End Support
// TODO (deephaven/deephaven-core#6034): Dictionary Support

final Field field = Field.convertField(typeInfo.arrowField());
final Field field = Field.convertField(typeInfo.arrowField());

final ArrowType.ArrowTypeID typeId = field.getType().getTypeID();
final boolean isSpecialType = DefaultChunkReaderFactory.SPECIAL_TYPES.contains(typeId);
Expand Down Expand Up @@ -262,7 +262,7 @@ public <T extends Chunk<Values>> ChunkWriter<T> newWriter(
}

// TODO: if (typeId == ArrowType.ArrowTypeID.Struct) {
// expose transformer API of Map<String, Chunk<Values>> -> T
// expose transformer API of Map<String, Chunk<Values>> -> T

if (typeId == ArrowType.ArrowTypeID.Union) {
final ArrowType.Union unionType = (ArrowType.Union) field.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Context(
offsets = WritableIntChunk.makeWritableChunk(numOffsets);
offsets.add(0);
for (int ii = 0; ii < chunk.size(); ++ii) {
numInnerElements += ((Map<?, ?>)chunk.get(ii)).size();
numInnerElements += ((Map<?, ?>) chunk.get(ii)).size();
offsets.add(numInnerElements);
}

Expand All @@ -79,7 +79,7 @@ public Context(
WritableObjectChunk.makeWritableChunk(numInnerElements);
valueObjChunk.setSize(0);
for (int ii = 0; ii < chunk.size(); ++ii) {
((Map<?, ?>)chunk.get(ii)).forEach((key, value) -> {
((Map<?, ?>) chunk.get(ii)).forEach((key, value) -> {
keyObjChunk.add(key);
valueObjChunk.add(value);
});
Expand All @@ -92,8 +92,8 @@ public Context(
} else {
// note that we do not close the unboxer since we steal the inner chunk and pass to key context
// noinspection unchecked
keyChunk = (WritableChunk<Values>)
ChunkUnboxer.getUnboxer(keyWriterChunkType, keyObjChunk.capacity()).unbox(keyObjChunk);
keyChunk = (WritableChunk<Values>) ChunkUnboxer.getUnboxer(keyWriterChunkType, keyObjChunk.capacity())
.unbox(keyObjChunk);
keyObjChunk.close();
}
keyContext = keyWriter.makeContext(keyChunk, 0);
Expand All @@ -105,8 +105,8 @@ public Context(
} else {
// note that we do not close the unboxer since we steal the inner chunk and pass to value context
// noinspection unchecked
valueChunk = (WritableChunk<Values>)
ChunkUnboxer.getUnboxer(valueWriterChunkType, valueObjChunk.capacity()).unbox(valueObjChunk);
valueChunk = (WritableChunk<Values>) ChunkUnboxer
.getUnboxer(valueWriterChunkType, valueObjChunk.capacity()).unbox(valueObjChunk);
valueObjChunk.close();
}
valueContext = valueWriter.makeContext(valueChunk, 0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;

public class SingleElementListHeaderReader<READ_CHUNK_TYPE extends WritableChunk<Values>>
extends BaseChunkReader<READ_CHUNK_TYPE> {
private static final String DEBUG_NAME = "SingleElementListHeaderReader";

private final ExpansionKernel<?> kernel;
private final ChunkReader<READ_CHUNK_TYPE> componentReader;

public SingleElementListHeaderReader(
final ExpansionKernel<?> kernel,
final ChunkReader<READ_CHUNK_TYPE> componentReader) {
this.componentReader = componentReader;
this.kernel = kernel;
}

@Override
public READ_CHUNK_TYPE readChunk(
@NotNull final Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
@NotNull final PrimitiveIterator.OfLong bufferInfoIter,
@NotNull final DataInput is,
@Nullable final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
final ChunkWriter.FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBufferLength = bufferInfoIter.nextLong();
final long offsetsBufferLength = bufferInfoIter.nextLong();

if (nodeInfo.numElements == 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBufferLength + offsetsBufferLength));
return componentReader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0);
}

// skip validity buffer:
int jj = 0;
if (validityBufferLength > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBufferLength));
}

// skip offsets:
if (offsetsBufferLength > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBufferLength));
}

return componentReader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0);
}
}

0 comments on commit aa45bcd

Please sign in to comment.