Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Blink-To-Append-Only Memoization and Apply to TableLoggers #4880

Merged
merged 5 commits into from
Nov 24, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -118,6 +118,10 @@ public static MemoizedOperationKey rollup(Collection<? extends Aggregation> aggr
includeConstituents);
}

public static MemoizedOperationKey blinkToAppendOnly(final long sizeLimit, @NotNull final Object key) {
return new BlinkToAppendOnly(sizeLimit, key);
}

private static boolean isMemoizable(SelectColumn[] selectColumn) {
return Arrays.stream(selectColumn)
.allMatch(sc -> sc instanceof SourceColumn || sc instanceof ReinterpretedColumn);
@@ -540,7 +544,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}

public static WouldMatch wouldMatch(WouldMatchPair... pairs) {
public static MemoizedOperationKey wouldMatch(WouldMatchPair... pairs) {
return new WouldMatch(pairs);
}

@@ -592,7 +596,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}

public static CrossJoin crossJoin(final Table rightTable, final MatchPair[] columnsToMatch,
public static MemoizedOperationKey crossJoin(final Table rightTable, final MatchPair[] columnsToMatch,
final MatchPair[] columnsToAdd, final int numRightBitsToReserve) {
return new CrossJoin(rightTable, columnsToMatch, columnsToAdd, numRightBitsToReserve);
}
@@ -650,7 +654,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}

public static RangeJoin rangeJoin(
public static MemoizedOperationKey rangeJoin(
@NotNull final Table rightTable,
@NotNull final Collection<? extends JoinMatch> exactMatches,
@NotNull final RangeJoinMatch rangeMatch,
@@ -672,4 +676,38 @@ protected static boolean equalWeakRefsByReferentIdentity(final WeakReference<?>
}
return t1 == t2;
}

private static class BlinkToAppendOnly extends AttributeAgnosticMemoizedOperationKey {
private final long sizeLimit;
private final Object key;

private BlinkToAppendOnly(final long sizeLimit, @NotNull final Object key) {
this.sizeLimit = sizeLimit;
this.key = Objects.requireNonNull(key);
}

@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}

final BlinkToAppendOnly blinkToAppendOnly = (BlinkToAppendOnly) other;

return sizeLimit == blinkToAppendOnly.sizeLimit && key.equals(blinkToAppendOnly.key);
}

@Override
public int hashCode() {
return 31 * key.hashCode() + Long.hashCode(sizeLimit);
}

