Skip to content

Commit

Permalink
Resolving some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jun 4, 2024
1 parent 7c7fa46 commit 0c87091
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,18 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
static boolean FORCE_PARALLEL_SELECT_AND_UPDATE =
Configuration.getInstance().getBooleanWithDefault("QueryTable.forceParallelSelectAndUpdate", false);

/**
* You can choose to enable or disable the parallel snapshot.
*/
public static boolean ENABLE_PARALLEL_SNAPSHOT =
Configuration.getInstance().getBooleanWithDefault("QueryTable.enableParallelSnapshot", true);

/**
* Minimum snapshot "chunk" size for parallel reading of columns, defaults to 1 million.
*/
public static long MINIMUM_PARALLEL_SNAPSHOT_ROWS =
Configuration.getInstance().getLongWithDefault("QueryTable.minimumParallelSnapshotRows", 1L << 20);

// Whether we should track the entire RowSet of firstBy and lastBy operations
@VisibleForTesting
public static boolean TRACKED_LAST_BY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.SnapshotUnsuccessfulException;
import io.deephaven.engine.table.impl.sources.InMemoryColumnSource;
import io.deephaven.engine.updategraph.*;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.SharedContext;
Expand Down Expand Up @@ -48,6 +49,9 @@

import io.deephaven.chunk.attributes.Values;

import static io.deephaven.engine.table.impl.QueryTable.ENABLE_PARALLEL_SNAPSHOT;
import static io.deephaven.engine.table.impl.QueryTable.MINIMUM_PARALLEL_SNAPSHOT_ROWS;

/**
* A Set of static utilities for computing values from a table while avoiding the use of an update graph lock. This
* class supports snapshots in both position space and key space.
Expand Down Expand Up @@ -86,6 +90,7 @@ public NoSnapshotAllowedException(String reason) {
public static final int SNAPSHOT_CHUNK_SIZE = Configuration.getInstance()
.getIntegerWithDefault("ConstructSnapshot.snapshotChunkSize", 1 << 24);

private static final ArrayList<Chunk<Values>> EMPTY_CHUNK_LIST = new ArrayList<>();

public interface State {

Expand Down Expand Up @@ -1520,18 +1525,17 @@ private static boolean serializeAllTable(
// if (executionContext.getOperationInitializer() instanceof PoisonedOperationInitializer) {
// jobScheduler = new ImmediateJobScheduler();
// }
if (!executionContext.getOperationInitializer().canParallelize()) {
jobScheduler = new ImmediateJobScheduler();
} else {
if (ENABLE_PARALLEL_SNAPSHOT && executionContext.getOperationInitializer().canParallelize() &&
(snapshot.rowsIncluded.size() > MINIMUM_PARALLEL_SNAPSHOT_ROWS ||
!allColumnSourcesInMemory(table, columnSources, columnsToSerialize))) {
// We can parallelize the snapshot
jobScheduler = new OperationInitializerJobScheduler();
} else {
jobScheduler = new ImmediateJobScheduler();
}

final CompletableFuture<Boolean> waitForResult = new CompletableFuture<>();
final AtomicReference<Exception> exception = new AtomicReference<>();
final Consumer<Exception> onError = err -> {
exception.set(err);
waitForResult.complete(Boolean.FALSE);
};
jobScheduler.iterateParallel(
executionContext,
logOutput -> logOutput.append("serializeAllTable"),
Expand All @@ -1540,7 +1544,10 @@ private static boolean serializeAllTable(
(context, colIdx, nestedErrorConsumer) -> serializeColumn(columnSources, colIdx, usePrev, snapshot,
table, logIdentityObject, columnsToSerialize),
() -> waitForResult.complete(Boolean.TRUE),
onError);
err -> {
exception.set(err);
waitForResult.complete(Boolean.FALSE);
});
try {
final boolean ret = waitForResult.get();
if (!ret) {
Expand Down Expand Up @@ -1569,7 +1576,26 @@ private static boolean serializeAllTable(
}
logEntry.append(", usePrev=").append(usePrev).endl();
}
return true;
}


/**
* Check if all the required columns are {@link InMemoryColumnSource}.
*/
private static boolean allColumnSourcesInMemory(
@NotNull final Table table,
@NotNull final String[] columnSources,
@Nullable final BitSet columnsToCheck) {
for (int colIdx = 0; colIdx < columnSources.length; colIdx++) {
if (columnsToCheck != null && !columnsToCheck.get(colIdx)) {
continue;
}
final ColumnSource<?> columnSource = table.getColumnSource(columnSources[colIdx]);
if (!(columnSource instanceof InMemoryColumnSource)) {
return false;
}
}
return true;
}

Expand Down Expand Up @@ -1615,7 +1641,7 @@ private static void serializeColumn(
final BarrageMessage.ModColumnData mcd = new BarrageMessage.ModColumnData();
snapshot.modColumnData[colIdx] = mcd;
mcd.rowsModified = RowSetFactory.empty();
mcd.data = getSnapshotDataAsChunkList(sourceToUse, null, RowSetFactory.empty(), usePrev);
mcd.data = EMPTY_CHUNK_LIST;
mcd.type = acd.type;
mcd.componentType = acd.componentType;
mcd.chunkType = sourceToUse.getChunkType();
Expand Down Expand Up @@ -1676,14 +1702,13 @@ private static <T> ArrayList<Chunk<Values>> getSnapshotDataAsChunkList(
@Nullable final SharedContext sharedContext,
@NotNull final RowSet rowSet,
final boolean usePrev) {
long offset = 0;
final long size = rowSet.size();
final ArrayList<Chunk<Values>> result = new ArrayList<>();

if (size == 0) {
return result;
return EMPTY_CHUNK_LIST;
}

long offset = 0;
final ArrayList<Chunk<Values>> result = new ArrayList<>();
final int maxChunkSize = (int) Math.min(size, SNAPSHOT_CHUNK_SIZE);

try (final ColumnSource.FillContext context = columnSource.makeFillContext(maxChunkSize, sharedContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2718,16 +2718,23 @@ public void testUngroupConstructSnapshotOfBoxedNull() {
testRefreshingTable(i(0).toTracking())
.update("X = new Integer[]{null, 2, 3}", "Z = new Integer[]{4, 5, null}");
final Table ungrouped = t.ungroup();

try (final BarrageMessage snap = ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) ungrouped)) {
assertEquals(snap.rowsAdded, i(0, 1, 2));
assertEquals(snap.addColumnData[0].data.get(0).asIntChunk().get(0),
io.deephaven.util.QueryConstants.NULL_INT);
assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(2),
io.deephaven.util.QueryConstants.NULL_INT);
testUngroupConstructSnashotHelper(snap);
}
final Table selected = ungrouped.select(); // Will convert column sources to in memory
try (final BarrageMessage snap = ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) selected)) {
testUngroupConstructSnashotHelper(snap);
}
}

private static void testUngroupConstructSnashotHelper(@NotNull final BarrageMessage snap) {
assertEquals(snap.rowsAdded, i(0, 1, 2));
assertEquals(snap.addColumnData[0].data.get(0).asIntChunk().get(0),
QueryConstants.NULL_INT);
assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(2),
QueryConstants.NULL_INT);
}

public void testUngroupableColumnSources() {
final Table table = testRefreshingTable(col("X", 1, 1, 2, 2, 3, 3, 4, 4), col("Int", 1, 2, 3, 4, 5, 6, 7, null),
col("Double", 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, null, 0.45),
Expand Down

0 comments on commit 0c87091

Please sign in to comment.