Skip to content

Commit

Permalink
Port DH-11168: Indexing enhancements to Community. (deephaven#3851)
Browse files Browse the repository at this point in the history
Data Indexing

* Unifies APIs for working with data indexes

* Expands persistent indexing support to include non-clustered and multi-column indexes in addition to the existing clustered, single-column index support (Parquet specific)

* Replaces "grouping" support, which is deprecated and adapted to "indexing" support in the persistence layers to ensure backwards-compatibility (Parquet specific)

* Adds new, user-facing APIs for accessing data indexes available from any Deephaven Table

* Adds new, user-facing APIs for adding in-memory data indexes to any Deephaven Table

* Adds support for refreshing data indexes on refreshing ("live") Deephaven Tables

* Expands Deephaven Table operation support for multi-column data indexes to include naturalJoin, aj, sort, and the aggBy family of operations; where already supports this for independent, single-column "match" filters, and whereIn/whereNotIn have full support

* Adds Deephaven Table operation support for refreshing data indexes to where, whereIn/whereNotIn, naturalJoin, aj, sort, and the aggBy family of operations

* Enables many future optimizations
  • Loading branch information
lbooker42 authored Mar 19, 2024
1 parent 38fc912 commit 24facb1
Show file tree
Hide file tree
Showing 215 changed files with 10,061 additions and 5,648 deletions.
37 changes: 37 additions & 0 deletions Util/src/main/java/io/deephaven/util/SafeCloseableList.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

/**
* {@link SafeCloseable} that will close an internal list of other {@link SafeCloseable}s.
Expand Down Expand Up @@ -57,4 +63,35 @@ public final void close() {
}
list.clear();
}

public static final Collector<SafeCloseable, SafeCloseableList, SafeCloseableList> COLLECTOR = new Collector<>() {

@Override
public Supplier<SafeCloseableList> supplier() {
return SafeCloseableList::new;
}

@Override
public BiConsumer<SafeCloseableList, SafeCloseable> accumulator() {
return SafeCloseableList::add;
}

@Override
public BinaryOperator<SafeCloseableList> combiner() {
return (left, right) -> {
left.addAll(right.list);
return left;
};
}

@Override
public Function<SafeCloseableList, SafeCloseableList> finisher() {
return a -> a;
}

@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.IDENTITY_FINISH);
}
};
}
1 change: 1 addition & 0 deletions docker/registry/selenium/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
io.deephaven.project.ProjectType=DOCKER_REGISTRY
deephaven.registry.imageName=selenium/standalone-firefox:4.16.1-20231219
deephaven.registry.imageId=selenium/standalone-firefox@sha256:a405fe92b3ce5d7eb31a07e1f99be3d628fdc0e5bdc81febd8dc11786edef024
deephaven.registry.platform=linux/amd64
118 changes: 118 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table;

import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
* Implementations of BasicDataIndex provide a data index for a {@link Table}. The index is itself a {@link Table} with
* columns corresponding to the indexed column(s) ("key" columns) and a column of {@link RowSet RowSets} that contain
* the row keys for each unique combination of key values (that is, the "group" or "bucket"). The index itself is a
* Table containing the key column(s) and the RowSets associated with each unique combination of values. Implementations
* may be loaded from persistent storage or created at runtime, e.g. via aggregations.
*/
public interface BasicDataIndex extends LivenessReferent {

/**
* Get a map from indexed {@link ColumnSource ColumnSources} to key column names for the index {@link #table()
* table}. This map must be ordered in the same order presented by {@link #keyColumnNames()} and used for lookup
* keys.
*
* @return A map designating the key column names for each indexed {@link ColumnSource}
*/
@NotNull
Map<ColumnSource<?>, String> keyColumnNamesByIndexedColumn();

/**
* Get a list of the key column names for the index {@link #table() table}.
*
* @return The key column names
*/
@NotNull
List<String> keyColumnNames();

/**
* Get the {@link RowSet} column name for the index {@link #table() table}.
*
* @return The {@link RowSet} column name
*/
@NotNull
String rowSetColumnName();

/**
* Get the key {@link ColumnSource ColumnSources} of the index {@link #table() table}.
*
* @return An array of the key {@link ColumnSource ColumnSources}, to be owned by the caller
*/
@FinalDefault
@NotNull
default ColumnSource<?>[] keyColumns() {
final Table indexTable = table();
return keyColumnNames().stream()
.map(indexTable::getColumnSource)
.toArray(ColumnSource[]::new);
}

/**
* Get the key {@link ColumnSource ColumnSources} of the index {@link #table() table} in the relative order of
* {@code indexedColumnSources}.
*
* @param indexedColumnSources The indexed {@link ColumnSource ColumnSources} in the desired order; must match the
* keys of {@link #keyColumnNamesByIndexedColumn()}
* @return An array of the key {@link ColumnSource ColumnSources} in the specified order, to be owned by the caller
*/
@FinalDefault
@NotNull
default ColumnSource<?>[] keyColumns(@NotNull final ColumnSource<?>[] indexedColumnSources) {
final Table indexTable = table();
final Map<ColumnSource<?>, String> keyColumnNamesByIndexedColumn = keyColumnNamesByIndexedColumn();
// Verify that the provided columns match the indexed columns.
if (keyColumnNamesByIndexedColumn.size() != indexedColumnSources.length
|| !keyColumnNamesByIndexedColumn.keySet().containsAll(Arrays.asList(indexedColumnSources))) {
throw new IllegalArgumentException(String.format(
"The provided columns %s do not match the index's indexed columns %s",
Arrays.toString(indexedColumnSources),
keyColumnNamesByIndexedColumn.keySet()));
}
return Arrays.stream(indexedColumnSources)
.map(keyColumnNamesByIndexedColumn::get)
.map(indexTable::getColumnSource)
.toArray(ColumnSource[]::new);
}

/**
* Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}.
*
* @return The {@link RowSet} {@link ColumnSource}
*/
@FinalDefault
@NotNull
default ColumnSource<RowSet> rowSetColumn() {
return table().getColumnSource(rowSetColumnName(), RowSet.class);
}

/**
* Get the {@link Table} backing this data index.
*
* @return The {@link Table}
*/
@NotNull
Table table();

/**
* Whether the index {@link #table()} {@link Table#isRefreshing() is refreshing}. Some transformations will force
* the index to become static even when the source table is refreshing.
*
* @return {@code true} if the index {@link #table()} {@link Table#isRefreshing() is refreshing}, {@code false}
* otherwise
*/
boolean isRefreshing();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ public enum ColumnType {
*/
Normal,

/**
* A column that has "grouping" metadata associated with it, possibly allowing for indexed filters, joins, and
* aggregations.
*/
Grouping,

/**
* A column that helps define underlying partitions in the storage of the data, which consequently may also be
* used for very efficient filtering.
Expand Down Expand Up @@ -393,10 +387,6 @@ public ColumnDefinition<TYPE> withPartitioning() {
return isPartitioning() ? this : new ColumnDefinition<>(name, dataType, componentType, ColumnType.Partitioning);
}

public ColumnDefinition<TYPE> withGrouping() {
return isGrouping() ? this : new ColumnDefinition<>(name, dataType, componentType, ColumnType.Grouping);
}

public ColumnDefinition<TYPE> withNormal() {
return columnType == ColumnType.Normal
? this
Expand All @@ -414,16 +404,12 @@ public ColumnDefinition<?> withName(@NotNull final String newName) {
return newName.equals(name) ? this : new ColumnDefinition<>(newName, dataType, componentType, columnType);
}

public boolean isGrouping() {
return (columnType == ColumnType.Grouping);
}

public boolean isPartitioning() {
return (columnType == ColumnType.Partitioning);
}

public boolean isDirect() {
return (columnType == ColumnType.Normal || columnType == ColumnType.Grouping);
return (columnType == ColumnType.Normal);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package io.deephaven.engine.table;

import io.deephaven.base.verify.Require;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.Map;

/**
* A "source" for column data - allows cell values to be looked up by (long) keys.
Expand All @@ -40,10 +39,26 @@ default ChunkType getChunkType() {
return ChunkType.fromElementType(dataType);
}

/**
* Return a {@link RowSet row set} where the values in the column source match the given keys.
*
* @param invertMatch Whether to invert the match, i.e. return the rows where the values do not match the given keys
* @param usePrev Whether to use the previous values for the ColumnSource
* @param caseInsensitive Whether to perform a case insensitive match
* @param dataIndex An optional data index that can be used to accelerate the match (the index table must be
* included in snapshot controls or otherwise guaranteed to be current)
* @param mapper Restrict results to this row set
* @param keys The keys to match in the column
*
* @return The rows that match the given keys
*/
WritableRowSet match(
boolean invertMatch, boolean usePrev, boolean caseInsensitive, @NotNull RowSet mapper, Object... keys);

Map<T, RowSet> getValuesMapping(RowSet subRange);
boolean invertMatch,
boolean usePrev,
boolean caseInsensitive,
@Nullable final DataIndex dataIndex,
@NotNull RowSet mapper,
Object... keys);

/**
* ColumnSource implementations that track previous values have the option to not actually start tracking previous
Expand All @@ -59,21 +74,6 @@ default void startTrackingPrevValues() {
}
}

/**
* Compute grouping information for all keys present in this column source.
*
* @return A map from distinct data values to a RowSet that contains those values
*/
Map<T, RowSet> getGroupToRange();

/**
* Compute grouping information for (at least) all keys present in rowSet.
*
* @param rowSet The RowSet to consider
* @return A map from distinct data values to a RowSet that contains those values
*/
Map<T, RowSet> getGroupToRange(RowSet rowSet);

/**
* Determine if this column source is immutable, meaning that the values at a given row key never change.
*
Expand Down
Loading

0 comments on commit 24facb1

Please sign in to comment.