Skip to content

Commit

Permalink
CumCountWhere added and tests complete.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jan 15, 2025
1 parent 521f97c commit c6c3a60
Show file tree
Hide file tree
Showing 11 changed files with 3,489 additions and 2,528 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.updateby.countwhere.CountFilter;
import io.deephaven.engine.table.impl.updateby.countwhere.CountWhereOperator;
import io.deephaven.engine.table.impl.updateby.delta.*;
import io.deephaven.engine.table.impl.updateby.em.*;
import io.deephaven.engine.table.impl.updateby.emstd.*;
Expand All @@ -32,7 +33,6 @@
import io.deephaven.engine.table.impl.updateby.prod.*;
import io.deephaven.engine.table.impl.updateby.rollingavg.*;
import io.deephaven.engine.table.impl.updateby.rollingcount.*;
import io.deephaven.engine.table.impl.updateby.countwhere.RollingCountWhereOperator;
import io.deephaven.engine.table.impl.updateby.rollingformula.*;
import io.deephaven.engine.table.impl.updateby.rollingformulamulticolumn.RollingFormulaMultiColumnOperator;
import io.deephaven.engine.table.impl.updateby.rollinggroup.RollingGroupOperator;
Expand Down Expand Up @@ -423,6 +423,12 @@ public Void visit(CumProdSpec cps) {
return null;
}

@Override
public Void visit(CumCountWhereSpec spec) {
ops.add(makeCumCountWhereOperator(tableDef, spec));
return null;
}

@Override
public Void visit(@NotNull final DeltaSpec spec) {
Arrays.stream(pairs)
Expand Down Expand Up @@ -1255,6 +1261,79 @@ private UpdateByOperator makeRollingCountOperator(@NotNull final MatchPair pair,
}
}

private UpdateByOperator makeCumCountWhereOperator(
@NotNull final TableDefinition tableDef,
@NotNull final CumCountWhereSpec rs) {
final WhereFilter[] whereFilters = WhereFilter.fromInternal(rs.filter());

final List<String> inputColumnNameList = new ArrayList<>();
final Map<String, Integer> inputColumnMap = new HashMap<>();
final List<int[]> filterInputColumnIndicesList = new ArrayList<>();

// Verify all the columns in the where filters are present in the table def and valid for use.
for (final WhereFilter whereFilter : whereFilters) {
whereFilter.init(tableDef);
if (whereFilter.isRefreshing()) {
throw new UnsupportedOperationException("CumCountWhere does not support refreshing filters");
}

// Compute which input sources this filter will use.
final List<String> filterColumnName = whereFilter.getColumns();
final int inputColumnCount = whereFilter.getColumns().size();
final int[] inputColumnIndices = new int[inputColumnCount];
for (int ii = 0; ii < inputColumnCount; ++ii) {
final String inputColumnName = filterColumnName.get(ii);
final int inputColumnIndex = inputColumnMap.computeIfAbsent(inputColumnName, k -> {
inputColumnNameList.add(inputColumnName);
return inputColumnNameList.size() - 1;
});
inputColumnIndices[ii] = inputColumnIndex;
}
filterInputColumnIndicesList.add(inputColumnIndices);
}

// Gather the input column type info.
final String[] inputColumnNames = inputColumnNameList.toArray(String[]::new);
final ChunkType[] inputChunkTypes = new ChunkType[inputColumnNames.length];
final Class<?>[] inputColumnTypes = new Class[inputColumnNames.length];
final Class<?>[] inputComponentTypes = new Class[inputColumnNames.length];
for (int i = 0; i < inputColumnNames.length; i++) {
final ColumnDefinition<?> columnDef = tableDef.getColumn(inputColumnNames[i]);
inputColumnTypes[i] = columnDef.getDataType();
inputChunkTypes[i] = ChunkType.fromElementType(inputColumnTypes[i]);
inputComponentTypes[i] = columnDef.getComponentType();
}

// Create a dummy table we can use to initialize filters.
final Map<String, ColumnSource<?>> columnSourceMap = new LinkedHashMap<>();
for (int i = 0; i < inputColumnNames.length; i++) {
final ColumnDefinition<?> columnDef = tableDef.getColumn(inputColumnNames[i]);
final ColumnSource<?> source =
NullValueColumnSource.getInstance(columnDef.getDataType(), columnDef.getComponentType());
columnSourceMap.put(inputColumnNames[i], source);
}
final Table dummyTable = new QueryTable(RowSetFactory.empty().toTracking(), columnSourceMap);

final CountFilter[] countFilters =
CountFilter.createCountFilters(whereFilters, dummyTable, filterInputColumnIndicesList);

// If any filter is a standard WhereFilter, we need a chunk source table.
final boolean chunkSourceTableRequired =
Arrays.asList(countFilters).stream().anyMatch(filter -> filter.whereFilter() != null);

// Create a new column pair with the same name for the left and right columns
final MatchPair pair = new MatchPair(rs.column().name(), rs.column().name());

return new CountWhereOperator(
pair,
countFilters,
inputColumnNames,
inputChunkTypes,
inputColumnTypes,
inputComponentTypes,
chunkSourceTableRequired);
}

