Skip to content

Commit

Permalink
Support for tablets routing information
Browse files Browse the repository at this point in the history
Adds TabletInfo class to hold tablet support info on a per Host basis
Adds TabletMap to hold currently known mappings
Adds tablets-routing-v1 protocol extension negotiation
Adds logic for parsing tablets routing information and using it under the hood.
  • Loading branch information
Bouncheck committed Jan 27, 2024
1 parent 5d2fb2c commit 439f933
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ public ListenableFuture<Void> apply(Message.Response response) throws Exception
if (lwt != null) {
getHost().setLwtInfo(lwt);
}
TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported);
getHost().setTabletInfo(tabletInfo);
return MoreFutures.VOID_SUCCESS;
case ERROR:
Responses.Error error = (Responses.Error) response;
Expand Down Expand Up @@ -507,6 +509,12 @@ public ListenableFuture<Void> apply(Void input) throws Exception {
if (lwtInfo != null) {
lwtInfo.addOption(extraOptions);
}
TabletInfo tabletInfo = getHost().getTabletInfo();
if (tabletInfo != null
&& tabletInfo.isEnabled()
&& ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) {
tabletInfo.addOption(extraOptions);
}
Future startupResponseFuture =
write(
new Requests.Startup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,38 @@ public void onSet(
switch (response.type) {
case RESULT:
Responses.Result rm = (Responses.Result) response;

if (rm.getCustomPayload() != null
&& rm.getCustomPayload().containsKey("tablets-routing-v1")
&& (statement instanceof BoundStatement)) {
BoundStatement st = (BoundStatement) statement;
TupleType tupleType =
TupleType.of(
protocolVersion,
session.configuration().getCodecRegistry(),
DataType.bigint(),
DataType.bigint(),
DataType.list(
TupleType.of(
protocolVersion,
session.configuration().getCodecRegistry(),
DataType.uuid(),
DataType.cint())));
TypeCodec<TupleValue> typeCodec =
session.configuration().getCodecRegistry().codecFor(tupleType);
TupleValue tupleValue =
typeCodec.deserialize(
rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY),
protocolVersion);
String keyspace = statement.getKeyspace();
String table =
st.preparedStatement().getPreparedId().boundValuesMetadata.variables.getTable(0);
session
.getCluster()
.getMetadata()
.getTabletsMap()
.processCustomPayloadV1(keyspace, table, tupleValue);
}
switch (rm.kind) {
case SET_KEYSPACE:
// propagate the keyspace change to other connections
Expand Down
11 changes: 11 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class Host {
// Can be set concurrently but the value should always be the same.
private volatile LwtInfo lwtInfo = null;

// Whether host supports TABLETS_ROUTING_V1
private volatile TabletInfo tabletInfo = null;

enum State {
ADDED,
DOWN,
Expand Down Expand Up @@ -450,6 +453,14 @@ public void setLwtInfo(LwtInfo lwtInfo) {
this.lwtInfo = lwtInfo;
}

public TabletInfo getTabletInfo() {
return tabletInfo;
}

public void setTabletInfo(TabletInfo tabletInfo) {
this.tabletInfo = tabletInfo;
}

/**
* Returns whether the host is considered up by the driver.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ ListenableFuture<Connection> borrowConnection(
TimeUnit unit,
int maxQueueSize,
Token.Factory partitioner,
ByteBuffer routingKey) {
ByteBuffer routingKey,
String keyspace,
String table) {
Phase phase = this.phase.get();
if (phase != Phase.READY)
return Futures.immediateFailedFuture(
Expand All @@ -515,7 +517,14 @@ ListenableFuture<Connection> borrowConnection(
if (routingKey != null) {
Metadata metadata = manager.cluster.getMetadata();
Token t = metadata.newToken(partitioner, routingKey);
shardId = host.getShardingInfo().shardId(t);
shardId = -1;
if (keyspace != null && table != null) {
assert t instanceof Token.TokenLong64;
shardId = metadata.getShardForTabletToken(keyspace, table, (Token.TokenLong64) t, host);
}
if (shardId == -1) { // means that tablet lookup failed
shardId = host.getShardingInfo().shardId(t);
}
} else {
shardId = RAND.nextInt(host.getShardingInfo().getShardsCount());
}
Expand Down
Loading

0 comments on commit 439f933

Please sign in to comment.