diff --git a/Base/src/main/java/io/deephaven/base/AtomicUtil.java b/Base/src/main/java/io/deephaven/base/AtomicUtil.java index 43aa96ac37b..866d11f5ee6 100644 --- a/Base/src/main/java/io/deephaven/base/AtomicUtil.java +++ b/Base/src/main/java/io/deephaven/base/AtomicUtil.java @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public abstract class AtomicUtil { @@ -267,4 +268,44 @@ public static int atomicAndNot(AtomicInteger i, int mask) { } while (!i.compareAndSet(expect, update)); return update; } + + /** + * Sets the field to the minimum of the current value and the passed in value + * + * @param o the object to update + * @param fu the field updater + * @param value the value that is a candidate for the minumum + * @return true if the minimum was set + * @param the type of o + */ + public static boolean setMin(final T o, final AtomicLongFieldUpdater fu, final long value) { + long current = fu.get(o); + while (current > value) { + if (fu.compareAndSet(o, current, value)) { + return true; + } + current = fu.get(o); + } + return false; + } + + /** + * Sets the field to the maximum of the current value and the passed in value + * + * @param o the object to update + * @param fu the field updater + * @param value the value that is a candidate for the maximum + * @return true if the maximum was set + * @param the type of o + */ + public static boolean setMax(final T o, final AtomicLongFieldUpdater fu, final long value) { + long current = fu.get(o); + while (value > current) { + if (fu.compareAndSet(o, current, value)) { + return true; + } + current = fu.get(o); + } + return false; + } } diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 65ee41424a1..92f9bfb2266 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -3,6 +3,8 @@ // package io.deephaven.base.stats; +import io.deephaven.base.AtomicUtil; + import java.text.DecimalFormat; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -12,13 +14,17 @@ public abstract class Value { AtomicLongFieldUpdater.newUpdater(Value.class, "sum"); private static final AtomicLongFieldUpdater SUM2_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "sum2"); - - volatile protected long n = 0; - volatile protected long last = 0; - volatile protected long sum = 0; - volatile protected long sum2 = 0; - volatile protected long max = Long.MIN_VALUE; - volatile protected long min = Long.MAX_VALUE; + private static final AtomicLongFieldUpdater MAX_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "max"); + private static final AtomicLongFieldUpdater MIN_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "min"); + + protected volatile long n = 0; + protected volatile long last = 0; + protected volatile long sum = 0; + protected volatile long sum2 = 0; + protected volatile long max = Long.MIN_VALUE; + protected volatile long min = Long.MAX_VALUE; private boolean alwaysUpdated = false; @@ -65,10 +71,10 @@ public void sample(long x) { SUM2_UPDATER.addAndGet(this, x * x); last = x; if (x > max) { - max = x; + AtomicUtil.setMax(this, MAX_UPDATER, x); } if (x < min) { - min = x; + AtomicUtil.setMin(this, MAX_UPDATER, x); } } @@ -113,17 +119,14 @@ public String toString() { final DecimalFormat format = new DecimalFormat("#,###"); final DecimalFormat avgFormat = new DecimalFormat("#,###.###"); - final double variance = n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN; - - return "Value{" + - "n=" + format.format(n) + - (n > 0 ? ", sum=" + format.format(sum) + - ", max=" + format.format(max) + - ", min=" + format.format(min) + - ", avg=" + avgFormat.format((n > 0 ? (double) sum / n : Double.NaN)) + - ", std=" + avgFormat.format(Math.sqrt(variance)) - : "") - + - '}'; + + if (n > 0) { + final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN); + final double avg = (double) sum / n; + return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg, + std); + } else { + return String.format("Value{n=%,d}", n); + } } } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b1f17cddfcf..e82274c805a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -56,7 +56,7 @@ For more information, see: ## Labels -- Each pull request msut have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label. +- Each pull request must have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label. - Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label. ## Styleguide diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java index f1f09f096be..08453259398 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java @@ -102,6 +102,8 @@ default ColumnSource rowSetColumn() { /** * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}. * + * @param options required for building the Index table this ColumnSource is retrieved from + * * @return The {@link RowSet} {@link ColumnSource} */ @FinalDefault diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java index 3dbdc8eeca7..8bea446650c 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java @@ -52,6 +52,15 @@ default RowKeyLookup rowKeyLookup() { return rowKeyLookup(DataIndexOptions.DEFAULT); } + /** + * Build a {@link RowKeyLookup lookup function} of row keys for this index. If {@link #isRefreshing()} is + * {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be + * in the order of the index's key columns. + * + * @param options required for building the table, if required by this RowKeyLookup + * + * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key + */ @NotNull RowKeyLookup rowKeyLookup(DataIndexOptions options); diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java index becd0948b2c..3b786c0bdeb 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -10,16 +10,20 @@ /** * Options for controlling the function of a {@link DataIndex}. * - *

