Skip to content

Commit

Permalink
Tablets support
Browse files Browse the repository at this point in the history
Introduces tablets support for version 4.x of the driver.
Metadata about tablets will be kept in TabletMap that gets continuously updated
through the tablets-routing-v1 extension. Each time the PreparedStatement
or BoundStatement targets the wrong node and shard combination the server
supporting tablets should respond with tablet metadata inside custom payload
of its response. This metadata will be transparently processed and used for
future queries.

Tablets metadata will by enabled by default. Until now driver was using
TokenMaps to choose replicas and appropriate shards. Having a token was enough
information to do that. Now driver will first attempt tablet-based lookup
and only after failing to find corresponding tablet it will defer to TokenMap
lookup. Since to find a correct tablet besides the token we need the keyspace
and table names, many of the methods were extended to also accept those
as parameters.

RequestHandlerTestHarness was adjusted to mock also MetadataManager.
Before it used to mock only `session.getMetadata()` call but the same can
be obtained by `context.getMetadataManager().getMetadata()`. Using the
second method was causing test failures.
  • Loading branch information
Bouncheck committed Mar 15, 2024
1 parent e561a2d commit c6730c6
Show file tree
Hide file tree
Showing 19 changed files with 919 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package com.datastax.oss.driver.api.core.cql;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
Expand Down Expand Up @@ -58,6 +59,10 @@ public interface PreparedStatement {
@NonNull
ColumnDefinitions getVariableDefinitions();

/** Table name inferred from {@link PreparedStatement#getVariableDefinitions()}. */
@Nullable
CqlIdentifier getTable();

/**
* The partitioner to use for token-aware routing. If {@code null}, the cluster-wide partitioner
* will be used.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.datastax.oss.driver.api.core.metadata;

import java.util.UUID;

/**
* Simple class to hold UUID of a host and a shard number. Class itself makes no checks or
* guarantees about existence of a shard on corresponding host.
*/
public class HostShardPair {
private final UUID host;
private final int shard;

public HostShardPair(UUID host, int shard) {
this.host = host;
this.shard = shard;
}

public UUID getHost() {
return host;
}

public int getShard() {
return shard;
}

@Override
public String toString() {
return "HostShardPair{" + "host=" + host + ", shard=" + shard + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.datastax.oss.driver.api.core.metadata;

import java.util.Objects;

/** Simple keyspace name and table name pair. */
public class KeyspaceTableNamePair {
private final String keyspace;
private final String tableName;

public KeyspaceTableNamePair(String keyspace, String tableName) {
this.keyspace = keyspace;
this.tableName = tableName;
}

public String getKeyspace() {
return keyspace;
}

public String getTableName() {
return tableName;
}

@Override
public String toString() {
return "KeyspaceTableNamePair{"
+ "keyspace='"
+ keyspace
+ '\''
+ ", tableName='"
+ tableName
+ '\''
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !(o instanceof KeyspaceTableNamePair)) return false;
KeyspaceTableNamePair that = (KeyspaceTableNamePair) o;
return keyspace.equals(that.keyspace) && tableName.equals(that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(keyspace, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ default Optional<KeyspaceMetadata> getKeyspace(@NonNull String keyspaceName) {
@NonNull
Optional<TokenMap> getTokenMap();

/**
* The tablet map for this cluster.
*
* <p>Starts as an empty map that will gradually receive updates on each query of a yet unknown
* tablet.
*/
TabletMap getTabletMap();

/**
* The cluster name to which this session is connected. The Optional returned should contain the
* value from the server for <b>system.local.cluster_name</b>.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.datastax.oss.driver.api.core.metadata;

import java.util.Set;
import java.util.UUID;

/** Represents a tablet as described in tablets-routing-v1 protocol extension. */
public interface Tablet extends Comparable<Tablet> {
public String getKeyspaceName();

public UUID getTableId();

public String getTableName();

public long getFirstToken();

public long getLastToken();

public Set<HostShardPair> getReplicas();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.api.core.data.TupleValue;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.UUID;

/** Holds all currently known tablet metadata. */
public interface TabletMap {
/**
* Returns mapping from tables to the sets of their tablets.
*
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
*/
public Map<KeyspaceTableNamePair, NavigableSet<Tablet>> getMapping();

/**
* Adds a singular tablet to the map. Provided tuple has to match format described for
* tablets-routing-v1 described in Scylla protocol extensions.
*
* @param keyspace tablet's keyspace
* @param table tablet's table
* @param tupleValue tablet
*/
public void addTablet(String keyspace, String table, TupleValue tupleValue);

/**
* Returns replicas holding tablets for given token. Because TabletMap holds only UUIDs of nodes a
* current mapping from UUID to actual Node instances is needed. If a mapping for the specific
* UUID is missing then such Node won't be included in the returned set.
*
* @param keyspace tablet's keyspace
* @param table tablet's table
* @param token target token
* @param nodes current UUID to Node mapping
* @return Set of {@link Node} that holds target tablets.
*/
public Set<Node> getReplicas(String keyspace, String table, long token, Map<UUID, Node> nodes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.protocol.internal.Message;
Expand Down Expand Up @@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
private ChannelHandlerContext ctx;
private final boolean querySupportedOptions;
private LwtInfo lwtInfo;
private TabletInfo tabletInfo;

/**
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
Expand Down Expand Up @@ -191,6 +193,9 @@ Message getRequest() {
if (lwtInfo != null) {
lwtInfo.addOption(startupOptions);
}
if (tabletInfo != null && tabletInfo.isEnabled()) {
TabletInfo.addOption(startupOptions);
}
return request = new Startup(startupOptions);
case GET_CLUSTER_NAME:
return request = CLUSTER_NAME_QUERY;
Expand Down Expand Up @@ -230,6 +235,7 @@ void onResponse(Message response) {
if (lwtInfo != null) {
channel.attr(LWT_INFO_KEY).set(lwtInfo);
}
tabletInfo = TabletInfo.parseTabletInfo(res.options);
step = Step.STARTUP;
send();
} else if (step == Step.STARTUP && response instanceof Ready) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.HostShardPair;
import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.Tablet;
import com.datastax.oss.driver.api.core.metadata.TabletMap;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
import com.datastax.oss.driver.api.core.metadata.token.Token;
Expand All @@ -58,9 +65,12 @@
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.DefaultTabletMap;
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
Expand Down Expand Up @@ -90,8 +100,10 @@
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -284,6 +296,103 @@ private Token getRoutingToken(Statement statement) {
return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key);
}

private CqlIdentifier getTabletRoutingKeyspace(Statement statement) {
if (statement == null) {
if (initialStatement == null) {
return null;
}
statement = initialStatement;
}
ColumnDefinitions cdefs = null;
CqlIdentifier result = null;
if (statement instanceof BoundStatement) {
cdefs = ((BoundStatement) statement).getPreparedStatement().getVariableDefinitions();
} else if (statement instanceof PreparedStatement) {
cdefs = ((PreparedStatement) statement).getVariableDefinitions();
}
if (cdefs != null && cdefs.size() > 0) {
result = cdefs.get(0).getKeyspace();
}
if (result == null) {
return keyspace;
} else {
return result;
}
}

private CqlIdentifier getTabletRoutingTable(Statement statement) {
if (statement == null) {
if (initialStatement == null) {
return null;
}
statement = initialStatement;
}
if (statement instanceof BoundStatement) {
return ((BoundStatement) statement).getPreparedStatement().getTable();
} else if (statement instanceof PreparedStatement) {
return ((PreparedStatement) statement).getTable();
} else {
return null;
}
}

public Integer getShardFromTabletMap(Statement statement, Node node, Token token) {
TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap();
if (!(token instanceof TokenLong64)) {
LOG.trace(
"Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.",
token,
statement);
return null;
}
if (tabletMap == null) {
LOG.trace(
"Could not determine shard for token {} on host {} because tablets metadata is currently null. "
+ "Returning null.",
token,
node);
return null;
}
if ((getTabletRoutingKeyspace(statement) == null && keyspace == null)
|| getTabletRoutingTable(statement) == null) {
return null;
}
UUID targetHostUuid = node.getHostId();
long tokenValue = ((TokenLong64) token).getValue();
String statementKeyspace =
getTabletRoutingKeyspace(statement) != null
? getTabletRoutingKeyspace(statement).asInternal()
: keyspace.asInternal();
String statementTable = getTabletRoutingTable(statement).asInternal();
KeyspaceTableNamePair key = new KeyspaceTableNamePair(statementKeyspace, statementTable);
NavigableSet<Tablet> targetTablets = tabletMap.getMapping().get(key);
if (targetTablets == null) {
LOG.trace(
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
+ "metadata. Returning null.",
token,
node,
statementKeyspace,
statementTable);
return null;
}
Tablet row = targetTablets.ceiling(DefaultTabletMap.DefaultTablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
}
}
}
LOG.trace(
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning null.",
token,
node,
statementTable,
statementKeyspace);
return null;
}

/**
* Sends the request to the next available node.
*
Expand All @@ -309,9 +418,20 @@ private void sendRequest(
Node node = retriedNode;
DriverChannel channel = null;
if (node == null
|| (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) {
|| (channel =
session.getChannel(
node,
logPrefix,
getRoutingToken(statement),
getShardFromTabletMap(statement, node, getRoutingToken(statement))))
== null) {
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix, getRoutingToken(statement));
channel =
session.getChannel(
node,
logPrefix,
getRoutingToken(statement),
getShardFromTabletMap(statement, node, getRoutingToken(statement)));
if (channel != null) {
break;
} else {
Expand Down Expand Up @@ -420,6 +540,18 @@ private void setFinalResult(
totalLatencyNanos,
TimeUnit.NANOSECONDS);
}
if (resultSet.getColumnDefinitions().size() > 0
&& resultSet
.getExecutionInfo()
.getIncomingPayload()
.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
context
.getMetadataManager()
.addTabletsFromPayload(
resultSet.getColumnDefinitions().get(0).getKeyspace(),
resultSet.getColumnDefinitions().get(0).getTable(),
resultSet.getExecutionInfo().getIncomingPayload());
}
}
// log the warnings if they have NOT been disabled
if (!executionInfo.getWarnings().isEmpty()
Expand Down
Loading

0 comments on commit c6730c6

Please sign in to comment.