Skip to content

Commit

Permalink
Added support for parallel snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jun 3, 2024
1 parent 33238dc commit 7c7fa46
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;

import io.deephaven.chunk.attributes.Values;
Expand Down Expand Up @@ -1452,6 +1456,7 @@ private static boolean serializeAllTable(
return false;
}

// TODO Should I also parallelize this method?
final ColumnSource<?> columnSource = table.getColumnSource(columnSources[ii]);
snapshot.dataColumns[ii] = getSnapshotData(columnSource, sharedContext, snapshot.rowsIncluded, usePrev);
}
Expand Down Expand Up @@ -1509,41 +1514,46 @@ private static boolean serializeAllTable(

final String[] columnSources = table.getDefinition().getColumnNamesArray();

try (final SharedContext sharedContext =
(columnSources.length > 1) ? SharedContext.makeSharedContext() : null) {
for (int ii = 0; ii < columnSources.length; ++ii) {
if (concurrentAttemptInconsistent()) {
if (log.isDebugEnabled()) {
final LogEntry logEntry = log.debug().append(System.identityHashCode(logIdentityObject))
.append(" Bad snapshot before column ").append(ii);
appendConcurrentAttemptClockInfo(logEntry);
logEntry.endl();
}
return false;
}

final ColumnSource<?> columnSource = table.getColumnSource(columnSources[ii]);
final ExecutionContext executionContext = ExecutionContext.getContext();
final JobScheduler jobScheduler;
// TODO Check with Ryan if I should keep this check
// if (executionContext.getOperationInitializer() instanceof PoisonedOperationInitializer) {
// jobScheduler = new ImmediateJobScheduler();
// }
if (!executionContext.getOperationInitializer().canParallelize()) {
jobScheduler = new ImmediateJobScheduler();
} else {
jobScheduler = new OperationInitializerJobScheduler();
}

final BarrageMessage.AddColumnData acd = new BarrageMessage.AddColumnData();
snapshot.addColumnData[ii] = acd;
final boolean columnIsEmpty = columnsToSerialize != null && !columnsToSerialize.get(ii);
final RowSet rows = columnIsEmpty ? RowSetFactory.empty() : snapshot.rowsIncluded;
// Note: cannot use shared context across several calls of differing lengths and no sharing necessary
// when empty
final ColumnSource<?> sourceToUse = ReinterpretUtils.maybeConvertToPrimitive(columnSource);
acd.data = getSnapshotDataAsChunkList(sourceToUse, columnIsEmpty ? null : sharedContext, rows, usePrev);
acd.type = columnSource.getType();
acd.componentType = columnSource.getComponentType();
acd.chunkType = sourceToUse.getChunkType();

final BarrageMessage.ModColumnData mcd = new BarrageMessage.ModColumnData();
snapshot.modColumnData[ii] = mcd;
mcd.rowsModified = RowSetFactory.empty();
mcd.data = getSnapshotDataAsChunkList(sourceToUse, null, RowSetFactory.empty(), usePrev);
mcd.type = acd.type;
mcd.componentType = acd.componentType;
mcd.chunkType = sourceToUse.getChunkType();
final CompletableFuture<Boolean> waitForResult = new CompletableFuture<>();
final AtomicReference<Exception> exception = new AtomicReference<>();
final Consumer<Exception> onError = err -> {
exception.set(err);
waitForResult.complete(Boolean.FALSE);
};
jobScheduler.iterateParallel(
executionContext,
logOutput -> logOutput.append("serializeAllTable"),
JobScheduler.DEFAULT_CONTEXT_FACTORY,
0, columnSources.length,
(context, colIdx, nestedErrorConsumer) -> serializeColumn(columnSources, colIdx, usePrev, snapshot,
table, logIdentityObject, columnsToSerialize),
() -> waitForResult.complete(Boolean.TRUE),
onError);
try {
final boolean ret = waitForResult.get();
if (!ret) {
final Exception err = exception.get();
if (!(err instanceof SnapshotUnsuccessfulException)) {
throw new SnapshotUnsuccessfulException("Snapshot failed", err);
}
return false;
}
} catch (final InterruptedException e) {
throw new java.util.concurrent.CancellationException("Interrupted while serializing table");
} catch (final ExecutionException e) {
throw new UncheckedDeephavenException("Execution exception while serializing table", e);
}

if (log.isDebugEnabled()) {
Expand All @@ -1563,6 +1573,54 @@ private static boolean serializeAllTable(
return true;
}

/**
* Serialize a single column of a table into a BarrageMessage.
* <p>
* This method is intended to be called from a thread pool executor.
*/
private static void serializeColumn(
final String[] columnSources,
final int colIdx,
final boolean usePrev,
@NotNull final BarrageMessage snapshot,
@NotNull final Table table,
@NotNull final Object logIdentityObject,
@Nullable final BitSet columnsToSerialize) {
if (concurrentAttemptInconsistent()) {
if (log.isDebugEnabled()) {
final LogEntry logEntry = log.debug().append(System.identityHashCode(logIdentityObject))
.append(" Bad snapshot before column ").append(columnSources[colIdx])
.append(" at idx").append(colIdx);
appendConcurrentAttemptClockInfo(logEntry);
logEntry.endl();
}
throw new SnapshotUnsuccessfulException("Failed to serialize column" + columnSources[colIdx] + " at idx "
+ colIdx + " due to inconsistent state");
}

final ColumnSource<?> columnSource = table.getColumnSource(columnSources[colIdx]);

final BarrageMessage.AddColumnData acd = new BarrageMessage.AddColumnData();
snapshot.addColumnData[colIdx] = acd;
final boolean columnIsEmpty = columnsToSerialize != null && !columnsToSerialize.get(colIdx);
final RowSet rows = columnIsEmpty ? RowSetFactory.empty() : snapshot.rowsIncluded;
// Note: cannot use shared context across several calls of differing lengths and no sharing necessary
// when empty
final ColumnSource<?> sourceToUse = ReinterpretUtils.maybeConvertToPrimitive(columnSource);
acd.data = getSnapshotDataAsChunkList(sourceToUse, null, rows, usePrev);
acd.type = columnSource.getType();
acd.componentType = columnSource.getComponentType();
acd.chunkType = sourceToUse.getChunkType();

final BarrageMessage.ModColumnData mcd = new BarrageMessage.ModColumnData();
snapshot.modColumnData[colIdx] = mcd;
mcd.rowsModified = RowSetFactory.empty();
mcd.data = getSnapshotDataAsChunkList(sourceToUse, null, RowSetFactory.empty(), usePrev);
mcd.type = acd.type;
mcd.componentType = acd.componentType;
mcd.chunkType = sourceToUse.getChunkType();
}

private static boolean serializeAllTables(
final boolean usePrev,
@NotNull final List<InitialSnapshot> snapshots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.*;
Expand Down Expand Up @@ -996,6 +997,11 @@ private void schedulePropagation() {
private class UpdatePropagationJob implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private final AtomicBoolean needsRun = new AtomicBoolean();
private final ExecutionContext executionContext;

UpdatePropagationJob() {
this.executionContext = ExecutionContext.newBuilder().markSystemic().build();
}

@Override
public void run() {
Expand All @@ -1006,7 +1012,7 @@ public void run() {
return;
}

try {
try (final SafeCloseable ignored = executionContext.open()) {
if (needsRun.compareAndSet(true, false)) {
final long startTm = System.nanoTime();
updateSubscriptionsSnapshotAndPropagate();
Expand Down

0 comments on commit 7c7fa46

Please sign in to comment.