@Override
BaseTable.CopyAttributeOperation copyType() {
return BaseTable.CopyAttributeOperation.None;
}
}
}
Original file line number Diff line number Diff line change
@@ -105,9 +105,6 @@ public synchronized boolean snapshotCompletedConsistently(
final boolean usedPreviousValues) {
final boolean snapshotConsistent;
if (isInInitialNotificationWindow()) {
if (eventualListener == null) {
throw new IllegalStateException("Listener has not been set on end!");
}
if (eventualResult == null) {
throw new IllegalStateException("Result has not been set on end!");
}
@@ -132,7 +129,7 @@ public synchronized boolean snapshotCompletedConsistently(

// Be sure to record initial last notification step before subscribing
eventualResult.setLastNotificationStep(lastNotificationStep);
return subscribeForUpdates(eventualListener);
return eventualListener == null || subscribeForUpdates(eventualListener);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

/**
@@ -160,7 +157,7 @@ boolean subscribeForUpdates(@NotNull final TableUpdateListener listener) {
* @param resultTable The table that will result from this operation
*/
public synchronized void setListenerAndResult(
@NotNull final TableUpdateListener listener,
final TableUpdateListener listener,
@NotNull final NotificationStepReceiver resultTable) {
eventualListener = listener;
eventualResult = resultTable;
Original file line number Diff line number Diff line change
@@ -100,7 +100,7 @@ public synchronized boolean snapshotCompletedConsistently(long afterClockValue,
}

@Override
public synchronized void setListenerAndResult(@NotNull final TableUpdateListener listener,
public synchronized void setListenerAndResult(final TableUpdateListener listener,
@NotNull final NotificationStepReceiver resultTable) {
super.setListenerAndResult(listener, resultTable);
if (DEBUG) {
Original file line number Diff line number Diff line change
@@ -116,15 +116,19 @@ default boolean snapshotNeeded() {
*/
class Result<T extends DynamicNode & NotificationStepReceiver> {
public final T resultNode;
public final TableUpdateListener resultListener; // may be null if parent is non-ticking
/**
* The listener that should be attached to the parent. The listener may be null if the table does not need
* to respond to ticks from other sources (e.g. the parent is non-refreshing).
*/
public final TableUpdateListener resultListener;

public Result(@NotNull final T resultNode) {
this(resultNode, null);
}

/**
* Construct the result of an operation. The listener may be null if the parent is non-ticking and the table
* does not need to respond to ticks from other sources.
* Construct the result of an operation. The listener may be null if the table does not need to respond to
* ticks from other sources (e.g. the parent is non-refreshing).
*
* @param resultNode the result of the operation
* @param resultListener the listener that should be attached to the parent (or null)
@@ -3537,8 +3541,7 @@ private <T extends DynamicNode & NotificationStepReceiver> T getResultNoMemo(fin

resultTable.setValue(result.resultNode);
if (snapshotControl != null) {
snapshotControl.setListenerAndResult(Require.neqNull(result.resultListener, "resultListener"),
result.resultNode);
snapshotControl.setListenerAndResult(result.resultListener, result.resultNode);
}

return true;
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ public class EngineMetrics {
private static final Logger log = LoggerFactory.getLogger(EngineMetrics.class);
private static final boolean STATS_LOGGING_ENABLED = Configuration.getInstance().getBooleanWithDefault(
"statsLoggingEnabled", true);

private static volatile ProcessInfo PROCESS_INFO;
private static volatile EngineMetrics ENGINE_METRICS;

@@ -105,7 +106,9 @@ public QueryTable getProcessInfoQueryTable() {
}

public QueryTable getProcessMetricsQueryTable() {
return statsImpl == null ? null : (QueryTable) BlinkTableTools.blinkToAppendOnly(statsImpl.blinkTable());
return statsImpl == null
? null
: (QueryTable) BlinkTableTools.blinkToAppendOnly(statsImpl.blinkTable());
}

private StatsIntradayLogger getStatsLogger() {
Original file line number Diff line number Diff line change
@@ -934,13 +934,28 @@ public void markSourcesRefreshedForUnitTests() {
*/
@TestUseOnly
public void completeCycleForUnitTests() {
completeCycleForUnitTests(false);
}

/**
* Do the second half of the update cycle, including flushing notifications, and completing the
* {@link LogicalClockImpl#completeUpdateCycle() LogicalClock} update cycle. Note that this happens on a simulated
* UpdateGraph run thread, rather than this thread.
*
* @param errorCaughtAndInFinallyBlock Whether an error was caught, and we are in a {@code finally} block
*/
private void completeCycleForUnitTests(boolean errorCaughtAndInFinallyBlock) {
Assert.assertion(unitTestMode, "unitTestMode");
Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
"logicalClock.currentStep()");
if (!errorCaughtAndInFinallyBlock) {
Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
"logicalClock.currentStep()");
}
try {
unitTestRefreshThreadPool.submit(this::completeCycleForUnitTestsInternal).get();
} catch (InterruptedException | ExecutionException e) {
throw new UncheckedDeephavenException(e);
if (!errorCaughtAndInFinallyBlock) {
throw new UncheckedDeephavenException(e);
}
}
}

@@ -986,10 +1001,14 @@ public <T extends Exception> void runWithinUnitTestCycle(
final boolean sourcesSatisfied)
throws T {
startCycleForUnitTests(sourcesSatisfied);
boolean errorCaught = false;
try {
runnable.run();
} catch (final Throwable err) {
errorCaught = true;
throw err;
} finally {
completeCycleForUnitTests();
completeCycleForUnitTests(errorCaught);
}
}

Loading