From f0997d3be5482c2d48597dde16bb28192e3b251d Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 10 Oct 2018 23:45:10 +0900 Subject: [PATCH] Introduce cassandra.skip-parition-check NativeCassandraSession#getPartitions is very slow if there are many partitions used in WHERE clause. In our use, the partitions usually exist, so introduce cassandra.skip-parition-check property to reduce planning time. --- .../cassandra/CassandraClientConfig.java | 13 ++ .../cassandra/CassandraClientModule.java | 3 +- .../presto/cassandra/CassandraType.java | 16 ++ .../cassandra/NativeCassandraSession.java | 94 +++++++++- .../presto/cassandra/EmbeddedCassandra.java | 11 +- .../cassandra/TestCassandraClientConfig.java | 7 +- .../cassandra/TestNativeCassandraSession.java | 164 ++++++++++++++++++ 7 files changed, 303 insertions(+), 5 deletions(-) create mode 100644 presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestNativeCassandraSession.java diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java index 59b095bf212ab..a45b5108d3229 100644 --- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java +++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java @@ -76,6 +76,7 @@ public class CassandraClientConfig private String truststorePassword; private File keystorePath; private String keystorePassword; + private boolean skipPartitionCheck; @NotNull @Size(min = 1) @@ -469,6 +470,11 @@ public Optional getTruststorePassword() return Optional.ofNullable(truststorePassword); } + public boolean isSkipPartitionCheck() + { + return skipPartitionCheck; + } + @Config("cassandra.tls.truststore-password") @ConfigSecuritySensitive public CassandraClientConfig setTruststorePassword(String truststorePassword) @@ -476,4 +482,11 @@ public CassandraClientConfig setTruststorePassword(String truststorePassword) this.truststorePassword = truststorePassword; return this; } + + @Config("cassandra.skip-partition-check") + public CassandraClientConfig setSkipPartitionCheck(boolean skipPartitionCheck) + { + this.skipPartitionCheck = skipPartitionCheck; + return this; + } } diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java index 364779296f020..ccd18cc2471a2 100644 --- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java +++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java @@ -156,6 +156,7 @@ public static CassandraSession createCassandraSession( contactPoints.forEach(clusterBuilder::addContactPoint); return clusterBuilder.build(); }), - config.getNoHostAvailableRetryTimeout()); + config.getNoHostAvailableRetryTimeout(), + config.isSkipPartitionCheck()); } } diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraType.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraType.java index 6b68da673e8ab..548a2b1a8a4e1 100644 --- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraType.java +++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraType.java @@ -355,6 +355,22 @@ public static String getColumnValueForCql(Row row, int position, CassandraType c } } + public static String getColumnValueForCql(Object object, CassandraType cassandraType) + { + switch (cassandraType) { + case ASCII: + case TEXT: + case VARCHAR: + return CassandraCqlUtils.quoteStringLiteral(((Slice) object).toStringUtf8()); + case INT: + case BIGINT: + return object.toString(); + default: + throw new IllegalStateException("Handling of type " + cassandraType + + " is not implemented"); + } + } + private static String objectToString(Object object, CassandraType elemType) { switch (elemType) { diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/NativeCassandraSession.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/NativeCassandraSession.java index 06ec9e47ef564..08cacc683a3f8 100644 --- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/NativeCassandraSession.java +++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/NativeCassandraSession.java @@ -56,6 +56,9 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.UncheckedExecutionException; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import io.airlift.slice.Slice; import io.airlift.units.Duration; import java.nio.Buffer; @@ -82,6 +85,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; +import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.Comparator.comparing; import static java.util.Locale.ENGLISH; @@ -116,13 +120,15 @@ public KeyspaceMetadata load(String key) private final Cluster cluster; private final Supplier session; private final Duration noHostAvailableRetryTimeout; + private final boolean skipPartitionCheck; - public NativeCassandraSession(String connectorId, JsonCodec> extraColumnMetadataCodec, Cluster cluster, Duration noHostAvailableRetryTimeout) + public NativeCassandraSession(String connectorId, JsonCodec> extraColumnMetadataCodec, Cluster cluster, Duration noHostAvailableRetryTimeout, boolean skipPartitionCheck) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null"); this.cluster = requireNonNull(cluster, "cluster is null"); this.noHostAvailableRetryTimeout = requireNonNull(noHostAvailableRetryTimeout, "noHostAvailableRetryTimeout is null"); + this.skipPartitionCheck = skipPartitionCheck; this.session = memoize(cluster::connect); } @@ -391,6 +397,10 @@ private CassandraColumnHandle buildColumnHandle(AbstractTableMetadata tableMetad @Override public List getPartitions(CassandraTable table, List> filterPrefixes) { + if (skipPartitionCheck) { + return buildPartitionsFromFilterPrefixes(table, filterPrefixes); + } + List partitionKeyColumns = table.getPartitionKeyColumns(); if (filterPrefixes.size() != partitionKeyColumns.size()) { @@ -458,6 +468,88 @@ public List getPartitions(CassandraTable table, List buildPartitionsFromFilterPrefixes(CassandraTable table, List> filterPrefixes) + { + List partitionKeyColumns = table.getPartitionKeyColumns(); + + if (filterPrefixes.size() != partitionKeyColumns.size()) { + return ImmutableList.of(CassandraPartition.UNPARTITIONED); + } + + ByteBuffer buffer = ByteBuffer.allocate(1000); + HashMap map = new HashMap<>(); + Set uniquePartitionIds = new HashSet<>(); + StringBuilder stringBuilder = new StringBuilder(); + + boolean isComposite = partitionKeyColumns.size() > 1; + + ImmutableList.Builder partitions = ImmutableList.builder(); + for (List values : Sets.cartesianProduct(filterPrefixes)) { + buffer.clear(); + map.clear(); + stringBuilder.setLength(0); + for (int i = 0; i < partitionKeyColumns.size(); i++) { + Object value = values.get(i); + CassandraColumnHandle columnHandle = partitionKeyColumns.get(i); + CassandraType cassandraType = columnHandle.getCassandraType(); + + switch (cassandraType) { + case TEXT: + Slice slice = (Slice) value; + if (isComposite) { + buffer.putShort((short) slice.length()); + buffer.put(slice.getBytes()); + buffer.put((byte) 0); + } + else { + buffer.put(slice.getBytes()); + } + break; + case INT: + int intValue = toIntExact((long) value); + if (isComposite) { + buffer.putShort((short) Integer.BYTES); + buffer.putInt(intValue); + buffer.put((byte) 0); + } + else { + buffer.putInt(intValue); + } + break; + case BIGINT: + if (isComposite) { + buffer.putShort((short) Long.BYTES); + buffer.putLong((long) value); + buffer.put((byte) 0); + } + else { + buffer.putLong((long) value); + } + break; + default: + throw new IllegalStateException("Handling of type " + cassandraType + " is not implemented"); + } + + map.put(columnHandle, NullableValue.of(cassandraType.getNativeType(), value)); + if (i > 0) { + stringBuilder.append(" AND "); + } + stringBuilder.append(CassandraCqlUtils.validColumnName(columnHandle.getName())); + stringBuilder.append(" = "); + stringBuilder.append(CassandraType.getColumnValueForCql(value, cassandraType)); + } + buffer.flip(); + byte[] key = new byte[buffer.limit()]; + buffer.get(key); + TupleDomain tupleDomain = TupleDomain.fromFixedValues(map); + String partitionId = stringBuilder.toString(); + if (uniquePartitionIds.add(partitionId)) { + partitions.add(new CassandraPartition(key, partitionId, tupleDomain, false)); + } + } + return partitions.build(); + } + @Override public ResultSet execute(String cql, Object... values) { diff --git a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/EmbeddedCassandra.java b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/EmbeddedCassandra.java index 9c8f90c271d1e..9c1c27336674f 100644 --- a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/EmbeddedCassandra.java +++ b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/EmbeddedCassandra.java @@ -57,6 +57,7 @@ public final class EmbeddedCassandra private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES); private static CassandraSession session; + private static ReopeningCluster cluster; private static boolean initialized; private EmbeddedCassandra() {} @@ -89,7 +90,8 @@ public static synchronized void start() "EmbeddedCassandra", JsonCodec.listJsonCodec(ExtraColumnMetadata.class), cluster, - new Duration(1, MINUTES)); + new Duration(1, MINUTES), + false); try { checkConnectivity(session); @@ -101,6 +103,7 @@ public static synchronized void start() } EmbeddedCassandra.session = session; + EmbeddedCassandra.cluster = cluster; initialized = true; } @@ -141,6 +144,12 @@ public static synchronized int getPort() return PORT; } + public static synchronized ReopeningCluster getCluster() + { + checkIsInitialized(); + return cluster; + } + private static void checkIsInitialized() { checkState(initialized, "EmbeddedCassandra must be started with #start() method before retrieving the cluster retrieval"); diff --git a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java index 7b6ec6e476abb..15cb13340e549 100644 --- a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java +++ b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java @@ -65,7 +65,8 @@ public void testDefaults() .setKeystorePassword(null) .setTruststorePath(null) .setTruststorePassword(null) - .setTlsEnabled(false)); + .setTlsEnabled(false) + .setSkipPartitionCheck(false)); } @Test @@ -103,6 +104,7 @@ public void testExplicitPropertyMappings() .put("cassandra.tls.keystore-password", "keystore-password") .put("cassandra.tls.truststore-path", "/tmp/truststore") .put("cassandra.tls.truststore-password", "truststore-password") + .put("cassandra.skip-partition-check", "true") .build(); CassandraClientConfig expected = new CassandraClientConfig() @@ -136,7 +138,8 @@ public void testExplicitPropertyMappings() .setKeystorePath(new File("/tmp/keystore")) .setKeystorePassword("keystore-password") .setTruststorePath(new File("/tmp/truststore")) - .setTruststorePassword("truststore-password"); + .setTruststorePassword("truststore-password") + .setSkipPartitionCheck(true); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestNativeCassandraSession.java b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestNativeCassandraSession.java new file mode 100644 index 0000000000000..172dba07cd882 --- /dev/null +++ b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestNativeCassandraSession.java @@ -0,0 +1,164 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.cassandra; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.facebook.airlift.json.JsonCodec; +import io.airlift.slice.Slices; +import io.airlift.units.Duration; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Set; + +import static com.facebook.presto.cassandra.CassandraTestingUtils.createKeyspace; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestNativeCassandraSession +{ + private static final String KEYSPACE = "test_native_cassandra_session_keyspace"; + private static final int FILTER_PARTITION_COUNT = 5; + private static final int EXISTING_PARTITION_COUNT = 4; + private static final int CLUSTERING_KEY_COUNT = 3; + + private CassandraSession session; + + @BeforeClass + public void setUp() throws Exception + { + EmbeddedCassandra.start(); + session = EmbeddedCassandra.getSession(); + createKeyspace(session, KEYSPACE); + } + + @Test + public void testGetPartitionsFromSingleParitionKeyTable() + { + NativeCassandraSession nativeSession = buildNativeSession(false); + String tableName = "single_part_key_table"; + CassandraTable table = createSinglePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildSinglePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), EXISTING_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + @Test + public void testGetPartitionsFromSinglePartitionKeyTableWithSkipPartitionCheck() + { + NativeCassandraSession nativeSession = buildNativeSession(true); + String tableName = "single_part_key_with_skip_partition_check_table"; + CassandraTable table = createSinglePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildSinglePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), FILTER_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + @Test + public void testGetPartitionsFromMultipleParitionKeyTable() + { + NativeCassandraSession nativeSession = buildNativeSession(false); + String tableName = "multi_part_key_table"; + CassandraTable table = createMultiplePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildMultiplePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), EXISTING_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + @Test + public void testGetPartitionsFromMultiplePartitionKeyTableWithSkipPartitionCheck() + { + NativeCassandraSession nativeSession = buildNativeSession(true); + String tableName = "multi_part_key_with_skip_partition_check_table"; + CassandraTable table = createMultiplePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildMultiplePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), FILTER_PARTITION_COUNT * FILTER_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + private NativeCassandraSession buildNativeSession(boolean skipPartitionCheck) + { + return new NativeCassandraSession( + "TestNativeCassandraSession", + JsonCodec.listJsonCodec(ExtraColumnMetadata.class), + EmbeddedCassandra.getCluster(), + new Duration(1, MINUTES), + skipPartitionCheck); + } + + private ImmutableList> buildSinglePartitionKeysList() + { + ImmutableSet.Builder partitionColumnValues = ImmutableSet.builder(); + for (int i = 0; i < FILTER_PARTITION_COUNT; i++) { + partitionColumnValues.add((long) i); + } + return ImmutableList.of(partitionColumnValues.build()); + } + + private CassandraTable createSinglePartitionKeyTable(String tableName) + { + session.execute(format("CREATE TABLE %s.%s (partition_key1 bigint, clustering_key1 bigint, PRIMARY KEY (partition_key1, clustering_key1))", KEYSPACE, tableName)); + for (int i = 0; i < EXISTING_PARTITION_COUNT; i++) { + for (int j = 0; j < CLUSTERING_KEY_COUNT; j++) { + session.execute(format("INSERT INTO %s.%s (partition_key1, clustering_key1) VALUES (%d, %d)", KEYSPACE, tableName, i, j)); + } + } + + CassandraColumnHandle col1 = new CassandraColumnHandle("cassandra", "partition_key1", 1, CassandraType.BIGINT, null, true, false, false, false); + CassandraColumnHandle col2 = new CassandraColumnHandle("cassandra", "clustering_key1", 2, CassandraType.BIGINT, null, false, true, false, false); + return new CassandraTable(new CassandraTableHandle("cassandra", KEYSPACE, tableName), ImmutableList.of(col1, col2)); + } + + private ImmutableList> buildMultiplePartitionKeysList() + { + ImmutableSet.Builder col1Values = ImmutableSet.builder(); + ImmutableSet.Builder col2Values = ImmutableSet.builder(); + for (int i = 0; i < FILTER_PARTITION_COUNT; i++) { + col1Values.add((long) i); + col2Values.add(Slices.utf8Slice(Integer.toString(i))); + } + return ImmutableList.of(col1Values.build(), col2Values.build()); + } + + private CassandraTable createMultiplePartitionKeyTable(String tableName) + { + session.execute(format("CREATE TABLE %s.%s (partition_key1 bigint, partition_key2 text, clustering_key1 bigint, PRIMARY KEY ((partition_key1, partition_key2), clustering_key1))", KEYSPACE, tableName)); + for (int i = 0; i < EXISTING_PARTITION_COUNT; i++) { + for (int j = 0; j < CLUSTERING_KEY_COUNT; j++) { + session.execute(format("INSERT INTO %s.%s (partition_key1, partition_key2, clustering_key1) VALUES (%d, '%s', %d)", KEYSPACE, tableName, i, Integer.toString(i), j)); + } + } + + CassandraColumnHandle col1 = new CassandraColumnHandle("cassandra", "partition_key1", 1, CassandraType.BIGINT, null, true, false, false, false); + CassandraColumnHandle col2 = new CassandraColumnHandle("cassandra", "partition_key2", 2, CassandraType.TEXT, null, true, false, false, false); + CassandraColumnHandle col3 = new CassandraColumnHandle("cassandra", "clustering_key1", 3, CassandraType.BIGINT, null, false, true, false, false); + return new CassandraTable(new CassandraTableHandle("cassandra", KEYSPACE, tableName), ImmutableList.of(col1, col2, col3)); + } +}