Skip to content

Commit

Permalink
[GLUTEN-3779][CH] Fix core dump when executing sql with runtime filter (
Browse files Browse the repository at this point in the history
#3781)

When there are more than one input row in the final stage with the bloom_filter_agg, it will be core dump for the CH backend.

The RC is: when merging values in the final stage, the input data maybe a non-init AggregateFunctionGroupBloomFilterData, it will use the wrong filter size and filter_hashes values to init the first AggregateFunctionGroupBloomFilterData, which leads to set the wrong filter size when merging values.

Close #3779.
  • Loading branch information
zzcclp authored Nov 20, 2023
1 parent 6983898 commit 60fc2a0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,59 @@
*/
package io.glutenproject.execution

import org.apache.spark.SparkConf
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}

class GlutenClickHouseTPCHParquetRFSuite extends GlutenClickHouseTPCHParquetSuite {

protected lazy val sparkVersion: String = {
val version = SPARK_VERSION_SHORT.split("\\.")
version(0) + "." + version(1)
}

override protected def sparkConf: SparkConf = {
super.sparkConf
// radically small threshold to force runtime bloom filter
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold", "1KB")
.set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
}

test("GLUTEN-3779: Fix core dump when executing sql with runtime filter") {
withSQLConf(
("spark.sql.autoBroadcastJoinThreshold", "-1"),
("spark.sql.files.maxPartitionBytes", "204800"),
("spark.sql.files.openCostInBytes", "102400")
) {
compareResultsAgainstVanillaSpark(
"""
|SELECT
| sum(l_extendedprice) / 7.0 AS avg_yearly
|FROM
| lineitem,
| part
|WHERE
| p_partkey = l_partkey
| AND p_size > 5
| AND l_quantity < (
| SELECT
| 0.2 * avg(l_quantity)
| FROM
| lineitem
| WHERE
| l_partkey = p_partkey);
|
|""".stripMargin,
compareResult = true,
df => {
if (sparkVersion.equals("3.3")) {
val filterExecs = df.queryExecution.executedPlan.collect {
case filter: FilterExecTransformerBase => filter
}
assert(filterExecs.size == 4)
assert(
filterExecs(0).asInstanceOf[FilterExecTransformer].toString.contains("might_contain"))
}
}
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ class AggregateFunctionGroupBloomFilter final : public IAggregateFunctionDataHel

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
// Skip un-initted values
if (!this->data(rhs).initted)
{
return;
}
const auto & bloom_other = this->data(rhs).bloom_filter;
const auto & filter_other = bloom_other.getFilter();
if (!this->data(place).initted)
Expand Down

0 comments on commit 60fc2a0

Please sign in to comment.