diff --git a/docs/velox-backend-limitations.md b/docs/velox-backend-limitations.md index 6d7051439f90..a17075e7d009 100644 --- a/docs/velox-backend-limitations.md +++ b/docs/velox-backend-limitations.md @@ -11,6 +11,34 @@ Gluten avoids to modify Spark's existing code and use Spark APIs if possible. Ho So you need to ensure preferentially load the Gluten jar to overwrite the jar of vanilla spark. Refer to [How to prioritize loading Gluten jars in Spark](https://github.com/oap-project/gluten/blob/main/docs/velox-backend-troubleshooting.md#incompatible-class-error-when-using-native-writer). + +### Runtime BloomFilter + +Velox BloomFilter's implementation is different from Spark's. So if `might_contain` falls back, but `bloom_filter_agg` is offloaded to velox, an exception will be thrown. + +#### example + +```sql +SELECT might_contain(null, null) both_null, + might_contain(null, 1L) null_bf, + might_contain((SELECT bloom_filter_agg(cast(id as long)) from range(1, 10000)), + null) null_value +``` + +The below exception will be thrown. + +``` +Unexpected Bloom filter version number (512) +java.io.IOException: Unexpected Bloom filter version number (512) + at org.apache.spark.util.sketch.BloomFilterImpl.readFrom0(BloomFilterImpl.java:256) + at org.apache.spark.util.sketch.BloomFilterImpl.readFrom(BloomFilterImpl.java:265) + at org.apache.spark.util.sketch.BloomFilter.readFrom(BloomFilter.java:178) +``` + +#### Solution + +Set the gluten config `spark.gluten.sql.native.bloomFilter=false` to fall back to vanilla bloom filter, you can also disable runtime filter by setting spark config `spark.sql.optimizer.runtime.bloomFilter.enabled=false`. + ### Fallbacks Except the unsupported operators, functions, file formats, data sources listed in , there are some known cases also fall back to Vanilla Spark. @@ -24,9 +52,6 @@ Gluten only supports spark default case-insensitive mode. If case-sensitive mode In velox, lookaround (lookahead/lookbehind) pattern is not supported in RE2-based implementations for Spark functions, such as `rlike`, `regexp_extract`, etc. -#### Runtime BloomFilter -Velox BloomFilter's serialization format is different from Spark's. BloomFilter binary generated by Velox can't be deserialized by vanilla spark. So if `might_contain` falls back, we fall back `bloom_filter_agg` to vanilla spark also. - #### FileSource format Currently, Gluten only fully supports parquet file format and partially support ORC. If other format is used, scan operator falls back to vanilla spark. diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 4ee22c0bc24c..0bd693aa1349 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -795,7 +795,6 @@ case class ColumnarOverrideRules(session: SparkSession) (spark: SparkSession) => PlanOneRowRelation(spark), (_: SparkSession) => FallbackEmptySchemaRelation(), (_: SparkSession) => AddTransformHintRule(), - (_: SparkSession) => FallbackBloomFilterAggIfNeeded(), (_: SparkSession) => TransformPreOverrides(isAdaptiveContext), (spark: SparkSession) => RewriteTransformer(spark), (_: SparkSession) => EnsureLocalSortRequirements diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala index c15c538b89d5..d7cbd1fc56d6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala @@ -26,7 +26,7 @@ import io.glutenproject.utils.PhysicalPlanSelector import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.FullOuter import org.apache.spark.sql.catalyst.rules.Rule @@ -270,49 +270,6 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { } } -/** - * Velox BloomFilter's implementation is different from Spark's. So if might_contain falls back, we - * need fall back related bloom filter agg. - */ -case class FallbackBloomFilterAggIfNeeded() extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = - if (GlutenConfig.getConf.enableNativeBloomFilter) { - plan.transformDown { - case p if TransformHints.isAlreadyTagged(p) && TransformHints.isNotTransformable(p) => - handleBloomFilterFallback(p) - p - } - } else { - plan - } - - object SubPlanFromBloomFilterMightContain { - def unapply(expr: Expression): Option[SparkPlan] = - SparkShimLoader.getSparkShims.extractSubPlanFromMightContain(expr) - } - - private def handleBloomFilterFallback(plan: SparkPlan): Unit = { - def tagNotTransformableRecursive(p: SparkPlan): Unit = { - p match { - case agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec - if SparkShimLoader.getSparkShims.hasBloomFilterAggregate(agg) => - TransformHints.tagNotTransformable(agg, "related BloomFilterMightContain falls back") - tagNotTransformableRecursive(agg.child) - case a: org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec => - tagNotTransformableRecursive(a.executedPlan) - case _ => - p.children.map(tagNotTransformableRecursive) - } - } - - plan.transformAllExpressions { - case expr @ SubPlanFromBloomFilterMightContain(p: SparkPlan) => - tagNotTransformableRecursive(p) - expr - } - } -} - // This rule will try to convert a plan into plan transformer. // The doValidate function will be called to check if the conversion is supported. // If false is returned or any unsupported exception is thrown, a row guard will diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 756707679822..f28a9c31a40a 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -51,6 +51,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("string split function with positive limit") .exclude("string split function with negative limit") enableSuite[GlutenBloomFilterAggregateQuerySuite] + // fallback might_contain, the input argument binary is not same with vanilla spark + .exclude("Test NULL inputs for might_contain") enableSuite[GlutenDataSourceV2DataFrameSessionCatalogSuite] enableSuite[GlutenDataSourceV2DataFrameSuite] enableSuite[GlutenDataSourceV2FunctionSuite] diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 1010414ceef4..dc0762a79d20 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -51,6 +51,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("string split function with positive limit") .exclude("string split function with negative limit") enableSuite[GlutenBloomFilterAggregateQuerySuite] + // fallback might_contain, the input argument binary is not same with vanilla spark + .exclude("Test NULL inputs for might_contain") enableSuite[GlutenDataSourceV2DataFrameSessionCatalogSuite] enableSuite[GlutenDataSourceV2DataFrameSuite] enableSuite[GlutenDataSourceV2FunctionSuite] diff --git a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala index ed833dd02738..de99e7efb44c 100644 --- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.text.TextScan @@ -81,9 +81,4 @@ trait SparkShims { start: Long, length: Long, @transient locations: Array[String] = Array.empty): PartitionedFile - - def hasBloomFilterAggregate( - agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean - - def extractSubPlanFromMightContain(expr: Expression): Option[SparkPlan] } diff --git a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala index 0e1a1fb09c5b..580cc93bdf75 100644 --- a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} +import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -101,9 +101,4 @@ class Spark32Shims extends SparkShims { length: Long, @transient locations: Array[String] = Array.empty): PartitionedFile = PartitionedFile(partitionValues, filePath, start, length, locations) - - override def hasBloomFilterAggregate( - agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean = false - - override def extractSubPlanFromMightContain(expr: Expression): Option[SparkPlan] = None } diff --git a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala index a1e34aab08ee..50e536610e7b 100644 --- a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala @@ -127,24 +127,6 @@ class Spark33Shims extends SparkShims { @transient locations: Array[String] = Array.empty): PartitionedFile = PartitionedFile(partitionValues, filePath, start, length, locations) - override def hasBloomFilterAggregate( - agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean = { - agg.aggregateExpressions.exists( - expr => expr.aggregateFunction.isInstanceOf[BloomFilterAggregate]) - } - - override def extractSubPlanFromMightContain(expr: Expression): Option[SparkPlan] = { - expr match { - case mc @ BloomFilterMightContain(sub: org.apache.spark.sql.execution.ScalarSubquery, _) => - Some(sub.plan) - case mc @ BloomFilterMightContain( - g @ GetStructField(sub: org.apache.spark.sql.execution.ScalarSubquery, _, _), - _) => - Some(sub.plan) - case _ => None - } - } - private def invalidBucketFile(path: String): Throwable = { new SparkException( errorClass = "INVALID_BUCKET_FILE", diff --git a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala index e23888e38a82..cdc42f3b43fd 100644 --- a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala @@ -130,24 +130,6 @@ class Spark34Shims extends SparkShims { @transient locations: Array[String] = Array.empty): PartitionedFile = PartitionedFile(partitionValues, SparkPath.fromPathString(filePath), start, length, locations) - override def hasBloomFilterAggregate( - agg: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec): Boolean = { - agg.aggregateExpressions.exists( - expr => expr.aggregateFunction.isInstanceOf[BloomFilterAggregate]) - } - - override def extractSubPlanFromMightContain(expr: Expression): Option[SparkPlan] = { - expr match { - case mc @ BloomFilterMightContain(sub: org.apache.spark.sql.execution.ScalarSubquery, _) => - Some(sub.plan) - case mc @ BloomFilterMightContain( - g @ GetStructField(sub: org.apache.spark.sql.execution.ScalarSubquery, _, _), - _) => - Some(sub.plan) - case _ => None - } - } - private def invalidBucketFile(path: String): Throwable = { new SparkException( errorClass = "INVALID_BUCKET_FILE",