Skip to content

Commit

Permalink
Add RollingFormula feature to UpdateBy (#5085)
Browse files Browse the repository at this point in the history
* Cleanup on base operators.

* Initial commit, working and passing tests.

* Optimized to re-use formula columns when possible.

* Shared formula column tested (and fixed).

* Proto file changes and java client updates.

* More spotless.

* Refactor accomplished, functional but ugly.

* Missing file, sigh.

* Python test fix.

* Cleanup and more documentation.

* Removed unneeded annotations from the Rolling op specs

* Addressing PR comments.

* Spotless.

* table.proto reversion

* Correct misnaming.

* PR comments addressed.

* Minor doc changes.

* Python client added, documentation updated.

* Python client documentation improvements.
  • Loading branch information
lbooker42 authored Feb 12, 2024
1 parent 78e7460 commit c0f35f7
Show file tree
Hide file tree
Showing 188 changed files with 12,901 additions and 5,673 deletions.
2,210 changes: 1,326 additions & 884 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.cc

Large diffs are not rendered by default.

766 changes: 681 additions & 85 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.h

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.deephaven.engine.table.impl.select.SourceColumn;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.table.impl.updateby.UpdateBy;
import io.deephaven.engine.updategraph.NotificationQueue.Dependency;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.TableTools;
Expand Down Expand Up @@ -599,7 +600,9 @@ public PartitionedTable.Proxy aggBy(Collection<? extends Aggregation> aggregatio
@Override
public PartitionedTable.Proxy updateBy(UpdateByControl control, Collection<? extends UpdateByOperation> operations,
Collection<? extends ColumnName> byColumns) {
return basicTransform(ct -> ct.updateBy(control, operations, byColumns));
final UpdateBy.UpdateByOperatorCollection collection = UpdateBy.UpdateByOperatorCollection
.from(target.constituentDefinition(), control, operations, byColumns);
return basicTransform(ct -> UpdateBy.updateBy((QueryTable) ct, collection.copy(), control));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.deephaven.engine.table.impl.updateby;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.api.updateby.UpdateByControl;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.exceptions.CancellationException;
Expand All @@ -17,7 +16,10 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -64,7 +66,7 @@ class BucketedPartitionedUpdateByManager extends UpdateBy {
@NotNull final QueryTable source,
@NotNull final String[] preservedColumns,
@NotNull final Map<String, ? extends ColumnSource<?>> resultSources,
@NotNull final Collection<? extends ColumnName> byColumns,
@NotNull final String[] byColumnNames,
@Nullable final String timestampColumnName,
@Nullable final RowRedirection rowRedirection,
@NotNull final UpdateByControl control) {
Expand All @@ -73,8 +75,6 @@ class BucketedPartitionedUpdateByManager extends UpdateBy {
// this table will always have the rowset of the source
result = new QueryTable(source.getRowSet(), resultSources);

final String[] byColumnNames = byColumns.stream().map(ColumnName::name).toArray(String[]::new);

final Table transformedTable = LivenessScopeStack.computeEnclosed(() -> {
final PartitionedTable partitioned = source.partitionedAggBy(List.of(), true, null, byColumnNames);
final PartitionedTable transformed = partitioned.transform(t -> {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.util.RowRedirection;
Expand Down Expand Up @@ -35,13 +38,13 @@
public abstract class UpdateByOperator {
protected final MatchPair pair;
protected final String[] affectingColumns;
protected final RowRedirection rowRedirection;

protected final long reverseWindowScaleUnits;
protected final long forwardWindowScaleUnits;
protected final String timestampColumnName;
protected final boolean isWindowed;

final boolean isWindowed;
protected RowRedirection rowRedirection;

/**
* The input modifiedColumnSet for this operator
Expand Down Expand Up @@ -126,20 +129,32 @@ public abstract void accumulateRolling(RowSequence inputKeys,

protected UpdateByOperator(@NotNull final MatchPair pair,
@NotNull final String[] affectingColumns,
@Nullable final RowRedirection rowRedirection,
@Nullable final String timestampColumnName,
final long reverseWindowScaleUnits,
final long forwardWindowScaleUnits,
final boolean isWindowed) {
this.pair = pair;
this.affectingColumns = affectingColumns;
this.rowRedirection = rowRedirection;
this.timestampColumnName = timestampColumnName;
this.reverseWindowScaleUnits = reverseWindowScaleUnits;
this.forwardWindowScaleUnits = forwardWindowScaleUnits;
this.isWindowed = isWindowed;
}

/**
* Create an uninitialized copy of this operator. {@link #initializeSources(Table, RowRedirection)} must be called
* before this operator can be used.
*
* @return a copy of this operator
*/
public abstract UpdateByOperator copy();

/**
* Initialize this operator with a specific source table (and row redirection if needed). This will be called
* exactly once per operator.
*/
public abstract void initializeSources(@NotNull Table source, @Nullable RowRedirection rowRedirection);

/**
* Initialize the bucket context for a cumulative operator
*/
Expand Down Expand Up @@ -240,6 +255,7 @@ protected String[] getOutputColumnNames() {
*
* @param context the context object
*/
@SuppressWarnings("unused")
protected void finishUpdate(@NotNull final Context context) {}

/**
Expand Down
Loading

0 comments on commit c0f35f7

Please sign in to comment.