Skip to content

Commit

Permalink
[VL] Allow users to set bloom filter configurations (apache#3610)
Browse files Browse the repository at this point in the history
[VL] Allow users to set bloom filter configurations.
  • Loading branch information
zhli1142015 authored Nov 8, 2023
1 parent 0e99bb6 commit 4a72871
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 5 deletions.
9 changes: 9 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const std::string kAbandonPartialAggregationMinPct =
"spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct";
const std::string kAbandonPartialAggregationMinRows =
"spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows";
const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems";
const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits";
const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits";

// metrics
const std::string kDynamicFiltersProduced = "dynamicFiltersProduced";
Expand Down Expand Up @@ -361,6 +364,12 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] =
getConfigValue(confMap_, kSpillableReservationGrowthPct, "25");
configs[velox::core::QueryConfig::kSpillCompressionKind] = getConfigValue(confMap_, kSpillCompressionKind, "lz4");
configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] =
getConfigValue(confMap_, kBloomFilterExpectedNumItems, "1000000");
configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] =
getConfigValue(confMap_, kBloomFilterNumBits, "8388608");
configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] =
getConfigValue(confMap_, kBloomFilterMaxNumBits, "4194304");
} catch (const std::invalid_argument& err) {
std::string errDetails = err.what();
throw std::runtime_error("Invalid conf arg: " + errDetails);
Expand Down
3 changes: 3 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ You can add these configurations into spark-defaults.conf to enable or disable t
| spark.gluten.loadLibFromJar | Controls whether to load dynamic link library from a packed jar for gluten/cpp. Not applicable to static build and clickhouse backend. | false |
| spark.gluten.sql.columnar.force.hashagg | Force to use hash agg to replace sort agg. | true |
| spark.gluten.sql.columnar.vanillaReaders | Enable vanilla spark's vectorized reader. Please note it may bring perf. overhead due to extra data transition. We recommend to disable it if most queries can be fully offloaded to gluten. | false |
| spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems | The default number of expected items for the velox bloomfilter. | 1000000L |
| spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits | The default number of bits to use for the velox bloom filter. | 8388608L |
| spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits | The max number of bits to use for the velox bloom filter. | 4194304L |

Below is an example for spark-default.conf, if you are using conda to install OAP project.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql

import io.glutenproject.GlutenConfig

import org.apache.spark.sql.internal.SQLConf

class GlutenBloomFilterAggregateQuerySuite
Expand All @@ -25,9 +27,12 @@ class GlutenBloomFilterAggregateQuerySuite

test("Test bloom_filter_agg with big RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS") {
val table = "bloom_filter_test"
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000") {
withSQLConf(
SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000",
GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304"
) {
val numEstimatedItems = 5000000L
val numBits = SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)
val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT every(might_contain(
| (SELECT bloom_filter_agg(col,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql

import io.glutenproject.GlutenConfig

import org.apache.spark.sql.internal.SQLConf

class GlutenBloomFilterAggregateQuerySuite
Expand All @@ -25,9 +27,12 @@ class GlutenBloomFilterAggregateQuerySuite

test("Test bloom_filter_agg with big RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS") {
val table = "bloom_filter_test"
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000") {
withSQLConf(
SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000",
GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304"
) {
val numEstimatedItems = 5000000L
val numBits = SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)
val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT every(might_contain(
| (SELECT bloom_filter_agg(col,
Expand Down
36 changes: 35 additions & 1 deletion shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def veloxSpillFileSystem: String = conf.getConf(COLUMNAR_VELOX_SPILL_FILE_SYSTEM)

def veloxBloomFilterExpectedNumItems: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)

def veloxBloomFilterNumBits: Long = conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS)

def veloxBloomFilterMaxNumBits: Long = conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)

def chColumnarShufflePreferSpill: Boolean = conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED)

def chColumnarShuffleSpillThreshold: Long = conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
Expand Down Expand Up @@ -424,7 +431,10 @@ object GlutenConfig {
SQLConf.SESSION_LOCAL_TIMEZONE.key,
GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY,
SQLConf.LEGACY_SIZE_OF_NULL.key,
"spark.io.compression.codec"
"spark.io.compression.codec",
COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key,
COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS.key,
COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key
)
keys.forEach(
k => {
Expand Down Expand Up @@ -1277,4 +1287,28 @@ object GlutenConfig {
+ "For example `fron_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)`")
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS =
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems")
.internal()
.doc("The default number of expected items for the velox bloomfilter: " +
"'spark.bloom_filter.expected_num_items'")
.longConf
.createWithDefault(1000000L)

val COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS =
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits")
.internal()
.doc("The default number of bits to use for the velox bloom filter: " +
"'spark.bloom_filter.num_bits'")
.longConf
.createWithDefault(8388608L)

val COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS =
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits")
.internal()
.doc("The max number of bits to use for the velox bloom filter: " +
"'spark.bloom_filter.max_num_bits'")
.longConf
.createWithDefault(4194304L)
}

0 comments on commit 4a72871

Please sign in to comment.