Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Jul 11, 2024
1 parent bca741c commit 6937570
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,6 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan with Gl
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = columnarInputRDD

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason => ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(ValidationResult.ok)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
import org.apache.gluten.extension.GlutenPlan

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
Expand All @@ -33,16 +32,6 @@ case class ColumnarCoalesceExec(numPartitions: Int, child: SparkPlan)

override def supportsColumnar: Boolean = true

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(child.schema)
.map {
reason =>
ValidationResult.notOk(s"Found schema check failure for ${child.schema}, due to: $reason")
}
.getOrElse(ValidationResult.ok)
}

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with LogLevelU
final def doValidate(): ValidationResult = {
try {
TransformerState.enterValidation
val res = doValidateInternal()
val res = BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(doValidateInternal())
if (!res.isValid) {
TestStats.addFallBackClassName(this.getClass.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader

Expand Down Expand Up @@ -133,19 +133,6 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
}

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
{
ValidationResult.notOk(
s"Unsupported schema in broadcast exchange: $schema, reason: $reason")
}
}
.getOrElse(ValidationResult.ok)
}

override def doPrepare(): Unit = {
// Materialize the future.
relationFuture
Expand Down

0 comments on commit 6937570

Please sign in to comment.