From 9956aeeabc64b88286696edd84a9948238c7dacd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Fri, 14 Jul 2023 17:48:25 +0200 Subject: [PATCH] Support for tablets routing information Adds TabletInfo class to hold tablet support info on a per Host basis Adds TabletMap to hold currently known mappings Adds tablets-routing-v1 protocol extension negotiation Adds logic for parsing tablets routing information and using it under the hood. --- .../com/datastax/driver/core/Connection.java | 6 + .../driver/core/DefaultResultSetFuture.java | 23 ++ .../java/com/datastax/driver/core/Host.java | 11 + .../driver/core/HostConnectionPool.java | 13 +- .../datastax/driver/core/LazyTabletMap.java | 302 ++++++++++++++++++ .../com/datastax/driver/core/Metadata.java | 105 +++++- .../datastax/driver/core/RequestHandler.java | 30 +- .../datastax/driver/core/SessionManager.java | 9 +- .../com/datastax/driver/core/TabletInfo.java | 30 ++ .../core/policies/TokenAwarePolicy.java | 16 +- .../driver/core/HostConnectionPoolTest.java | 3 +- .../core/policies/TokenAwarePolicyTest.java | 2 +- 12 files changed, 538 insertions(+), 12 deletions(-) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/LazyTabletMap.java create mode 100644 driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index 5fee90f504a..04f60ac2ca2 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -474,6 +474,8 @@ public ListenableFuture apply(Message.Response response) throws Exception if (lwt != null) { getHost().setLwtInfo(lwt); } + TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported); + getHost().setTabletInfo(tabletInfo); return MoreFutures.VOID_SUCCESS; case ERROR: Responses.Error error = (Responses.Error) response; @@ -507,6 +509,10 @@ public ListenableFuture apply(Void input) throws Exception { if (lwtInfo != null) { lwtInfo.addOption(extraOptions); } + TabletInfo tabletInfo = getHost().getTabletInfo(); + if (tabletInfo != null && tabletInfo.isEnabled() && ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) { + tabletInfo.addOption(extraOptions); + } Future startupResponseFuture = write( new Requests.Startup( diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java index 13ac90cecac..8d1e7150c53 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java @@ -25,6 +25,8 @@ import com.datastax.driver.core.exceptions.QueryValidationException; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Uninterruptibles; + +import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -70,6 +72,27 @@ public void onSet( switch (response.type) { case RESULT: Responses.Result rm = (Responses.Result) response; + + if (rm.getCustomPayload() != null && rm.getCustomPayload().containsKey("tablets-routing-v1") && (statement instanceof BoundStatement)) { + BoundStatement st = (BoundStatement) statement; + TupleType tupleType = + TupleType.of(protocolVersion + , session.configuration().getCodecRegistry() + , DataType.bigint() + , DataType.bigint() + , DataType.list(TupleType.of(protocolVersion + , session.configuration().getCodecRegistry() + , DataType.uuid() + , DataType.cint() + ) + ) + ); + TypeCodec typeCodec = session.configuration().getCodecRegistry().codecFor(tupleType); + TupleValue tupleValue = typeCodec.deserialize(rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY), protocolVersion); + String keyspace = statement.getKeyspace(); + String table = st.preparedStatement().getPreparedId().boundValuesMetadata.variables.getTable(0); + session.getCluster().getMetadata().getTabletsMap().processCustomPayloadV1(keyspace, table, tupleValue); + } switch (rm.kind) { case SET_KEYSPACE: // propagate the keyspace change to other connections diff --git a/driver-core/src/main/java/com/datastax/driver/core/Host.java b/driver-core/src/main/java/com/datastax/driver/core/Host.java index d18b41d1e20..603a5dc1010 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Host.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Host.java @@ -70,6 +70,9 @@ public class Host { // Can be set concurrently but the value should always be the same. private volatile LwtInfo lwtInfo = null; + // Whether host supports TABLETS_ROUTING_V1 + private volatile TabletInfo tabletInfo = null; + enum State { ADDED, DOWN, @@ -450,6 +453,14 @@ public void setLwtInfo(LwtInfo lwtInfo) { this.lwtInfo = lwtInfo; } + public TabletInfo getTabletInfo() { + return tabletInfo; + } + + public void setTabletInfo(TabletInfo tabletInfo) { + this.tabletInfo = tabletInfo; + } + /** * Returns whether the host is considered up by the driver. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java index d967bc937d2..e6690e16411 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java +++ b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java @@ -504,7 +504,9 @@ ListenableFuture borrowConnection( TimeUnit unit, int maxQueueSize, Token.Factory partitioner, - ByteBuffer routingKey) { + ByteBuffer routingKey, + String keyspace, + String table) { Phase phase = this.phase.get(); if (phase != Phase.READY) return Futures.immediateFailedFuture( @@ -515,7 +517,14 @@ ListenableFuture borrowConnection( if (routingKey != null) { Metadata metadata = manager.cluster.getMetadata(); Token t = metadata.newToken(partitioner, routingKey); - shardId = host.getShardingInfo().shardId(t); + shardId = -1; + if (keyspace != null && table != null) { + assert t instanceof Token.TokenLong64; + shardId = metadata.getShardForTabletToken(keyspace, table, (Token.TokenLong64) t, host); + } + if (shardId == -1) { // means that tablet lookup failed + shardId = host.getShardingInfo().shardId(t); + } } else { shardId = RAND.nextInt(host.getShardingInfo().getShardsCount()); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/LazyTabletMap.java b/driver-core/src/main/java/com/datastax/driver/core/LazyTabletMap.java new file mode 100644 index 00000000000..3eb10711c84 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/LazyTabletMap.java @@ -0,0 +1,302 @@ +package com.datastax.driver.core; + +import com.google.common.annotations.Beta; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds currently known tablet mappings. Updated lazily through received custom payloads + * described in Scylla's CQL protocol extensions (tablets-routing-v1). + */ +@Beta +public class LazyTabletMap { + private static final Logger logger = LoggerFactory.getLogger(LazyTabletMap.class); + + private final Map> mapping; + + public LazyTabletMap(Map> mapping) { + this.mapping = mapping; + } + + public static LazyTabletMap emptyMap() { + return new LazyTabletMap(new HashMap<>()); + } + + /** + * Returns the mapping of tables to their tablets. + * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type. + */ + public Map> getMapping() { + return mapping; + } + + /** + * Finds hosts that have replicas for a given table and token combination + * @param keyspace the keyspace that table is in + * @param table the table name + * @param token the token to look for + * @return Set of host UUIDS that do have a tablet for given token for a given table. + */ + public Set getReplicas(String keyspace, String table, long token) { + LazyTabletMap.KeyspaceTableNamePair key = new LazyTabletMap.KeyspaceTableNamePair(keyspace, table); + + if (mapping == null) { + logger.trace("This tablets map is null. Returning empty set."); + return Collections.emptySet(); + } + + NavigableSet set = mapping.get(key); + if (set == null) { + logger.trace( + "There is no tablets for {}.{} in this mapping. Returning empty set.", keyspace, table); + return Collections.emptySet(); + } + LazyTablet row = mapping.get(key).ceiling(LazyTablet.malformedTablet(token)); + if (row == null || row.firstToken >= token) { + logger.trace( + "Could not find tablet for {}.{} that owns token {}. Returning empty set.", + keyspace, + table, + token); + return Collections.emptySet(); + } + + HashSet uuidSet = new HashSet<>(); + for (HostShardPair hostShardPair : row.replicas) { + uuidSet.add(hostShardPair.getHost()); + } + return uuidSet; + } + + /** + * Processes already parsed tablets-routing-v1 custom payload. Expects the tuple + * structure to correspond to {@code TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))}. + * Handles removing outdated tables that intersect with the one about to be added. + * @param keyspace the keyspace of the table + * @param table the table name + * @param tupleValue the parsed tuple holding the tablet information + */ + void processCustomPayloadV1(String keyspace, String table, TupleValue tupleValue) { + KeyspaceTableNamePair ktPair = new KeyspaceTableNamePair(keyspace, table); + + long firstToken = tupleValue.getLong(0); + long lastToken = tupleValue.getLong(1); + + Set replicas = new HashSet<>(); + List list = tupleValue.getList(2, TupleValue.class); + for (TupleValue tuple : list) { + HostShardPair hostShardPair = new HostShardPair(tuple.getUUID(0), tuple.getInt(1)); + replicas.add(hostShardPair); + } + + NavigableSet existingTablets = mapping.computeIfAbsent(ktPair, k -> new TreeSet<>()); + + // Single tablet token range is represented by (firstToken, lastToken] interval + // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing tablets + // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted according + // to their lastTokens. + + // First sweep: remove all tablets whose lastToken is inside this interval + Iterator it = existingTablets.headSet(LazyTablet.malformedTablet(lastToken), true).descendingIterator(); + while (it.hasNext()) { + LazyTablet tablet = it.next(); + if (tablet.lastToken <= firstToken ) { + break; + } + it.remove(); + } + + //Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken, lastToken] + //After the first sweep, this theoretically should remove at most 1 tablet + it = existingTablets.tailSet(LazyTablet.malformedTablet(lastToken), true).iterator(); + while (it.hasNext()) { + LazyTablet tablet = it.next(); + if (tablet.firstToken >= lastToken) { + break; + } + it.remove(); + } + + // Add new (now) non-overlapping tablet + existingTablets.add(new LazyTablet(keyspace, null, table, firstToken, lastToken, replicas)); + } + + /** + * Simple class to hold UUID of a host and a shard number. + * Class itself makes no checks or guarantees about existence of a shard on corresponding host. + */ + public static class HostShardPair { + private final UUID host; + private final int shard; + + public HostShardPair(UUID host, int shard) { + this.host = host; + this.shard = shard; + } + + public UUID getHost() { + return host; + } + + public int getShard() { + return shard; + } + + @Override + public String toString() { + return "HostShardPair{" + "host=" + host + ", shard=" + shard + '}'; + } + } + + /** + * Simple keyspace name and table name pair. + */ + public static class KeyspaceTableNamePair { + private final String keyspace; + private final String tableName; + + public KeyspaceTableNamePair(String keyspace, String tableName) { + this.keyspace = keyspace; + this.tableName = tableName; + } + + public String getKeyspace() { + return keyspace; + } + + public String getTableName() { + return tableName; + } + + @Override + public String toString() { + return "KeyspaceTableNamePair{" + + "keyspace='" + + keyspace + + '\'' + + ", tableName='" + + tableName + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KeyspaceTableNamePair that = (KeyspaceTableNamePair) o; + return keyspace.equals(that.keyspace) && tableName.equals(that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(keyspace, tableName); + } + } + + /** + * Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code + * compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow + * for quick lookup on sorted Collections based just on the token value. + */ + public static class LazyTablet implements Comparable { + private final String keyspaceName; + private final UUID tableId; // currently null almost everywhere + private final String tableName; + private final long firstToken; + private final long lastToken; + private final Set replicas; + + private LazyTablet( + String keyspaceName, + UUID tableId, + String tableName, + long firstToken, + long lastToken, + Set replicas) { + this.keyspaceName = keyspaceName; + this.tableId = tableId; + this.tableName = tableName; + this.firstToken = firstToken; + this.lastToken = lastToken; + this.replicas = replicas; + } + + /** + * Creates a {@link LazyTablet} instance with given {@code lastToken}, identical {@code firstToken} + * and unspecified other fields. Used for lookup of sorted collections of proper {@link LazyTablet}. + * + * @param lastToken + * @return New {@link LazyTablet} object + */ + public static LazyTablet malformedTablet(long lastToken) { + return new LazyTablet(null, null, null, lastToken, lastToken, null); + } + + public String getKeyspaceName() { + return keyspaceName; + } + + public UUID getTableId() { + return tableId; + } + + public String getTableName() { + return tableName; + } + + public long getFirstToken() { + return firstToken; + } + + public long getLastToken() { + return lastToken; + } + + public Set getReplicas() { + return replicas; + } + + @Override + public String toString() { + return "LazyTablet{" + + "keyspaceName='" + keyspaceName + '\'' + + ", tableId=" + tableId + + ", tableName='" + tableName + '\'' + + ", firstToken=" + firstToken + + ", lastToken=" + lastToken + + ", replicas=" + replicas + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LazyTablet that = (LazyTablet) o; + return firstToken == that.firstToken && lastToken == that.lastToken && keyspaceName.equals(that.keyspaceName) && Objects.equals(tableId, that.tableId) && tableName.equals(that.tableName) && Objects.equals(replicas, that.replicas); + } + + @Override + public int hashCode() { + return Objects.hash(keyspaceName, tableId, tableName, firstToken, lastToken, replicas); + } + + @Override + public int compareTo(LazyTablet lazyTablet) { + return Long.compare(this.lastToken, lazyTablet.lastToken); + } + } +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java index f24d5b588ae..f0898368376 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java @@ -21,6 +21,7 @@ */ package com.datastax.driver.core; +import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -35,6 +36,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -42,6 +44,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +63,8 @@ public class Metadata { final ConcurrentMap keyspaces = new ConcurrentHashMap(); private volatile TokenMap tokenMap; - final ReentrantLock lock = new ReentrantLock(); + private final LazyTabletMap lazyTabletMap = LazyTabletMap.emptyMap(); // See https://github.com/apache/cassandra/blob/trunk/doc/cql3/CQL.textile#appendixA private static final IntObjectHashMap> RESERVED_KEYWORDS = @@ -176,6 +179,7 @@ void rebuildTokenMap(Token.Factory factory, Map> allTokens) { } } + Host newHost(EndPoint endPoint) { return new Host(endPoint, cluster.convictionPolicyFactory, cluster); } @@ -514,23 +518,45 @@ public Set getTokenRanges(String keyspace, Host host) { } /** - * Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code - * null} and then a cluster-wide partitioner will be invoked. + * Extension of legacy method {@link Metadata#getReplicas(String, Token.Factory, ByteBuffer)}. + * Tablets model requires knowledge of the table name to determine the replicas. This method will + * first try to lookup replicas through known tablets metadata. It will default to TokenMap lookup + * if either {@code null} was passed as table name or the tablet lookup is unsuccessful for any other reason. + * + *

Returns the set of hosts that are replica for a given partition key. Partitioner can be + * {@code null} and then a cluster-wide partitioner will be invoked. * *

Note that this information is refreshed asynchronously by the control connection, when * schema or ring topology changes. It might occasionally be stale (or even empty). * * @param keyspace the name of the keyspace to get replicas for. + * @param table the name of the table to get replicas for. Necessary for distinction for tablets. + * Unnecessary for regular TokenMap * @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner. * @param partitionKey the partition key for which to find the set of replica. * @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note * that the result might be stale or empty if metadata was explicitly disabled with {@link * QueryOptions#setMetadataEnabled(boolean)}. */ + @Beta public Set getReplicas( - String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) { + String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) { keyspace = handleId(keyspace); TokenMap current = tokenMap; + + if (keyspace != null + && table != null) { + if (partitioner == null) { + partitioner = current.factory; + } + Token token = partitioner.hash(partitionKey); + assert (token instanceof Token.TokenLong64); + Set hostUuids = lazyTabletMap.getReplicas(keyspace, table, (long) token.getValue()); + if (!hostUuids.isEmpty()) { + return hostUuids.stream().map(this::getHost).collect(Collectors.toSet()); + } + } + if (current == null) { return Collections.emptySet(); } else { @@ -542,6 +568,25 @@ public Set getReplicas( } } + /** + * Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code + * null} and then a cluster-wide partitioner will be invoked. + * + *

Note that this information is refreshed asynchronously by the control connection, when + * schema or ring topology changes. It might occasionally be stale (or even empty). + * + * @param keyspace the name of the keyspace to get replicas for. + * @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner. + * @param partitionKey the partition key for which to find the set of replica. + * @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note + * that the result might be stale or empty if metadata was explicitly disabled with {@link + * QueryOptions#setMetadataEnabled(boolean)}. + */ + public Set getReplicas( + String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) { + return getReplicas(keyspace, null, partitioner, partitionKey); + } + /** * Returns the set of hosts that are replica for a given token range. * @@ -860,6 +905,58 @@ void triggerOnMaterializedViewRemoved(MaterializedViewMetadata view) { } } + @Beta + public int getShardForTabletToken( + String keyspace, String table, Token.TokenLong64 token, Host host) { + if (lazyTabletMap == null) { + logger.trace( + "Could not determine shard for token {} on host {} because tablets metadata is currently null. " + + "Returning -1.", + token, + host); + return -1; + } + UUID targetHostUuid = host.getHostId(); + long tokenValue = (long) token.getValue(); + LazyTabletMap.KeyspaceTableNamePair key = new LazyTabletMap.KeyspaceTableNamePair(keyspace, table); + NavigableSet targetTablets = lazyTabletMap.getMapping().get(key); + if (targetTablets == null) { + logger.trace( + "Could not determine shard for token {} on host {} because table {}.{} is not present in tablets " + + "metadata. Returning -1.", + token, + host, + keyspace, + table); + return -1; + } + LazyTabletMap.LazyTablet row = targetTablets.ceiling(LazyTabletMap.LazyTablet.malformedTablet(tokenValue)); + if (row != null && row.getFirstToken() < tokenValue) { + for (LazyTabletMap.HostShardPair hostShardPair : row.getReplicas()) { + if (hostShardPair.getHost().equals(targetHostUuid)) { + return hostShardPair.getShard(); + } + } + } + logger.trace( + "Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.", + token, + host, + table, + keyspace); + return -1; + } + + /** + * Getter for last parsed {@link LazyTabletMap}. + * + * @return last parsed {@link LazyTabletMap} + */ + @Beta + public LazyTabletMap getTabletsMap() { + return lazyTabletMap; + } + private static class TokenMap { private final Token.Factory factory; diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index a8a05f093f5..220507ec7c8 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -110,11 +110,22 @@ private Iterator getReplicas( } Token.Factory partitioner = statement.getPartitioner(); + String tableName = null; + ColumnDefinitions defs = null; + if (statement instanceof BoundStatement) { + defs = ((BoundStatement) statement).preparedStatement().getVariables(); + } else if (statement instanceof PreparedStatement) { + defs = ((PreparedStatement) statement).getVariables(); + } + if (defs != null && defs.size() != 0) { + tableName = defs.getTable(0); + } + final Set replicas = manager .cluster .getMetadata() - .getReplicas(Metadata.quote(keyspace), partitioner, partitionKey); + .getReplicas(Metadata.quote(keyspace), tableName, partitioner, partitionKey); // replicas are stored in the right order starting with the primary replica return replicas.iterator(); @@ -437,13 +448,28 @@ private boolean query(final Host host) { ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry); PoolingOptions poolingOptions = manager.configuration().getPoolingOptions(); + String statementKeyspace = statement.getKeyspace(); + String statementTable = null; + ColumnDefinitions defs = null; + if (statement instanceof PreparedStatement) { + defs = ((PreparedStatement) statement).getVariables(); + } + if (statement instanceof BoundStatement) { + defs = ((BoundStatement) statement).statement.getVariables(); + } + if (defs != null && defs.size() > 0) { + statementTable = defs.getTable(0); + } + ListenableFuture connectionFuture = pool.borrowConnection( poolingOptions.getPoolTimeoutMillis(), TimeUnit.MILLISECONDS, poolingOptions.getMaxQueueSize(), statement.getPartitioner(), - routingKey); + routingKey, + statementKeyspace, + statementTable); GuavaCompatibility.INSTANCE.addCallback( connectionFuture, new FutureCallback() { diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index ada9cc1699c..4ec2fcade54 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -739,7 +739,14 @@ private ListenableFuture prepare( ListenableFuture connectionFuture = entry .getValue() - .borrowConnection(0, TimeUnit.MILLISECONDS, 0, null, statement.getRoutingKey()); + .borrowConnection( + 0, + TimeUnit.MILLISECONDS, + 0, + null, + statement.getRoutingKey(), + statement.getQueryKeyspace(), + statement.getVariables().getTable(0)); ListenableFuture prepareFuture = GuavaCompatibility.INSTANCE.transformAsync( connectionFuture, diff --git a/driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java b/driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java new file mode 100644 index 00000000000..c7daf71d25c --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java @@ -0,0 +1,30 @@ +package com.datastax.driver.core; + +import java.util.List; +import java.util.Map; + +public class TabletInfo { + private static final String SCYLLA_TABLETS_STARTUP_OPTION_KEY = "TABLETS_ROUTING_V1"; + private static final String SCYLLA_TABLETS_STARTUP_OPTION_VALUE = ""; + public static final String TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY = "tablets-routing-v1"; + + private boolean enabled = false; + + private TabletInfo(boolean enabled){ + this.enabled = enabled; + } + + // Currently pertains only to TABLETS_ROUTING_V1 + public boolean isEnabled() { + return enabled; + } + + public static TabletInfo parseTabletInfo(Map> supported) { + List values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY); + return new TabletInfo(values != null && values.size() == 1 && values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE)); + } + + public static void addOption(Map options) { + options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE); + } +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java index 047af665b48..6f4c8d6c545 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java @@ -21,11 +21,14 @@ */ package com.datastax.driver.core.policies; +import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.Host; import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.Statement; import com.google.common.collect.AbstractIterator; @@ -185,9 +188,20 @@ public Iterator newQueryPlan(final String loggedKeyspace, final Statement if (partitionKey == null || keyspace == null) return childPolicy.newQueryPlan(keyspace, statement); + String tableName = null; + ColumnDefinitions defs = null; + if (statement instanceof BoundStatement) { + defs = ((BoundStatement) statement).preparedStatement().getVariables(); + } else if (statement instanceof PreparedStatement) { + defs = ((PreparedStatement) statement).getVariables(); + } + if (defs != null && defs.size() > 0) { + tableName = defs.getTable(0); + } + final Set replicas = clusterMetadata.getReplicas( - Metadata.quote(keyspace), statement.getPartitioner(), partitionKey); + Metadata.quote(keyspace), tableName, statement.getPartitioner(), partitionKey); if (replicas.isEmpty()) return childPolicy.newQueryPlan(loggedKeyspace, statement); if (replicaOrdering == ReplicaOrdering.NEUTRAL) { diff --git a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java index f44e64dca0c..34d382dcb8f 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java @@ -1595,7 +1595,8 @@ private MockRequest(HostConnectionPool pool, int timeoutMillis, int maxQueueSize ByteBuffer routingKey = ByteBuffer.allocate(4); routingKey.putInt(0, 0); this.connectionFuture = - pool.borrowConnection(timeoutMillis, MILLISECONDS, maxQueueSize, null, routingKey); + pool.borrowConnection( + timeoutMillis, MILLISECONDS, maxQueueSize, null, routingKey, null, null); requestInitialized = GuavaCompatibility.INSTANCE.transform( this.connectionFuture, diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java index efddbc4a5a6..929aacc4029 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java @@ -89,7 +89,7 @@ public void initMocks() { when(configuration.getProtocolOptions()).thenReturn(protocolOptions); when(protocolOptions.getProtocolVersion()).thenReturn(ProtocolVersion.DEFAULT); when(cluster.getMetadata()).thenReturn(metadata); - when(metadata.getReplicas(Metadata.quote("keyspace"), null, routingKey)) + when(metadata.getReplicas(Metadata.quote("keyspace"), null, null, routingKey)) .thenReturn(Sets.newLinkedHashSet(host1, host2)); when(childPolicy.newQueryPlan("keyspace", statement)) .thenReturn(Sets.newLinkedHashSet(host4, host3, host2, host1).iterator());