Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConstructSnapshot + BarrageMessageProducer: Use Static ConstructSnapshot Fast Path #4876

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,18 @@ public static long callDataSnapshotFunction(
* Invokes the snapshot function in a loop until it succeeds with provably consistent results, or until
* {@code MAX_CONCURRENT_ATTEMPTS} or {@code MAX_CONCURRENT_ATTEMPT_DURATION_MILLIS} are exceeded. Falls back to
* acquiring a shared update graph lock for a final attempt.
* <p>
* The supplied {@link SnapshotControl}'s {@link SnapshotControl#usePreviousValues usePreviousValues} will be
* invoked at the start of any snapshot attempt, and its {@link SnapshotControl#snapshotCompletedConsistently
* snapshotCompletedConsistently} will be invoked at the end of any snapshot attempt that is not provably
* inconsistent.
* <p>
* If the supplied {@link SnapshotControl} provides a null {@link SnapshotControl#getUpdateGraph UpdateGraph}, then
* this method will perform a static snapshot without locks or retrying. In this case, the {@link SnapshotControl}'s
* {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false},
* {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}, and
* the {@link LogicalClock#NULL_CLOCK_VALUE NULL_CLOCK_VALUE} will be supplied to {@code usePreviousValues} and
* {@code snapshotCompletedConsistently}.
*
* @param logPrefix A prefix for our log messages
* @param control A {@link SnapshotControl} to define the parameters and consistency for this snapshot
Expand All @@ -1190,13 +1202,27 @@ public static long callDataSnapshotFunction(

if (updateGraph == null) {
// This is a snapshot of static data. Just call the function with no frippery.
final boolean controlUsePrev = control.usePreviousValues(LogicalClock.NULL_CLOCK_VALUE);
if (controlUsePrev) {
throw new SnapshotUnsuccessfulException("Static snapshot requested previous values");
}

final boolean functionSuccessful = function.call(false, LogicalClock.NULL_CLOCK_VALUE);
Assert.assertion(functionSuccessful, "functionSuccessful");
if (!functionSuccessful) {
throw new SnapshotUnsuccessfulException("Static snapshot failed to execute snapshot function");
}
if (log.isDebugEnabled()) {
final long duration = System.currentTimeMillis() - overallStart;
log.debug().append(logPrefix)
.append(" Static snapshot function elapsed time ").append(duration).append(" ms").endl();
}

// notify control of successful snapshot
final boolean controlSuccessful =
control.snapshotCompletedConsistently(LogicalClock.NULL_CLOCK_VALUE, false);
if (!controlSuccessful) {
throw new SnapshotUnsuccessfulException("Static snapshot function succeeded but control failed");
}
return LogicalClock.NULL_CLOCK_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,18 +501,19 @@ public void addSubscription(final StreamObserver<MessageView> listener,
final Subscription subscription =
new Subscription(listener, options, cols, initialViewport, reverseViewport);

log.debug().append(logPrefix)
.append(subscription.logPrefix)
.append("subbing to columns ")
.append(FormatBitSet.formatBitSet(cols))
.endl();
if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
.append(subscription.logPrefix)
.append("subbing to columns ")
.append(FormatBitSet.formatBitSet(cols))
.append(" and scheduling update immediately, for initial snapshot.")
.endl();
}

subscription.hasPendingUpdate = true;
pendingSubscriptions.add(subscription);

// we'd like to send the initial snapshot as soon as possible
log.debug().append(logPrefix).append(subscription.logPrefix)
.append("scheduling update immediately, for initial snapshot.").endl();
updatePropagationJob.scheduleImmediately();
}
}
Expand All @@ -528,6 +529,10 @@ private boolean findAndUpdateSubscription(final StreamObserver<MessageView> list
pendingSubscriptions.add(sub);
}

if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append("Find and update subscription scheduling immediately.")
.endl();
}
updatePropagationJob.scheduleImmediately();
return true;
}
Expand Down Expand Up @@ -570,24 +575,28 @@ public boolean updateSubscription(final StreamObserver<MessageView> listener, @N
}

sub.pendingColumns = cols;
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for viewport and column updates.").endl();
if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for viewport and column updates.").endl();
}
});
}

