Skip to content

Commit

Permalink
simple changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Jan 22, 2025
1 parent a15b998 commit ef721a7
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 76 deletions.
41 changes: 41 additions & 0 deletions Base/src/main/java/io/deephaven/base/AtomicUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public abstract class AtomicUtil {

Expand Down Expand Up @@ -267,4 +268,44 @@ public static int atomicAndNot(AtomicInteger i, int mask) {
} while (!i.compareAndSet(expect, update));
return update;
}

/**
* Sets the field to the minimum of the current value and the passed in value
*
* @param o the object to update
* @param fu the field updater
* @param value the value that is a candidate for the minumum
* @return true if the minimum was set
* @param <T> the type of o
*/
public static <T> boolean setMin(final T o, final AtomicLongFieldUpdater<T> fu, final long value) {
long current = fu.get(o);
while (current > value) {
if (fu.compareAndSet(o, current, value)) {
return true;
}
current = fu.get(o);
}
return false;
}

/**
* Sets the field to the maximum of the current value and the passed in value
*
* @param o the object to update
* @param fu the field updater
* @param value the value that is a candidate for the maximum
* @return true if the maximum was set
* @param <T> the type of o
*/
public static <T> boolean setMax(final T o, final AtomicLongFieldUpdater<T> fu, final long value) {
long current = fu.get(o);
while (value > current) {
if (fu.compareAndSet(o, current, value)) {
return true;
}
current = fu.get(o);
}
return false;
}
}
45 changes: 24 additions & 21 deletions Base/src/main/java/io/deephaven/base/stats/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package io.deephaven.base.stats;

import io.deephaven.base.AtomicUtil;

import java.text.DecimalFormat;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

Expand All @@ -12,13 +14,17 @@ public abstract class Value {
AtomicLongFieldUpdater.newUpdater(Value.class, "sum");
private static final AtomicLongFieldUpdater<Value> SUM2_UPDATER =
AtomicLongFieldUpdater.newUpdater(Value.class, "sum2");

volatile protected long n = 0;
volatile protected long last = 0;
volatile protected long sum = 0;
volatile protected long sum2 = 0;
volatile protected long max = Long.MIN_VALUE;
volatile protected long min = Long.MAX_VALUE;
private static final AtomicLongFieldUpdater<Value> MAX_UPDATER =
AtomicLongFieldUpdater.newUpdater(Value.class, "max");
private static final AtomicLongFieldUpdater<Value> MIN_UPDATER =
AtomicLongFieldUpdater.newUpdater(Value.class, "min");

protected volatile long n = 0;
protected volatile long last = 0;
protected volatile long sum = 0;
protected volatile long sum2 = 0;
protected volatile long max = Long.MIN_VALUE;
protected volatile long min = Long.MAX_VALUE;

private boolean alwaysUpdated = false;

Expand Down Expand Up @@ -65,10 +71,10 @@ public void sample(long x) {
SUM2_UPDATER.addAndGet(this, x * x);
last = x;
if (x > max) {
max = x;
AtomicUtil.setMax(this, MAX_UPDATER, x);
}
if (x < min) {
min = x;
AtomicUtil.setMin(this, MAX_UPDATER, x);
}
}

Expand Down Expand Up @@ -113,17 +119,14 @@ public String toString() {
final DecimalFormat format = new DecimalFormat("#,###");
final DecimalFormat avgFormat = new DecimalFormat("#,###.###");

final double variance = n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN;

return "Value{" +
"n=" + format.format(n) +
(n > 0 ? ", sum=" + format.format(sum) +
", max=" + format.format(max) +
", min=" + format.format(min) +
", avg=" + avgFormat.format((n > 0 ? (double) sum / n : Double.NaN)) +
", std=" + avgFormat.format(Math.sqrt(variance))
: "")
+
'}';

if (n > 0) {
final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN);
final double avg = (double) sum / n;
return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg,
std);
} else {
return String.format("Value{n=%,d}", n);
}
}
}
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ For more information, see:

## Labels

- Each pull request msut have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label.
- Each pull request must have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label.
- Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label.

## Styleguide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ default ColumnSource<RowSet> rowSetColumn() {
/**
* Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}.
*
* @param options required for building the Index table this ColumnSource is retrieved from
*
* @return The {@link RowSet} {@link ColumnSource}
*/
@FinalDefault
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ default RowKeyLookup rowKeyLookup() {
return rowKeyLookup(DataIndexOptions.DEFAULT);
}

