From e03373ac9108f516cd4496b993dae2dade8026c1 Mon Sep 17 00:00:00 2001 From: Ekaterina Dimitrova Date: Wed, 15 Jan 2025 13:36:03 -0500 Subject: [PATCH 1/3] CNDB-12425: A preliminary patch to facilitate discussions. It does not include: - feature flag - checks we are on the new messaging version added for ANNOptions - we fall back to allow filtering only on Index Creation. Currently we also fall back to ALLOW FILTERING if we use nodetool to rebuild indexes --- .../restrictions/StatementRestrictions.java | 13 +- .../cql3/statements/SelectStatement.java | 2 +- .../org/apache/cassandra/db/ReadCommand.java | 80 +++-- .../cassandra/db/filter/ANNOptions.java | 5 +- .../apache/cassandra/db/filter/RowFilter.java | 36 ++- .../org/apache/cassandra/index/Index.java | 11 + .../apache/cassandra/index/IndexRegistry.java | 7 + .../index/SecondaryIndexManager.java | 40 ++- .../cassandra/index/SingletonIndexGroup.java | 21 +- .../index/sai/StorageAttachedIndexGroup.java | 16 + .../cassandra/locator/ReplicaPlans.java | 12 +- .../service/reads/AbstractReadExecutor.java | 2 +- .../service/reads/range/RangeCommands.java | 3 +- .../reads/range/ReplicaPlanIterator.java | 10 +- .../test/sai/ANNOptionsDistributedTest.java | 5 +- .../test/sai/IndexAvailabilityTest.java | 226 +++++++++++++- .../entities/SecondaryIndexTest.java | 2 + .../cassandra/db/filter/ANNOptionsTest.java | 6 + .../cassandra/index/CustomIndexTest.java | 6 + .../index/sai/cql/AllowFilteringTest.java | 278 +++++++++++++++++- .../AssureSufficientLiveNodesTest.java | 10 +- .../cassandra/locator/ReplicaPlansTest.java | 4 +- ...pointGroupingRangeCommandIteratorTest.java | 3 +- .../reads/range/RangeCommandIteratorTest.java | 4 +- .../reads/range/RangeCommandsTest.java | 7 +- .../reads/range/ReplicaPlanIteratorTest.java | 2 +- .../reads/range/ReplicaPlanMergerTest.java | 2 +- 27 files changed, 708 insertions(+), 105 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index b20d658867e8..228347fda5ed 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -772,6 +772,11 @@ public boolean hasIndxBasedOrdering() return nonPrimaryKeyRestrictions.restrictions().stream().anyMatch(SingleRestriction::isIndexBasedOrdering); } + public boolean hasIndxBasedBoundedAnn() + { + return nonPrimaryKeyRestrictions.restrictions().stream().anyMatch(SingleRestriction::isBoundedAnn); + } + public void throwRequiresAllowFilteringError(TableMetadata table) { if (hasIndxBasedOrdering()) @@ -988,7 +993,7 @@ private boolean hasUnrestrictedClusteringColumns() return table.clusteringColumns().size() != clusteringColumnsRestrictions.size(); } - public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, QueryState queryState, SelectOptions selectOptions) + public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, QueryState queryState, SelectOptions selectOptions, boolean allowsFiltering) { boolean hasAnnOptions = selectOptions.hasANNOptions(); @@ -1001,7 +1006,11 @@ public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, } ANNOptions annOptions = selectOptions.parseANNOptions(); - RowFilter rowFilter = RowFilter.builder(indexManager) + + if (allowsFiltering && (hasIndxBasedOrdering() || hasIndxBasedBoundedAnn())) + allowsFiltering = false; + + RowFilter rowFilter = RowFilter.builder(indexManager, allowsFiltering) .buildFromRestrictions(this, table, options, queryState, annOptions); if (hasAnnOptions && !rowFilter.hasANN()) diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 58b75aacab5a..b613a57361aa 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -1001,7 +1001,7 @@ private NavigableSet> getRequestedRows(QueryOptions options, Query public RowFilter getRowFilter(QueryOptions options, QueryState state) throws InvalidRequestException { IndexRegistry indexRegistry = IndexRegistry.obtain(table); - return restrictions.getRowFilter(indexRegistry, options, state, selectOptions); + return restrictions.getRowFilter(indexRegistry, options, state, selectOptions, parameters.allowFiltering); } private ResultSet process(PartitionIterator partitions, diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index b110a3ada2de..a2752fdf2a3f 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -18,12 +18,7 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; @@ -65,6 +60,7 @@ import org.apache.cassandra.guardrails.Threshold; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; @@ -83,19 +79,22 @@ import org.apache.cassandra.sensors.Context; import org.apache.cassandra.sensors.read.TrackingRowIterator; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; +import static java.lang.String.format; import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; +import static org.apache.cassandra.index.SecondaryIndexManager.getIndexStatus; import static org.apache.cassandra.utils.MonotonicClock.approxTime; /** * General interface for storage-engine read commands (common to both range and * single partition commands). *

- * This contains all the informations needed to do a local read. + * This contains all the information needed to do a local read. */ public abstract class ReadCommand extends AbstractReadQuery { @@ -215,7 +214,7 @@ public int digestVersion() * this allows us to use the command as a carrier of the digest version even if we only call * setIsDigestQuery on some copy of it. * - * @param digestVersion the version for the digest is this command is used for digest query.. + * @param digestVersion the version for the digest is this command is used for digest query. * @return this read command. */ public ReadCommand setDigestVersion(int digestVersion) @@ -408,10 +407,12 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut Index.Searcher searcher = null; if (indexQueryPlan != null) { - cfs.indexManager.checkQueryability(indexQueryPlan); - searcher = indexSearcher(); - Index index = indexQueryPlan.getFirst(); - Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name); + if (cfs.indexManager.isQueryableThroughIndex(indexQueryPlan, rowFilter().allowFiltering)) + { + searcher = indexSearcher(); + Index index = indexQueryPlan.getFirst(); + Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name); + } } Context context = Context.from(this); @@ -531,9 +532,9 @@ public ReadExecutionController executionController(boolean trackRepairedStatus) } /** - * Allow to post-process the result of the query after it has been reconciled on the coordinator + * Allow to post-process the result of the query after it has been reconciled on the coordinator, * but before it is passed to the CQL layer to return the ResultSet. - * + *

* See CASSANDRA-8717 for why this exists. */ public PartitionIterator postReconciliationProcessing(PartitionIterator result) @@ -741,7 +742,7 @@ public Message createMessage(boolean trackRepairedData) // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which - // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + // are to some extent an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs, ReadExecutionController controller) @@ -769,7 +770,7 @@ protected LongPredicate getPurgeEvaluator() * Note that in general the returned string will not be exactly the original user string, first * because there isn't always a single syntax for a given query, but also because we don't have * all the information needed (we know the non-PK columns queried but not the PK ones as internally - * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * we query them all). So this shouldn't be relied on too strongly, but this should be good enough for * debugging purpose which is what this is for. */ public String toCQLString() @@ -833,7 +834,7 @@ InputCollector iteratorsForRange(ColumnFamilyStore. * input for a query. Separates them according to repaired status and of repaired * status is being tracked, handles the merge and wrapping in a digest generator of * the repaired iterators. - * + *

* Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks * as this prematurely closes the underlying iterators */ @@ -1041,7 +1042,7 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException int flags = in.readByte(); boolean isDigest = isDigest(flags); boolean acceptsTransient = acceptsTransient(flags); - // Shouldn't happen or it's a user error (see comment above) but + // Shouldn't happen, or it's a user error (see comment above) but // better complain loudly than doing the wrong thing. if (isForThrift(flags)) throw new IllegalStateException("Received a command with the thrift flag set. " @@ -1060,15 +1061,48 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException if (hasIndex) { IndexMetadata index = deserializeIndexMetadata(in, version, metadata); - if (index != null) + indexQueryPlan = buildIndexQueryPlan(index, metadata, rowFilter); + } + + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan); + } + + private Index.QueryPlan buildIndexQueryPlan(IndexMetadata index, TableMetadata metadata, RowFilter rowFilter) + { + if (index == null) + return null; + + Index.Group indexGroup = Keyspace.openAndGetStore(metadata).indexManager.getIndexGroup(index); + if (indexGroup == null) + return null; + + Index.QueryPlan queryPlan = indexGroup.queryPlanFor(rowFilter); + if (queryPlan == null) + return null; + + Set allIndexes = queryPlan.getIndexes(); + Set availableIndexes = new HashSet<>(); + for (Index plannedIndex : allIndexes) + { + String indexName = plannedIndex.getIndexMetadata().name; + Index.Status status = getIndexStatus(FBUtilities.getBroadcastAddressAndPort(), + metadata.keyspace, + indexName); + if (status != Index.Status.FULL_REBUILD_STARTED) + availableIndexes.add(plannedIndex); + else { - Index.Group indexGroup = Keyspace.openAndGetStore(metadata).indexManager.getIndexGroup(index); - if (indexGroup != null) - indexQueryPlan = indexGroup.queryPlanFor(rowFilter); + ClientWarn.instance.warn(format(SecondaryIndexManager.FELL_BACK_TO_ALLOW_FILTERING, indexName)); + logger.warn(format(SecondaryIndexManager.FELL_BACK_TO_ALLOW_FILTERING, indexName)); } } - return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan); + if (availableIndexes.isEmpty()) + return null; + + return availableIndexes.size() < queryPlan.getIndexes().size() + ? indexGroup.queryPlanForIndices(rowFilter, availableIndexes) + : queryPlan; } private @Nullable IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException diff --git a/src/java/org/apache/cassandra/db/filter/ANNOptions.java b/src/java/org/apache/cassandra/db/filter/ANNOptions.java index ea74ff6f1136..72ffb53318c4 100644 --- a/src/java/org/apache/cassandra/db/filter/ANNOptions.java +++ b/src/java/org/apache/cassandra/db/filter/ANNOptions.java @@ -35,6 +35,9 @@ */ public class ANNOptions { + public static final String REQUIRES_HIGHER_MESSAGING_VERSION = + "ANN options are not supported in clusters below DS 11."; + public static final String RERANK_K_OPTION_NAME = "rerank_k"; public static final ANNOptions NONE = new ANNOptions(null); @@ -79,7 +82,7 @@ public static ANNOptions fromMap(Map map) if (MessagingService.current_version < MessagingService.VERSION_DS_11) badNodes.add(FBUtilities.getBroadcastAddressAndPort()); if (!badNodes.isEmpty()) - throw new InvalidRequestException("ANN options are not supported in clusters below DS 11."); + throw new InvalidRequestException(REQUIRES_HIGHER_MESSAGING_VERSION); Integer rerankK = null; diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index dad3bb212d8a..58da52082609 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -79,13 +79,16 @@ public class RowFilter private static final Logger logger = LoggerFactory.getLogger(RowFilter.class); public static final Serializer serializer = new Serializer(); - public static final RowFilter NONE = new RowFilter(FilterElement.NONE); + public static final RowFilter NONE = new RowFilter(FilterElement.NONE, false); private final FilterElement root; - protected RowFilter(FilterElement root) + public final boolean allowFiltering; + + protected RowFilter(FilterElement root, boolean allowFitlering) { this.root = root; + this.allowFiltering = allowFitlering; } public FilterElement root() @@ -294,7 +297,7 @@ public RowFilter without(Expression expression) if (root.size() == 1) return RowFilter.NONE; - return new RowFilter(root.filter(e -> !e.equals(expression))); + return new RowFilter(root.filter(e -> !e.equals(expression)), allowFiltering); } public RowFilter withoutExpressions() @@ -307,12 +310,12 @@ public RowFilter withoutExpressions() */ public RowFilter withoutDisjunctions() { - return new RowFilter(root.withoutDisjunctions()); + return new RowFilter(root.withoutDisjunctions(), allowFiltering); } public RowFilter restrict(Predicate filter) { - return new RowFilter(root.filter(filter)); + return new RowFilter(root.filter(filter), allowFiltering); } public boolean isEmpty() @@ -328,12 +331,12 @@ public String toString() public static Builder builder() { - return new Builder(null); + return new Builder(null, false); } - public static Builder builder(IndexRegistry indexRegistry) + public static Builder builder(IndexRegistry indexRegistry, boolean allowFitlering) { - return new Builder(indexRegistry); + return new Builder(indexRegistry, allowFitlering); } public static class Builder @@ -341,15 +344,17 @@ public static class Builder private FilterElement.Builder current = new FilterElement.Builder(false); private final IndexRegistry indexRegistry; + private final Boolean allowFiltering; - public Builder(IndexRegistry indexRegistry) + public Builder(IndexRegistry indexRegistry, boolean allowFiltering) { this.indexRegistry = indexRegistry; + this.allowFiltering = allowFiltering; } public RowFilter build() { - return new RowFilter(current.build()); + return new RowFilter(current.build(), allowFiltering); } public RowFilter buildFromRestrictions(StatementRestrictions restrictions, @@ -363,7 +368,7 @@ public RowFilter buildFromRestrictions(StatementRestrictions restrictions, if (Guardrails.queryFilters.enabled(queryState)) Guardrails.queryFilters.guard(root.numFilteredValues(), "Select query", false, queryState); - return new RowFilter(root); + return new RowFilter(root, allowFiltering); } private FilterElement doBuild(StatementRestrictions restrictions, @@ -420,7 +425,7 @@ public void addAllAsConjunction(Consumer addToRowFilterDelegate) { // If we're in disjunction mode, we must not pass the current builder to addToRowFilter. // We create a new conjunction sub-builder instead and add all expressions there. - var builder = new Builder(indexRegistry); + var builder = new Builder(indexRegistry, allowFiltering); addToRowFilterDelegate.accept(builder); if (builder.current.expressions.size() == 1 && builder.current.children.isEmpty()) @@ -1836,19 +1841,22 @@ public void serialize(RowFilter filter, DataOutputPlus out, int version) throws { out.writeBoolean(false); // Old "is for thrift" boolean FilterElement.serializer.serialize(filter.root, out, version); + out.writeBoolean(filter.allowFiltering); } public RowFilter deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException { in.readBoolean(); // Unused FilterElement operation = FilterElement.serializer.deserialize(in, version, metadata); - return new RowFilter(operation); + boolean allowFiltering = in.readBoolean(); + return new RowFilter(operation, allowFiltering); } public long serializedSize(RowFilter filter, int version) { return 1 // unused boolean - + FilterElement.serializer.serializedSize(filter.root, version); + + FilterElement.serializer.serializedSize(filter.root, version) + + 1; // for allowFiltering } } } diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index beb8280e674a..bd2d01051f02 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -807,6 +807,17 @@ Indexer indexerFor(Predicate indexSelector, @Nullable QueryPlan queryPlanFor(RowFilter rowFilter); + /** + * Returns a new {@link QueryPlan} for the specified {@link RowFilter} and set of {@link Index}, or {@code null} + * if none of the indexes in this group supports the expression in the row filter. + * + * @param rowFilter a row filter + * @param indexes a set of indexes + * @return a new query plan for the specified {@link RowFilter} and {@link Index}, {@code null} otherwise + */ + @Nullable + QueryPlan queryPlanForIndices(RowFilter rowFilter, Set indexes); + /** * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction). * diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java index 639d9aec350b..aab44179afd7 100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -244,6 +244,13 @@ public Index.QueryPlan queryPlanFor(RowFilter rowFilter) return null; } + @Nullable + @Override + public Index.QueryPlan queryPlanForIndices(RowFilter rowFilter, Set indexes) + { + return null; + } + @Nullable @Override public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata, long keyCount) diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index b1930e741804..638b75d762d1 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -125,6 +125,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.SinglePartitionPager; import org.apache.cassandra.tracing.Tracing; @@ -134,6 +135,7 @@ import org.apache.cassandra.utils.concurrent.Refs; import org.json.simple.JSONValue; +import static java.lang.String.format; import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; import static org.apache.cassandra.utils.ExecutorUtils.shutdown; @@ -172,7 +174,7 @@ *

* Finally, this class provides a clear and safe lifecycle to manage index builds, either full rebuilds via * {@link this#rebuildIndexesBlocking(Set)} or builds of new sstables - * added via {@link org.apache.cassandra.notifications.SSTableAddedNotification}s, guaranteeing + * added via {@link SSTableAddedNotification}s, guaranteeing * the following: *