Skip to content

Commit

Permalink
Charles' Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 16, 2024
1 parent 2ab1088 commit ae71435
Show file tree
Hide file tree
Showing 88 changed files with 3,908 additions and 1,189 deletions.
6 changes: 6 additions & 0 deletions R/rdeephaven/inst/tests/testthat/test_table_handle_wrapper.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ test_that("is_static returns the correct value", {
})

test_that("nrow returns the correct number of rows", {
skip()

data <- setup()

expect_equal(nrow(data$th1), nrow(data$df1))
Expand All @@ -67,6 +69,8 @@ test_that("nrow returns the correct number of rows", {
})

test_that("ncol returns the correct number of columns", {
skip()

data <- setup()

expect_equal(ncol(data$th1), ncol(data$df1))
Expand All @@ -78,6 +82,8 @@ test_that("ncol returns the correct number of columns", {
})

test_that("dim returns the correct dimension", {
skip()

data <- setup()

expect_equal(dim(data$th1), dim(data$df1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public interface Table extends
*/
String BARRAGE_PERFORMANCE_KEY_ATTRIBUTE = "BarragePerformanceTableKey";
/**
* Set this to control the schema used for barrage serialization.
* Set an Apache Arrow POJO Schema to this attribute to control the column encoding used for barrage serialization.
*/
String BARRAGE_SCHEMA_ATTRIBUTE = "BarrageSchema";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public WritableChunk<T> get() {

/**
* Ensure the underlying chunk has a capacity of at least {@code capacity}.
*
* <p>
* The data and size of the returned chunk are undefined.
*
* @param capacity the minimum capacity for the chunk.
Expand All @@ -56,9 +56,9 @@ public WritableChunk<T> ensureCapacity(int capacity) {

/**
* Ensure the underlying chunk has a capacity of at least {@code capacity}.
*
* <p>
* If the chunk has existing data, then it is copied to the new chunk.
*
* <p>
* If the underlying chunk already exists, then the size of the chunk is the original size. If the chunk did not
* exist, then the size of the returned chunk is zero.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ public enum CopyAttributeOperation {
CopyAttributeOperation.Preview));

tempMap.put(BARRAGE_SCHEMA_ATTRIBUTE, EnumSet.of(
CopyAttributeOperation.Filter));
CopyAttributeOperation.Filter,
CopyAttributeOperation.FirstBy,
CopyAttributeOperation.Flatten,
CopyAttributeOperation.LastBy,
CopyAttributeOperation.PartitionBy,
CopyAttributeOperation.Reverse,
CopyAttributeOperation.Sort));

attributeToCopySet = Collections.unmodifiableMap(tempMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,12 +1411,7 @@ private static boolean snapshotColumnsParallel(
final ExecutionContext executionContext,
@NotNull final BarrageMessage snapshot) {
final JobScheduler jobScheduler = new OperationInitializerJobScheduler();
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>() {
@Override
public boolean completeExceptionally(Throwable ex) {
return super.completeExceptionally(ex);
}
};
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>();
jobScheduler.iterateParallel(
executionContext,
logOutput -> logOutput.append("snapshotColumnsParallel"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ public static Builder newBuilder(final String name) {

public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
"PeriodicUpdateGraph.targetCycleDurationMillis";
public static final int DEFAULT_TARGET_CYCLE_DURATION_MILLIS =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);

public static int getDefaultTargetCycleDurationMillis() {
return Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
}

private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
Expand Down Expand Up @@ -255,7 +257,7 @@ public boolean isCycleOnBudget(long cycleTimeNanos) {
* Resets the run cycle time to the default target configured via the {@link Builder} setting.
*
* @implNote If the {@link Builder#targetCycleDurationMillis(long)} property is not set, this value defaults to
* {@link Builder#DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP} which defaults to 1000ms.
* {@link #DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP} which defaults to 1000ms.
*/
@SuppressWarnings("unused")
public void resetTargetCycleDuration() {
Expand Down Expand Up @@ -1169,7 +1171,7 @@ public static PeriodicUpdateGraph getInstance(final String name) {
public static final class Builder {
private final boolean allowUnitTestMode =
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis = DEFAULT_TARGET_CYCLE_DURATION_MILLIS;
private long targetCycleDurationMillis = getDefaultTargetCycleDurationMillis();
private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;

private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ private void processBatches(Consumer<DefensiveDrainable> visitor, final RecordBa
}
}

private static int findWriterForOffset(final ChunkWriter.Context<?>[] chunks, final long offset) {
private static int findWriterForOffset(final ChunkWriter.Context[] chunks, final long offset) {
// fast path for smaller updates
if (chunks.length <= 1) {
return 0;
Expand Down Expand Up @@ -949,7 +949,7 @@ private int appendAddColumns(final RecordBatchMessageView view, final long start
endPos = Long.MAX_VALUE;
}
if (addColumnData[0].chunks().length != 0) {
final ChunkWriter.Context<?> writer = addColumnData[0].chunks()[chunkIdx];
final ChunkWriter.Context writer = addColumnData[0].chunks()[chunkIdx];
endPos = Math.min(endPos, writer.getLastRowOffset());
shift = -writer.getRowOffset();
}
Expand Down Expand Up @@ -981,9 +981,9 @@ private int appendAddColumns(final RecordBatchMessageView view, final long start
// Add the drainable last as it is allowed to immediately close a row set the visitors need
addStream.accept(drainableColumn);
} else {
final ChunkWriter.Context<Chunk<Values>> chunk = chunkListWriter.chunks()[chunkIdx];
final ChunkWriter.Context context = chunkListWriter.chunks()[chunkIdx];
final ChunkWriter.DrainableColumn drainableColumn = chunkListWriter.writer().getInputStream(
chunk,
context,
shift == 0 ? myAddedOffsets : adjustedOffsets,
view.options());
drainableColumn.visitFieldNodes(fieldNodeListener);
Expand All @@ -1008,18 +1008,18 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
// adjust the batch size if we would cross a chunk boundary
for (int ii = 0; ii < modColumnData.length; ++ii) {
final ModColumnWriter mcd = modColumnData[ii];
final ChunkWriter.Context<?>[] chunks = mcd.chunkListWriter.chunks();
if (chunks.length == 0) {
final ChunkWriter.Context[] contexts = mcd.chunkListWriter.chunks();
if (contexts.length == 0) {
continue;
}

final RowSet modOffsets = view.modRowOffsets(ii);
// if all mods are being sent, then offsets yield an identity mapping
final long startPos = modOffsets != null ? modOffsets.get(startRange) : startRange;
if (startPos != RowSet.NULL_ROW_KEY) {
final int chunkIdx = findWriterForOffset(chunks, startPos);
if (chunkIdx < chunks.length - 1) {
maxLength = Math.min(maxLength, chunks[chunkIdx].getLastRowOffset() + 1 - startPos);
final int chunkIdx = findWriterForOffset(contexts, startPos);
if (chunkIdx < contexts.length - 1) {
maxLength = Math.min(maxLength, contexts[chunkIdx].getLastRowOffset() + 1 - startPos);
}
columnChunkIdx[ii] = chunkIdx;
}
Expand All @@ -1029,7 +1029,7 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
long numRows = 0;
for (int ii = 0; ii < modColumnData.length; ++ii) {
final ModColumnWriter mcd = modColumnData[ii];
final ChunkWriter.Context<Chunk<Values>> chunk = mcd.chunkListWriter.chunks().length == 0
final ChunkWriter.Context context = mcd.chunkListWriter.chunks().length == 0
? null
: mcd.chunkListWriter.chunks()[columnChunkIdx[ii]];

Expand All @@ -1046,8 +1046,8 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
// if all mods are being sent, then offsets yield an identity mapping
startPos = startRange;
endPos = startRange + maxLength - 1;
if (chunk != null) {
endPos = Math.min(endPos, chunk.getLastRowOffset());
if (context != null) {
endPos = Math.min(endPos, context.getLastRowOffset());
}
}

Expand All @@ -1065,7 +1065,7 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
numRows = Math.max(numRows, myModOffsets.size());

try {
final int numElements = chunk == null ? 0 : myModOffsets.intSize("BarrageStreamWriterImpl");
final int numElements = context == null ? 0 : myModOffsets.intSize("BarrageStreamWriterImpl");
if (view.options().columnsAsList()) {
// if we are sending columns as a list, we need to add the list buffers before each column
final SingleElementListHeaderWriter listHeader =
Expand All @@ -1084,11 +1084,11 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
// Add the drainable last as it is allowed to immediately close a row set the visitors need
addStream.accept(drainableColumn);
} else {
final long shift = -chunk.getRowOffset();
final long shift = -context.getRowOffset();
// normalize to the chunk offsets
try (final WritableRowSet adjustedOffsets = shift == 0 ? null : myModOffsets.shift(shift)) {
final ChunkWriter.DrainableColumn drainableColumn = mcd.chunkListWriter.writer().getInputStream(
chunk, shift == 0 ? myModOffsets : adjustedOffsets, view.options());
context, shift == 0 ? myModOffsets : adjustedOffsets, view.options());
drainableColumn.visitFieldNodes(fieldNodeListener);
drainableColumn.visitBuffers(bufferListener);
// Add the drainable last as it is allowed to immediately close a row set the visitors need
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,30 @@
import java.io.IOException;
import java.util.List;

public class ChunkListWriter<SourceChunkType extends Chunk<Values>> implements SafeCloseable {
private final ChunkWriter<SourceChunkType> writer;
private final ChunkWriter.Context<SourceChunkType>[] contexts;
public class ChunkListWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> implements SafeCloseable {
private final ChunkWriter<SOURCE_CHUNK_TYPE> writer;
private final ChunkWriter.Context[] contexts;

public ChunkListWriter(
final ChunkWriter<SourceChunkType> writer,
final List<SourceChunkType> chunks) {
final ChunkWriter<SOURCE_CHUNK_TYPE> writer,
final List<SOURCE_CHUNK_TYPE> chunks) {
this.writer = writer;

// noinspection unchecked
this.contexts = (ChunkWriter.Context<SourceChunkType>[]) new ChunkWriter.Context[chunks.size()];
this.contexts = new ChunkWriter.Context[chunks.size()];

long rowOffset = 0;
for (int i = 0; i < chunks.size(); ++i) {
final SourceChunkType valuesChunk = chunks.get(i);
final SOURCE_CHUNK_TYPE valuesChunk = chunks.get(i);
this.contexts[i] = writer.makeContext(valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
}

public ChunkWriter<SourceChunkType> writer() {
public ChunkWriter<SOURCE_CHUNK_TYPE> writer() {
return writer;
}

public ChunkWriter.Context<SourceChunkType>[] chunks() {
public ChunkWriter.Context[] chunks() {
return contexts;
}

Expand All @@ -46,8 +45,8 @@ public ChunkWriter.DrainableColumn empty(@NotNull final BarrageOptions options)

@Override
public void close() {
for (final ChunkWriter.Context<SourceChunkType> context : contexts) {
context.decrementReferenceCount();
for (final ChunkWriter.Context context : contexts) {
context.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@

import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import org.jetbrains.annotations.NotNull;

import java.io.DataInput;
import java.io.IOException;
import java.util.function.Function;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -35,4 +40,25 @@ public static ChunkType getChunkTypeFor(final Class<?> dest) {
}
return ChunkType.fromElementType(dest);
}

protected static void readValidityBuffer(
@NotNull final DataInput is,
final int numValidityLongs,
final long validityBufferLength,
@NotNull final WritableLongChunk<Values> isValid,
@NotNull final String DEBUG_NAME) throws IOException {
// Read validity buffer:
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBufferLength / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBufferLength) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBufferLength - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
}
}
Loading

0 comments on commit ae71435

Please sign in to comment.