Skip to content

Commit

Permalink
Revert "[VL] Add schema validation for all operators (apache#6406)"
Browse files Browse the repository at this point in the history
This reverts commit 1cc4ef4.
  • Loading branch information
yma11 committed Jul 15, 2024
1 parent 64dfda6 commit a8ffea2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exec.Runtimes
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.ArrowAbiUtil
Expand Down Expand Up @@ -45,6 +47,13 @@ import scala.collection.mutable.ListBuffer
case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBase(child = child) {

override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema).foreach {
reason =>
throw new GlutenException(
s"Input schema contains unsupported type when convert row to columnar for $schema " +
s"due to $reason")
}

val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
checkLengthAndPlan(df, 5)
}

testWithSpecifiedSparkVersion("coalesce validation", Some("3.4")) {
withTempPath {
path =>
val data = "2019-09-09 01:02:03.456789"
val df = Seq(data).toDF("strTs").selectExpr(s"CAST(strTs AS TIMESTAMP_NTZ) AS ts")
df.coalesce(1).write.format("parquet").save(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).collect
}
}

test("groupby") {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan with Gl
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = columnarInputRDD

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason => ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(ValidationResult.ok)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with LogLevelU
* Validate whether this SparkPlan supports to be transformed into substrait node in Native Code.
*/
final def doValidate(): ValidationResult = {
val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason => ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(ValidationResult.ok)
if (!schemaVaidationResult.isValid) {
TestStats.addFallBackClassName(this.getClass.toString)
return schemaVaidationResult
}
try {
TransformerState.enterValidation
val res = doValidateInternal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader

Expand Down Expand Up @@ -133,6 +133,19 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
}

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
{
ValidationResult.notOk(
s"Unsupported schema in broadcast exchange: $schema, reason: $reason")
}
}
.getOrElse(ValidationResult.ok)
}

override def doPrepare(): Unit = {
// Materialize the future.
relationFuture
Expand Down

0 comments on commit a8ffea2

Please sign in to comment.