private UpdateByOperator makeRollingCountWhereOperator(
@NotNull final TableDefinition tableDef,
@NotNull final RollingCountWhereSpec rs) {
Expand Down Expand Up @@ -1328,7 +1407,7 @@ private UpdateByOperator makeRollingCountWhereOperator(
// Create a new column pair with the same name for the left and right columns
final MatchPair pair = new MatchPair(rs.column().name(), rs.column().name());

return new RollingCountWhereOperator(
return new CountWhereOperator(
pair,
affectingColumns,
rs.revWindowScale().timestampCol(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@

import static io.deephaven.util.QueryConstants.*;

// TODO: this operator can alomost certainly be re-used with minor changes for the cumulative version as well. Should
// probably be called CountWhereOperator, even though that sorta collides with AggBy operator name.

public class RollingCountWhereOperator extends BaseLongUpdateByOperator {
public class CountWhereOperator extends BaseLongUpdateByOperator {
private static final int BUFFER_INITIAL_CAPACITY = 512;

/**
Expand Down Expand Up @@ -192,10 +189,7 @@ public void setValueChunks(@NotNull Chunk<? extends Values>[] valueChunks) {
* Do the work of applying the filters against the input data and assigning true to the result chunk where all
* filters pass, false otherwise.
*/
private void applyFilters(
final Chunk<? extends Values>[] inputChunks,
final int chunkSize) {

private void applyFilters(final int chunkSize) {
// Use the filters to populate a boolean buffer with the filter results.
boolean initialized = false;
WritableRowSet remainingRows = null;
Expand Down Expand Up @@ -264,7 +258,7 @@ public void accumulateRolling(
setValueChunks(influencerValueChunkArr);
setPosChunks(affectedPosChunk, influencerPosChunk);

applyFilters(influencerValueChunkArr, influencerCount);
applyFilters(influencerCount);

int pushIndex = 0;

Expand Down Expand Up @@ -297,6 +291,26 @@ public void accumulateRolling(
writeToOutputColumn(inputKeys);
}

@Override
public void accumulateCumulative(
@NotNull final RowSequence inputKeys,
@NotNull final Chunk<? extends Values>[] valueChunkArr,
@Nullable final LongChunk<? extends Values> tsChunk,
final int len) {

setValueChunks(valueChunkArr);
applyFilters(len);

// chunk processing
for (int ii = 0; ii < len; ii++) {
push(ii, 1);
writeToOutputChunk(ii);
}

// chunk output to column
writeToOutputColumn(inputKeys);
}

@Override
protected void push(int pos, int count) {
// Push from the pre-computed results chunk into the buffer.
Expand Down Expand Up @@ -325,16 +339,6 @@ protected void pop(int count) {
}
}
}

@Override
public void accumulateCumulative(
@NotNull final RowSequence inputKeys,
@NotNull final Chunk<? extends Values>[] valueChunkArr,
@Nullable final LongChunk<? extends Values> tsChunk,
final int len) {
throw new UnsupportedOperationException(
"RollingCountWhereOperator is not supported in cumulative operations.");
}
}

/**
Expand All @@ -351,7 +355,7 @@ private static WritableRowSet buildFromBooleanChunk(final BooleanChunk<Values> v
}

/**
* Create a new RollingCountWhereOperator.
* Create a new CountWhereOperator for rolling / windowed operations.
*
* @param pair Contains the output column name as a MatchPair
* @param affectingColumns The names of the columns that when changed would affect this formula output
Expand All @@ -365,7 +369,7 @@ private static WritableRowSet buildFromBooleanChunk(final BooleanChunk<Values> v
* @param inputColumnTypes The data types for each input column
* @param inputComponentTypes The component types for each input column
*/
public RollingCountWhereOperator(
public CountWhereOperator(
@NotNull final MatchPair pair,
@NotNull final String[] affectingColumns,
@Nullable final String timestampColumnName,
Expand All @@ -386,9 +390,46 @@ public RollingCountWhereOperator(
this.chunkSourceTableRequired = chunkSourceTableRequired;
}

/**
* Create a new CountWhereOperator for cumulative operations.
*
* @param pair Contains the output column name as a MatchPair
* @param filters the filters to apply to the input columns
* @param inputColumnNames The names of the key columns to be used as inputs
* @param inputChunkTypes The chunk types for each input column
* @param inputColumnTypes The data types for each input column
* @param inputComponentTypes The component types for each input column
*/
public CountWhereOperator(
@NotNull final MatchPair pair,
final CountFilter[] filters,
final String[] inputColumnNames,
final ChunkType[] inputChunkTypes,
final Class<?>[] inputColumnTypes,
final Class<?>[] inputComponentTypes,
final boolean chunkSourceTableRequired) {
super(pair, inputColumnNames, null, 0, 0, false);
this.filters = filters;
this.inputColumnNames = inputColumnNames;
this.inputChunkTypes = inputChunkTypes;
this.inputColumnTypes = inputColumnTypes;
this.inputComponentTypes = inputComponentTypes;
this.chunkSourceTableRequired = chunkSourceTableRequired;
}

@Override
public UpdateByOperator copy() {
return new RollingCountWhereOperator(
if (!isWindowed) {
return new CountWhereOperator(
pair,
filters,
inputColumnNames,
inputChunkTypes,
inputColumnTypes,
inputComponentTypes,
chunkSourceTableRequired);
}
return new CountWhereOperator(
pair,
affectingColumns,
timestampColumnName,
Expand Down
Loading

0 comments on commit c6c3a60

Please sign in to comment.