Skip to content

Commit

Permalink
Support for tablets routing information
Browse files Browse the repository at this point in the history
Adds TabletInfo class to hold tablet support info on a per Host basis
Adds TabletMap to hold currently known mappings
Adds tablets-routing-v1 protocol extension negotiation
Adds logic for parsing tablets routing information and using it under the hood.
Adds TabletsIT
  • Loading branch information
Bouncheck committed Feb 6, 2024
1 parent 5d2fb2c commit 8807dab
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ public ListenableFuture<Void> apply(Message.Response response) throws Exception
if (lwt != null) {
getHost().setLwtInfo(lwt);
}
TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported);
getHost().setTabletInfo(tabletInfo);
return MoreFutures.VOID_SUCCESS;
case ERROR:
Responses.Error error = (Responses.Error) response;
Expand Down Expand Up @@ -507,6 +509,12 @@ public ListenableFuture<Void> apply(Void input) throws Exception {
if (lwtInfo != null) {
lwtInfo.addOption(extraOptions);
}
TabletInfo tabletInfo = getHost().getTabletInfo();
if (tabletInfo != null
&& tabletInfo.isEnabled()
&& ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) {
tabletInfo.addOption(extraOptions);
}
Future startupResponseFuture =
write(
new Requests.Startup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,38 @@ public void onSet(
switch (response.type) {
case RESULT:
Responses.Result rm = (Responses.Result) response;

if (rm.getCustomPayload() != null
&& rm.getCustomPayload().containsKey("tablets-routing-v1")
&& (statement instanceof BoundStatement)) {
BoundStatement st = (BoundStatement) statement;
TupleType tupleType =
TupleType.of(
protocolVersion,
session.configuration().getCodecRegistry(),
DataType.bigint(),
DataType.bigint(),
DataType.list(
TupleType.of(
protocolVersion,
session.configuration().getCodecRegistry(),
DataType.uuid(),
DataType.cint())));
TypeCodec<TupleValue> typeCodec =
session.configuration().getCodecRegistry().codecFor(tupleType);
TupleValue tupleValue =
typeCodec.deserialize(
rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY),
protocolVersion);
String keyspace = statement.getKeyspace();
String table =
st.preparedStatement().getPreparedId().boundValuesMetadata.variables.getTable(0);
session
.getCluster()
.getMetadata()
.getTabletMap()
.processCustomPayloadV1(keyspace, table, tupleValue);
}
switch (rm.kind) {
case SET_KEYSPACE:
// propagate the keyspace change to other connections
Expand Down
11 changes: 11 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class Host {
// Can be set concurrently but the value should always be the same.
private volatile LwtInfo lwtInfo = null;

// Whether host supports TABLETS_ROUTING_V1
private volatile TabletInfo tabletInfo = null;

enum State {
ADDED,
DOWN,
Expand Down Expand Up @@ -450,6 +453,14 @@ public void setLwtInfo(LwtInfo lwtInfo) {
this.lwtInfo = lwtInfo;
}

public TabletInfo getTabletInfo() {
return tabletInfo;
}

public void setTabletInfo(TabletInfo tabletInfo) {
this.tabletInfo = tabletInfo;
}

/**
* Returns whether the host is considered up by the driver.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ ListenableFuture<Connection> borrowConnection(
TimeUnit unit,
int maxQueueSize,
Token.Factory partitioner,
ByteBuffer routingKey) {
ByteBuffer routingKey,
String keyspace,
String table) {
Phase phase = this.phase.get();
if (phase != Phase.READY)
return Futures.immediateFailedFuture(
Expand All @@ -515,7 +517,14 @@ ListenableFuture<Connection> borrowConnection(
if (routingKey != null) {
Metadata metadata = manager.cluster.getMetadata();
Token t = metadata.newToken(partitioner, routingKey);
shardId = host.getShardingInfo().shardId(t);
shardId = -1;
if (keyspace != null && table != null) {
assert t instanceof Token.TokenLong64;
shardId = metadata.getShardForTabletToken(keyspace, table, (Token.TokenLong64) t, host);
}
if (shardId == -1) { // means that tablet lookup failed
shardId = host.getShardingInfo().shardId(t);
}
} else {
shardId = RAND.nextInt(host.getShardingInfo().getShardsCount());
}
Expand Down
105 changes: 101 additions & 4 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package com.datastax.driver.core;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
Expand All @@ -35,13 +36,15 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,8 +63,8 @@ public class Metadata {
final ConcurrentMap<String, KeyspaceMetadata> keyspaces =
new ConcurrentHashMap<String, KeyspaceMetadata>();
private volatile TokenMap tokenMap;

final ReentrantLock lock = new ReentrantLock();
private final TabletMap tabletMap = TabletMap.emptyMap();

// See https://github.com/apache/cassandra/blob/trunk/doc/cql3/CQL.textile#appendixA
private static final IntObjectHashMap<List<char[]>> RESERVED_KEYWORDS =
Expand Down Expand Up @@ -514,23 +517,45 @@ public Set<TokenRange> getTokenRanges(String keyspace, Host host) {
}

/**
* Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code
* null} and then a cluster-wide partitioner will be invoked.
* Extension of legacy method {@link Metadata#getReplicas(String, Token.Factory, ByteBuffer)}.
* Tablets model requires knowledge of the table name to determine the replicas. This method will
* first try to lookup replicas through known tablets metadata. It will default to TokenMap lookup
* if either {@code null} was passed as table name or the tablet lookup is unsuccessful for any
* other reason.
*
* <p>Returns the set of hosts that are replica for a given partition key. Partitioner can be
* {@code null} and then a cluster-wide partitioner will be invoked.
*
* <p>Note that this information is refreshed asynchronously by the control connection, when
* schema or ring topology changes. It might occasionally be stale (or even empty).
*
* @param keyspace the name of the keyspace to get replicas for.
* @param table the name of the table to get replicas for. Necessary for distinction for tablets.
* Unnecessary for regular TokenMap
* @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner.
* @param partitionKey the partition key for which to find the set of replica.
* @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note
* that the result might be stale or empty if metadata was explicitly disabled with {@link
* QueryOptions#setMetadataEnabled(boolean)}.
*/
@Beta
public Set<Host> getReplicas(
String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) {
String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) {
keyspace = handleId(keyspace);
TokenMap current = tokenMap;

if (keyspace != null && table != null) {
if (partitioner == null) {
partitioner = current.factory;
}
Token token = partitioner.hash(partitionKey);
assert (token instanceof Token.TokenLong64);
Set<UUID> hostUuids = tabletMap.getReplicas(keyspace, table, (long) token.getValue());
if (!hostUuids.isEmpty()) {
return hostUuids.stream().map(this::getHost).collect(Collectors.toSet());
}
}

if (current == null) {
return Collections.emptySet();
} else {
Expand All @@ -542,6 +567,25 @@ public Set<Host> getReplicas(
}
}

/**
* Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code
* null} and then a cluster-wide partitioner will be invoked.
*
* <p>Note that this information is refreshed asynchronously by the control connection, when
* schema or ring topology changes. It might occasionally be stale (or even empty).
*
* @param keyspace the name of the keyspace to get replicas for.
* @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner.
* @param partitionKey the partition key for which to find the set of replica.
* @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note
* that the result might be stale or empty if metadata was explicitly disabled with {@link
* QueryOptions#setMetadataEnabled(boolean)}.
*/
public Set<Host> getReplicas(
String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) {
return getReplicas(keyspace, null, partitioner, partitionKey);
}

/**
* Returns the set of hosts that are replica for a given token range.
*
Expand Down Expand Up @@ -860,6 +904,59 @@ void triggerOnMaterializedViewRemoved(MaterializedViewMetadata view) {
}
}

@Beta
public int getShardForTabletToken(
String keyspace, String table, Token.TokenLong64 token, Host host) {
if (tabletMap == null) {
logger.trace(
"Could not determine shard for token {} on host {} because tablets metadata is currently null. "
+ "Returning -1.",
token,
host);
return -1;
}
UUID targetHostUuid = host.getHostId();
long tokenValue = (long) token.getValue();
TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table);
NavigableSet<TabletMap.LazyTablet> targetTablets = tabletMap.getMapping().get(key);
if (targetTablets == null) {
logger.trace(
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
+ "metadata. Returning -1.",
token,
host,
keyspace,
table);
return -1;
}
TabletMap.LazyTablet row =
targetTablets.ceiling(TabletMap.LazyTablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
}
}
}
logger.trace(
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.",
token,
host,
table,
keyspace);
return -1;
}

/**
* Getter for current {@link TabletMap}.
*
* @return current {@link TabletMap}
*/
@Beta
public TabletMap getTabletMap() {
return tabletMap;
}

private static class TokenMap {

private final Token.Factory factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,22 @@ private Iterator<Host> getReplicas(
}

Token.Factory partitioner = statement.getPartitioner();
String tableName = null;
ColumnDefinitions defs = null;
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).preparedStatement().getVariables();
} else if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (defs != null && defs.size() > 0) {
tableName = defs.getTable(0);
}

final Set<Host> replicas =
manager
.cluster
.getMetadata()
.getReplicas(Metadata.quote(keyspace), partitioner, partitionKey);
.getReplicas(Metadata.quote(keyspace), tableName, partitioner, partitionKey);

// replicas are stored in the right order starting with the primary replica
return replicas.iterator();
Expand Down Expand Up @@ -437,13 +448,28 @@ private boolean query(final Host host) {
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);

PoolingOptions poolingOptions = manager.configuration().getPoolingOptions();
String statementKeyspace = statement.getKeyspace();
String statementTable = null;
ColumnDefinitions defs = null;
if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).statement.getVariables();
}
if (defs != null && defs.size() > 0) {
statementTable = defs.getTable(0);
}

ListenableFuture<Connection> connectionFuture =
pool.borrowConnection(
poolingOptions.getPoolTimeoutMillis(),
TimeUnit.MILLISECONDS,
poolingOptions.getMaxQueueSize(),
statement.getPartitioner(),
routingKey);
routingKey,
statementKeyspace,
statementTable);
GuavaCompatibility.INSTANCE.addCallback(
connectionFuture,
new FutureCallback<Connection>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,13 +733,22 @@ private ListenableFuture<PreparedStatement> prepare(
if (entry.getKey().getEndPoint().equals(toExclude)) continue;

try {
ColumnDefinitions defs = statement.getVariables();
String statementTable = (defs != null && defs.size() > 0 ? defs.getTable(0) : null);
// Preparing is not critical: if it fails, it will fix itself later when the user tries to
// execute
// the prepared query. So don't wait if no connection is available, simply abort.
ListenableFuture<Connection> connectionFuture =
entry
.getValue()
.borrowConnection(0, TimeUnit.MILLISECONDS, 0, null, statement.getRoutingKey());
.borrowConnection(
0,
TimeUnit.MILLISECONDS,
0,
null,
statement.getRoutingKey(),
statement.getQueryKeyspace(),
statementTable);
ListenableFuture<Response> prepareFuture =
GuavaCompatibility.INSTANCE.transformAsync(
connectionFuture,
Expand Down
33 changes: 33 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.datastax.driver.core;

import java.util.List;
import java.util.Map;

public class TabletInfo {
private static final String SCYLLA_TABLETS_STARTUP_OPTION_KEY = "TABLETS_ROUTING_V1";
private static final String SCYLLA_TABLETS_STARTUP_OPTION_VALUE = "";
public static final String TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY = "tablets-routing-v1";

private boolean enabled = false;

private TabletInfo(boolean enabled) {
this.enabled = enabled;
}

// Currently pertains only to TABLETS_ROUTING_V1
public boolean isEnabled() {
return enabled;
}

public static TabletInfo parseTabletInfo(Map<String, List<String>> supported) {
List<String> values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY);
return new TabletInfo(
values != null
&& values.size() == 1
&& values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE));
}

public static void addOption(Map<String, String> options) {
options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE);
}
}
Loading

0 comments on commit 8807dab

Please sign in to comment.