diff --git a/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java b/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java index f5dbdd8cf4b..47b79d9e683 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java @@ -400,6 +400,15 @@ public ColumnDefinition withDataType(@NotNull final Class : fromGenericType(name, newDataType, componentType, columnType); } + public ColumnDefinition withDataType( + @NotNull final Class newDataType, + @Nullable final Class newComponentType) { + // noinspection unchecked + return dataType == newDataType && componentType == newComponentType + ? (ColumnDefinition) this + : fromGenericType(name, newDataType, newComponentType, columnType); + } + public ColumnDefinition withName(@NotNull final String newName) { return newName.equals(name) ? this : new ColumnDefinition<>(newName, dataType, componentType, columnType); } diff --git a/engine/chunk/src/main/java/io/deephaven/chunk/ResettableReadOnlyChunk.java b/engine/chunk/src/main/java/io/deephaven/chunk/ResettableReadOnlyChunk.java index 71bb522b9ad..eab6c4216ed 100644 --- a/engine/chunk/src/main/java/io/deephaven/chunk/ResettableReadOnlyChunk.java +++ b/engine/chunk/src/main/java/io/deephaven/chunk/ResettableReadOnlyChunk.java @@ -10,7 +10,7 @@ * {@link Chunk} that may have its backing storage reset to a slice of that belonging to another {@link Chunk} or a * native array. */ -public interface ResettableReadOnlyChunk extends ResettableChunk, PoolableChunk { +public interface ResettableReadOnlyChunk extends ResettableChunk, PoolableChunk { /** * Reset the data and bounds of this chunk to a range or sub-range of the specified {@link Chunk}. diff --git a/engine/chunk/src/main/java/io/deephaven/chunk/ResettableWritableChunk.java b/engine/chunk/src/main/java/io/deephaven/chunk/ResettableWritableChunk.java index 0c24d2cafe4..4aa25479103 100644 --- a/engine/chunk/src/main/java/io/deephaven/chunk/ResettableWritableChunk.java +++ b/engine/chunk/src/main/java/io/deephaven/chunk/ResettableWritableChunk.java @@ -11,7 +11,7 @@ * {@link WritableChunk} or a native array. */ public interface ResettableWritableChunk - extends ResettableChunk, WritableChunk, PoolableChunk { + extends ResettableChunk, WritableChunk, PoolableChunk { @Override WritableChunk resetFromChunk(WritableChunk other, int offset, int capacity); diff --git a/engine/chunk/src/main/java/io/deephaven/chunk/WritableChunk.java b/engine/chunk/src/main/java/io/deephaven/chunk/WritableChunk.java index 43da8c2c351..dc4a2f7a344 100644 --- a/engine/chunk/src/main/java/io/deephaven/chunk/WritableChunk.java +++ b/engine/chunk/src/main/java/io/deephaven/chunk/WritableChunk.java @@ -14,7 +14,7 @@ * * @param Descriptive attribute that applies to the elements stored within this WritableChunk */ -public interface WritableChunk extends Chunk, PoolableChunk { +public interface WritableChunk extends Chunk, PoolableChunk { @Override WritableChunk slice(int offset, int capacity); diff --git a/engine/chunk/src/main/java/io/deephaven/chunk/util/pools/PoolableChunk.java b/engine/chunk/src/main/java/io/deephaven/chunk/util/pools/PoolableChunk.java index 9d4545df4de..d6c8df997ad 100644 --- a/engine/chunk/src/main/java/io/deephaven/chunk/util/pools/PoolableChunk.java +++ b/engine/chunk/src/main/java/io/deephaven/chunk/util/pools/PoolableChunk.java @@ -4,11 +4,12 @@ package io.deephaven.chunk.util.pools; import io.deephaven.chunk.Chunk; +import io.deephaven.chunk.attributes.Any; import io.deephaven.util.SafeCloseable; /** * Marker interface for {@link Chunk} subclasses that can be kept with in a {@link ChunkPool}, and whose * {@link #close()} method will return them to the appropriate pool. */ -public interface PoolableChunk extends SafeCloseable { +public interface PoolableChunk extends Chunk, SafeCloseable { } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index 88d05fdbf92..94544aeec38 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -1411,7 +1411,12 @@ private static boolean snapshotColumnsParallel( final ExecutionContext executionContext, @NotNull final BarrageMessage snapshot) { final JobScheduler jobScheduler = new OperationInitializerJobScheduler(); - final CompletableFuture waitForParallelSnapshot = new CompletableFuture<>(); + final CompletableFuture waitForParallelSnapshot = new CompletableFuture<>() { + @Override + public boolean completeExceptionally(Throwable ex) { + return super.completeExceptionally(ex); + } + }; jobScheduler.iterateParallel( executionContext, logOutput -> logOutput.append("snapshotColumnsParallel"), diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ReinterpretUtils.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ReinterpretUtils.java index 012f783c53c..058d48a267f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ReinterpretUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ReinterpretUtils.java @@ -4,6 +4,7 @@ package io.deephaven.engine.table.impl.sources; import io.deephaven.chunk.ChunkType; +import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.WritableColumnSource; import org.jetbrains.annotations.NotNull; @@ -212,6 +213,27 @@ public static ColumnSource[] maybeConvertToPrimitive(@NotNull final ColumnSou return result; } + /** + * If {@code columnDefinition.getDataType()} or {@code columnDefinition.getComponentType} are something that we + * prefer to handle as a primitive, do the appropriate conversion. + * + * @param columnDefinition The column definition to convert + * @return if possible, {@code columnDefinition} converted to a primitive, otherewise {@code columnDefinition} + */ + @NotNull + public static ColumnDefinition maybeConvertToPrimitive(@NotNull final ColumnDefinition columnDefinition) { + final Class dataType = ReinterpretUtils.maybeConvertToPrimitiveDataType(columnDefinition.getDataType()); + Class componentType = columnDefinition.getComponentType(); + if (componentType != null) { + componentType = ReinterpretUtils.maybeConvertToPrimitiveDataType(componentType); + } + if (columnDefinition.getDataType() == dataType + && columnDefinition.getComponentType() == componentType) { + return columnDefinition; + } + return columnDefinition.withDataType(dataType, componentType); + } + /** * If {@code source} is something that we prefer to handle as a primitive, do the appropriate conversion. * diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java index 6188cf18440..f07d7ac3f3e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java @@ -149,7 +149,8 @@ public > ChunkReader newReader( // TODO (deephaven/deephaven-core#6038): these arrow types require 64-bit offsets if (typeId == ArrowType.ArrowTypeID.LargeUtf8 || typeId == ArrowType.ArrowTypeID.LargeBinary - || typeId == ArrowType.ArrowTypeID.LargeList) { + || typeId == ArrowType.ArrowTypeID.LargeList + || typeId == ArrowType.ArrowTypeID.LargeListView) { throw new UnsupportedOperationException(String.format( "No support for 64-bit offsets to map arrow type %s to %s.", field.getType().toString(), @@ -184,13 +185,15 @@ public > ChunkReader newReader( } if (typeId == ArrowType.ArrowTypeID.List + || typeId == ArrowType.ArrowTypeID.ListView || typeId == ArrowType.ArrowTypeID.FixedSizeList) { - // TODO (deephaven/deephaven-core#5947): Add SPARSE branch for ListView int fixedSizeLength = 0; final ListChunkReader.Mode mode; if (typeId == ArrowType.ArrowTypeID.List) { mode = ListChunkReader.Mode.DENSE; + } else if (typeId == ArrowType.ArrowTypeID.ListView) { + mode = ListChunkReader.Mode.SPARSE; } else { mode = ListChunkReader.Mode.FIXED; fixedSizeLength = ((ArrowType.FixedSizeList) field.getType()).getListSize(); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java index 2bd62751123..446777145f8 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java @@ -36,6 +36,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.jetbrains.annotations.NotNull; +import org.jpy.PyObject; import java.io.DataOutput; import java.io.IOException; @@ -84,6 +85,7 @@ protected DefaultChunkWriterFactory() { DefaultChunkWriterFactory::timestampFromZonedDateTime); register(ArrowType.ArrowTypeID.Utf8, String.class, DefaultChunkWriterFactory::utf8FromString); register(ArrowType.ArrowTypeID.Utf8, Object.class, DefaultChunkWriterFactory::utf8FromObject); + register(ArrowType.ArrowTypeID.Utf8, PyObject.class, DefaultChunkWriterFactory::utf8FromPyObject); register(ArrowType.ArrowTypeID.Utf8, ArrayPreview.class, DefaultChunkWriterFactory::utf8FromObject); register(ArrowType.ArrowTypeID.Utf8, DisplayWrapper.class, DefaultChunkWriterFactory::utf8FromObject); register(ArrowType.ArrowTypeID.Duration, long.class, DefaultChunkWriterFactory::durationFromLong); @@ -392,6 +394,12 @@ private static ChunkWriter> utf8FromObject( return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); } + private static ChunkWriter> utf8FromPyObject( + final ArrowType arrowType, + final BarrageTypeInfo typeInfo) { + return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); + } + private static ChunkWriter> durationFromLong( final ArrowType arrowType, final BarrageTypeInfo typeInfo) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index 32c6578d578..894fae3692d 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -543,20 +543,31 @@ public ChunkReader[] computeChunkReaders( @NotNull final ChunkReader.Factory chunkReaderFactory, @NotNull final org.apache.arrow.flatbuf.Schema schema, @NotNull final BarrageOptions barrageOptions) { + return computeChunkReaders(chunkReaderFactory, schema, barrageOptions, false); + } + + public ChunkReader[] computePrimitiveChunkReaders( + @NotNull final ChunkReader.Factory chunkReaderFactory, + @NotNull final org.apache.arrow.flatbuf.Schema schema, + @NotNull final BarrageOptions barrageOptions) { + return computeChunkReaders(chunkReaderFactory, schema, barrageOptions, true); + } + + private ChunkReader[] computeChunkReaders( + @NotNull final ChunkReader.Factory chunkReaderFactory, + @NotNull final org.apache.arrow.flatbuf.Schema schema, + @NotNull final BarrageOptions barrageOptions, + final boolean convertToPrimitive) { // noinspection unchecked final ChunkReader[] readers = (ChunkReader[]) new ChunkReader[tableDef.numColumns()]; final List> columns = tableDef.getColumns(); for (int ii = 0; ii < tableDef.numColumns(); ++ii) { - // final ColumnDefinition columnDefinition = columns.get(ii); - // final BarrageTypeInfo typeInfo = typeInfo( - // ReinterpretUtils.maybeConvertToWritablePrimitiveChunkType(columnDefinition.getDataType()), - // columnDefinition.getDataType(), - // columnDefinition.getComponentType(), - // schema.fields(ii)); - // readers[ii] = DefaultChunkReadingFactory.INSTANCE.getReader(barrageOptions, factor, typeInfo); - throw new UnsupportedOperationException("TODO NOCOMMIT NATE"); + final ColumnDefinition columnDefinition = ReinterpretUtils.maybeConvertToPrimitive(columns.get(ii)); + final BarrageTypeInfo typeInfo = BarrageTypeInfo.make( + columnDefinition.getDataType(), columnDefinition.getComponentType(), schema.fields(ii)); + readers[ii] = chunkReaderFactory.newReader(typeInfo, barrageOptions); } return readers; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java index 395ea92972a..c4904ee7951 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java @@ -21,6 +21,7 @@ import io.deephaven.engine.table.impl.chunkboxer.ChunkBoxer; import io.deephaven.engine.table.impl.locations.*; import io.deephaven.engine.table.impl.locations.impl.*; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.sources.regioned.*; import io.deephaven.extensions.barrage.BarrageOptions; import io.deephaven.extensions.barrage.chunk.ChunkReader; @@ -449,15 +450,17 @@ public List> getColumnValues( .reduce((a, b) -> a + ", " + b).orElse("")))); return; } - if (!columnDefinition.isCompatible(schemaPlus.tableDef.getColumns().get(0))) { + final ColumnDefinition dataColumn = ReinterpretUtils.maybeConvertToPrimitive( + schemaPlus.tableDef.getColumns().get(0)); + if (!columnDefinition.isCompatible(dataColumn)) { asyncState.setError(new IllegalArgumentException(String.format( "Received incompatible column definition. Expected %s, but received %s.", - columnDefinition, schemaPlus.tableDef.getColumns().get(0)))); + columnDefinition, dataColumn))); return; } final ArrayList> resultChunks = new ArrayList<>(messages.length - 1); - final ChunkReader reader = schemaPlus.computeChunkReaders( + final ChunkReader reader = schemaPlus.computePrimitiveChunkReaders( chunkReaderFactory, schema, streamReaderOptions)[0]; int mi = 1; try { @@ -898,7 +901,7 @@ private class TableServiceGetRangeAdapter implements AppendOnlyRegionAccessor columnDefinition; public TableServiceGetRangeAdapter(@NotNull ColumnDefinition columnDefinition) { - this.columnDefinition = columnDefinition; + this.columnDefinition = ReinterpretUtils.maybeConvertToPrimitive(columnDefinition); } @Override