/**
* Build a {@link RowKeyLookup lookup function} of row keys for this index. If {@link #isRefreshing()} is
* {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be
* in the order of the index's key columns.
*
* @param options required for building the table, if required by this RowKeyLookup
*
* @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key
*/
@NotNull
RowKeyLookup rowKeyLookup(DataIndexOptions options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,32 @@
/**
* Options for controlling the function of a {@link DataIndex}.
*
* <p>
* Presently, this is used for the {@link Table#where(Filter)} operation to more efficiently handle data index matches,
* without necessarily reading all RowSet information from disk across partitions.
* </p>
*/
@Value.Immutable
@BuildableStyle
public interface DataIndexOptions {
/**
* Static default options, which uses a full table.
*/
DataIndexOptions DEFAULT = DataIndexOptions.builder().build();

/**
* Static options that uses a partial table instead of the full table.
*/
DataIndexOptions USE_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build();

/**
* Does this operation use only a subset of the DataIndex?
*
* <p>
* The DataIndex implementation may use this hint to defer work for some row sets.
* </p>
*
* <p>
* Presently, this is used for the {@link Table#where(Filter)} operation to hint that work for computing
* {@link io.deephaven.engine.rowset.RowSet RowSets} for non-matching keys should be deferred.
* </p>
*
* @return if this operation is only going to use a subset of this data index
*/
@Value.Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ public abstract class AbstractColumnSource<T> implements
public static boolean USE_PARTIAL_TABLE_DATA_INDEX = Configuration.getInstance()
.getBooleanWithDefault("AbstractColumnSource.usePartialDataIndex", true);
/**
* After generating a DataIndex table and identifying which row keys are responsive to the filter, the result Index
* After generating a DataIndex table and identifying which row keys are responsive to the filter, the result RowSet
* can be built in serial or in parallel. By default, the index is built in parallel which may take advantage of
* using more threads for I/O of the index data structure. Parallel builds do require more setup and thread
* synchronization, so they can be disabled by setting the Configuration property
* "AbstractColumnSource.useParallelIndexBuild" to false.
*/
public static boolean USE_PARALLEL_INDEX_BUILD = Configuration.getInstance()
.getBooleanWithDefault("AbstractColumnSource.useParallelIndexBuild", true);
public static boolean USE_PARALLEL_ROWSET_BUILD = Configuration.getInstance()
.getBooleanWithDefault("AbstractColumnSource.useParallelRowSetBuild", true);

/**
* Duration of match() calls using a DataIndex (also provides the count).
*/
public static final Value indexFilter =
public static final Value INDEX_FILTER_MILLIS =
Stats.makeItem("AbstractColumnSource", "indexFilter", Counter.FACTORY,
"Duration of match() with a DataIndex in nanos")
"Duration of match() with a DataIndex in millis")
.getValue();
/**
* Duration of match() calls using a chunk filter (i.e. no DataIndex).
*/
public static final Value chunkFilter =
public static final Value CHUNK_FILTER_MILLIS =
Stats.makeItem("AbstractColumnSource", "chunkFilter", Counter.FACTORY,
"Duration of match() without a DataIndex in nanos")
"Duration of match() without a DataIndex in millis")
.getValue();

/**
Expand All @@ -89,13 +89,6 @@ public abstract class AbstractColumnSource<T> implements

private static final int CHUNK_SIZE = 1 << 11;

/**
* The match operation does not use the complete table, it just picks out the relevant indices so there is no reason
* to actually read it fully.
*/
private static final DataIndexOptions PARTIAL_TABLE_DATA_INDEX =
DataIndexOptions.builder().operationUsesPartialTable(true).build();

protected final Class<T> type;
protected final Class<?> componentType;

Expand Down Expand Up @@ -173,7 +166,7 @@ public WritableRowSet match(
return doDataIndexFilter(invertMatch, usePrev, caseInsensitive, dataIndex, rowsetToFilter, keys);
} finally {
final long t1 = System.nanoTime();
indexFilter.sample(t1 - t0);
INDEX_FILTER_MILLIS.sample((t1 - t0) / 1_000_000);
}
}

Expand All @@ -184,7 +177,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch,
@NotNull final RowSet rowsetToFilter,
final Object[] keys) {
final DataIndexOptions partialOption =
USE_PARTIAL_TABLE_DATA_INDEX ? PARTIAL_TABLE_DATA_INDEX : DataIndexOptions.DEFAULT;
USE_PARTIAL_TABLE_DATA_INDEX ? DataIndexOptions.USE_PARTIAL_TABLE : DataIndexOptions.DEFAULT;

final Table indexTable = dataIndex.table(partialOption);

Expand Down Expand Up @@ -242,7 +235,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch,
? dataIndex.rowSetColumn(partialOption).getPrevSource()
: dataIndex.rowSetColumn(partialOption);

if (USE_PARALLEL_INDEX_BUILD) {
if (USE_PARALLEL_ROWSET_BUILD) {
final long[] rowKeyArray = new long[matchingIndexRows.intSize()];
matchingIndexRows.toRowKeyArray(rowKeyArray);
Arrays.stream(rowKeyArray).parallel().forEach((final long rowKey) -> {
Expand Down Expand Up @@ -290,7 +283,7 @@ private WritableRowSet doChunkFilter(final boolean invertMatch,
ChunkMatchFilterFactory.getChunkFilter(type, caseInsensitive, invertMatch, keys));
} finally {
final long t1 = System.nanoTime();
chunkFilter.sample(t1 - t0);
CHUNK_FILTER_MILLIS.sample((t1 - t0) / 1_000_000);
}
}

Expand Down
Loading

0 comments on commit ef721a7

Please sign in to comment.