Skip to content

Commit

Permalink
[CORE] Add schema validation for broadcast exchange (#4608)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Feb 2, 2024
1 parent 503afb1 commit b0d50c0
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ValidatorApiImpl extends ValidatorApi {
case array: ArrayType =>
doSchemaValidate(array.elementType)
case _ =>
Some(s"do not support data type: $schema")
Some(s"Schema / data type not supported: $schema")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package io.glutenproject.execution

import io.glutenproject.backendsapi.velox.ValidatorApiImpl
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.Runtimes
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
Expand Down Expand Up @@ -45,7 +45,7 @@ import scala.collection.mutable.ListBuffer
case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBase(child = child) {

override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
new ValidatorApiImpl().doSchemaValidate(schema).foreach {
BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema).foreach {
reason =>
throw new UnsupportedOperationException(
s"Input schema contains unsupported type when convert row to columnar for $schema " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.backendsapi.velox.ValidatorApiImpl
import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.Runtimes
import io.glutenproject.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
Expand Down Expand Up @@ -91,7 +90,7 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
}

private def validateSchema(schema: StructType): Boolean = {
val reason = new ValidatorApiImpl().doSchemaValidate(schema)
val reason = BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema)
if (reason.isDefined) {
logInfo(s"Columnar cache does not support schema $schema, due to ${reason.get}")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,23 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}

override protected def doValidateInternal(): ValidationResult = mode match {
case _: HashedRelationBroadcastMode =>
ValidationResult.ok
case _ =>
// TODO IdentityBroadcastMode not supported. Need to support BroadcastNestedLoopJoin first.
ValidationResult.notOk("Only support HashedRelationBroadcastMode for now.")
override protected def doValidateInternal(): ValidationResult = {
mode match {
case _: HashedRelationBroadcastMode =>
case _ =>
// TODO IdentityBroadcastMode not supported. Need to support BroadcastNestedLoopJoin first.
return ValidationResult.notOk("Only support HashedRelationBroadcastMode for now.")
}
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
{
ValidationResult.notOk(
s"Unsupported schema in broadcast exchange: $schema, reason: $reason")
}
}
.getOrElse(ValidationResult.ok)
}

override def doPrepare(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenGeneratorFunctionSuite]
enableSuite[GlutenDataFrameTimeWindowingSuite]
.exclude("time window joins") // FIXME hongze
enableSuite[GlutenDataFrameSessionWindowingSuite]
enableSuite[GlutenBroadcastExchangeSuite]
enableSuite[GlutenDataFramePivotSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,6 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-9083: sort with non-deterministic expressions"
)
enableSuite[GlutenDataFrameTimeWindowingSuite]
.exclude("time window joins") // FIXME hongze
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFramesSuite]
// Local window fixes are not added.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,6 @@ class VeloxTestSettings extends BackendTestSettings {
// test for sort node not present but gluten uses shuffle hash join
.exclude("SPARK-41048: Improve output partitioning and ordering with AQE cache")
enableSuite[GlutenDataFrameTimeWindowingSuite]
.exclude("time window joins") // FIXME hongze
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFramesSuite]
// Local window fixes are not added.
Expand Down

0 comments on commit b0d50c0

Please sign in to comment.