Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement ParallelWhere using JobScheduler semantics. #5032

Merged
merged 13 commits into from
Jan 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,20 @@ public void scheduleCompletion(
*/
abstract int getTargetSegments();

/**
* Should this operation be allowed to run parallelized?
*/
abstract boolean permitParallelization();

/**
* Should a filter of the given size be parallelized or executed within this thread?
*/
abstract boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows);
boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) {
return permitParallelization()
&& numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
&& filter.permitParallelization();
}

/**
* Should parallelization be allowed for this operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ JobScheduler jobScheduler() {
return jobScheduler;
}

@Override
boolean permitParallelization() {
return permitParallelization;
}

BasePerformanceEntry getBasePerformanceEntry() {
return basePerformanceEntry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private MergedListener whereListener;

@ReferentialIntegrity
private Runnable delayedErrorReference;
Runnable delayedErrorReference;

public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) {
super(source.getDefinition(), currentMapping, source.columns, null, null);
Expand Down Expand Up @@ -1067,10 +1067,10 @@ void doRefilter(

if (refilterMatchedRequested && refilterUnmatchedRequested) {
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeFilterExecution(source.getRowSet().copy());
listener.makeRefilterExecution(source.getRowSet().copy());
filterExecution.scheduleCompletion(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception));
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
Expand All @@ -1080,7 +1080,7 @@ void doRefilter(
unmatchedRows.insert(upstream.modified());
}
final RowSet unmatched = unmatchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched);
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
filterExecution.scheduleCompletion((adds, mods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
Expand All @@ -1092,7 +1092,7 @@ void doRefilter(
newMapping.insert(previouslyMatched);
}
completeRefilterUpdate(listener, upstream, update, adds);
}, exception -> errorRefilterUpdate(listener, exception));
}, exception -> errorRefilterUpdate(listener, exception, upstream));
refilterUnmatchedRequested = false;
} else if (refilterMatchedRequested) {
// we need to take removed rows out of our rowSet so we do not read them, and also examine added or
Expand All @@ -1105,10 +1105,10 @@ void doRefilter(
final RowSet matchedClone = matchedRows.copy();

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeFilterExecution(matchedClone);
listener.makeRefilterExecution(matchedClone);
filterExecution.scheduleCompletion(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception));
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
Expand Down Expand Up @@ -1147,11 +1147,11 @@ private void completeRefilterUpdate(
if (upstream != null) {
upstream.release();
}

listener.setFinalExecutionStep();
// Release the upstream update and set the final notification step.
listener.finalizeUpdate(upstream);
}

private void errorRefilterUpdate(final WhereListener listener, final Exception e) {
private void errorRefilterUpdate(final WhereListener listener, final Exception e, final TableUpdate upstream) {
// Notify listeners that we had an issue refreshing the table.
if (getLastNotificationStep() == updateGraph.clock().currentStep()) {
if (listener != null) {
Expand All @@ -1162,6 +1162,8 @@ private void errorRefilterUpdate(final WhereListener listener, final Exception e
notifyListenersOnError(e, listener == null ? null : listener.entry);
forceReferenceCountToZero();
}
// Release the upstream update and set the final notification step.
listener.finalizeUpdate(upstream);
}

private void setWhereListener(MergedListener whereListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.*;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -48,8 +50,8 @@ class WhereListener extends MergedListener {
private volatile long initialNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile long finalNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;

@ReferentialIntegrity
private Runnable delayedErrorReference;
private static final AtomicLongFieldUpdater<WhereListener> FINAL_NOTIFICATION_STEP_UPDATER =
AtomicLongFieldUpdater.newUpdater(WhereListener.class, "finalNotificationStep");

WhereListener(
final Logger log,
Expand Down Expand Up @@ -115,12 +117,12 @@ public void process() {
// we should not get here if the recorder is null and we did not request a refilter
Assert.neqNull(recorder, "recorder");

final ListenerFilterExecution result = makeFilterExecution();
final ListenerFilterExecution result = makeRefilterExecution();
final TableUpdate upstream = recorder.getUpdate().acquire();
result.scheduleCompletion(
(adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds,
mods),
this::errorUpdate);
exception -> errorUpdate(exception, upstream));
}

private ModifiedColumnSet getSourceModifiedColumnSet() {
Expand Down Expand Up @@ -186,20 +188,32 @@ private void completeUpdate(

result.notifyListeners(update);

upstream.release();

setFinalExecutionStep();
// Release the upstream update and set the final notification step.
finalizeUpdate(upstream);
}

private void errorUpdate(final Exception e) {
private void errorUpdate(final Exception e, final TableUpdate upstream) {
// Notify listeners that we had an issue refreshing the table.
if (result.getLastNotificationStep() == result.updateGraph.clock().currentStep()) {
forceReferenceCountToZero();
delayedErrorReference = new DelayedErrorNotifier(e, entry, result);
result.delayedErrorReference = new DelayedErrorNotifier(e, entry, result);
} else {
result.notifyListenersOnError(e, entry);
forceReferenceCountToZero();
}

// Release the upstream update and set the final notification step.
finalizeUpdate(upstream);
}

void finalizeUpdate(@Nullable final TableUpdate upstream) {
final long oldStep = FINAL_NOTIFICATION_STEP_UPDATER.get(this);
final long step = getUpdateGraph().clock().currentStep();
if (oldStep < step && FINAL_NOTIFICATION_STEP_UPDATER.compareAndSet(this, oldStep, step)) {
if (upstream != null) {
upstream.release();
}
}
}

/**
Expand All @@ -220,15 +234,11 @@ public boolean satisfied(long step) {
return false;
}

ListenerFilterExecution makeFilterExecution(final RowSet refilter) {
ListenerFilterExecution makeRefilterExecution(final RowSet refilter) {
return new ListenerFilterExecution(refilter, null, false, ModifiedColumnSet.ALL);
}

void setFinalExecutionStep() {
finalNotificationStep = getUpdateGraph().clock().currentStep();
}

ListenerFilterExecution makeFilterExecution() {
ListenerFilterExecution makeRefilterExecution() {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
final ModifiedColumnSet sourceModColumns = getSourceModifiedColumnSet();
final boolean runModifiedFilters = filterColumns == null || sourceModColumns.containsAny(filterColumns);
return new ListenerFilterExecution(recorder.getAdded(), recorder.getModified(),
Expand Down Expand Up @@ -259,6 +269,12 @@ JobScheduler jobScheduler() {
return jobScheduler;
}


@Override
boolean permitParallelization() {
return permitParallelization;
}

@Override
int getTargetSegments() {
return segmentCount;
Expand Down
Loading