Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Dec 5, 2023
1 parent 119062f commit f04d6d1
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 72 deletions.
25 changes: 1 addition & 24 deletions docs/velox-backend-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,7 @@ So you need to ensure preferentially load the Gluten jar to overwrite the jar of

### Runtime BloomFilter

Velox BloomFilter's implementation is different from Spark's. So if `might_contain` falls back, but `bloom_filter_agg` is offloaded to velox, an exception will be thrown.

#### example

```sql
SELECT might_contain(null, null) both_null,
might_contain(null, 1L) null_bf,
might_contain((SELECT bloom_filter_agg(cast(id as long)) from range(1, 10000)),
null) null_value
```

The below exception will be thrown.

```
Unexpected Bloom filter version number (512)
java.io.IOException: Unexpected Bloom filter version number (512)
at org.apache.spark.util.sketch.BloomFilterImpl.readFrom0(BloomFilterImpl.java:256)
at org.apache.spark.util.sketch.BloomFilterImpl.readFrom(BloomFilterImpl.java:265)
at org.apache.spark.util.sketch.BloomFilter.readFrom(BloomFilter.java:178)
```

#### Solution

Set the gluten config `spark.gluten.sql.native.bloomFilter=false` to fall back to vanilla bloom filter, you can also disable runtime filter by setting spark config `spark.sql.optimizer.runtime.bloomFilter.enabled=false`.
Velox BloomFilter's serialization format is different from Spark's. BloomFilter binary generated by Velox can't be deserialized by vanilla spark. So if `might_contain` falls back, we fall back `bloom_filter_agg` to vanilla spark also.

### Fallbacks
Except the unsupported operators, functions, file formats, data sources listed in , there are some known cases also fall back to Vanilla Spark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ case class ColumnarOverrideRules(session: SparkSession)
(spark: SparkSession) => PlanOneRowRelation(spark),
(_: SparkSession) => FallbackEmptySchemaRelation(),
(_: SparkSession) => AddTransformHintRule(),
(_: SparkSession) => FallbackBloomFilterAggIfNeeded(),
(_: SparkSession) => TransformPreOverrides(isAdaptiveContext),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EnsureLocalSortRequirements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.glutenproject.utils.PhysicalPlanSelector
import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -270,6 +270,44 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
}
}

/**
* Velox BloomFilter's implementation is different from Spark's. So if might_contain falls back, we
* need fall back related bloom filter agg.
*/
case class FallbackBloomFilterAggIfNeeded() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case p if TransformHints.isAlreadyTagged(p) && TransformHints.isNotTransformable(p) =>
handleBloomFilterFallback(p)
p
}

object SubPlanFromBloomFilterMightContain {
def unapply(expr: Expression): Option[SparkPlan] =
SparkShimLoader.getSparkShims.extactPlanFromBloomFilterMightContain(expr)
}

private def handleBloomFilterFallback(plan: SparkPlan): Unit = {
def tagNotTransformableRecursive(p: SparkPlan): Unit = {
p match {
case agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
if SparkShimLoader.getSparkShims.hasBloomFilterAggregate(agg) =>
TransformHints.tagNotTransformable(agg, "related BloomFilterMightContain falls back")
tagNotTransformableRecursive(agg.child)
case a: org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec =>
tagNotTransformableRecursive(a.executedPlan)
case _ =>
p.children.map(tagNotTransformableRecursive)
}
}

plan.transformAllExpressions {
case expr @ SubPlanFromBloomFilterMightContain(p: SparkPlan) =>
tagNotTransformableRecursive(p)
expr
}
}
}

// This rule will try to convert a plan into plan transformer.
// The doValidate function will be called to check if the conversion is supported.
// If false is returned or any unsupported exception is thrown, a row guard will
Expand Down Expand Up @@ -714,13 +752,6 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
s"${e.getMessage}, original sparkplan is " +
s"${plan.getClass}(${plan.children.toList.map(_.getClass)})")
}

if (TransformHints.isAlreadyTagged(plan) && TransformHints.isNotTransformable(plan)) {
// Velox BloomFilter's implementation is different from Spark's.
// So if might_contain falls back, we need fall back related bloom filter agg.
SparkShimLoader.getSparkShims.handleBloomFilterFallback(plan)(
p => TransformHints.tagNotTransformable(p, "related BloomFilterMightContain is fallbacked"))
}
}

