diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 2ea5569c45e9..12d7c97cd365 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -516,19 +516,29 @@ public static UntypedResultSet execute(String query, ConsistencyLevel cl, Object
public static UntypedResultSet execute(String query, ConsistencyLevel cl, QueryState state, Object... values)
throws RequestExecutionException
{
- try
- {
- Prepared prepared = prepareInternal(query);
- ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared.statement, values, cl), System.nanoTime());
- if (result instanceof ResultMessage.Rows)
- return UntypedResultSet.create(((ResultMessage.Rows)result).result);
- else
- return null;
- }
- catch (RequestValidationException e)
- {
- throw new RuntimeException("Error validating " + query, e);
- }
+ Prepared prepared = prepareInternal(query);
+ ResultMessage> result = prepared.statement.execute(state, makeInternalOptions(prepared.statement, values, cl), System.nanoTime());
+ if (result instanceof ResultMessage.Rows)
+ return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+ else
+ return null;
+ }
+
+ /**
+ * Same than {@link #execute(String, ConsistencyLevel, Object...)}, but to use for queries we know are only executed
+ * once so that the created statement object is not cached.
+ */
+ @VisibleForTesting
+ static UntypedResultSet executeOnce(String query, ConsistencyLevel cl, Object... values)
+ {
+ QueryState queryState = internalQueryState();
+ CQLStatement statement = parseStatement(query, queryState.getClientState());
+ statement.validate(queryState);
+ ResultMessage> result = statement.execute(queryState, makeInternalOptions(statement, values, cl), System.nanoTime());
+ if (result instanceof ResultMessage.Rows)
+ return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+ else
+ return null;
}
public static UntypedResultSet executeInternalWithPaging(String query, PageSize pageSize, Object... values)
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 38b937041caf..3daf8e3b6a05 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -43,7 +43,6 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
@@ -99,7 +98,6 @@
import org.apache.cassandra.auth.CassandraAuthorizer;
import org.apache.cassandra.auth.CassandraRoleManager;
import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
@@ -107,6 +105,7 @@
import org.apache.cassandra.cql3.functions.types.ParseUtils;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadCommand;
@@ -204,6 +203,16 @@ public abstract class CQLTester
private static final int ASSERTION_TIMEOUT_SECONDS = 15;
private static final User SUPER_USER = new User("cassandra", "cassandra");
+ /**
+ * Whether to use coorfinator execution in {@link #execute(String, Object...)}, so queries get full validation and
+ * go through reconciliation. When enabled, calls to {@link #execute(String, Object...)} will behave as calls to
+ * {@link #executeWithCoordinator(String, Object...)}. Otherwise, they will behave as calls to
+ * {@link #executeInternal(String, Object...)}.
+ *
+ * @see #execute
+ */
+ private static boolean coordinatorExecution = false;
+
private static org.apache.cassandra.transport.Server server;
private static JMXConnectorServer jmxServer;
protected static String jmxHost;
@@ -506,6 +515,9 @@ public static List buildCassandraStressArgs(List args)
protected static void requireNetworkWithoutDriver()
{
+ if (server != null)
+ return;
+
startServices();
startServer(server -> {});
}
@@ -1520,21 +1532,101 @@ protected ResultMessage.Prepared prepare(String query) throws Throwable
return QueryProcessor.instance.prepare(formatQuery(query), ClientState.forInternalCalls());
}
+ /**
+ * Enables coordinator execution in {@link #execute(String, Object...)}, so queries get full validation and go
+ * through reconciliation. This makes calling {@link #execute(String, Object...)} equivalent to calling
+ * {@link #executeWithCoordinator(String, Object...)}.
+ */
+ protected static void enableCoordinatorExecution()
+ {
+ requireNetworkWithoutDriver();
+ coordinatorExecution = true;
+ }
+
+ /**
+ * Disables coordinator execution in {@link #execute(String, Object...)}, so queries won't get full validation nor
+ * go through reconciliation.This makes calling {@link #execute(String, Object...)} equivalent to calling
+ * {@link #executeInternal(String, Object...)}.
+ */
+ protected static void disableCoordinatorExecution()
+ {
+ coordinatorExecution = false;
+ }
+
+ /**
+ * Execute the specified query as either an internal query or a coordinator query depending on the value of
+ * {@link #coordinatorExecution}.
+ *
+ * @param query a CQL query
+ * @param values the values to bind to the query
+ * @return the query results
+ * @see #execute
+ * @see #executeInternal
+ */
public UntypedResultSet execute(String query, Object... values)
{
- return executeFormattedQuery(formatQuery(query), values);
+ return coordinatorExecution
+ ? executeWithCoordinator(query, values)
+ : executeInternal(query, values);
+ }
+
+ /**
+ * Execute the specified query as an internal query only for the local node. This will skip reconciliation and some
+ * validation.
+ *
+ * For the particular case of {@code SELECT} queries using secondary indexes, the skipping of reconciliation means
+ * that the query {@link org.apache.cassandra.db.filter.RowFilter} might not be fully applied to the index results.
+ *
+ * @param query a CQL query
+ * @param values the values to bind to the query
+ * @return the query results
+ * @see CQLStatement#executeLocally
+ */
+ public UntypedResultSet executeInternal(String query, Object... values)
+ {
+ return executeFormattedQuery(formatQuery(query), false, values);
+ }
+
+ /**
+ * Execute the specified query as an coordinator-side query meant for all the relevant nodes in the cluster, even if
+ * {@link CQLTester} tests are single-node. This won't skip reconciliation and will do full validation.
+ *
+ * For the particular case of {@code SELECT} queries using secondary indexes, applying reconciliation means that the
+ * query {@link org.apache.cassandra.db.filter.RowFilter} will be fully applied to the index results.
+ *
+ * @param query a CQL query
+ * @param values the values to bind to the query
+ * @return the query results
+ * @see CQLStatement#execute
+ */
+ public UntypedResultSet executeWithCoordinator(String query, Object... values)
+ {
+ return executeFormattedQuery(formatQuery(query), true, values);
}
public UntypedResultSet executeFormattedQuery(String query, Object... values)
{
+ return executeFormattedQuery(query, coordinatorExecution, values);
+ }
+
+ private UntypedResultSet executeFormattedQuery(String query, boolean useCoordinator, Object... values)
+ {
+ if (useCoordinator)
+ requireNetworkWithoutDriver();
+
UntypedResultSet rs;
if (usePrepared)
{
if (logger.isTraceEnabled())
logger.trace("Executing: {} with values {}", query, formatAllValues(values));
+
+ Object[] transformedValues = transformValues(values);
+
if (reusePrepared)
{
- rs = QueryProcessor.executeInternal(query, transformValues(values));
+ rs = useCoordinator
+ ? QueryProcessor.execute(query, ConsistencyLevel.ONE, transformedValues)
+ : QueryProcessor.executeInternal(query, transformedValues);
// If a test uses a "USE ...", then presumably its statements use relative table. In that case, a USE
// change the meaning of the current keyspace, so we don't want a following statement to reuse a previously
@@ -1545,15 +1637,21 @@ public UntypedResultSet executeFormattedQuery(String query, Object... values)
}
else
{
- rs = QueryProcessor.executeOnceInternal(query, transformValues(values));
+ rs = useCoordinator
+ ? QueryProcessor.executeOnce(query, ConsistencyLevel.ONE, transformedValues)
+ : QueryProcessor.executeOnceInternal(query, transformedValues);
}
}
else
{
query = replaceValues(query, values);
+
if (logger.isTraceEnabled())
logger.trace("Executing: {}", query);
- rs = QueryProcessor.executeOnceInternal(query);
+
+ rs = useCoordinator
+ ? QueryProcessor.executeOnce(query, ConsistencyLevel.ONE)
+ : QueryProcessor.executeOnceInternal(query);
}
if (rs != null)
{
diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java b/test/unit/org/apache/cassandra/index/sai/SAITester.java
index 6f20ecababcd..0b9e96262969 100644
--- a/test/unit/org/apache/cassandra/index/sai/SAITester.java
+++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java
@@ -22,8 +22,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
@@ -48,6 +46,7 @@
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestRule;
@@ -72,8 +71,6 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
-import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
-import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.ResourceLeakDetector;
import org.apache.cassandra.inject.ActionBuilder;
@@ -91,10 +88,8 @@
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ReflectionUtils;
import org.apache.cassandra.utils.Throwables;
import org.apache.lucene.codecs.CodecUtil;
-import org.awaitility.Awaitility;
import static org.apache.cassandra.inject.ActionBuilder.newActionBuilder;
import static org.apache.cassandra.inject.Expression.expr;
@@ -224,6 +219,17 @@ public void removeAllInjections()
Injections.deleteAll();
}
+ /**
+ * Enable external execution of all queries because we want to use reconciliation in SELECT queries so that we can
+ * simulate the application of the entire row filter in the coordinator node, even if unit tests are not multinode.
+ */
+ @BeforeClass
+ public static void setUpClass()
+ {
+ CQLTester.setUpClass();
+ CQLTester.enableCoordinatorExecution();
+ }
+
public static IndexContext createIndexContext(String name, AbstractType> validator, ColumnFamilyStore cfs)
{
return new IndexContext(cfs.getKeyspaceName(),
diff --git a/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java b/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java
index 302369290cf0..6d48e411974b 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java
@@ -28,6 +28,7 @@
import org.junit.Test;
import static org.apache.cassandra.inject.InvokePointBuilder.newInvokePoint;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertTrue;
public class DropIndexWhileQueryingTest extends SAITester
@@ -47,8 +48,8 @@ public void testDropIndexWhileQuerying() throws Throwable
execute("INSERT INTO %s (k, x, y, z) VALUES (?, ?, ?, ?)", "car", 0, "y0", "z0");
String query = "SELECT * FROM %s WHERE x IN (0, 1) OR (y IN ('Y0', 'Y1' ) OR z IN ('z1', 'z2'))";
- assertInvalidMessage(QueryController.INDEX_MAY_HAVE_BEEN_DROPPED, query);
- assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE, query);
+ assertThatThrownBy(() -> executeInternal(query)).hasMessage(QueryController.INDEX_MAY_HAVE_BEEN_DROPPED);
+ assertThatThrownBy(() -> executeInternal(query)).hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
}
@Test
@@ -84,8 +85,8 @@ public void testDropVectorIndexWhileQuerying() throws Throwable
execute("INSERT INTO %s (pk, str_val, val) VALUES (0, 'A', [1.0, 2.0, 3.0])");
String query = "SELECT pk FROM %s ORDER BY val ann of [0.5, 1.5, 2.5] LIMIT 2";
- assertInvalidMessage(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED, query);
- assertInvalidMessage(String.format(StatementRestrictions.NON_CLUSTER_ORDERING_REQUIRES_INDEX_MESSAGE, "val"), query);
+ assertThatThrownBy(() -> executeInternal(query)).hasMessage(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED);
+ assertThatThrownBy(() -> executeInternal(query)).hasMessage(String.format(StatementRestrictions.NON_CLUSTER_ORDERING_REQUIRES_INDEX_MESSAGE, "val"));
}
private static void injectIndexDrop(String injectionName, String indexName, String methodName, boolean atEntry) throws Throwable
diff --git a/test/unit/org/apache/cassandra/index/sai/cql/InetAddressTypeEquivalencyTest.java b/test/unit/org/apache/cassandra/index/sai/cql/InetAddressTypeEquivalencyTest.java
index 1fbef7ebc899..9ebbd90f674b 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/InetAddressTypeEquivalencyTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/InetAddressTypeEquivalencyTest.java
@@ -19,8 +19,10 @@
import java.net.InetAddress;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.index.sai.cql.types.InetTest;
@@ -31,6 +33,15 @@
*/
public class InetAddressTypeEquivalencyTest extends SAITester
{
+ // TODO: Disables coordinator execution because we know SAI indexing for inet works differently than RowFilter,
+ // which can wrongly discard rows in the coordinator. This is reported in CNDB-12978, and we should enable
+ // distributed execution again once we have a fix.
+ @BeforeClass
+ public static void disableCoordinatorExecution()
+ {
+ CQLTester.disableCoordinatorExecution();
+ }
+
@Before
public void createTableAndIndex()
{
diff --git a/test/unit/org/apache/cassandra/index/sai/cql/LuceneAnalyzerTest.java b/test/unit/org/apache/cassandra/index/sai/cql/LuceneAnalyzerTest.java
index 7ade35d11c02..04e6693b4902 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/LuceneAnalyzerTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/LuceneAnalyzerTest.java
@@ -64,6 +64,32 @@ public void testQueryAnalyzer()
assertEquals(0, execute("SELECT * FROM %s WHERE val = 'query'").size());
}
+ /**
+ * See CNDB-12739 for more details.
+ */
+ @Test
+ public void testQueryAnalyzerWithExtraData() throws Throwable
+ {
+ createTable("CREATE TABLE %s (c1 int PRIMARY KEY , c2 text)");
+
+ createIndex("CREATE CUSTOM INDEX ON %s(c2) USING 'StorageAttachedIndex' WITH OPTIONS = {" +
+ "'index_analyzer': '{" +
+ " \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
+ " \"filters\" : [ { \"name\" : \"lowercase\", \"args\": {} }, " +
+ " { \"name\" : \"edgengram\", \"args\": { \"minGramSize\":\"1\", \"maxGramSize\":\"30\" } }]," +
+ " \"charFilters\" : []}', " +
+ "'query_analyzer': '{" +
+ " \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
+ " \"filters\" : [ {\"name\" : \"lowercase\",\"args\": {}} ]}'}");
+
+ execute("INSERT INTO %s(c1,c2) VALUES (1, 'astra quick fox')");
+ execute("INSERT INTO %s(c1,c2) VALUES (2, 'astra quick foxes')");
+ execute("INSERT INTO %s(c1,c2) VALUES (3, 'astra1')");
+ execute("INSERT INTO %s(c1,c2) VALUES (4, 'astra4 -1@a#')");
+
+ beforeAndAfterFlush(() -> assertEquals(4, execute("SELECT * FROM %s WHERE c2 :'ast' ").size()));
+ }
+
@Test
public void testStandardQueryAnalyzer()
{
diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorTracingTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorTracingTest.java
index b620b8762129..cff0305770dc 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/VectorTracingTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorTracingTest.java
@@ -30,9 +30,10 @@
public class VectorTracingTest extends VectorTester.VersionedWithChecksums
{
@BeforeClass
- public static void setupClass()
+ public static void setUpClass()
{
System.setProperty("cassandra.custom_tracing_class", "org.apache.cassandra.tracing.TracingTestImpl");
+ VectorTester.setUpClass();
}
@Test
diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java
index 71a054ad1548..8eb435ae751e 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java
@@ -79,9 +79,10 @@ public static Collection