Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12425: A few reproduction tests and a preliminary patch, WIP #1529

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,11 @@ public boolean hasIndxBasedOrdering()
return nonPrimaryKeyRestrictions.restrictions().stream().anyMatch(SingleRestriction::isIndexBasedOrdering);
}

public boolean hasIndxBasedBoundedAnn()
{
return nonPrimaryKeyRestrictions.restrictions().stream().anyMatch(SingleRestriction::isBoundedAnn);
}

public void throwRequiresAllowFilteringError(TableMetadata table)
{
if (hasIndxBasedOrdering())
Expand Down Expand Up @@ -988,7 +993,7 @@ private boolean hasUnrestrictedClusteringColumns()
return table.clusteringColumns().size() != clusteringColumnsRestrictions.size();
}

public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, QueryState queryState, SelectOptions selectOptions)
public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, QueryState queryState, SelectOptions selectOptions, boolean allowsFiltering)
{
boolean hasAnnOptions = selectOptions.hasANNOptions();

Expand All @@ -1001,7 +1006,11 @@ public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options,
}

ANNOptions annOptions = selectOptions.parseANNOptions();
RowFilter rowFilter = RowFilter.builder(indexManager)

if (allowsFiltering && (hasIndxBasedOrdering() || hasIndxBasedBoundedAnn()))
allowsFiltering = false;

RowFilter rowFilter = RowFilter.builder(indexManager, allowsFiltering)
.buildFromRestrictions(this, table, options, queryState, annOptions);

if (hasAnnOptions && !rowFilter.hasANN())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ private NavigableSet<Clustering<?>> getRequestedRows(QueryOptions options, Query
public RowFilter getRowFilter(QueryOptions options, QueryState state) throws InvalidRequestException
{
IndexRegistry indexRegistry = IndexRegistry.obtain(table);
return restrictions.getRowFilter(indexRegistry, options, state, selectOptions);
return restrictions.getRowFilter(indexRegistry, options, state, selectOptions, parameters.allowFiltering);
}

private ResultSet process(PartitionIterator partitions,
Expand Down
80 changes: 57 additions & 23 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@
package org.apache.cassandra.db;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -65,6 +60,7 @@
import org.apache.cassandra.guardrails.Threshold;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
Expand All @@ -83,19 +79,22 @@
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.read.TrackingRowIterator;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;

import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
import static java.lang.String.format;
import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
import static org.apache.cassandra.index.SecondaryIndexManager.getIndexStatus;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;

/**
* General interface for storage-engine read commands (common to both range and
* single partition commands).
* <p>
* This contains all the informations needed to do a local read.
* This contains all the information needed to do a local read.
*/
public abstract class ReadCommand extends AbstractReadQuery
{
Expand Down Expand Up @@ -215,7 +214,7 @@ public int digestVersion()
* this allows us to use the command as a carrier of the digest version even if we only call
* setIsDigestQuery on some copy of it.
*
* @param digestVersion the version for the digest is this command is used for digest query..
* @param digestVersion the version for the digest is this command is used for digest query.
* @return this read command.
*/
public ReadCommand setDigestVersion(int digestVersion)
Expand Down Expand Up @@ -408,10 +407,12 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
Index.Searcher searcher = null;
if (indexQueryPlan != null)
{
cfs.indexManager.checkQueryability(indexQueryPlan);
searcher = indexSearcher();
Index index = indexQueryPlan.getFirst();
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
if (cfs.indexManager.isQueryableThroughIndex(indexQueryPlan, rowFilter().allowFiltering))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collapsed the two ifs, no need of nesting here

{
searcher = indexSearcher();
Index index = indexQueryPlan.getFirst();
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
}
}

Context context = Context.from(this);
Expand Down Expand Up @@ -531,9 +532,9 @@ public ReadExecutionController executionController(boolean trackRepairedStatus)
}

/**
* Allow to post-process the result of the query after it has been reconciled on the coordinator
* Allow to post-process the result of the query after it has been reconciled on the coordinator,
* but before it is passed to the CQL layer to return the ResultSet.
*
* <p>
* See CASSANDRA-8717 for why this exists.
*/
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
Expand Down Expand Up @@ -741,7 +742,7 @@ public Message<ReadCommand> createMessage(boolean trackRepairedData)

// Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
// can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
// are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
// are to some extent an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator,
ColumnFamilyStore cfs,
ReadExecutionController controller)
Expand Down Expand Up @@ -769,7 +770,7 @@ protected LongPredicate getPurgeEvaluator()
* Note that in general the returned string will not be exactly the original user string, first
* because there isn't always a single syntax for a given query, but also because we don't have
* all the information needed (we know the non-PK columns queried but not the PK ones as internally
* we query them all). So this shouldn't be relied too strongly, but this should be good enough for
* we query them all). So this shouldn't be relied on too strongly, but this should be good enough for
* debugging purpose which is what this is for.
*/
public String toCQLString()
Expand Down Expand Up @@ -833,7 +834,7 @@ InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.
* input for a query. Separates them according to repaired status and of repaired
* status is being tracked, handles the merge and wrapping in a digest generator of
* the repaired iterators.
*
* <p>
* Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks
* as this prematurely closes the underlying iterators
*/
Expand Down Expand Up @@ -1041,7 +1042,7 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
int flags = in.readByte();
boolean isDigest = isDigest(flags);
boolean acceptsTransient = acceptsTransient(flags);
// Shouldn't happen or it's a user error (see comment above) but
// Shouldn't happen, or it's a user error (see comment above) but
// better complain loudly than doing the wrong thing.
if (isForThrift(flags))
throw new IllegalStateException("Received a command with the thrift flag set. "
Expand All @@ -1060,15 +1061,48 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
if (hasIndex)
{
IndexMetadata index = deserializeIndexMetadata(in, version, metadata);
if (index != null)
indexQueryPlan = buildIndexQueryPlan(index, metadata, rowFilter);
}

return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan);
}