- * Presently, this is used for the {@link Table#where(Filter)} operation to more efficiently handle data index matches, - * without necessarily reading all RowSet information from disk across partitions. - *

*/ @Value.Immutable @BuildableStyle public interface DataIndexOptions { + /** + * Static default options, which uses a full table. + */ DataIndexOptions DEFAULT = DataIndexOptions.builder().build(); + /** + * Static options that uses a partial table instead of the full table. + */ + DataIndexOptions USE_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build(); + /** * Does this operation use only a subset of the DataIndex? * @@ -27,6 +31,11 @@ public interface DataIndexOptions { * The DataIndex implementation may use this hint to defer work for some row sets. *

* + *

+ * Presently, this is used for the {@link Table#where(Filter)} operation to hint that work for computing + * {@link io.deephaven.engine.rowset.RowSet RowSets} for non-matching keys should be deferred. + *

+ * * @return if this operation is only going to use a subset of this data index */ @Value.Default diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index eae1bf0cb5b..73660d2ed82 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -57,28 +57,28 @@ public abstract class AbstractColumnSource implements public static boolean USE_PARTIAL_TABLE_DATA_INDEX = Configuration.getInstance() .getBooleanWithDefault("AbstractColumnSource.usePartialDataIndex", true); /** - * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result Index + * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result RowSet * can be built in serial or in parallel. By default, the index is built in parallel which may take advantage of * using more threads for I/O of the index data structure. Parallel builds do require more setup and thread * synchronization, so they can be disabled by setting the Configuration property * "AbstractColumnSource.useParallelIndexBuild" to false. */ - public static boolean USE_PARALLEL_INDEX_BUILD = Configuration.getInstance() - .getBooleanWithDefault("AbstractColumnSource.useParallelIndexBuild", true); + public static boolean USE_PARALLEL_ROWSET_BUILD = Configuration.getInstance() + .getBooleanWithDefault("AbstractColumnSource.useParallelRowSetBuild", true); /** * Duration of match() calls using a DataIndex (also provides the count). */ - public static final Value indexFilter = + public static final Value INDEX_FILTER_MILLIS = Stats.makeItem("AbstractColumnSource", "indexFilter", Counter.FACTORY, - "Duration of match() with a DataIndex in nanos") + "Duration of match() with a DataIndex in millis") .getValue(); /** * Duration of match() calls using a chunk filter (i.e. no DataIndex). */ - public static final Value chunkFilter = + public static final Value CHUNK_FILTER_MILLIS = Stats.makeItem("AbstractColumnSource", "chunkFilter", Counter.FACTORY, - "Duration of match() without a DataIndex in nanos") + "Duration of match() without a DataIndex in millis") .getValue(); /** @@ -89,13 +89,6 @@ public abstract class AbstractColumnSource implements private static final int CHUNK_SIZE = 1 << 11; - /** - * The match operation does not use the complete table, it just picks out the relevant indices so there is no reason - * to actually read it fully. - */ - private static final DataIndexOptions PARTIAL_TABLE_DATA_INDEX = - DataIndexOptions.builder().operationUsesPartialTable(true).build(); - protected final Class type; protected final Class componentType; @@ -173,7 +166,7 @@ public WritableRowSet match( return doDataIndexFilter(invertMatch, usePrev, caseInsensitive, dataIndex, rowsetToFilter, keys); } finally { final long t1 = System.nanoTime(); - indexFilter.sample(t1 - t0); + INDEX_FILTER_MILLIS.sample((t1 - t0) / 1_000_000); } } @@ -184,7 +177,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch, @NotNull final RowSet rowsetToFilter, final Object[] keys) { final DataIndexOptions partialOption = - USE_PARTIAL_TABLE_DATA_INDEX ? PARTIAL_TABLE_DATA_INDEX : DataIndexOptions.DEFAULT; + USE_PARTIAL_TABLE_DATA_INDEX ? DataIndexOptions.USE_PARTIAL_TABLE : DataIndexOptions.DEFAULT; final Table indexTable = dataIndex.table(partialOption); @@ -242,7 +235,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch, ? dataIndex.rowSetColumn(partialOption).getPrevSource() : dataIndex.rowSetColumn(partialOption); - if (USE_PARALLEL_INDEX_BUILD) { + if (USE_PARALLEL_ROWSET_BUILD) { final long[] rowKeyArray = new long[matchingIndexRows.intSize()]; matchingIndexRows.toRowKeyArray(rowKeyArray); Arrays.stream(rowKeyArray).parallel().forEach((final long rowKey) -> { @@ -290,7 +283,7 @@ private WritableRowSet doChunkFilter(final boolean invertMatch, ChunkMatchFilterFactory.getChunkFilter(type, caseInsensitive, invertMatch, keys)); } finally { final long t1 = System.nanoTime(); - chunkFilter.sample(t1 - t0); + CHUNK_FILTER_MILLIS.sample((t1 - t0) / 1_000_000); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index a19923566c8..c974fc5f6d5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -34,7 +34,6 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -54,8 +53,8 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl /** * The duration in nanos to build a DataIndex table. */ - public static final Value buildIndexTable = Stats - .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in nanos of building an index").getValue(); + public static final Value BUILD_INDEX_TABLE_MILLIS = Stats + .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in millis of building an index").getValue(); /** * When merging row sets from multiple component DataIndex structures, reading each individual component can be @@ -85,8 +84,8 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl private volatile Table indexTable; /** - * A lazy version of the table, note this value is never set if indexTable is set. A lazy table can be converted to - * an indexTable by selecting the rowset column. + * A lazy version of the table. This value is never set if indexTable is set. Can be converted to indexTable by + * selecting the RowSet column. */ private volatile Table lazyTable; @@ -145,8 +144,8 @@ public Table table(final DataIndexOptions options) { if ((localIndexTable = indexTable) != null) { return localIndexTable; } - final boolean lazyRowsetMerge = options.operationUsesPartialTable(); - if (lazyRowsetMerge) { + final boolean lazyRowSetMerge = options.operationUsesPartialTable(); + if (lazyRowSetMerge) { if ((localIndexTable = lazyTable) != null) { return localIndexTable; } @@ -154,7 +153,7 @@ public Table table(final DataIndexOptions options) { synchronized (this) { if ((localIndexTable = indexTable) != null) { return localIndexTable; - } else if (lazyRowsetMerge) { + } else if (lazyRowSetMerge) { if ((localIndexTable = lazyTable) != null) { return localIndexTable; } @@ -162,7 +161,7 @@ public Table table(final DataIndexOptions options) { try { return QueryPerformanceRecorder.withNugget( String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)), - ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowsetMerge))); + ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowSetMerge))); } catch (Throwable t) { isCorrupt = true; throw t; @@ -182,11 +181,11 @@ public Table table(final DataIndexOptions options) { * array to avoid duplicated work. *

*/ - private static class RowsetCacher { + private static class RowSetCacher { final ColumnSource> source; final AtomicReferenceArray results; - private RowsetCacher(final ColumnSource> source, final int capacity) { + private RowSetCacher(final ColumnSource> source, final int capacity) { this.source = source; this.results = new AtomicReferenceArray<>(capacity); } @@ -216,7 +215,7 @@ RowSet get(final long rowKey) { } // we need to create our own placeholder, and synchronize on it first - final ReentrantLock placeholder = new ReentrantLock(); + final Object placeholder = new Object(); // noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (placeholder) { if (!results.compareAndSet(iRowKey, null, placeholder)) { @@ -226,22 +225,17 @@ RowSet get(final long rowKey) { // it is our responsibility to get the right answer final ObjectVector inputRowsets = source.get(rowKey); Assert.neqNull(inputRowsets, "inputRowsets"); - assert inputRowsets != null; // need to get the value and set it into our own value final RowSet computedResult; try { + // noinspection DataFlowIssue computedResult = mergeRowSets(rowKey, inputRowsets); } catch (Exception e) { - if (!results.compareAndSet(iRowKey, placeholder, e)) { - throw new IllegalStateException("another thread changed our cache placeholder!"); - } + results.set(iRowKey, e); throw e; } - if (!results.compareAndSet(iRowKey, placeholder, computedResult)) { - throw new IllegalStateException("another thread changed our cache placeholder!"); - } - + results.set(iRowKey, computedResult); return computedResult; } } while (true); @@ -252,11 +246,10 @@ private Table buildTable(final boolean lazyRowsetMerge) { if (lazyTable != null) { if (lazyRowsetMerge) { return lazyTable; - } else { - indexTable = lazyTable.select(); - lazyTable = null; - return indexTable; } + indexTable = lazyTable.select(); + lazyTable = null; + return indexTable; } final long t0 = System.nanoTime(); @@ -264,8 +257,9 @@ private Table buildTable(final boolean lazyRowsetMerge) { final Table locationTable = columnSourceManager.locationTable().coalesce(); // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by - // the appropriate region offset. The row sets are not forced into memory; but keys are to enable efficient - // grouping. The rowsets are read into memory as part of the {@link #mergeRowSets} call. + // the appropriate region offset. The row sets are not forced into memory, but keys are in order to enable + // efficient + // grouping. The rowsets are read into memory as part of the mergeRowSets call. final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); final Table locationDataIndexes = locationTable .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( @@ -288,9 +282,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { // we are using a regular array here; so we must ensure that we are flat; or we'll have a bad time Assert.assertion(groupedByKeyColumns.isFlat(), "groupedByKeyColumns.isFlat()"); - final RowsetCacher rowsetCacher = new RowsetCacher(vectorColumnSource, groupedByKeyColumns.intSize()); - // need to do something better with a magic holder that looks at the rowset column source and lazily - // merges them instead of this version that actually wants to have an input and doesn't cache anything + final RowSetCacher rowsetCacher = new RowSetCacher(vectorColumnSource, groupedByKeyColumns.intSize()); combined = groupedByKeyColumns .view(List.of(SelectColumn.ofStateless(new MultiSourceFunctionalColumn<>(List.of(), ROW_SET_COLUMN_NAME, RowSet.class, (k, v) -> rowsetCacher.get(k))))); @@ -317,7 +309,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { return combined; } finally { final long t1 = System.nanoTime(); - buildIndexTable.sample(t1 - t0); + BUILD_INDEX_TABLE_MILLIS.sample((t1 - t0) / 1_000_000); } } @@ -335,7 +327,7 @@ private static Table loadIndexTableAndShiftRowSets( final Table coalesced = indexTable.coalesce(); // pull the key columns into memory while we are parallel; - final Table withInMemorykeyColumns = coalesced.update(keyColumnNames); + final Table withInMemoryKeyColumns = coalesced.update(keyColumnNames); final Selectable shiftFunction; if (shiftAmount == 0) { @@ -349,7 +341,7 @@ private static Table loadIndexTableAndShiftRowSets( } // the rowset column shift need not occur until we perform the rowset merge operation - which is either // lazy or part of an update [which itself can be parallel]. - return withInMemorykeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); + return withInMemoryKeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); } private static RowSet mergeRowSets(