Skip to content

Commit

Permalink
tmp some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 21, 2024
1 parent 6526a9f commit d940d97
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ public <Other> ColumnDefinition<Other> withDataType(@NotNull final Class<Other>
: fromGenericType(name, newDataType, componentType, columnType);
}

public <Other> ColumnDefinition<Other> withDataType(
@NotNull final Class<Other> newDataType,
@Nullable final Class<?> newComponentType) {
// noinspection unchecked
return dataType == newDataType && componentType == newComponentType
? (ColumnDefinition<Other>) this
: fromGenericType(name, newDataType, newComponentType, columnType);
}

public ColumnDefinition<?> withName(@NotNull final String newName) {
return newName.equals(name) ? this : new ColumnDefinition<>(newName, dataType, componentType, columnType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* {@link Chunk} that may have its backing storage reset to a slice of that belonging to another {@link Chunk} or a
* native array.
*/
public interface ResettableReadOnlyChunk<ATTR_BASE extends Any> extends ResettableChunk<ATTR_BASE>, PoolableChunk {
public interface ResettableReadOnlyChunk<ATTR_BASE extends Any> extends ResettableChunk<ATTR_BASE>, PoolableChunk<ATTR_BASE> {

/**
* Reset the data and bounds of this chunk to a range or sub-range of the specified {@link Chunk}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* {@link WritableChunk} or a native array.
*/
public interface ResettableWritableChunk<ATTR_BASE extends Any>
extends ResettableChunk<ATTR_BASE>, WritableChunk<ATTR_BASE>, PoolableChunk {
extends ResettableChunk<ATTR_BASE>, WritableChunk<ATTR_BASE>, PoolableChunk<ATTR_BASE> {

@Override
<ATTR extends ATTR_BASE> WritableChunk<ATTR> resetFromChunk(WritableChunk<ATTR> other, int offset, int capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
* @param <ATTR> Descriptive attribute that applies to the elements stored within this WritableChunk
*/
public interface WritableChunk<ATTR extends Any> extends Chunk<ATTR>, PoolableChunk {
public interface WritableChunk<ATTR extends Any> extends Chunk<ATTR>, PoolableChunk<ATTR> {
@Override
WritableChunk<ATTR> slice(int offset, int capacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
package io.deephaven.chunk.util.pools;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;

/**
* Marker interface for {@link Chunk} subclasses that can be kept with in a {@link ChunkPool}, and whose
* {@link #close()} method will return them to the appropriate pool.
*/
public interface PoolableChunk extends SafeCloseable {
public interface PoolableChunk<ATTR extends Any> extends Chunk<ATTR>, SafeCloseable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,12 @@ private static boolean snapshotColumnsParallel(
final ExecutionContext executionContext,
@NotNull final BarrageMessage snapshot) {
final JobScheduler jobScheduler = new OperationInitializerJobScheduler();
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>();
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>() {
@Override
public boolean completeExceptionally(Throwable ex) {
return super.completeExceptionally(ex);
}
};
jobScheduler.iterateParallel(
executionContext,
logOutput -> logOutput.append("snapshotColumnsParallel"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl.sources;

import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -212,6 +213,27 @@ public static ColumnSource<?>[] maybeConvertToPrimitive(@NotNull final ColumnSou
return result;
}

/**
* If {@code columnDefinition.getDataType()} or {@code columnDefinition.getComponentType} are something that we
* prefer to handle as a primitive, do the appropriate conversion.
*
* @param columnDefinition The column definition to convert
* @return if possible, {@code columnDefinition} converted to a primitive, otherewise {@code columnDefinition}
*/
@NotNull
public static ColumnDefinition<?> maybeConvertToPrimitive(@NotNull final ColumnDefinition<?> columnDefinition) {
final Class<?> dataType = ReinterpretUtils.maybeConvertToPrimitiveDataType(columnDefinition.getDataType());
Class<?> componentType = columnDefinition.getComponentType();
if (componentType != null) {
componentType = ReinterpretUtils.maybeConvertToPrimitiveDataType(componentType);
}
if (columnDefinition.getDataType() == dataType
&& columnDefinition.getComponentType() == componentType) {
return columnDefinition;
}
return columnDefinition.withDataType(dataType, componentType);
}

/**
* If {@code source} is something that we prefer to handle as a primitive, do the appropriate conversion.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
// TODO (deephaven/deephaven-core#6038): these arrow types require 64-bit offsets
if (typeId == ArrowType.ArrowTypeID.LargeUtf8
|| typeId == ArrowType.ArrowTypeID.LargeBinary
|| typeId == ArrowType.ArrowTypeID.LargeList) {
|| typeId == ArrowType.ArrowTypeID.LargeList
|| typeId == ArrowType.ArrowTypeID.LargeListView) {
throw new UnsupportedOperationException(String.format(
"No support for 64-bit offsets to map arrow type %s to %s.",
field.getType().toString(),
Expand Down Expand Up @@ -184,13 +185,15 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
}

if (typeId == ArrowType.ArrowTypeID.List
|| typeId == ArrowType.ArrowTypeID.ListView
|| typeId == ArrowType.ArrowTypeID.FixedSizeList) {

// TODO (deephaven/deephaven-core#5947): Add SPARSE branch for ListView
int fixedSizeLength = 0;
final ListChunkReader.Mode mode;
if (typeId == ArrowType.ArrowTypeID.List) {
mode = ListChunkReader.Mode.DENSE;
} else if (typeId == ArrowType.ArrowTypeID.ListView) {
mode = ListChunkReader.Mode.SPARSE;
} else {
mode = ListChunkReader.Mode.FIXED;
fixedSizeLength = ((ArrowType.FixedSizeList) field.getType()).getListSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.jetbrains.annotations.NotNull;
import org.jpy.PyObject;

import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -84,6 +85,7 @@ protected DefaultChunkWriterFactory() {
DefaultChunkWriterFactory::timestampFromZonedDateTime);
register(ArrowType.ArrowTypeID.Utf8, String.class, DefaultChunkWriterFactory::utf8FromString);
register(ArrowType.ArrowTypeID.Utf8, Object.class, DefaultChunkWriterFactory::utf8FromObject);
register(ArrowType.ArrowTypeID.Utf8, PyObject.class, DefaultChunkWriterFactory::utf8FromPyObject);
register(ArrowType.ArrowTypeID.Utf8, ArrayPreview.class, DefaultChunkWriterFactory::utf8FromObject);
register(ArrowType.ArrowTypeID.Utf8, DisplayWrapper.class, DefaultChunkWriterFactory::utf8FromObject);
register(ArrowType.ArrowTypeID.Duration, long.class, DefaultChunkWriterFactory::durationFromLong);
Expand Down Expand Up @@ -392,6 +394,12 @@ private static ChunkWriter<ObjectChunk<Object, Values>> utf8FromObject(
return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8)));
}

private static ChunkWriter<ObjectChunk<PyObject, Values>> utf8FromPyObject(
final ArrowType arrowType,
final BarrageTypeInfo typeInfo) {
return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8)));
}

private static ChunkWriter<LongChunk<Values>> durationFromLong(
final ArrowType arrowType,
final BarrageTypeInfo typeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,20 +543,31 @@ public ChunkReader<? extends Values>[] computeChunkReaders(
@NotNull final ChunkReader.Factory chunkReaderFactory,
@NotNull final org.apache.arrow.flatbuf.Schema schema,
@NotNull final BarrageOptions barrageOptions) {
return computeChunkReaders(chunkReaderFactory, schema, barrageOptions, false);
}

public ChunkReader<? extends Values>[] computePrimitiveChunkReaders(
@NotNull final ChunkReader.Factory chunkReaderFactory,
@NotNull final org.apache.arrow.flatbuf.Schema schema,
@NotNull final BarrageOptions barrageOptions) {
return computeChunkReaders(chunkReaderFactory, schema, barrageOptions, true);
}

private ChunkReader<? extends Values>[] computeChunkReaders(
@NotNull final ChunkReader.Factory chunkReaderFactory,
@NotNull final org.apache.arrow.flatbuf.Schema schema,
@NotNull final BarrageOptions barrageOptions,
final boolean convertToPrimitive) {
// noinspection unchecked
final ChunkReader<? extends Values>[] readers =
(ChunkReader<? extends Values>[]) new ChunkReader[tableDef.numColumns()];

final List<ColumnDefinition<?>> columns = tableDef.getColumns();
for (int ii = 0; ii < tableDef.numColumns(); ++ii) {
// final ColumnDefinition<?> columnDefinition = columns.get(ii);
// final BarrageTypeInfo typeInfo = typeInfo(
// ReinterpretUtils.maybeConvertToWritablePrimitiveChunkType(columnDefinition.getDataType()),
// columnDefinition.getDataType(),
// columnDefinition.getComponentType(),
// schema.fields(ii));
// readers[ii] = DefaultChunkReadingFactory.INSTANCE.getReader(barrageOptions, factor, typeInfo);
throw new UnsupportedOperationException("TODO NOCOMMIT NATE");
final ColumnDefinition<?> columnDefinition = ReinterpretUtils.maybeConvertToPrimitive(columns.get(ii));
final BarrageTypeInfo typeInfo = BarrageTypeInfo.make(
columnDefinition.getDataType(), columnDefinition.getComponentType(), schema.fields(ii));
readers[ii] = chunkReaderFactory.newReader(typeInfo, barrageOptions);
}

return readers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.deephaven.engine.table.impl.chunkboxer.ChunkBoxer;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.table.impl.locations.impl.*;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.sources.regioned.*;
import io.deephaven.extensions.barrage.BarrageOptions;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
Expand Down Expand Up @@ -449,15 +450,17 @@ public List<WritableChunk<Values>> getColumnValues(
.reduce((a, b) -> a + ", " + b).orElse(""))));
return;
}
if (!columnDefinition.isCompatible(schemaPlus.tableDef.getColumns().get(0))) {
final ColumnDefinition<?> dataColumn = ReinterpretUtils.maybeConvertToPrimitive(
schemaPlus.tableDef.getColumns().get(0));
if (!columnDefinition.isCompatible(dataColumn)) {
asyncState.setError(new IllegalArgumentException(String.format(
"Received incompatible column definition. Expected %s, but received %s.",
columnDefinition, schemaPlus.tableDef.getColumns().get(0))));
columnDefinition, dataColumn)));
return;
}

final ArrayList<WritableChunk<Values>> resultChunks = new ArrayList<>(messages.length - 1);
final ChunkReader<? extends Values> reader = schemaPlus.computeChunkReaders(
final ChunkReader<? extends Values> reader = schemaPlus.computePrimitiveChunkReaders(
chunkReaderFactory, schema, streamReaderOptions)[0];
int mi = 1;
try {
Expand Down Expand Up @@ -898,7 +901,7 @@ private class TableServiceGetRangeAdapter implements AppendOnlyRegionAccessor<Va
private final @NotNull ColumnDefinition<?> columnDefinition;

public TableServiceGetRangeAdapter(@NotNull ColumnDefinition<?> columnDefinition) {
this.columnDefinition = columnDefinition;
this.columnDefinition = ReinterpretUtils.maybeConvertToPrimitive(columnDefinition);
}

@Override
Expand Down

0 comments on commit d940d97

Please sign in to comment.