diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index f653ad246341..f62564cd2e82 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -62,6 +62,9 @@ const std::string kAbandonPartialAggregationMinPct = "spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct"; const std::string kAbandonPartialAggregationMinRows = "spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows"; +const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems"; +const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits"; +const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits"; // metrics const std::string kDynamicFiltersProduced = "dynamicFiltersProduced"; @@ -361,6 +364,12 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] = getConfigValue(confMap_, kSpillableReservationGrowthPct, "25"); configs[velox::core::QueryConfig::kSpillCompressionKind] = getConfigValue(confMap_, kSpillCompressionKind, "lz4"); + configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] = + getConfigValue(confMap_, kBloomFilterExpectedNumItems, "1000000"); + configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] = + getConfigValue(confMap_, kBloomFilterNumBits, "8388608"); + configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] = + getConfigValue(confMap_, kBloomFilterMaxNumBits, "4194304"); } catch (const std::invalid_argument& err) { std::string errDetails = err.what(); throw std::runtime_error("Invalid conf arg: " + errDetails); diff --git a/docs/Configuration.md b/docs/Configuration.md index 94776dd6c804..e66c5e6034e7 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -51,6 +51,9 @@ You can add these configurations into spark-defaults.conf to enable or disable t | spark.gluten.loadLibFromJar | Controls whether to load dynamic link library from a packed jar for gluten/cpp. Not applicable to static build and clickhouse backend. | false | | spark.gluten.sql.columnar.force.hashagg | Force to use hash agg to replace sort agg. | true | | spark.gluten.sql.columnar.vanillaReaders | Enable vanilla spark's vectorized reader. Please note it may bring perf. overhead due to extra data transition. We recommend to disable it if most queries can be fully offloaded to gluten. | false | +| spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems | The default number of expected items for the velox bloomfilter. | 1000000L | +| spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits | The default number of bits to use for the velox bloom filter. | 8388608L | +| spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits | The max number of bits to use for the velox bloom filter. | 4194304L | Below is an example for spark-default.conf, if you are using conda to install OAP project. diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala index 07ab780c4659..744271e53a78 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql +import io.glutenproject.GlutenConfig + import org.apache.spark.sql.internal.SQLConf class GlutenBloomFilterAggregateQuerySuite @@ -25,9 +27,12 @@ class GlutenBloomFilterAggregateQuerySuite test("Test bloom_filter_agg with big RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS") { val table = "bloom_filter_test" - withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000") { + withSQLConf( + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000", + GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304" + ) { val numEstimatedItems = 5000000L - val numBits = SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS) + val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT every(might_contain( | (SELECT bloom_filter_agg(col, diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala index 07ab780c4659..744271e53a78 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql +import io.glutenproject.GlutenConfig + import org.apache.spark.sql.internal.SQLConf class GlutenBloomFilterAggregateQuerySuite @@ -25,9 +27,12 @@ class GlutenBloomFilterAggregateQuerySuite test("Test bloom_filter_agg with big RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS") { val table = "bloom_filter_test" - withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000") { + withSQLConf( + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000", + GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304" + ) { val numEstimatedItems = 5000000L - val numBits = SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS) + val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT every(might_contain( | (SELECT bloom_filter_agg(col, diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 0299637dc531..287ae5807651 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -228,6 +228,13 @@ class GlutenConfig(conf: SQLConf) extends Logging { def veloxSpillFileSystem: String = conf.getConf(COLUMNAR_VELOX_SPILL_FILE_SYSTEM) + def veloxBloomFilterExpectedNumItems: Long = + conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS) + + def veloxBloomFilterNumBits: Long = conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS) + + def veloxBloomFilterMaxNumBits: Long = conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS) + def chColumnarShufflePreferSpill: Boolean = conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED) def chColumnarShuffleSpillThreshold: Long = conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD) @@ -424,7 +431,10 @@ object GlutenConfig { SQLConf.SESSION_LOCAL_TIMEZONE.key, GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY, SQLConf.LEGACY_SIZE_OF_NULL.key, - "spark.io.compression.codec" + "spark.io.compression.codec", + COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key, + COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS.key, + COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key ) keys.forEach( k => { @@ -1277,4 +1287,28 @@ object GlutenConfig { + "For example `fron_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)`") .booleanConf .createWithDefault(true) + + val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS = + buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems") + .internal() + .doc("The default number of expected items for the velox bloomfilter: " + + "'spark.bloom_filter.expected_num_items'") + .longConf + .createWithDefault(1000000L) + + val COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS = + buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits") + .internal() + .doc("The default number of bits to use for the velox bloom filter: " + + "'spark.bloom_filter.num_bits'") + .longConf + .createWithDefault(8388608L) + + val COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS = + buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits") + .internal() + .doc("The max number of bits to use for the velox bloom filter: " + + "'spark.bloom_filter.max_num_bits'") + .longConf + .createWithDefault(4194304L) }