Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement ParallelWhere using JobScheduler semantics. #5032

Merged
merged 13 commits into from
Jan 24, 2024

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,109 +1,44 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;

/**
* A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in
* the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}.
*/
class InitialFilterExecution extends AbstractFilterExecution {
private final QueryTable sourceTable;
private final boolean permitParallelization;
private final int segmentCount;
private final WhereFilter[] filters;

/**
* The pendingSatisfaction list is global to the root node of this InitialExecutionFilter. The outstanding children
* allows us to count how many jobs exist. If we have no outstanding jobs, but unsatisfied Notifications then an
* error has occurred.
*/
private final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> pendingSatisfaction;
private final Map<Thread, Thread> runningChildren;
private final AtomicBoolean cancelled;

/**
* The SubEntry lets us track query performance for the split jobs.
*/
private BasePerformanceEntry basePerformanceEntry;
private final int segmentCount;
private final boolean permitParallelization;

/**
* The InitialFilterExecution that represents all the work we are doing for this table.
*/
private final InitialFilterExecution root;
private final JobScheduler jobScheduler;

InitialFilterExecution(
final QueryTable sourceTable,
final WhereFilter[] filters,
final RowSet addedInput,
final long addStart,
final long addEnd,
final InitialFilterExecution parent,
final int filterIndex,
final boolean usePrev) {
super(sourceTable, filters, addedInput, addStart, addEnd, null, 0, 0, parent, usePrev, false,
ModifiedColumnSet.ALL, filterIndex);
this.sourceTable = sourceTable;
permitParallelization = permitParallelization(filters);
this.filters = filters;
if (parent == null) {
pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
: QueryTable.PARALLEL_WHERE_SEGMENTS;
runningChildren = Collections.synchronizedMap(new IdentityHashMap<>());
cancelled = new AtomicBoolean(false);
this.root = this;
super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL);
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
: QueryTable.PARALLEL_WHERE_SEGMENTS;
permitParallelization = permitParallelization(filters)
&& !QueryTable.DISABLE_PARALLEL_WHERE
&& segmentCount > 1
&& ExecutionContext.getContext().getOperationInitializer().canParallelize();

// If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler.
if (permitParallelization) {
jobScheduler = new OperationInitializerJobScheduler();
} else {
pendingSatisfaction = parent.pendingSatisfaction;
segmentCount = parent.segmentCount;
this.root = parent.root;
runningChildren = null;
cancelled = null;
}
}

@Override
void enqueueSubFilters(
List<AbstractFilterExecution> subFilters,
AbstractFilterExecution.CombinationNotification combinationNotification) {
synchronized (pendingSatisfaction) {
enqueueJobs(subFilters);
pendingSatisfaction.offer(combinationNotification);
}
}

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
ExecutionContext.getContext().getOperationInitializer().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
notification.run();
} else {
// we must ensure that we, the parent InitialFilterExecution, are notified of completion
onChildCompleted();
}
if (Thread.interrupted()) {
// we would like to throw a query cancellation exception
exceptionResult = new CancellationException("thread interrupted");
}
} finally {
root.runningChildren.remove(Thread.currentThread());
}
});
jobScheduler = ImmediateJobScheduler.INSTANCE;
}
}

Expand All @@ -113,70 +48,16 @@ int getTargetSegments() {
}

@Override
boolean doParallelization(long numberOfRows) {
return permitParallelization
&& ExecutionContext.getContext().getOperationInitializer().canParallelize()
&& doParallelizationBase(numberOfRows);
}

@Override
void handleUncaughtException(Exception throwable) {
throw new UnsupportedOperationException(throwable);
}

@Override
void accumulatePerformanceEntry(BasePerformanceEntry entry) {
synchronized (root) {
if (root.basePerformanceEntry != null) {
root.basePerformanceEntry.accumulate(entry);
} else {
root.basePerformanceEntry = entry;
}
}
}

