diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 8d98c111af68..d275f58ff893 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -44,14 +44,22 @@ import scala.util.control.Breaks.breakable class VeloxBackend extends Backend { override def name(): String = VeloxBackend.BACKEND_NAME + override def buildInfo(): BackendBuildInfo = BackendBuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME) + override def iteratorApi(): IteratorApi = new VeloxIteratorApi + override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi + override def transformerApi(): TransformerApi = new VeloxTransformerApi + override def validatorApi(): ValidatorApi = new VeloxValidatorApi + override def metricsApi(): MetricsApi = new VeloxMetricsApi + override def listenerApi(): ListenerApi = new VeloxListenerApi + override def settings(): BackendSettingsApi = VeloxBackendSettings } @@ -223,14 +231,22 @@ object VeloxBackendSettings extends BackendSettingsApi { def validateDataTypes(): Option[String] = { val unsupportedTypes = fields.flatMap { field => - field.dataType match { + def checkType(dataType: DataType): Option[String] = dataType match { case _: TimestampType => Some("TimestampType") - case _: StructType => Some("StructType") - case _: ArrayType => Some("ArrayType") - case _: MapType => Some("MapType") + case structType: StructType => + structType.fields + .flatMap(f => checkType(f.dataType)) + .headOption + .map(_ => "StructType") + case arrayType: ArrayType => + checkType(arrayType.elementType).map(_ => "ArrayType") + case mapType: MapType => + checkType(mapType.keyType).orElse(checkType(mapType.valueType)).map(_ => "MapType") case _: YearMonthIntervalType => Some("YearMonthIntervalType") case _ => None } + + checkType(field.dataType) } if (unsupportedTypes.nonEmpty) { Some(unsupportedTypes.mkString("Found unsupported type:", ",", "")) @@ -293,7 +309,7 @@ object VeloxBackendSettings extends BackendSettingsApi { fields.map { field => field.dataType match { - case _: TimestampType | _: StructType | _: ArrayType | _: MapType => return false + case _: TimestampType => return false case _ => } } @@ -357,6 +373,7 @@ object VeloxBackendSettings extends BackendSettingsApi { case _ => } } + doCheck(swf.upper) doCheck(swf.lower) } @@ -406,6 +423,7 @@ object VeloxBackendSettings extends BackendSettingsApi { } } } + override def supportHashBuildJoinTypeOnRight: JoinType => Boolean = { t => if (super.supportHashBuildJoinTypeOnRight(t)) { @@ -422,8 +440,9 @@ object VeloxBackendSettings extends BackendSettingsApi { /** * Check whether a plan needs to be offloaded even though they have empty input schema, e.g, * Sum(1), Count(1), rand(), etc. - * @param plan: - * The Spark plan to check. + * + * @param plan + * : The Spark plan to check. */ private def mayNeedOffload(plan: SparkPlan): Boolean = { def checkExpr(expr: Expression): Boolean = { @@ -463,6 +482,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def fallbackAggregateWithEmptyOutputChild(): Boolean = true override def recreateJoinExecOnFallback(): Boolean = true + override def rescaleDecimalArithmetic(): Boolean = true /** Get the config prefix for each backend */ diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index 07f9101375ce..12b5426b8ce3 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -1599,27 +1599,33 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla } test("test array literal") { - withTable("array_table") { - sql("create table array_table(a array) using parquet") - sql("insert into table array_table select array(1)") - runQueryAndCompare("select size(coalesce(a, array())) from array_table") { - df => - { - assert(getExecutedPlan(df).count(_.isInstanceOf[ProjectExecTransformer]) == 1) - } + // Velox parquet write doesn't support nesting constant encoding. + withSQLConf("spark.gluten.sql.native.writer.enabled" -> "false") { + withTable("array_table") { + sql("create table array_table(a array) using parquet") + sql("insert into table array_table select array(1)") + runQueryAndCompare("select size(coalesce(a, array())) from array_table") { + df => + { + assert(getExecutedPlan(df).count(_.isInstanceOf[ProjectExecTransformer]) == 1) + } + } } } } test("test map literal") { - withTable("map_table") { - sql("create table map_table(a map) using parquet") - sql("insert into table map_table select map(1, 'hello')") - runQueryAndCompare("select size(coalesce(a, map())) from map_table") { - df => - { - assert(getExecutedPlan(df).count(_.isInstanceOf[ProjectExecTransformer]) == 1) - } + // Velox parquet write doesn't support nesting constant encoding. + withSQLConf("spark.gluten.sql.native.writer.enabled" -> "false") { + withTable("map_table") { + sql("create table map_table(a map) using parquet") + sql("insert into table map_table select map(1, 'hello')") + runQueryAndCompare("select size(coalesce(a, map())) from map_table") { + df => + { + assert(getExecutedPlan(df).count(_.isInstanceOf[ProjectExecTransformer]) == 1) + } + } } } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala index 9793df2ab334..16fc089bf18b 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala @@ -485,9 +485,6 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit .format("parquet") .load(data_path) .drop("timestamp") - .drop("array") - .drop("struct") - .drop("map") df.write.mode("append").format("parquet").save(write_path) val parquetDf = spark.read .format("parquet") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 57346f493945..1c8ad5d43202 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -659,6 +659,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") enableSuite[GlutenParquetV2FilterSuite] + // Velox parquet write doesn't support rebaseMode being CORRECT with complex type. + .exclude("Gluten - filter pushdown - date") // Rewrite. .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 9716a7c14374..772fb78c9e3a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -664,6 +664,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") enableSuite[GlutenParquetV2FilterSuite] + // Velox parquet write doesn't support rebaseMode being CORRECT with complex type. + .exclude("Gluten - filter pushdown - date") // Rewrite. .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats")