Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12905: Allow usage of ReadQuery#execute rather than only executeInternal in CQL utests #1564

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
36 changes: 23 additions & 13 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,19 +516,29 @@ public static UntypedResultSet execute(String query, ConsistencyLevel cl, Object
public static UntypedResultSet execute(String query, ConsistencyLevel cl, QueryState state, Object... values)
throws RequestExecutionException
{
try
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the try-catch block because the only caller of this method is SystemDistributedKeyspace#viewStatus, who simply swallows the exception making this wrapping of RequestValidationException no-op.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who simply swallows the exception making this wrapping of RequestValidationException no-op

Could this be an actual bug?
Also, it seems Sonar complains about test coverage in this area.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, how do you think this could fail?

Copy link

@ekaterinadimitrova2 ekaterinadimitrova2 Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems parseAndPrepare also throws that exception, and it is called by this method.
It seems that parseStatement (called by parseAndPrepare) is throwing SyntaxException that extends the RequestValidationException.
Interesting history - this execute method was added with the re-write of the storage engine. The method did not have any callers and the execute method implementations were throwing that exception. They stopped throwing it in CASSANDRA-13426 so I guess that was left over.

I haven't written particular tests to verify what is going on, just looking around the code. Shall we leave the cleaning for follow-up ticket?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the generic try-catch block in that variant of execute looks like a reminder of the original implementation. The method is only used by SystemDistributedKeyspace#viewStatus, which shallows the exception because it's only interested in collecting view statuses for a nodetool command. So it's a no-op. Now we want to re-use the method for QueryTester, but the useless exception replacement gets in the way and hides the original exception, which we want for our test checks.

Do you agree on simply removing the try-catch block, since it doesn't seem to use any purpose? Or I can add a variant of the QueryProcessor#execute method without the exception replacement, or even embed it in CQLTester#executeFormattedQuery, given how simple it is.

Copy link

@ekaterinadimitrova2 ekaterinadimitrova2 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's remove it, convinced. Just trying not to break something and being extra cautious.

{
Prepared prepared = prepareInternal(query);
ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared.statement, values, cl), System.nanoTime());
if (result instanceof ResultMessage.Rows)
return UntypedResultSet.create(((ResultMessage.Rows)result).result);
else
return null;
}
catch (RequestValidationException e)
{
throw new RuntimeException("Error validating " + query, e);
}
Prepared prepared = prepareInternal(query);
ResultMessage<?> result = prepared.statement.execute(state, makeInternalOptions(prepared.statement, values, cl), System.nanoTime());
if (result instanceof ResultMessage.Rows)
return UntypedResultSet.create(((ResultMessage.Rows)result).result);
else
return null;
}

/**
* Same than {@link #execute(String, ConsistencyLevel, Object...)}, but to use for queries we know are only executed
* once so that the created statement object is not cached.
*/
@VisibleForTesting
static UntypedResultSet executeOnce(String query, ConsistencyLevel cl, Object... values)
{
QueryState queryState = internalQueryState();
CQLStatement statement = parseStatement(query, queryState.getClientState());
statement.validate(queryState);
ResultMessage<?> result = statement.execute(queryState, makeInternalOptions(statement, values, cl), System.nanoTime());
if (result instanceof ResultMessage.Rows)
return UntypedResultSet.create(((ResultMessage.Rows)result).result);
else
return null;
}

public static UntypedResultSet executeInternalWithPaging(String query, PageSize pageSize, Object... values)
Expand Down
110 changes: 104 additions & 6 deletions test/unit/org/apache/cassandra/cql3/CQLTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -99,14 +98,14 @@
import org.apache.cassandra.auth.CassandraAuthorizer;
import org.apache.cassandra.auth.CassandraRoleManager;
import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.functions.types.ParseUtils;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadCommand;
Expand Down Expand Up @@ -204,6 +203,16 @@ public abstract class CQLTester
private static final int ASSERTION_TIMEOUT_SECONDS = 15;
private static final User SUPER_USER = new User("cassandra", "cassandra");

/**
* Whether to use coorfinator execution in {@link #execute(String, Object...)}, so queries get full validation and
* go through reconciliation. When enabled, calls to {@link #execute(String, Object...)} will behave as calls to
* {@link #executeWithCoordinator(String, Object...)}. Otherwise, they will behave as calls to
* {@link #executeInternal(String, Object...)}.
*
* @see #execute
*/
private static boolean coordinatorExecution = false;

private static org.apache.cassandra.transport.Server server;
private static JMXConnectorServer jmxServer;
protected static String jmxHost;
Expand Down Expand Up @@ -506,6 +515,9 @@ public static List<String> buildCassandraStressArgs(List<String> args)

protected static void requireNetworkWithoutDriver()
{
if (server != null)
return;

startServices();
startServer(server -> {});
}
Expand Down Expand Up @@ -1520,21 +1532,101 @@ protected ResultMessage.Prepared prepare(String query) throws Throwable
return QueryProcessor.instance.prepare(formatQuery(query), ClientState.forInternalCalls());
}

/**
* Enables coordinator execution in {@link #execute(String, Object...)}, so queries get full validation and go
* through reconciliation. This makes calling {@link #execute(String, Object...)} equivalent to calling
* {@link #executeWithCoordinator(String, Object...)}.
*/
protected static void enableCoordinatorExecution()
{
requireNetworkWithoutDriver();
coordinatorExecution = true;
}

/**
* Disables coordinator execution in {@link #execute(String, Object...)}, so queries won't get full validation nor
* go through reconciliation.This makes calling {@link #execute(String, Object...)} equivalent to calling
* {@link #executeInternal(String, Object...)}.
*/
protected static void disableCoordinatorExecution()
{
coordinatorExecution = false;
}

/**
* Execute the specified query as either an internal query or a coordinator query depending on the value of
* {@link #coordinatorExecution}.
*
* @param query a CQL query
* @param values the values to bind to the query
* @return the query results
* @see #execute
* @see #executeInternal
*/
public UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(query), values);
return coordinatorExecution
? executeWithCoordinator(query, values)
: executeInternal(query, values);
}

/**
* Execute the specified query as an internal query only for the local node. This will skip reconciliation and some
* validation.
* </p>
* For the particular case of {@code SELECT} queries using secondary indexes, the skipping of reconciliation means
* that the query {@link org.apache.cassandra.db.filter.RowFilter} might not be fully applied to the index results.
*
* @param query a CQL query
* @param values the values to bind to the query
* @return the query results
* @see CQLStatement#executeLocally
*/
public UntypedResultSet executeInternal(String query, Object... values)
{
return executeFormattedQuery(formatQuery(query), false, values);
}

/**
* Execute the specified query as an coordinator-side query meant for all the relevant nodes in the cluster, even if
* {@link CQLTester} tests are single-node. This won't skip reconciliation and will do full validation.
* </p>
* For the particular case of {@code SELECT} queries using secondary indexes, applying reconciliation means that the
* query {@link org.apache.cassandra.db.filter.RowFilter} will be fully applied to the index results.
*
* @param query a CQL query
* @param values the values to bind to the query
* @return the query results
* @see CQLStatement#execute
*/
public UntypedResultSet executeWithCoordinator(String query, Object... values)
{
return executeFormattedQuery(formatQuery(query), true, values);
}

public UntypedResultSet executeFormattedQuery(String query, Object... values)
{
return executeFormattedQuery(query, coordinatorExecution, values);
}

private UntypedResultSet executeFormattedQuery(String query, boolean useCoordinator, Object... values)
{
if (useCoordinator)
requireNetworkWithoutDriver();

UntypedResultSet rs;
if (usePrepared)
{
if (logger.isTraceEnabled())
logger.trace("Executing: {} with values {}", query, formatAllValues(values));

Object[] transformedValues = transformValues(values);

if (reusePrepared)
{
rs = QueryProcessor.executeInternal(query, transformValues(values));
rs = useCoordinator
? QueryProcessor.execute(query, ConsistencyLevel.ONE, transformedValues)
: QueryProcessor.executeInternal(query, transformedValues);

// If a test uses a "USE ...", then presumably its statements use relative table. In that case, a USE
// change the meaning of the current keyspace, so we don't want a following statement to reuse a previously
Expand All @@ -1545,15 +1637,21 @@ public UntypedResultSet executeFormattedQuery(String query, Object... values)
}
else
{
rs = QueryProcessor.executeOnceInternal(query, transformValues(values));
rs = useCoordinator
? QueryProcessor.executeOnce(query, ConsistencyLevel.ONE, transformedValues)
: QueryProcessor.executeOnceInternal(query, transformedValues);
}
}
else
{
query = replaceValues(query, values);

if (logger.isTraceEnabled())
logger.trace("Executing: {}", query);
rs = QueryProcessor.executeOnceInternal(query);

rs = useCoordinator
? QueryProcessor.executeOnce(query, ConsistencyLevel.ONE)
: QueryProcessor.executeOnceInternal(query);
}
if (rs != null)
{
Expand Down
18 changes: 12 additions & 6 deletions test/unit/org/apache/cassandra/index/sai/SAITester.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand All @@ -48,6 +46,7 @@
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestRule;

Expand All @@ -72,8 +71,6 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.ResourceLeakDetector;
import org.apache.cassandra.inject.ActionBuilder;
Expand All @@ -91,10 +88,8 @@
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ReflectionUtils;
import org.apache.cassandra.utils.Throwables;
import org.apache.lucene.codecs.CodecUtil;
import org.awaitility.Awaitility;

import static org.apache.cassandra.inject.ActionBuilder.newActionBuilder;
import static org.apache.cassandra.inject.Expression.expr;
Expand Down Expand Up @@ -224,6 +219,17 @@ public void removeAllInjections()
Injections.deleteAll();
}

/**
* Enable external execution of all queries because we want to use reconciliation in SELECT queries so that we can
* simulate the application of the entire row filter in the coordinator node, even if unit tests are not multinode.
*/
@BeforeClass
public static void setUpClass()
{
CQLTester.setUpClass();
CQLTester.enableCoordinatorExecution();
}

public static IndexContext createIndexContext(String name, AbstractType<?> validator, ColumnFamilyStore cfs)
{
return new IndexContext(cfs.getKeyspaceName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.Test;

import static org.apache.cassandra.inject.InvokePointBuilder.newInvokePoint;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertTrue;

public class DropIndexWhileQueryingTest extends SAITester
Expand All @@ -47,8 +48,8 @@ public void testDropIndexWhileQuerying() throws Throwable

execute("INSERT INTO %s (k, x, y, z) VALUES (?, ?, ?, ?)", "car", 0, "y0", "z0");
String query = "SELECT * FROM %s WHERE x IN (0, 1) OR (y IN ('Y0', 'Y1' ) OR z IN ('z1', 'z2'))";
assertInvalidMessage(QueryController.INDEX_MAY_HAVE_BEEN_DROPPED, query);
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE, query);
assertThatThrownBy(() -> executeInternal(query)).hasMessage(QueryController.INDEX_MAY_HAVE_BEEN_DROPPED);
assertThatThrownBy(() -> executeInternal(query)).hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
}

@Test
Expand Down Expand Up @@ -84,8 +85,8 @@ public void testDropVectorIndexWhileQuerying() throws Throwable
execute("INSERT INTO %s (pk, str_val, val) VALUES (0, 'A', [1.0, 2.0, 3.0])");

String query = "SELECT pk FROM %s ORDER BY val ann of [0.5, 1.5, 2.5] LIMIT 2";
assertInvalidMessage(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED, query);
assertInvalidMessage(String.format(StatementRestrictions.NON_CLUSTER_ORDERING_REQUIRES_INDEX_MESSAGE, "val"), query);
assertThatThrownBy(() -> executeInternal(query)).hasMessage(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED);
assertThatThrownBy(() -> executeInternal(query)).hasMessage(String.format(StatementRestrictions.NON_CLUSTER_ORDERING_REQUIRES_INDEX_MESSAGE, "val"));
}

private static void injectIndexDrop(String injectionName, String indexName, String methodName, boolean atEntry) throws Throwable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.net.InetAddress;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.index.sai.cql.types.InetTest;

Expand All @@ -31,6 +33,15 @@
*/
public class InetAddressTypeEquivalencyTest extends SAITester
{
// TODO: Disables coordinator execution because we know SAI indexing for inet works differently than RowFilter,
// which can wrongly discard rows in the coordinator. This is reported in CNDB-12978, and we should enable
// distributed execution again once we have a fix.
@BeforeClass
public static void disableCoordinatorExecution()
{
CQLTester.disableCoordinatorExecution();
}

@Before
public void createTableAndIndex()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,32 @@ public void testQueryAnalyzer()
assertEquals(0, execute("SELECT * FROM %s WHERE val = 'query'").size());
}

/**
* See CNDB-12739 for more details.
*/
@Test
public void testQueryAnalyzerWithExtraData() throws Throwable
{
createTable("CREATE TABLE %s (c1 int PRIMARY KEY , c2 text)");

createIndex("CREATE CUSTOM INDEX ON %s(c2) USING 'StorageAttachedIndex' WITH OPTIONS = {" +
"'index_analyzer': '{" +
" \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
" \"filters\" : [ { \"name\" : \"lowercase\", \"args\": {} }, " +
" { \"name\" : \"edgengram\", \"args\": { \"minGramSize\":\"1\", \"maxGramSize\":\"30\" } }]," +
" \"charFilters\" : []}', " +
"'query_analyzer': '{" +
" \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
" \"filters\" : [ {\"name\" : \"lowercase\",\"args\": {}} ]}'}");

execute("INSERT INTO %s(c1,c2) VALUES (1, 'astra quick fox')");
execute("INSERT INTO %s(c1,c2) VALUES (2, 'astra quick foxes')");
execute("INSERT INTO %s(c1,c2) VALUES (3, 'astra1')");
execute("INSERT INTO %s(c1,c2) VALUES (4, 'astra4 -1@a#')");

beforeAndAfterFlush(() -> assertEquals(4, execute("SELECT * FROM %s WHERE c2 :'ast' ").size()));
}

@Test
public void testStandardQueryAnalyzer()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
public class VectorTracingTest extends VectorTester.VersionedWithChecksums
{
@BeforeClass
public static void setupClass()
public static void setUpClass()
{
System.setProperty("cassandra.custom_tracing_class", "org.apache.cassandra.tracing.TracingTestImpl");
VectorTester.setUpClass();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ public static Collection<Object[]> data()
private static final IPartitioner partitioner = Murmur3Partitioner.instance;

@BeforeClass
public static void setupClass()
public static void setUpClass()
{
System.setProperty("cassandra.custom_tracing_class", "org.apache.cassandra.tracing.TracingTestImpl");
VectorTester.setUpClass();
}

@Before
Expand Down Expand Up @@ -1012,7 +1013,7 @@ public void testEnsureIndexQueryableAfterTransientFailure() throws Throwable
// Ensure that we fail, as expected, and that a subsequent call to search is successful.
beforeAndAfterFlush(() -> {
injection.enable();
assertThatThrownBy(() -> execute("SELECT pk FROM %s ORDER BY vec ANN OF [1,1] LIMIT 2")).hasMessageContaining("Injected failure!");
assertThatThrownBy(() -> executeInternal("SELECT pk FROM %s ORDER BY vec ANN OF [1,1] LIMIT 2")).hasMessageContaining("Injected failure!");
injection.disable();
assertRows(execute("SELECT pk FROM %s ORDER BY vec ANN OF [1,1] LIMIT 2"), row(1));
});
Expand Down
Loading