/**
* Run any satisfied jobs in the pendingSatisfaction list.
*/
@Override
void onNoChildren() {
final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> satisfied = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
synchronized (pendingSatisfaction) {
for (final Iterator<NotificationQueue.Notification> it = pendingSatisfaction.iterator(); it.hasNext();) {
final NotificationQueue.Notification notification = it.next();
if (notification.canExecute(0)) {
satisfied.offer(notification);
it.remove();
}
}
}
if (satisfied.isEmpty()) {
return;
}
satisfied.forEach(NotificationQueue.Notification::run);
JobScheduler jobScheduler() {
return jobScheduler;
}

@Override
InitialFilterExecution makeChild(
final RowSet addedInput,
final long addStart,
final long addEnd,
final RowSet modifyInput,
final long modifyStart,
final long modifyEnd,
final int filterIndex) {
Assert.eqNull(modifyInput, "modifyInput");
return new InitialFilterExecution(sourceTable, filters, addedInput, addStart, addEnd, this, filterIndex,
usePrev);
boolean permitParallelization() {
return permitParallelization;
}

BasePerformanceEntry getBasePerformanceEntry() {
return basePerformanceEntry;
}

void setCancelled() {
cancelled.set(true);
runningChildren.forEach((thread, ignored) -> thread.interrupt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.liveness.LivenessScope;
Expand All @@ -48,17 +49,14 @@
import io.deephaven.engine.table.impl.select.MatchPairFactory;
import io.deephaven.engine.table.impl.select.SelectColumnFactory;
import io.deephaven.engine.table.impl.updateby.UpdateBy;
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
import io.deephaven.engine.table.impl.util.FieldUtils;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.table.iterators.*;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.util.*;
import io.deephaven.engine.util.systemicmarking.SystemicObject;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.vector.Vector;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
Expand Down Expand Up @@ -982,6 +980,9 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private boolean refilterUnmatchedRequested = false;
private MergedListener whereListener;

@ReferentialIntegrity
Runnable delayedErrorReference;

public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) {
super(source.getDefinition(), currentMapping, source.columns, null, null);
this.source = source;
Expand Down Expand Up @@ -1066,9 +1067,10 @@ void doRefilter(

if (refilterMatchedRequested && refilterUnmatchedRequested) {
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeFilterExecution(source.getRowSet().copy());
listener.makeRefilterExecution(source.getRowSet().copy());
filterExecution.scheduleCompletion(
fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
Expand All @@ -1078,9 +1080,9 @@ void doRefilter(
unmatchedRows.insert(upstream.modified());
}
final RowSet unmatched = unmatchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched);
filterExecution.scheduleCompletion(fe -> {
final WritableRowSet newMapping = fe.addedResult;
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
filterExecution.scheduleCompletion((adds, mods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
if (upstream != null) {
Expand All @@ -1089,8 +1091,8 @@ void doRefilter(
}
newMapping.insert(previouslyMatched);
}
completeRefilterUpdate(listener, upstream, update, fe.addedResult);
});
completeRefilterUpdate(listener, upstream, update, adds);
}, exception -> errorRefilterUpdate(listener, exception, upstream));
refilterUnmatchedRequested = false;
} else if (refilterMatchedRequested) {
// we need to take removed rows out of our rowSet so we do not read them, and also examine added or
Expand All @@ -1103,9 +1105,10 @@ void doRefilter(
final RowSet matchedClone = matchedRows.copy();

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeFilterExecution(matchedClone);
listener.makeRefilterExecution(matchedClone);
filterExecution.scheduleCompletion(
fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
Expand Down Expand Up @@ -1141,11 +1144,24 @@ private void completeRefilterUpdate(
update.shifted = upstream == null ? RowSetShiftData.EMPTY : upstream.shifted();

notifyListeners(update);
if (upstream != null) {
upstream.release();
}

listener.setFinalExecutionStep();
// Release the upstream update and set the final notification step.
listener.finalizeUpdate(upstream);
}

private void errorRefilterUpdate(final WhereListener listener, final Exception e, final TableUpdate upstream) {
// Notify listeners that we had an issue refreshing the table.
if (getLastNotificationStep() == updateGraph.clock().currentStep()) {
if (listener != null) {
listener.forceReferenceCountToZero();
}
delayedErrorReference = new DelayedErrorNotifier(e, listener == null ? null : listener.entry, this);
} else {
notifyListenersOnError(e, listener == null ? null : listener.entry);
forceReferenceCountToZero();
}
// Release the upstream update and set the final notification step.
listener.finalizeUpdate(upstream);
}

private void setWhereListener(MergedListener whereListener) {
Expand Down Expand Up @@ -1226,41 +1242,18 @@ private QueryTable whereInternal(final WhereFilter... filters) {

final CompletableFuture<TrackingWritableRowSet> currentMappingFuture =
new CompletableFuture<>();

final InitialFilterExecution initialFilterExecution = new InitialFilterExecution(
this, filters, rowSetToUse.copy(), 0, rowSetToUse.size(), null, 0,
usePrev) {
@Override
void handleUncaughtException(Exception throwable) {
currentMappingFuture.completeExceptionally(throwable);
}
};
final ExecutionContext executionContext = ExecutionContext.getContext();
initialFilterExecution.scheduleCompletion(x -> {
try (final SafeCloseable ignored = executionContext.open()) {
if (x.exceptionResult != null) {
currentMappingFuture.completeExceptionally(x.exceptionResult);
} else {
currentMappingFuture.complete(x.addedResult.toTracking());
}
}
});
this, filters, rowSetToUse.copy(), usePrev);
final TrackingWritableRowSet currentMapping;
initialFilterExecution.scheduleCompletion((adds, mods) -> {
currentMappingFuture.complete(adds.writableCast().toTracking());
}, currentMappingFuture::completeExceptionally);

boolean cancelled = false;
TrackingWritableRowSet currentMapping = null;
try {
boolean done = false;
while (!done) {
try {
currentMapping = currentMappingFuture.get();
done = true;
} catch (InterruptedException e) {
// cancel the job and wait for it to finish cancelling
cancelled = true;
initialFilterExecution.setCancelled();
}
}
} catch (ExecutionException e) {
if (cancelled) {
currentMapping = currentMappingFuture.get();
} catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
throw new CancellationException("interrupted while filtering");
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
Expand Down
Loading
Loading