From 1c043e9c9b9778a32492cef54f5c65f7ce1a160b Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 21 Nov 2023 16:04:22 -0700 Subject: [PATCH 1/3] ConstructSnapshot + BarrageMessageProducer: Also Finalize Subscriptions on Static Tables --- .../table/impl/remote/ConstructSnapshot.java | 19 ++++++ .../barrage/BarrageMessageProducer.java | 58 +++++++++++++------ 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index 1f72098d501..eb67309d72f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -1173,6 +1173,17 @@ public static long callDataSnapshotFunction( * Invokes the snapshot function in a loop until it succeeds with provably consistent results, or until * {@code MAX_CONCURRENT_ATTEMPTS} or {@code MAX_CONCURRENT_ATTEMPT_DURATION_MILLIS} are exceeded. Falls back to * acquiring a shared update graph lock for a final attempt. + *

+ * The supplied {@link SnapshotControl}'s {@link SnapshotControl#usePreviousValues usePreviousValues} will be + * invoked at the start of any snapshot attempt, and its {@link SnapshotControl#snapshotCompletedConsistently + * snapshotCompletedConsistently} will be invoked at the end of any snapshot attempt that is not provably + * inconsistent. + *

+ * If the supplied {@link SnapshotControl} provides a null {@link SnapshotControl#getUpdateGraph UpdateGraph}, then + * this method will perform a static snapshot without locks or retrying. In this case, the {@link SnapshotControl}'s + * {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false}, and + * {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}. + * * * @param logPrefix A prefix for our log messages * @param control A {@link SnapshotControl} to define the parameters and consistency for this snapshot @@ -1190,6 +1201,9 @@ public static long callDataSnapshotFunction( if (updateGraph == null) { // This is a snapshot of static data. Just call the function with no frippery. + final boolean controlUsePrev = control.usePreviousValues(LogicalClock.NULL_CLOCK_VALUE); + Assert.eqFalse(controlUsePrev, "controlUsePrev"); + final boolean functionSuccessful = function.call(false, LogicalClock.NULL_CLOCK_VALUE); Assert.assertion(functionSuccessful, "functionSuccessful"); if (log.isDebugEnabled()) { @@ -1197,6 +1211,11 @@ public static long callDataSnapshotFunction( log.debug().append(logPrefix) .append(" Static snapshot function elapsed time ").append(duration).append(" ms").endl(); } + + // notify control of successful snapshot + final boolean controlSuccessful = + control.snapshotCompletedConsistently(LogicalClock.NULL_CLOCK_VALUE, false); + Assert.assertion(controlSuccessful, "controlSuccessful"); return LogicalClock.NULL_CLOCK_VALUE; } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index c8ad8192fed..509298cddc5 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -501,18 +501,19 @@ public void addSubscription(final StreamObserver listener, final Subscription subscription = new Subscription(listener, options, cols, initialViewport, reverseViewport); - log.debug().append(logPrefix) - .append(subscription.logPrefix) - .append("subbing to columns ") - .append(FormatBitSet.formatBitSet(cols)) - .endl(); + if (log.isDebugEnabled()) { + log.debug().append(logPrefix) + .append(subscription.logPrefix) + .append("subbing to columns ") + .append(FormatBitSet.formatBitSet(cols)) + .append(" and scheduling update immediately, for initial snapshot.") + .endl(); + } subscription.hasPendingUpdate = true; pendingSubscriptions.add(subscription); // we'd like to send the initial snapshot as soon as possible - log.debug().append(logPrefix).append(subscription.logPrefix) - .append("scheduling update immediately, for initial snapshot.").endl(); updatePropagationJob.scheduleImmediately(); } } @@ -528,6 +529,10 @@ private boolean findAndUpdateSubscription(final StreamObserver list pendingSubscriptions.add(sub); } + if (log.isDebugEnabled()) { + log.debug().append(logPrefix).append("Find and update subscription scheduling immediately.") + .endl(); + } updatePropagationJob.scheduleImmediately(); return true; } @@ -570,16 +575,20 @@ public boolean updateSubscription(final StreamObserver listener, @N } sub.pendingColumns = cols; - log.debug().append(logPrefix).append(sub.logPrefix) - .append("scheduling update immediately, for viewport and column updates.").endl(); + if (log.isDebugEnabled()) { + log.debug().append(logPrefix).append(sub.logPrefix) + .append("scheduling update immediately, for viewport and column updates.").endl(); + } }); } public void removeSubscription(final StreamObserver listener) { findAndUpdateSubscription(listener, sub -> { sub.pendingDelete = true; - log.debug().append(logPrefix).append(sub.logPrefix) - .append("scheduling update immediately, for removed subscription.").endl(); + if (log.isDebugEnabled()) { + log.debug().append(logPrefix).append(sub.logPrefix) + .append("scheduling update immediately, for removed subscription.").endl(); + } }); } @@ -587,7 +596,7 @@ public void removeSubscription(final StreamObserver listener) { // Update Processing and Data Recording Methods // ////////////////////////////////////////////////// - public DeltaListener constructListener() { + public InstrumentedTableUpdateListener constructListener() { return parentIsRefreshing ? new DeltaListener() : null; } @@ -1340,7 +1349,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { long elapsed = System.nanoTime() - start; recordMetric(stats -> stats.snapshot, elapsed); - if (SUBSCRIPTION_GROWTH_ENABLED && snapshot.rowsIncluded.size() > 0) { + if (SUBSCRIPTION_GROWTH_ENABLED && !snapshot.rowsIncluded.isEmpty()) { // very simplistic logic to take the last snapshot and extrapolate max number of rows that will // not exceed the target UGP processing time percentage PeriodicUpdateGraph updateGraph = parent.getUpdateGraph().cast(); @@ -1364,7 +1373,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { } synchronized (this) { - if (growingSubscriptions.size() == 0 && pendingDeltas.isEmpty() && pendingError == null) { + if (growingSubscriptions.isEmpty() && pendingDeltas.isEmpty() && pendingError == null) { return; } @@ -1450,6 +1459,10 @@ private void updateSubscriptionsSnapshotAndPropagate() { if (snapshot != null) { try (final BarrageStreamGenerator snapshotGenerator = streamGeneratorFactory.newGenerator(snapshot, this::recordWriteMetrics)) { + if (log.isDebugEnabled()) { + log.debug().append(logPrefix).append("Sending snapshot to ").append(activeSubscriptions.size()) + .append(" subscriber(s).").endl(); + } for (final Subscription subscription : growingSubscriptions) { if (subscription.pendingDelete) { continue; @@ -1488,6 +1501,11 @@ private void updateSubscriptionsSnapshotAndPropagate() { } if (numGrowingSubscriptions > 0) { + if (log.isDebugEnabled()) { + log.info().append(logPrefix).append("Have ").append(numGrowingSubscriptions) + .append(" growing subscriptions; scheduling next snapshot immediately.").endl(); + } + updatePropagationJob.scheduleImmediately(); updatePropagationJob.scheduleImmediately(); } @@ -1882,7 +1900,7 @@ final class ColumnInfo { delta.update.shifted().unapply(modifiedRemaining); } - if (unfilledAdds.size() > 0) { + if (!unfilledAdds.isEmpty()) { Assert.assertion(false, "Error: added:" + coalescer.added + " unfilled:" + unfilledAdds + " missing:" + coalescer.added.subSetForPositions(unfilledAdds)); } @@ -2024,6 +2042,11 @@ private void finalizeSnapshotForSubscriptions(final List subscript || subscription.growingRemainingViewport.firstRowKey() >= parentTableSize || isBlinkTable; + if (log.isDebugEnabled()) { + log.debug().append(logPrefix).append("finalizing snapshot for subscription: ") + .append(subscription.toString()).append(", isComplete=").append(isComplete).endl(); + } + if (isComplete) { // this subscription is complete, remove it from the growing list subscription.isGrowingViewport = false; @@ -2195,14 +2218,15 @@ public boolean snapshotCompletedConsistently(final long afterClockValue, final b } if (log.isDebugEnabled()) { log.debug().append(logPrefix) - .append("success=").append(success).append(", validStep=").append(resultValidStep).endl(); + .append("success=").append(success).append(", validStep=").append(resultValidStep) + .append(", numSnapshotSubscriptions=").append(snapshotSubscriptions.size()).endl(); } return success; } @Override public UpdateGraph getUpdateGraph() { - return parent.getUpdateGraph(); + return parent.isRefreshing() ? parent.getUpdateGraph() : null; } } From 378d40691a789545d44dfd2a834dd61727563a91 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 21 Nov 2023 17:14:52 -0700 Subject: [PATCH 2/3] Ryan's feedback --- .../table/impl/remote/ConstructSnapshot.java | 17 ++++++++++++----- .../server/barrage/BarrageMessageProducer.java | 5 ++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index eb67309d72f..c397ffaa89f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -1181,8 +1181,9 @@ public static long callDataSnapshotFunction( *

* If the supplied {@link SnapshotControl} provides a null {@link SnapshotControl#getUpdateGraph UpdateGraph}, then * this method will perform a static snapshot without locks or retrying. In this case, the {@link SnapshotControl}'s - * {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false}, and - * {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}. + * {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false}, + * {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}, and + * the clock step returned by this method will be the {@link LogicalClock#NULL_CLOCK_VALUE NULL_CLOCK_VALUE}. * * * @param logPrefix A prefix for our log messages @@ -1202,10 +1203,14 @@ public static long callDataSnapshotFunction( if (updateGraph == null) { // This is a snapshot of static data. Just call the function with no frippery. final boolean controlUsePrev = control.usePreviousValues(LogicalClock.NULL_CLOCK_VALUE); - Assert.eqFalse(controlUsePrev, "controlUsePrev"); + if (controlUsePrev) { + throw new SnapshotUnsuccessfulException("Static snapshot requested previous values"); + } final boolean functionSuccessful = function.call(false, LogicalClock.NULL_CLOCK_VALUE); - Assert.assertion(functionSuccessful, "functionSuccessful"); + if (!functionSuccessful) { + throw new SnapshotUnsuccessfulException("Static snapshot failed to execute snapshot function"); + } if (log.isDebugEnabled()) { final long duration = System.currentTimeMillis() - overallStart; log.debug().append(logPrefix) @@ -1215,7 +1220,9 @@ public static long callDataSnapshotFunction( // notify control of successful snapshot final boolean controlSuccessful = control.snapshotCompletedConsistently(LogicalClock.NULL_CLOCK_VALUE, false); - Assert.assertion(controlSuccessful, "controlSuccessful"); + if (!controlSuccessful) { + throw new SnapshotUnsuccessfulException("Static snapshot function succeeded but control failed"); + } return LogicalClock.NULL_CLOCK_VALUE; } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 509298cddc5..47770a115f7 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -1506,7 +1506,6 @@ private void updateSubscriptionsSnapshotAndPropagate() { .append(" growing subscriptions; scheduling next snapshot immediately.").endl(); } updatePropagationJob.scheduleImmediately(); - updatePropagationJob.scheduleImmediately(); } lastUpdateTime = scheduler.currentTimeMillis(); @@ -2043,8 +2042,8 @@ private void finalizeSnapshotForSubscriptions(final List subscript || isBlinkTable; if (log.isDebugEnabled()) { - log.debug().append(logPrefix).append("finalizing snapshot for subscription: ") - .append(subscription.toString()).append(", isComplete=").append(isComplete).endl(); + log.debug().append(logPrefix).append("finalizing snapshot for subscription=") + .append(subscription.logPrefix).append(", isComplete=").append(isComplete).endl(); } if (isComplete) { From cb09c5f8d374c038a44144fe47e205ab30a67463 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 21 Nov 2023 17:42:47 -0700 Subject: [PATCH 3/3] Ryan's feedback rnd2 --- .../engine/table/impl/remote/ConstructSnapshot.java | 4 ++-- .../io/deephaven/server/barrage/BarrageMessageProducer.java | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index c397ffaa89f..fdbb01bedac 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -1183,8 +1183,8 @@ public static long callDataSnapshotFunction( * this method will perform a static snapshot without locks or retrying. In this case, the {@link SnapshotControl}'s * {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false}, * {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}, and - * the clock step returned by this method will be the {@link LogicalClock#NULL_CLOCK_VALUE NULL_CLOCK_VALUE}. - * + * the {@link LogicalClock#NULL_CLOCK_VALUE NULL_CLOCK_VALUE} will be supplied to {@code usePreviousValues} and + * {@code snapshotCompletedConsistently}. * * @param logPrefix A prefix for our log messages * @param control A {@link SnapshotControl} to define the parameters and consistency for this snapshot diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 47770a115f7..33b48e654dc 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -2042,8 +2042,10 @@ private void finalizeSnapshotForSubscriptions(final List subscript || isBlinkTable; if (log.isDebugEnabled()) { - log.debug().append(logPrefix).append("finalizing snapshot for subscription=") - .append(subscription.logPrefix).append(", isComplete=").append(isComplete).endl(); + log.debug().append(logPrefix) + .append(subscription.logPrefix) + .append("finalizing snapshot isComplete=").append(isComplete) + .endl(); } if (isComplete) {