private Index.QueryPlan buildIndexQueryPlan(IndexMetadata index, TableMetadata metadata, RowFilter rowFilter)
{
if (index == null)
return null;

Index.Group indexGroup = Keyspace.openAndGetStore(metadata).indexManager.getIndexGroup(index);
if (indexGroup == null)
return null;

Index.QueryPlan queryPlan = indexGroup.queryPlanFor(rowFilter);
if (queryPlan == null)
return null;

Set<Index> allIndexes = queryPlan.getIndexes();
Set<Index> availableIndexes = new HashSet<>();
for (Index plannedIndex : allIndexes)
{
String indexName = plannedIndex.getIndexMetadata().name;
Index.Status status = getIndexStatus(FBUtilities.getBroadcastAddressAndPort(),
metadata.keyspace,
indexName);
if (status != Index.Status.FULL_REBUILD_STARTED)
availableIndexes.add(plannedIndex);
else
{
Index.Group indexGroup = Keyspace.openAndGetStore(metadata).indexManager.getIndexGroup(index);
if (indexGroup != null)
indexQueryPlan = indexGroup.queryPlanFor(rowFilter);
ClientWarn.instance.warn(format(SecondaryIndexManager.FELL_BACK_TO_ALLOW_FILTERING, indexName));
logger.warn(format(SecondaryIndexManager.FELL_BACK_TO_ALLOW_FILTERING, indexName));
}
}

return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan);
if (availableIndexes.isEmpty())
return null;

return availableIndexes.size() < queryPlan.getIndexes().size()
? indexGroup.queryPlanForIndices(rowFilter, availableIndexes)
: queryPlan;
}

private @Nullable IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException
Expand Down
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/db/filter/ANNOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
*/
public class ANNOptions
{
public static final String REQUIRES_HIGHER_MESSAGING_VERSION =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

"ANN options are not supported in clusters below DS 11.";

public static final String RERANK_K_OPTION_NAME = "rerank_k";

public static final ANNOptions NONE = new ANNOptions(null);
Expand Down Expand Up @@ -79,7 +82,7 @@ public static ANNOptions fromMap(Map<String, String> map)
if (MessagingService.current_version < MessagingService.VERSION_DS_11)
badNodes.add(FBUtilities.getBroadcastAddressAndPort());
if (!badNodes.isEmpty())
throw new InvalidRequestException("ANN options are not supported in clusters below DS 11.");
throw new InvalidRequestException(REQUIRES_HIGHER_MESSAGING_VERSION);

Integer rerankK = null;

Expand Down
36 changes: 22 additions & 14 deletions src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ public class RowFilter
private static final Logger logger = LoggerFactory.getLogger(RowFilter.class);

public static final Serializer serializer = new Serializer();
public static final RowFilter NONE = new RowFilter(FilterElement.NONE);
public static final RowFilter NONE = new RowFilter(FilterElement.NONE, false);

private final FilterElement root;

protected RowFilter(FilterElement root)
public final boolean allowFiltering;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a mixed use of allowFiltering and allowsFiltering around. I would homogenize it, so it's easier to search.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALLOW FILTERING has always been kind of a transversal concept while it was a CQL thing. Putting it in the RowFilter somehow feels like it finally has a clear home. I think it would be super nice to add some brief JavaDoc here saying what ALLOW FILTERING is, for the sake of newcomers.


protected RowFilter(FilterElement root, boolean allowFitlering)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: s/allowFitlering/allowFiltering

{
this.root = root;
this.allowFiltering = allowFitlering;
}

public FilterElement root()
Expand Down Expand Up @@ -294,7 +297,7 @@ public RowFilter without(Expression expression)
if (root.size() == 1)
return RowFilter.NONE;

return new RowFilter(root.filter(e -> !e.equals(expression)));
return new RowFilter(root.filter(e -> !e.equals(expression)), allowFiltering);
}

