diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 2f3e88f9af9cb..29478fe9dbd79 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -17,7 +17,9 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.exception.GlutenException import org.apache.gluten.exec.Runtimes import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.utils.ArrowAbiUtil @@ -45,6 +47,13 @@ import scala.collection.mutable.ListBuffer case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBase(child = child) { override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { + BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema).foreach { + reason => + throw new GlutenException( + s"Input schema contains unsupported type when convert row to columnar for $schema " + + s"due to $reason") + } + val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val convertTime = longMetric("convertTime") diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index 07f9101375ce2..230fc565d9eb3 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -258,16 +258,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla checkLengthAndPlan(df, 5) } - testWithSpecifiedSparkVersion("coalesce validation", Some("3.4")) { - withTempPath { - path => - val data = "2019-09-09 01:02:03.456789" - val df = Seq(data).toDF("strTs").selectExpr(s"CAST(strTs AS TIMESTAMP_NTZ) AS ts") - df.coalesce(1).write.format("parquet").save(path.getCanonicalPath) - spark.read.parquet(path.getCanonicalPath).collect - } - } - test("groupby") { val df = runQueryAndCompare( "select l_orderkey, sum(l_partkey) as sum from lineitem " + diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index 97b4c3a3f807a..0b792d52e0561 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -311,6 +311,15 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan with Gl } override protected def doExecuteColumnar(): RDD[ColumnarBatch] = columnarInputRDD + + override protected def doValidateInternal(): ValidationResult = { + BackendsApiManager.getValidatorApiInstance + .doSchemaValidate(schema) + .map { + reason => ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason") + } + .getOrElse(ValidationResult.ok) + } } /** diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala index 71a76ff63dd1f..8f1004be4aaae 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala @@ -63,16 +63,6 @@ trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with LogLevelU * Validate whether this SparkPlan supports to be transformed into substrait node in Native Code. */ final def doValidate(): ValidationResult = { - val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance - .doSchemaValidate(schema) - .map { - reason => ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason") - } - .getOrElse(ValidationResult.ok) - if (!schemaVaidationResult.isValid) { - TestStats.addFallBackClassName(this.getClass.toString) - return schemaVaidationResult - } try { TransformerState.enterValidation val res = doValidateInternal() diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index d55733fe4e971..4da7a2f6f11ae 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.extension.GlutenPlan +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} import org.apache.gluten.metrics.GlutenTimeMetric import org.apache.gluten.sql.shims.SparkShimLoader @@ -133,6 +133,19 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized) } + override protected def doValidateInternal(): ValidationResult = { + BackendsApiManager.getValidatorApiInstance + .doSchemaValidate(schema) + .map { + reason => + { + ValidationResult.notOk( + s"Unsupported schema in broadcast exchange: $schema, reason: $reason") + } + } + .getOrElse(ValidationResult.ok) + } + override def doPrepare(): Unit = { // Materialize the future. relationFuture