implicit class EncodeTransformableTagImplicits(validationResult: ValidationResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,8 @@ trait SparkShims {
length: Long,
@transient locations: Array[String] = Array.empty): PartitionedFile

def handleBloomFilterFallback(plan: SparkPlan)(fun: SparkPlan => Unit): Unit
def hasBloomFilterAggregate(
agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean

def extactPlanFromBloomFilterMightContain(expr: Expression): Option[SparkPlan]
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,8 @@ class Spark32Shims extends SparkShims {
@transient locations: Array[String] = Array.empty): PartitionedFile =
PartitionedFile(partitionValues, filePath, start, length, locations)

override def handleBloomFilterFallback(plan: SparkPlan)(fun: SparkPlan => Unit): Unit = {}
override def hasBloomFilterAggregate(
agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean = false

override def extactPlanFromBloomFilterMightContain(expr: Expression): Option[SparkPlan] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,30 +127,21 @@ class Spark33Shims extends SparkShims {
@transient locations: Array[String] = Array.empty): PartitionedFile =
PartitionedFile(partitionValues, filePath, start, length, locations)

override def handleBloomFilterFallback(plan: SparkPlan)(fun: SparkPlan => Unit): Unit = {
def tagNotTransformableRecursive(p: SparkPlan): Unit = {
p match {
case agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
if agg.aggregateExpressions.exists(
expr => expr.aggregateFunction.isInstanceOf[BloomFilterAggregate]) =>
fun(agg)
tagNotTransformableRecursive(agg.child)
case a: org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec =>
tagNotTransformableRecursive(a.executedPlan)
case _ =>
p.children.map(tagNotTransformableRecursive)
}
}
override def hasBloomFilterAggregate(
agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean = {
agg.aggregateExpressions.exists(
expr => expr.aggregateFunction.isInstanceOf[BloomFilterAggregate])
}

plan.transformAllExpressions {
override def extactPlanFromBloomFilterMightContain(expr: Expression): Option[SparkPlan] = {
expr match {
case mc @ BloomFilterMightContain(sub: org.apache.spark.sql.execution.ScalarSubquery, _) =>
tagNotTransformableRecursive(sub.plan)
mc
Some(sub.plan)
case mc @ BloomFilterMightContain(
g @ GetStructField(sub: org.apache.spark.sql.execution.ScalarSubquery, _, _),
_) =>
tagNotTransformableRecursive(sub.plan)
mc
Some(sub.plan)
case _ => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,21 @@ class Spark34Shims extends SparkShims {
@transient locations: Array[String] = Array.empty): PartitionedFile =
PartitionedFile(partitionValues, SparkPath.fromPathString(filePath), start, length, locations)

override def handleBloomFilterFallback(plan: SparkPlan)(fun: SparkPlan => Unit): Unit = {
def tagNotTransformableRecursive(p: SparkPlan): Unit = {
p match {
case agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
if agg.aggregateExpressions.exists(
expr => expr.aggregateFunction.isInstanceOf[BloomFilterAggregate]) =>
fun(agg)
tagNotTransformableRecursive(agg.child)
case a: org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec =>
tagNotTransformableRecursive(a.executedPlan)
case _ =>
p.children.map(tagNotTransformableRecursive)
}
}
override def hasBloomFilterAggregate(
agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean = {
agg.aggregateExpressions.exists(
expr => expr.aggregateFunction.isInstanceOf[BloomFilterAggregate])
}

plan.transformAllExpressions {
override def extactPlanFromBloomFilterMightContain(expr: Expression): Option[SparkPlan] = {
expr match {
case mc @ BloomFilterMightContain(sub: org.apache.spark.sql.execution.ScalarSubquery, _) =>
tagNotTransformableRecursive(sub.plan)
mc
Some(sub.plan)
case mc @ BloomFilterMightContain(
g @ GetStructField(sub: org.apache.spark.sql.execution.ScalarSubquery, _, _),
_) =>
tagNotTransformableRecursive(sub.plan)
mc
Some(sub.plan)
case _ => None
}
}

Expand Down

0 comments on commit f04d6d1

Please sign in to comment.