public RowFilter withoutExpressions()
Expand All @@ -307,12 +310,12 @@ public RowFilter withoutExpressions()
*/
public RowFilter withoutDisjunctions()
{
return new RowFilter(root.withoutDisjunctions());
return new RowFilter(root.withoutDisjunctions(), allowFiltering);
}

public RowFilter restrict(Predicate<Expression> filter)
{
return new RowFilter(root.filter(filter));
return new RowFilter(root.filter(filter), allowFiltering);
}

public boolean isEmpty()
Expand All @@ -328,28 +331,30 @@ public String toString()

public static Builder builder()
{
return new Builder(null);
return new Builder(null, false);
}

public static Builder builder(IndexRegistry indexRegistry)
public static Builder builder(IndexRegistry indexRegistry, boolean allowFitlering)
{
return new Builder(indexRegistry);
return new Builder(indexRegistry, allowFitlering);
}

public static class Builder
{
private FilterElement.Builder current = new FilterElement.Builder(false);

private final IndexRegistry indexRegistry;
private final Boolean allowFiltering;

public Builder(IndexRegistry indexRegistry)
public Builder(IndexRegistry indexRegistry, boolean allowFiltering)
{
this.indexRegistry = indexRegistry;
this.allowFiltering = allowFiltering;
}

public RowFilter build()
{
return new RowFilter(current.build());
return new RowFilter(current.build(), allowFiltering);
}

public RowFilter buildFromRestrictions(StatementRestrictions restrictions,
Expand All @@ -363,7 +368,7 @@ public RowFilter buildFromRestrictions(StatementRestrictions restrictions,
if (Guardrails.queryFilters.enabled(queryState))
Guardrails.queryFilters.guard(root.numFilteredValues(), "Select query", false, queryState);

return new RowFilter(root);
return new RowFilter(root, allowFiltering);
}

private FilterElement doBuild(StatementRestrictions restrictions,
Expand Down Expand Up @@ -420,7 +425,7 @@ public void addAllAsConjunction(Consumer<Builder> addToRowFilterDelegate)
{
// If we're in disjunction mode, we must not pass the current builder to addToRowFilter.
// We create a new conjunction sub-builder instead and add all expressions there.
var builder = new Builder(indexRegistry);
var builder = new Builder(indexRegistry, allowFiltering);
addToRowFilterDelegate.accept(builder);

if (builder.current.expressions.size() == 1 && builder.current.children.isEmpty())
Expand Down Expand Up @@ -1836,19 +1841,22 @@ public void serialize(RowFilter filter, DataOutputPlus out, int version) throws
{
out.writeBoolean(false); // Old "is for thrift" boolean
FilterElement.serializer.serialize(filter.root, out, version);
out.writeBoolean(filter.allowFiltering);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should depend on the messaging version

}

public RowFilter deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException
{
in.readBoolean(); // Unused
FilterElement operation = FilterElement.serializer.deserialize(in, version, metadata);
return new RowFilter(operation);
boolean allowFiltering = in.readBoolean();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should depend on the messaging version

return new RowFilter(operation, allowFiltering);
}

public long serializedSize(RowFilter filter, int version)
{
return 1 // unused boolean
+ FilterElement.serializer.serializedSize(filter.root, version);
+ FilterElement.serializer.serializedSize(filter.root, version)
+ 1; // for allowFiltering

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should depend on the messaging version. Also, it's probably better to use TypeSizes.BOOL_SIZE

}
}
}
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,17 @@ Indexer indexerFor(Predicate<Index> indexSelector,
@Nullable
QueryPlan queryPlanFor(RowFilter rowFilter);

/**
* Returns a new {@link QueryPlan} for the specified {@link RowFilter} and set of {@link Index}, or {@code null}
* if none of the indexes in this group supports the expression in the row filter.
*
* @param rowFilter a row filter
* @param indexes a set of indexes
* @return a new query plan for the specified {@link RowFilter} and {@link Index}, {@code null} otherwise
*/
@Nullable
QueryPlan queryPlanForIndices(RowFilter rowFilter, Set<Index> indexes);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still thinking about how could make this the only method and get rid of queryPlanFor(RowFilter)


/**
* Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction).
*
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/index/IndexRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ public Index.QueryPlan queryPlanFor(RowFilter rowFilter)
return null;
}

@Nullable
@Override
public Index.QueryPlan queryPlanForIndices(RowFilter rowFilter, Set<Index> indexes)
{
return null;
}

@Nullable
@Override
public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata, long keyCount)
Expand Down
Loading