public void removeSubscription(final StreamObserver<MessageView> listener) {
findAndUpdateSubscription(listener, sub -> {
sub.pendingDelete = true;
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for removed subscription.").endl();
if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for removed subscription.").endl();
}
});
}

//////////////////////////////////////////////////
// Update Processing and Data Recording Methods //
//////////////////////////////////////////////////

public DeltaListener constructListener() {
public InstrumentedTableUpdateListener constructListener() {
return parentIsRefreshing ? new DeltaListener() : null;
}

Expand Down Expand Up @@ -1340,7 +1349,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
long elapsed = System.nanoTime() - start;
recordMetric(stats -> stats.snapshot, elapsed);

if (SUBSCRIPTION_GROWTH_ENABLED && snapshot.rowsIncluded.size() > 0) {
if (SUBSCRIPTION_GROWTH_ENABLED && !snapshot.rowsIncluded.isEmpty()) {
// very simplistic logic to take the last snapshot and extrapolate max number of rows that will
// not exceed the target UGP processing time percentage
PeriodicUpdateGraph updateGraph = parent.getUpdateGraph().cast();
Expand All @@ -1364,7 +1373,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}

synchronized (this) {
if (growingSubscriptions.size() == 0 && pendingDeltas.isEmpty() && pendingError == null) {
if (growingSubscriptions.isEmpty() && pendingDeltas.isEmpty() && pendingError == null) {
return;
}

Expand Down Expand Up @@ -1450,6 +1459,10 @@ private void updateSubscriptionsSnapshotAndPropagate() {
if (snapshot != null) {
try (final BarrageStreamGenerator<MessageView> snapshotGenerator =
streamGeneratorFactory.newGenerator(snapshot, this::recordWriteMetrics)) {
if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append("Sending snapshot to ").append(activeSubscriptions.size())
.append(" subscriber(s).").endl();
}
for (final Subscription subscription : growingSubscriptions) {
if (subscription.pendingDelete) {
continue;
Expand Down Expand Up @@ -1488,6 +1501,10 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}

if (numGrowingSubscriptions > 0) {
if (log.isDebugEnabled()) {
log.info().append(logPrefix).append("Have ").append(numGrowingSubscriptions)
.append(" growing subscriptions; scheduling next snapshot immediately.").endl();
}
updatePropagationJob.scheduleImmediately();
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -1882,7 +1899,7 @@ final class ColumnInfo {
delta.update.shifted().unapply(modifiedRemaining);
}

if (unfilledAdds.size() > 0) {
if (!unfilledAdds.isEmpty()) {
Assert.assertion(false, "Error: added:" + coalescer.added + " unfilled:" + unfilledAdds
+ " missing:" + coalescer.added.subSetForPositions(unfilledAdds));
}
Expand Down Expand Up @@ -2024,6 +2041,13 @@ private void finalizeSnapshotForSubscriptions(final List<Subscription> subscript
|| subscription.growingRemainingViewport.firstRowKey() >= parentTableSize
|| isBlinkTable;

if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
.append(subscription.logPrefix)
.append("finalizing snapshot isComplete=").append(isComplete)
.endl();
}

if (isComplete) {
// this subscription is complete, remove it from the growing list
subscription.isGrowingViewport = false;
Expand Down Expand Up @@ -2195,14 +2219,15 @@ public boolean snapshotCompletedConsistently(final long afterClockValue, final b
}
if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
.append("success=").append(success).append(", validStep=").append(resultValidStep).endl();
.append("success=").append(success).append(", validStep=").append(resultValidStep)
.append(", numSnapshotSubscriptions=").append(snapshotSubscriptions.size()).endl();
}
return success;
}

@Override
public UpdateGraph getUpdateGraph() {
return parent.getUpdateGraph();
return parent.isRefreshing() ? parent.getUpdateGraph() : null;
}
}

Expand Down
Loading