From c6730c6022e2c786eeb9a4d13b7e57026d9bc3b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Fri, 15 Mar 2024 12:06:43 +0100 Subject: [PATCH] Tablets support Introduces tablets support for version 4.x of the driver. Metadata about tablets will be kept in TabletMap that gets continuously updated through the tablets-routing-v1 extension. Each time the PreparedStatement or BoundStatement targets the wrong node and shard combination the server supporting tablets should respond with tablet metadata inside custom payload of its response. This metadata will be transparently processed and used for future queries. Tablets metadata will by enabled by default. Until now driver was using TokenMaps to choose replicas and appropriate shards. Having a token was enough information to do that. Now driver will first attempt tablet-based lookup and only after failing to find corresponding tablet it will defer to TokenMap lookup. Since to find a correct tablet besides the token we need the keyspace and table names, many of the methods were extended to also accept those as parameters. RequestHandlerTestHarness was adjusted to mock also MetadataManager. Before it used to mock only `session.getMetadata()` call but the same can be obtained by `context.getMetadataManager().getMetadata()`. Using the second method was causing test failures. --- .../api/core/cql/PreparedStatement.java | 5 + .../api/core/metadata/HostShardPair.java | 30 ++ .../core/metadata/KeyspaceTableNamePair.java | 47 +++ .../driver/api/core/metadata/Metadata.java | 8 + .../oss/driver/api/core/metadata/Tablet.java | 19 ++ .../driver/api/core/metadata/TabletMap.java | 40 +++ .../core/channel/ProtocolInitHandler.java | 6 + .../internal/core/cql/CqlRequestHandler.java | 136 ++++++++- .../core/cql/DefaultPreparedStatement.java | 10 + .../BasicLoadBalancingPolicy.java | 36 +++ .../core/metadata/AddTabletRefresh.java | 25 ++ .../core/metadata/DefaultMetadata.java | 28 +- .../core/metadata/DefaultTabletMap.java | 284 ++++++++++++++++++ .../core/metadata/MetadataManager.java | 37 +++ .../internal/core/pool/ChannelPool.java | 17 +- .../internal/core/protocol/TabletInfo.java | 33 ++ .../internal/core/session/DefaultSession.java | 13 +- .../core/cql/RequestHandlerTestHarness.java | 11 + .../driver/examples/basic/TabletsExample.java | 147 +++++++++ 19 files changed, 919 insertions(+), 13 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/api/core/metadata/HostShardPair.java create mode 100644 core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java create mode 100644 core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java create mode 100644 core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java create mode 100644 examples/src/main/java/com/datastax/oss/driver/examples/basic/TabletsExample.java diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/PreparedStatement.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/PreparedStatement.java index 48e0d1567c2..8cffd5fdb03 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/PreparedStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/PreparedStatement.java @@ -21,6 +21,7 @@ */ package com.datastax.oss.driver.api.core.cql; +import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; import com.datastax.oss.driver.api.core.metadata.token.Partitioner; @@ -58,6 +59,10 @@ public interface PreparedStatement { @NonNull ColumnDefinitions getVariableDefinitions(); + /** Table name inferred from {@link PreparedStatement#getVariableDefinitions()}. */ + @Nullable + CqlIdentifier getTable(); + /** * The partitioner to use for token-aware routing. If {@code null}, the cluster-wide partitioner * will be used. diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/HostShardPair.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/HostShardPair.java new file mode 100644 index 00000000000..949f349b427 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/HostShardPair.java @@ -0,0 +1,30 @@ +package com.datastax.oss.driver.api.core.metadata; + +import java.util.UUID; + +/** + * Simple class to hold UUID of a host and a shard number. Class itself makes no checks or + * guarantees about existence of a shard on corresponding host. + */ +public class HostShardPair { + private final UUID host; + private final int shard; + + public HostShardPair(UUID host, int shard) { + this.host = host; + this.shard = shard; + } + + public UUID getHost() { + return host; + } + + public int getShard() { + return shard; + } + + @Override + public String toString() { + return "HostShardPair{" + "host=" + host + ", shard=" + shard + '}'; + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java new file mode 100644 index 00000000000..02d21b2aa2c --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java @@ -0,0 +1,47 @@ +package com.datastax.oss.driver.api.core.metadata; + +import java.util.Objects; + +/** Simple keyspace name and table name pair. */ +public class KeyspaceTableNamePair { + private final String keyspace; + private final String tableName; + + public KeyspaceTableNamePair(String keyspace, String tableName) { + this.keyspace = keyspace; + this.tableName = tableName; + } + + public String getKeyspace() { + return keyspace; + } + + public String getTableName() { + return tableName; + } + + @Override + public String toString() { + return "KeyspaceTableNamePair{" + + "keyspace='" + + keyspace + + '\'' + + ", tableName='" + + tableName + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof KeyspaceTableNamePair)) return false; + KeyspaceTableNamePair that = (KeyspaceTableNamePair) o; + return keyspace.equals(that.keyspace) && tableName.equals(that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(keyspace, tableName); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java index 287298c44fd..9945c3c2d35 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java @@ -115,6 +115,14 @@ default Optional getKeyspace(@NonNull String keyspaceName) { @NonNull Optional getTokenMap(); + /** + * The tablet map for this cluster. + * + *

Starts as an empty map that will gradually receive updates on each query of a yet unknown + * tablet. + */ + TabletMap getTabletMap(); + /** * The cluster name to which this session is connected. The Optional returned should contain the * value from the server for system.local.cluster_name. diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java new file mode 100644 index 00000000000..99dc9ef339b --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java @@ -0,0 +1,19 @@ +package com.datastax.oss.driver.api.core.metadata; + +import java.util.Set; +import java.util.UUID; + +/** Represents a tablet as described in tablets-routing-v1 protocol extension. */ +public interface Tablet extends Comparable { + public String getKeyspaceName(); + + public UUID getTableId(); + + public String getTableName(); + + public long getFirstToken(); + + public long getLastToken(); + + public Set getReplicas(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java new file mode 100644 index 00000000000..f639b717d27 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java @@ -0,0 +1,40 @@ +package com.datastax.oss.driver.api.core.metadata; + +import com.datastax.oss.driver.api.core.data.TupleValue; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; + +/** Holds all currently known tablet metadata. */ +public interface TabletMap { + /** + * Returns mapping from tables to the sets of their tablets. + * + * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type. + */ + public Map> getMapping(); + + /** + * Adds a singular tablet to the map. Provided tuple has to match format described for + * tablets-routing-v1 described in Scylla protocol extensions. + * + * @param keyspace tablet's keyspace + * @param table tablet's table + * @param tupleValue tablet + */ + public void addTablet(String keyspace, String table, TupleValue tupleValue); + + /** + * Returns replicas holding tablets for given token. Because TabletMap holds only UUIDs of nodes a + * current mapping from UUID to actual Node instances is needed. If a mapping for the specific + * UUID is missing then such Node won't be included in the returned set. + * + * @param keyspace tablet's keyspace + * @param table tablet's table + * @param token target token + * @param nodes current UUID to Node mapping + * @return Set of {@link Node} that holds target tablets. + */ + public Set getReplicas(String keyspace, String table, long token, Map nodes); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 9570e960536..b5a677ff3c4 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -43,6 +43,7 @@ import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.ProtocolUtils; import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions; import com.datastax.oss.protocol.internal.Message; @@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler { private ChannelHandlerContext ctx; private final boolean querySupportedOptions; private LwtInfo lwtInfo; + private TabletInfo tabletInfo; /** * @param querySupportedOptions whether to send OPTIONS as the first message, to request which @@ -191,6 +193,9 @@ Message getRequest() { if (lwtInfo != null) { lwtInfo.addOption(startupOptions); } + if (tabletInfo != null && tabletInfo.isEnabled()) { + TabletInfo.addOption(startupOptions); + } return request = new Startup(startupOptions); case GET_CLUSTER_NAME: return request = CLUSTER_NAME_QUERY; @@ -230,6 +235,7 @@ void onResponse(Message response) { if (lwtInfo != null) { channel.attr(LWT_INFO_KEY).set(lwtInfo); } + tabletInfo = TabletInfo.parseTabletInfo(res.options); step = Step.STARTUP; send(); } else if (step == Step.STARTUP && response instanceof Ready) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index bb2f4ebcb84..709fb2660a7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -31,9 +31,16 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.connection.FrameTooLongException; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.HostShardPair; +import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.metadata.TabletMap; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.token.Partitioner; import com.datastax.oss.driver.api.core.metadata.token.Token; @@ -58,9 +65,12 @@ import com.datastax.oss.driver.internal.core.channel.ResponseCallback; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.metadata.DefaultNode; +import com.datastax.oss.driver.internal.core.metadata.DefaultTabletMap; import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap; +import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64; import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.session.RepreparePayload; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; @@ -90,8 +100,10 @@ import java.util.AbstractMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Queue; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -284,6 +296,103 @@ private Token getRoutingToken(Statement statement) { return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key); } + private CqlIdentifier getTabletRoutingKeyspace(Statement statement) { + if (statement == null) { + if (initialStatement == null) { + return null; + } + statement = initialStatement; + } + ColumnDefinitions cdefs = null; + CqlIdentifier result = null; + if (statement instanceof BoundStatement) { + cdefs = ((BoundStatement) statement).getPreparedStatement().getVariableDefinitions(); + } else if (statement instanceof PreparedStatement) { + cdefs = ((PreparedStatement) statement).getVariableDefinitions(); + } + if (cdefs != null && cdefs.size() > 0) { + result = cdefs.get(0).getKeyspace(); + } + if (result == null) { + return keyspace; + } else { + return result; + } + } + + private CqlIdentifier getTabletRoutingTable(Statement statement) { + if (statement == null) { + if (initialStatement == null) { + return null; + } + statement = initialStatement; + } + if (statement instanceof BoundStatement) { + return ((BoundStatement) statement).getPreparedStatement().getTable(); + } else if (statement instanceof PreparedStatement) { + return ((PreparedStatement) statement).getTable(); + } else { + return null; + } + } + + public Integer getShardFromTabletMap(Statement statement, Node node, Token token) { + TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap(); + if (!(token instanceof TokenLong64)) { + LOG.trace( + "Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.", + token, + statement); + return null; + } + if (tabletMap == null) { + LOG.trace( + "Could not determine shard for token {} on host {} because tablets metadata is currently null. " + + "Returning null.", + token, + node); + return null; + } + if ((getTabletRoutingKeyspace(statement) == null && keyspace == null) + || getTabletRoutingTable(statement) == null) { + return null; + } + UUID targetHostUuid = node.getHostId(); + long tokenValue = ((TokenLong64) token).getValue(); + String statementKeyspace = + getTabletRoutingKeyspace(statement) != null + ? getTabletRoutingKeyspace(statement).asInternal() + : keyspace.asInternal(); + String statementTable = getTabletRoutingTable(statement).asInternal(); + KeyspaceTableNamePair key = new KeyspaceTableNamePair(statementKeyspace, statementTable); + NavigableSet targetTablets = tabletMap.getMapping().get(key); + if (targetTablets == null) { + LOG.trace( + "Could not determine shard for token {} on host {} because table {}.{} is not present in tablets " + + "metadata. Returning null.", + token, + node, + statementKeyspace, + statementTable); + return null; + } + Tablet row = targetTablets.ceiling(DefaultTabletMap.DefaultTablet.malformedTablet(tokenValue)); + if (row != null && row.getFirstToken() < tokenValue) { + for (HostShardPair hostShardPair : row.getReplicas()) { + if (hostShardPair.getHost().equals(targetHostUuid)) { + return hostShardPair.getShard(); + } + } + } + LOG.trace( + "Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning null.", + token, + node, + statementTable, + statementKeyspace); + return null; + } + /** * Sends the request to the next available node. * @@ -309,9 +418,20 @@ private void sendRequest( Node node = retriedNode; DriverChannel channel = null; if (node == null - || (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) { + || (channel = + session.getChannel( + node, + logPrefix, + getRoutingToken(statement), + getShardFromTabletMap(statement, node, getRoutingToken(statement)))) + == null) { while (!result.isDone() && (node = queryPlan.poll()) != null) { - channel = session.getChannel(node, logPrefix, getRoutingToken(statement)); + channel = + session.getChannel( + node, + logPrefix, + getRoutingToken(statement), + getShardFromTabletMap(statement, node, getRoutingToken(statement))); if (channel != null) { break; } else { @@ -420,6 +540,18 @@ private void setFinalResult( totalLatencyNanos, TimeUnit.NANOSECONDS); } + if (resultSet.getColumnDefinitions().size() > 0 + && resultSet + .getExecutionInfo() + .getIncomingPayload() + .containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + context + .getMetadataManager() + .addTabletsFromPayload( + resultSet.getColumnDefinitions().get(0).getKeyspace(), + resultSet.getColumnDefinitions().get(0).getTable(), + resultSet.getExecutionInfo().getIncomingPayload()); + } } // log the warnings if they have NOT been disabled if (!executionInfo.getWarnings().isEmpty() diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java index bae23bfce22..81160375dbc 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java @@ -36,6 +36,7 @@ import com.datastax.oss.driver.internal.core.data.ValuesHelper; import com.datastax.oss.driver.internal.core.session.RepreparePayload; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; @@ -140,6 +141,15 @@ public ColumnDefinitions getVariableDefinitions() { return variableDefinitions; } + @Nullable + @Override + public CqlIdentifier getTable() { + if (variableDefinitions.size() > 0) { + return variableDefinitions.get(0).getTable(); + } + return null; + } + @Override public Partitioner getPartitioner() { return partitioner; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index b3a71827edc..d41d5f2a894 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -26,12 +26,14 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.datastax.oss.driver.api.core.metadata.TabletMap; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.token.Partitioner; import com.datastax.oss.driver.api.core.metadata.token.Token; @@ -45,6 +47,7 @@ import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet; +import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64; import com.datastax.oss.driver.internal.core.util.ArrayUtils; import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan; @@ -288,14 +291,21 @@ protected Set getReplicas(@Nullable Request request, @Nullable Session ses if (!maybeTokenMap.isPresent()) { return Collections.emptySet(); } + TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap(); // Note: we're on the hot path and the getXxx methods are potentially more than simple getters, // so we only call each method when strictly necessary (which is why the code below looks a bit // weird). CqlIdentifier keyspace; + CqlIdentifier table = null; Token token; ByteBuffer key; Partitioner partitioner = null; + + if (request instanceof BoundStatement) { + table = ((BoundStatement) request).getPreparedStatement().getTable(); + } + try { keyspace = request.getKeyspace(); if (keyspace == null) { @@ -321,6 +331,32 @@ protected Set getReplicas(@Nullable Request request, @Nullable Session ses return Collections.emptySet(); } + if (table != null && tabletMap != null) { + Partitioner p = + (partitioner != null + ? partitioner + : context + .getTokenFactoryRegistry() + .tokenFactoryFor(maybeTokenMap.get().getPartitionerName())); + Token t = (token != null ? token : p.hash(key)); + if (t instanceof TokenLong64) { + Set replicas = + tabletMap.getReplicas( + keyspace.asInternal(), + table.asInternal(), + ((TokenLong64) t).getValue(), + context.getMetadataManager().getMetadata().getNodes()); + if (!replicas.isEmpty()) { + System.out.println("Outputting tablet replicas: "); + for (Node n : replicas) { + System.out.print(n); + System.out.print(", "); + } + return replicas; + } + } + } + TokenMap tokenMap = maybeTokenMap.get(); return token != null ? tokenMap.getReplicas(keyspace, token) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java new file mode 100644 index 00000000000..a6d7eb5269d --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java @@ -0,0 +1,25 @@ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.data.TupleValue; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; + +public class AddTabletRefresh implements MetadataRefresh { + + final CqlIdentifier keyspace; + final CqlIdentifier table; + final TupleValue tabletTuple; + + public AddTabletRefresh(CqlIdentifier keyspace, CqlIdentifier table, TupleValue tabletTuple) { + this.keyspace = keyspace; + this.table = table; + this.tabletTuple = tabletTuple; + } + + @Override + public Result compute( + DefaultMetadata oldMetadata, boolean tokenMapEnabled, InternalDriverContext context) { + oldMetadata.tabletMap.addTablet(keyspace.asInternal(), table.asInternal(), tabletTuple); + return new Result(oldMetadata); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java index c34486029fe..70a61e25881 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java @@ -18,6 +18,7 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.TabletMap; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; @@ -46,22 +47,34 @@ public class DefaultMetadata implements Metadata { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadata.class); public static DefaultMetadata EMPTY = - new DefaultMetadata(Collections.emptyMap(), Collections.emptyMap(), null, null); + new DefaultMetadata( + Collections.emptyMap(), Collections.emptyMap(), null, null, DefaultTabletMap.emptyMap()); protected final Map nodes; protected final Map keyspaces; protected final TokenMap tokenMap; protected final String clusterName; + protected final TabletMap tabletMap; protected DefaultMetadata( Map nodes, Map keyspaces, TokenMap tokenMap, String clusterName) { + this(nodes, keyspaces, tokenMap, clusterName, DefaultTabletMap.emptyMap()); + } + + protected DefaultMetadata( + Map nodes, + Map keyspaces, + TokenMap tokenMap, + String clusterName, + TabletMap tabletMap) { this.nodes = nodes; this.keyspaces = keyspaces; this.tokenMap = tokenMap; this.clusterName = clusterName; + this.tabletMap = tabletMap; } @NonNull @@ -82,6 +95,11 @@ public Optional getTokenMap() { return Optional.ofNullable(tokenMap); } + @Override + public TabletMap getTabletMap() { + return tabletMap; + } + @NonNull @Override public Optional getClusterName() { @@ -91,6 +109,8 @@ public Optional getClusterName() { /** * Refreshes the current metadata with the given list of nodes. * + *

Does not rebuild the tablet map. + * * @param tokenMapEnabled whether to rebuild the token map or not; if this is {@code false} the * current token map will be copied into the new metadata without being recomputed. * @param tokensChanged whether we observed a change of tokens for at least one node. This will @@ -114,7 +134,8 @@ public DefaultMetadata withNodes( this.keyspaces, rebuildTokenMap( newNodes, keyspaces, tokenMapEnabled, forceFullRebuild, tokenFactory, context), - context.getChannelFactory().getClusterName()); + context.getChannelFactory().getClusterName(), + this.tabletMap); } public DefaultMetadata withSchema( @@ -125,7 +146,8 @@ public DefaultMetadata withSchema( this.nodes, ImmutableMap.copyOf(newKeyspaces), rebuildTokenMap(nodes, newKeyspaces, tokenMapEnabled, false, null, context), - context.getChannelFactory().getClusterName()); + context.getChannelFactory().getClusterName(), + this.tabletMap); } @Nullable diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java new file mode 100644 index 00000000000..f9122a16898 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java @@ -0,0 +1,284 @@ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.data.TupleValue; +import com.datastax.oss.driver.api.core.metadata.HostShardPair; +import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.metadata.TabletMap; +import com.datastax.oss.driver.shaded.guava.common.annotations.Beta; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds currently known tablet mappings. Updated lazily through received custom payloads described + * in Scylla's CQL protocol extensions (tablets-routing-v1). + */ +@Beta +public class DefaultTabletMap implements TabletMap { + private static final Logger logger = LoggerFactory.getLogger(DefaultTabletMap.class); + + private final ConcurrentMap> mapping; + + public DefaultTabletMap(ConcurrentMap> mapping) { + this.mapping = mapping; + } + + public static DefaultTabletMap emptyMap() { + return new DefaultTabletMap(new ConcurrentHashMap<>()); + } + + /** + * Returns the mapping of tables to their tablets. + * + * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type. + */ + @Override + public Map> getMapping() { + return mapping; + } + + /** + * Finds nodes that have replicas for a given table and token combination. Considers only nodes + * present in passed map. + * + * @param keyspace the keyspace that table is in + * @param table the table name + * @param token the token to look for + * @param nodes UUID keyed map of currently recognized Nodes + * @return Set of Nodes that do own a tablet for given token for a given table. + */ + @Override + public Set getReplicas(String keyspace, String table, long token, Map nodes) { + KeyspaceTableNamePair key = new KeyspaceTableNamePair(keyspace, table); + + if (mapping == null) { + logger.trace("This tablets map is null. Returning empty set."); + return Collections.emptySet(); + } + + NavigableSet set = mapping.get(key); + if (set == null) { + logger.trace( + "There is no tablets for {}.{} in this mapping. Returning empty set.", keyspace, table); + return Collections.emptySet(); + } + Tablet row = mapping.get(key).ceiling(DefaultTablet.malformedTablet(token)); + if (row == null || row.getFirstToken() >= token) { + logger.trace( + "Could not find tablet for {}.{} that owns token {}. Returning empty set.", + keyspace, + table, + token); + return Collections.emptySet(); + } + + HashSet result = new HashSet<>(); + for (HostShardPair hostShardPair : row.getReplicas()) { + Node node = nodes.get(hostShardPair.getHost()); + if (node != null) result.add(node); + } + return result; + } + + /** + * Updates tablet map with parsed tablets-routing-v1 custom payload. Expects the tuple's + * underlying structure to correspond to {@code TupleType(LongType, LongType, + * ListType(TupleType(UUIDType, Int32Type)))}. Handles removing outdated tables that intersect + * with the one about to be added. + * + * @param keyspace the keyspace of the table + * @param table the table name + * @param tupleValue the parsed payload + */ + @Override + public void addTablet(String keyspace, String table, TupleValue tupleValue) { + KeyspaceTableNamePair ktPair = new KeyspaceTableNamePair(keyspace, table); + + long firstToken = tupleValue.getLong(0); + long lastToken = tupleValue.getLong(1); + + logger.trace( + "Received tablets routing V1 payload: {}.{} range {}-{}", + keyspace, + table, + firstToken, + lastToken); + + Set replicas = new HashSet<>(); + List list = tupleValue.getList(2, TupleValue.class); + assert list != null; + for (TupleValue tuple : list) { + HostShardPair hostShardPair = new HostShardPair(tuple.getUuid(0), tuple.getInt(1)); + replicas.add(hostShardPair); + } + + // Working on a copy to avoid concurrent modification of the same set + NavigableSet existingTablets = + new TreeSet<>(mapping.computeIfAbsent(ktPair, k -> new TreeSet<>())); + + // Single tablet token range is represented by (firstToken, lastToken] interval + // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing + // tablets + // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted + // according + // to their lastTokens. + + // First sweep: remove all tablets whose lastToken is inside this interval + Iterator it = + existingTablets + .headSet(DefaultTablet.malformedTablet(lastToken), true) + .descendingIterator(); + while (it.hasNext()) { + Tablet tablet = it.next(); + if (tablet.getLastToken() <= firstToken) { + break; + } + it.remove(); + } + + // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken, + // lastToken] + // After the first sweep, this theoretically should remove at most 1 tablet + it = existingTablets.tailSet(DefaultTablet.malformedTablet(lastToken), true).iterator(); + while (it.hasNext()) { + Tablet tablet = it.next(); + if (tablet.getFirstToken() >= lastToken) { + break; + } + it.remove(); + } + + // Add new (now) non-overlapping tablet + existingTablets.add(new DefaultTablet(keyspace, null, table, firstToken, lastToken, replicas)); + + // Set the updated result in the main map + mapping.put(ktPair, existingTablets); + } + + /** + * Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code + * compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow + * for quick lookup on sorted Collections based just on the token value. + */ + public static class DefaultTablet implements Tablet { + private final String keyspaceName; + private final UUID tableId; // currently null almost everywhere + private final String tableName; + private final long firstToken; + private final long lastToken; + private final Set replicas; + + private DefaultTablet( + String keyspaceName, + UUID tableId, + String tableName, + long firstToken, + long lastToken, + Set replicas) { + this.keyspaceName = keyspaceName; + this.tableId = tableId; + this.tableName = tableName; + this.firstToken = firstToken; + this.lastToken = lastToken; + this.replicas = replicas; + } + + /** + * Creates a {@link DefaultTablet} instance with given {@code lastToken}, identical {@code + * firstToken} and unspecified other fields. Used for lookup of sorted collections of proper + * {@link DefaultTablet}. + * + * @param lastToken + * @return New {@link DefaultTablet} object + */ + public static DefaultTablet malformedTablet(long lastToken) { + return new DefaultTablet(null, null, null, lastToken, lastToken, null); + } + + @Override + public String getKeyspaceName() { + return keyspaceName; + } + + @Override + public UUID getTableId() { + return tableId; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public long getFirstToken() { + return firstToken; + } + + @Override + public long getLastToken() { + return lastToken; + } + + @Override + public Set getReplicas() { + return replicas; + } + + @Override + public String toString() { + return "DefaultTablet{" + + "keyspaceName='" + + keyspaceName + + '\'' + + ", tableId=" + + tableId + + ", tableName='" + + tableName + + '\'' + + ", firstToken=" + + firstToken + + ", lastToken=" + + lastToken + + ", replicas=" + + replicas + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof DefaultTablet)) return false; + DefaultTablet that = (DefaultTablet) o; + return firstToken == that.firstToken + && lastToken == that.lastToken + && keyspaceName.equals(that.keyspaceName) + && Objects.equals(tableId, that.tableId) + && tableName.equals(that.tableName) + && Objects.equals(replicas, that.replicas); + } + + @Override + public int hashCode() { + return Objects.hash(keyspaceName, tableId, tableName, firstToken, lastToken, replicas); + } + + @Override + public int compareTo(Tablet tablet) { + return Long.compare(this.lastToken, tablet.getLastToken()); + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 0f28d354c46..c472b9240c9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -16,11 +16,16 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.AsyncAutoCloseable; +import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.TupleType; +import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.control.ControlConnection; @@ -29,6 +34,7 @@ import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory; import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaRows; import com.datastax.oss.driver.internal.core.metadata.schema.refresh.SchemaRefresh; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.Loggers; import com.datastax.oss.driver.internal.core.util.NanoTime; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; @@ -39,8 +45,10 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.util.concurrent.EventExecutor; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -71,6 +79,7 @@ public class MetadataManager implements AsyncAutoCloseable { private volatile boolean tokenMapEnabled; private volatile Set contactPoints; private volatile boolean wasImplicitContactPoint; + private volatile TypeCodec tabletPayloadCodec = null; public MetadataManager(InternalDriverContext context) { this(context, DefaultMetadata.EMPTY); @@ -520,4 +529,32 @@ Void apply(MetadataRefresh refresh) { } return null; } + + private TypeCodec getTabletPayloadCodec() { + if (tabletPayloadCodec == null) { + TupleType payloadOuterTuple = + DataTypes.tupleOf( + DataTypes.BIGINT, + DataTypes.BIGINT, + DataTypes.listOf(DataTypes.tupleOf(DataTypes.UUID, DataTypes.INT))); + tabletPayloadCodec = context.getCodecRegistry().codecFor(payloadOuterTuple); + } + return tabletPayloadCodec; + } + + public boolean addTabletsFromPayload( + CqlIdentifier keyspace, CqlIdentifier table, Map incomingPayload) { + if (incomingPayload == null + || !incomingPayload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + return false; + } else { + TupleValue tabletTuple = + getTabletPayloadCodec() + .decode( + incomingPayload.get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY), + context.getProtocolVersion()); + apply(new AddTabletRefresh(keyspace, table, tabletTuple)); + return true; + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java index 48818554f3f..cb3f9c4e483 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java @@ -157,7 +157,7 @@ public boolean isInvalidKeyspace() { * to the caller to fail fast and move to the next node. */ public DriverChannel next() { - return next(null); + return next(null, null); } /** @@ -167,7 +167,7 @@ public DriverChannel next() { * to the caller to fail fast and move to the next node. *

There is no need to return the channel. */ - public DriverChannel next(@Nullable Token routingKey) { + public DriverChannel next(@Nullable Token routingKey, @Nullable Integer shardSuggestion) { if (!singleThreaded.initialized) { return null; } @@ -175,10 +175,15 @@ public DriverChannel next(@Nullable Token routingKey) { return channels[0].next(); } - int shardId = - routingKey != null - ? singleThreaded.shardingInfo.shardId(routingKey) - : ThreadLocalRandom.current().nextInt(channels.length); + int shardId; + if (shardSuggestion != null && shardSuggestion < channels.length) { + shardId = shardSuggestion; + } else { + shardId = + routingKey != null + ? singleThreaded.shardingInfo.shardId(routingKey) + : ThreadLocalRandom.current().nextInt(channels.length); + } if (channels[shardId].size() > 0) { return channels[shardId].next(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java new file mode 100644 index 00000000000..8c33e803fd5 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java @@ -0,0 +1,33 @@ +package com.datastax.oss.driver.internal.core.protocol; + +import java.util.List; +import java.util.Map; + +public class TabletInfo { + private static final String SCYLLA_TABLETS_STARTUP_OPTION_KEY = "TABLETS_ROUTING_V1"; + private static final String SCYLLA_TABLETS_STARTUP_OPTION_VALUE = ""; + public static final String TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY = "tablets-routing-v1"; + + private boolean enabled = false; + + private TabletInfo(boolean enabled) { + this.enabled = enabled; + } + + // Currently pertains only to TABLETS_ROUTING_V1 + public boolean isEnabled() { + return enabled; + } + + public static TabletInfo parseTabletInfo(Map> supported) { + List values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY); + return new TabletInfo( + values != null + && values.size() == 1 + && values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE)); + } + + public static void addOption(Map options) { + options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java index 647b75892ad..5a80d5fb5d4 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java @@ -239,18 +239,27 @@ public ResultT execute( @Nullable public DriverChannel getChannel(@NonNull Node node, @NonNull String logPrefix) { - return getChannel(node, logPrefix, null); + return getChannel(node, logPrefix, null, null); } @Nullable public DriverChannel getChannel( @NonNull Node node, @NonNull String logPrefix, @Nullable Token routingKey) { + return getChannel(node, logPrefix, routingKey, null); + } + + @Nullable + public DriverChannel getChannel( + @NonNull Node node, + @NonNull String logPrefix, + @Nullable Token routingKey, + @Nullable Integer shardSuggestion) { ChannelPool pool = poolManager.getPools().get(node); if (pool == null) { LOG.trace("[{}] No pool to {}, skipping", logPrefix, node); return null; } else { - DriverChannel channel = pool.next(routingKey); + DriverChannel channel = pool.next(routingKey, shardSuggestion); if (channel == null) { LOG.trace("[{}] Pool returned no channel for {}, skipping", logPrefix, node); return null; diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java index 65fe4a405f2..815fb69f6b3 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java @@ -50,6 +50,7 @@ import com.datastax.oss.driver.internal.core.context.NettyOptions; import com.datastax.oss.driver.internal.core.metadata.DefaultMetadata; import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper; +import com.datastax.oss.driver.internal.core.metadata.MetadataManager; import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.servererrors.DefaultWriteTypeRegistry; @@ -98,6 +99,7 @@ public static Builder builder() { @Mock protected TimestampGenerator timestampGenerator; @Mock protected ProtocolVersionRegistry protocolVersionRegistry; @Mock protected SessionMetricUpdater sessionMetricUpdater; + @Mock protected MetadataManager metadataManager; protected RequestHandlerTestHarness(Builder builder) { MockitoAnnotations.initMocks(this); @@ -139,8 +141,17 @@ protected RequestHandlerTestHarness(Builder builder) { when(timestampGenerator.next()).thenReturn(Statement.NO_DEFAULT_TIMESTAMP); when(context.getTimestampGenerator()).thenReturn(timestampGenerator); + when(context.getMetadataManager()).thenReturn(metadataManager); + when(metadataManager.getMetadata()).thenReturn(DefaultMetadata.EMPTY); pools = builder.buildMockPools(); + // Call variation introduced with Tablets (shardSuggestion field) + when(session.getChannel(any(Node.class), anyString(), any(), any())) + .thenAnswer( + invocation -> { + Node node = invocation.getArgument(0); + return pools.get(node).next(); + }); when(session.getChannel(any(Node.class), anyString(), any())) .thenAnswer( invocation -> { diff --git a/examples/src/main/java/com/datastax/oss/driver/examples/basic/TabletsExample.java b/examples/src/main/java/com/datastax/oss/driver/examples/basic/TabletsExample.java new file mode 100644 index 00000000000..6aee428898e --- /dev/null +++ b/examples/src/main/java/com/datastax/oss/driver/examples/basic/TabletsExample.java @@ -0,0 +1,147 @@ +package com.datastax.oss.driver.examples.basic; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.servererrors.SyntaxError; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Map; + +/** + * Prerequirements: set up Scylla cluster that supports tablets generally and tablets-routing-v1. + * Replace contact point ip with the ip of your node. + * + *

This example will attempt to create a keyspace and table that uses tablets. It tests the + * tablets-routing-v1 by querying enough to randomly hit every possible tablet. This means that + * driver should receive as many custom payloads as there are tablets. Every subsequent call for + * already queried data should reuse obtained tablet info from the payload and not receive any + * further payloads. + */ +public class TabletsExample { + + private static final int INITIAL_TABLETS = 32; + private static final int QUERIES = 800; + private static final int REPLICATION_FACTOR = 2; + private static String KEYSPACE_NAME = "tabletsTest"; + private static String TABLE_NAME = "tabletsTable"; + private static String CREATE_KEYSPACE_QUERY_V2 = + "CREATE KEYSPACE IF NOT EXISTS " + + KEYSPACE_NAME + + " WITH replication = {'class': " + + "'NetworkTopologyStrategy', " + + "'replication_factor': '" + + REPLICATION_FACTOR + + "'} AND durable_writes = true AND tablets = " + + "{'initial': " + + INITIAL_TABLETS + + "};"; + private static String CREATE_KEYSPACE_QUERY_V1 = + "CREATE KEYSPACE IF NOT EXISTS " + + KEYSPACE_NAME + + " WITH replication = {'class': " + + "'NetworkTopologyStrategy', " + + "'replication_factor': '" + + REPLICATION_FACTOR + + "', 'initial_tablets': '" + + INITIAL_TABLETS + + "'} AND durable_writes = true;"; + private static String CREATE_TABLE_QUERY = + "CREATE TABLE IF NOT EXISTS " + + KEYSPACE_NAME + + "." + + TABLE_NAME + + " (pk int, ck int, PRIMARY KEY(pk, ck));"; + + public static void main(String[] args) throws InterruptedException { + ProgrammaticDriverConfigLoaderBuilder loaderBuilder = DriverConfigLoader.programmaticBuilder(); + CqlSession session = + CqlSession.builder() + .addContactPoint(new InetSocketAddress("172.18.0.2", 9042)) + .withConfigLoader( + loaderBuilder + .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30)) + .build()) + .build(); + try { + session.execute(CREATE_KEYSPACE_QUERY_V2); + } catch (SyntaxError ex) { + if (ex.getMessage().contains("Unknown property 'tablets'")) { + session.execute(CREATE_KEYSPACE_QUERY_V1); + } else { + throw ex; + } + } + + session.execute(CREATE_TABLE_QUERY); + + for (int i = 1; i <= QUERIES; i++) { + session.execute( + "INSERT INTO " + + KEYSPACE_NAME + + "." + + TABLE_NAME + + " (pk,ck) VALUES (" + + i + + "," + + i + + ");"); + } + + PreparedStatement preparedStatement = + session.prepare( + SimpleStatement.builder( + "select pk,ck from " + + KEYSPACE_NAME + + "." + + TABLE_NAME + + " WHERE pk = ? AND ck = ?") + .setTracing(true) + .build()); + // preparedStatement.enableTracing(); + int counter = 0; + for (int i = 1; i <= QUERIES; i++) { + ResultSet rs = session.execute(preparedStatement.bind(i, i).setTracing(true)); + Map payload = rs.getExecutionInfo().getIncomingPayload(); + if (payload != null + && payload.containsKey( + TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { // We hit wrong tablet + counter++; + } + } + + System.out.println("Ran first set of queries"); + + // With enough queries we should hit a wrong node for each tablet once. + assert counter == INITIAL_TABLETS; + + // All tablet information should be available by now (unless for some reason cluster did sth on + // its own) + // We should not receive any tablet payloads now, since they are sent only on mismatch. + for (int i = 1; i <= QUERIES; i++) { + + ResultSet rs = session.execute(preparedStatement.bind(i, i).setTracing(true)); + Map payload = rs.getExecutionInfo().getIncomingPayload(); + /* + System.out.println(rs.getExecutionInfo().getQueryTrace()); + for (TraceEvent traceEvent : rs.getExecutionInfo().getQueryTrace().getEvents()) { + System.out.println(traceEvent); + } + */ + + if (payload != null + && payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + throw new RuntimeException( + "Received non empty payload with tablets routing information: " + payload); + } + } + System.out.println("Finished."); + session.close(); + } +}