Skip to content

Commit

Permalink
[CORE] fix UnsupportedOperationException caused by gluten C2R (#4165)
Browse files Browse the repository at this point in the history
[VL] fix UnsupportedOperationException caused by gluten C2R
  • Loading branch information
zhli1142015 authored Jan 4, 2024
1 parent e99cd01 commit dd8e872
Show file tree
Hide file tree
Showing 18 changed files with 430 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.glutenproject.execution

import io.glutenproject.GlutenConfig
import io.glutenproject.extension.InMemoryTableScanHelper
import io.glutenproject.utils.PlanUtil

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -134,8 +134,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
checkAnswer(df, Row(60175))
assert(
find(df.queryExecution.executedPlan) {
case VeloxColumnarToRowExec(child: SparkPlan)
if InMemoryTableScanHelper.isGlutenTableCache(child) =>
case VeloxColumnarToRowExec(child: SparkPlan) if PlanUtil.isGlutenTableCache(child) =>
true
case _ => false
}.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package io.glutenproject.expression
import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution.{ColumnarToRowExecBase, WholeStageTransformer}
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.test.TestStats
import io.glutenproject.utils.DecimalArithmeticUtil
import io.glutenproject.utils.{DecimalArithmeticUtil, PlanUtil}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
Expand Down Expand Up @@ -522,7 +521,7 @@ object ExpressionConverter extends SQLConfHelper with Logging {
// get WholeStageTransformer directly
case c2r: ColumnarToRowExecBase => c2r.child
// in fallback case
case plan: UnaryExecNode if !plan.isInstanceOf[GlutenPlan] =>
case plan: UnaryExecNode if !PlanUtil.isGlutenColumnarOp(plan) =>
plan.child match {
case _: ColumnarToRowExec =>
val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.glutenproject.execution._
import io.glutenproject.expression.ExpressionConverter
import io.glutenproject.extension.columnar._
import io.glutenproject.metrics.GlutenTimeMetric
import io.glutenproject.utils.{LogLevelUtil, PhysicalPlanSelector}
import io.glutenproject.utils.{LogLevelUtil, PhysicalPlanSelector, PlanUtil}

import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.internal.Logging
Expand All @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange._
Expand Down Expand Up @@ -114,7 +113,7 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
// If the child is transformable, transform aggregation as well.
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
transformHashAggregate()
case p: SparkPlan if InMemoryTableScanHelper.isGlutenTableCache(p) =>
case p: SparkPlan if PlanUtil.isGlutenTableCache(p) =>
transformHashAggregate()
case _ =>
// If the child is not transformable, transform the grandchildren only.
Expand Down Expand Up @@ -615,15 +614,21 @@ case class TransformPostOverrides(isAdaptiveContext: Boolean) extends Rule[Spark
def transformColumnarToRowExec(plan: ColumnarToRowExec): SparkPlan = {
if (columnarConf.enableNativeColumnarToRow) {
val child = replaceWithTransformerPlan(plan.child)
logDebug(s"ColumnarPostOverrides ColumnarToRowExecBase(${child.nodeName})")
val nativeConversion =
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
val validationResult = nativeConversion.doValidate()
if (validationResult.isValid) {
nativeConversion
} else {
TransformHints.tagNotTransformable(plan, validationResult)

if (!PlanUtil.outputNativeColumnarData(child)) {
TransformHints.tagNotTransformable(plan, "child is not gluten plan")
plan.withNewChildren(plan.children.map(replaceWithTransformerPlan))
} else {
logDebug(s"ColumnarPostOverrides ColumnarToRowExecBase(${child.nodeName})")
val nativeConversion =
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
val validationResult = nativeConversion.doValidate()
if (validationResult.isValid) {
nativeConversion
} else {
TransformHints.tagNotTransformable(plan, validationResult)
plan.withNewChildren(plan.children.map(replaceWithTransformerPlan))
}
}
} else {
plan.withNewChildren(plan.children.map(replaceWithTransformerPlan))
Expand All @@ -646,7 +651,7 @@ case class TransformPostOverrides(isAdaptiveContext: Boolean) extends Rule[Spark
case ColumnarToRowExec(child: BroadcastQueryStageExec) =>
replaceWithTransformerPlan(child)
// `InMemoryTableScanExec` internally supports ColumnarToRow
case ColumnarToRowExec(child: SparkPlan) if InMemoryTableScanHelper.isGlutenTableCache(child) =>
case ColumnarToRowExec(child: SparkPlan) if PlanUtil.isGlutenTableCache(child) =>
child
case plan: ColumnarToRowExec =>
transformColumnarToRowExec(plan)
Expand All @@ -657,7 +662,7 @@ case class TransformPostOverrides(isAdaptiveContext: Boolean) extends Rule[Spark
// ColumnarExchange maybe child as a Row SparkPlan
r.withNewChildren(r.children.map {
// `InMemoryTableScanExec` internally supports ColumnarToRow
case c: ColumnarToRowExec if !InMemoryTableScanHelper.isGlutenTableCache(c.child) =>
case c: ColumnarToRowExec if !PlanUtil.isGlutenTableCache(c.child) =>
transformColumnarToRowExec(c)
case other =>
replaceWithTransformerPlan(other)
Expand All @@ -674,62 +679,48 @@ case class TransformPostOverrides(isAdaptiveContext: Boolean) extends Rule[Spark
}
}

object InMemoryTableScanHelper {
private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
// `ColumnarCachedBatchSerializer` is at velox module, so use class name here
i.relation.cacheBuilder.serializer.getClass.getSimpleName == "ColumnarCachedBatchSerializer" &&
i.supportsColumnar
}

def isGlutenTableCache(plan: SparkPlan): Boolean = {
plan match {
case i: InMemoryTableScanExec =>
isGlutenTableCacheInternal(i)
case q: QueryStageExec =>
// Compatible with Spark3.5 `TableCacheQueryStage`
isGlutenTableCache(q.plan)
case _ => false
}
}
}

// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
// to support vanilla columnar scan.
// to support vanilla columnar operators.
case class VanillaColumnarPlanOverrides(session: SparkSession) extends Rule[SparkPlan] {
@transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()

private def replaceWithVanillaColumnarToRow(plan: SparkPlan): SparkPlan = plan match {
case c2r: ColumnarToRowExecBase if isVanillaColumnarReader(c2r.child) =>
ColumnarToRowExec(c2r.child)
case c2r: ColumnarToRowExec if isVanillaColumnarReader(c2r.child) =>
c2r
case _ if isVanillaColumnarReader(plan) =>
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(ColumnarToRowExec(plan))
case _ if PlanUtil.isGlutenColumnarOp(plan) =>
plan.withNewChildren(plan.children.map {
c =>
val child = replaceWithVanillaColumnarToRow(c)
if (PlanUtil.isVanillaColumnarOp(child)) {
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
ColumnarToRowExec(child))
} else {
child
}
})
case _ =>
plan.withNewChildren(plan.children.map(replaceWithVanillaColumnarToRow))
}

private def isVanillaColumnarReader(plan: SparkPlan): Boolean = plan match {
case _: BatchScanExec | _: DataSourceScanExec =>
!plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
case i: InMemoryTableScanExec =>
if (InMemoryTableScanHelper.isGlutenTableCache(i)) {
// `InMemoryTableScanExec` do not need extra RowToColumnar or ColumnarToRow
false
} else {
!plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
}
case _ => false
private def replaceWithVanillaRowToColumnar(plan: SparkPlan): SparkPlan = plan match {
case _ if PlanUtil.isVanillaColumnarOp(plan) =>
plan.withNewChildren(plan.children.map {
c =>
val child = replaceWithVanillaRowToColumnar(c)
if (PlanUtil.isGlutenColumnarOp(child)) {
RowToColumnarExec(
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
} else {
child
}
})
case _ =>
plan.withNewChildren(plan.children.map(replaceWithVanillaRowToColumnar))
}

def apply(plan: SparkPlan): SparkPlan =
if (GlutenConfig.getConf.enableVanillaVectorizedReaders) {
val newPlan = replaceWithVanillaColumnarToRow(plan)
planChangeLogger.logRule(ruleName, plan, newPlan)
newPlan
} else {
plan
}
def apply(plan: SparkPlan): SparkPlan = {
val newPlan = replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
planChangeLogger.logRule(ruleName, plan, newPlan)
newPlan
}
}

object ColumnarOverrideRules {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution.BroadcastHashJoinExecTransformer
import io.glutenproject.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints}
import io.glutenproject.utils.PlanUtil

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -80,7 +81,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
case ColumnarToRowExec(s: Exchange) if isAdaptiveContext =>
countFallbackInternal(s)
case u: UnaryExecNode
if !u.isInstanceOf[GlutenPlan] && InMemoryTableScanHelper.isGlutenTableCache(u.child) =>
if !PlanUtil.isGlutenColumnarOp(u) && PlanUtil.isGlutenTableCache(u.child) =>
// Vanilla Spark plan will call `InMemoryTableScanExec.convertCachedBatchToInternalRow`
// which is a kind of `ColumnarToRowExec`.
transitionCost = transitionCost + 1
Expand All @@ -94,8 +95,8 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
transitionCost = transitionCost + 1
}
countFallbackInternal(r.child)
case leafPlan: LeafExecNode if InMemoryTableScanHelper.isGlutenTableCache(leafPlan) =>
case leafPlan: LeafExecNode if !leafPlan.isInstanceOf[GlutenPlan] =>
case leafPlan: LeafExecNode if PlanUtil.isGlutenTableCache(leafPlan) =>
case leafPlan: LeafExecNode if !PlanUtil.isGlutenColumnarOp(leafPlan) =>
// Possible fallback for leaf node.
transitionCost = transitionCost + 1
case p => p.children.foreach(countFallbackInternal)
Expand Down Expand Up @@ -157,17 +158,17 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
.foreach {
// For this case, table cache will internally execute ColumnarToRow if
// we make the stage fall back.
case tableCache if InMemoryTableScanHelper.isGlutenTableCache(tableCache) =>
case tableCache if PlanUtil.isGlutenTableCache(tableCache) =>
stageFallbackTransitionCost = stageFallbackTransitionCost + 1
case _ =>
}
leaves
.filter(_.isInstanceOf[QueryStageExec])
.foreach {
case stage: QueryStageExec
if stage.plan.isInstanceOf[GlutenPlan] ||
if PlanUtil.isGlutenColumnarOp(stage.plan) ||
// For TableCacheQueryStageExec since spark 3.5.
InMemoryTableScanHelper.isGlutenTableCache(stage) =>
PlanUtil.isGlutenTableCache(stage) =>
stageFallbackTransitionCost = stageFallbackTransitionCost + 1
case _ =>
}
Expand Down Expand Up @@ -243,8 +244,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
case c2r @ ColumnarToRowExec(_: AQEShuffleReadExec) =>
transformPostOverrides.transformColumnarToRowExec(c2r)
// `InMemoryTableScanExec` itself supports columnar to row
case ColumnarToRowExec(child: SparkPlan)
if InMemoryTableScanHelper.isGlutenTableCache(child) =>
case ColumnarToRowExec(child: SparkPlan) if PlanUtil.isGlutenTableCache(child) =>
child
case plan: RowToColumnarExec =>
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package io.glutenproject.utils

import io.glutenproject.extension.GlutenPlan

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
Expand Down Expand Up @@ -68,12 +66,12 @@ object FallbackUtil extends Logging with AdaptiveSparkPlanHelper {
var fallbackOperator: Seq[SparkPlan] = null
if (plan.isInstanceOf[AdaptiveSparkPlanExec]) {
fallbackOperator = collectWithSubqueries(plan) {
case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
case plan if !PlanUtil.isGlutenColumnarOp(plan) && !skip(plan) =>
plan
}
} else {
fallbackOperator = plan.collectWithSubqueries {
case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
case plan if !PlanUtil.isGlutenColumnarOp(plan) && !skip(plan) =>
plan
}
}
Expand Down
80 changes: 80 additions & 0 deletions gluten-core/src/main/scala/io/glutenproject/utils/PlanUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.utils

import io.glutenproject.extension.GlutenPlan

import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange._

object PlanUtil {
private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
// `ColumnarCachedBatchSerializer` is at velox module, so use class name here
i.relation.cacheBuilder.serializer.getClass.getSimpleName == "ColumnarCachedBatchSerializer" &&
i.supportsColumnar
}

def isGlutenTableCache(plan: SparkPlan): Boolean = {
plan match {
case i: InMemoryTableScanExec =>
isGlutenTableCacheInternal(i)
case q: QueryStageExec =>
// Compatible with Spark3.5 `TableCacheQueryStage`
isGlutenTableCache(q.plan)
case _ => false
}
}

def outputNativeColumnarData(plan: SparkPlan): Boolean = {
plan match {
case a: AQEShuffleReadExec => outputNativeColumnarData(a.child)
case s: QueryStageExec => outputNativeColumnarData(s.plan)
case s: ReusedExchangeExec => outputNativeColumnarData(s.child)
case s: InputAdapter => outputNativeColumnarData(s.child)
case s: WholeStageCodegenExec => outputNativeColumnarData(s.child)
case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan)
case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i)
case _: GlutenPlan => true
case _ => false
}
}

def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
plan match {
case i: InMemoryTableScanExec =>
if (PlanUtil.isGlutenTableCache(i)) {
// `InMemoryTableScanExec` do not need extra RowToColumnar or ColumnarToRow
false
} else {
!plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
}
case a: AQEShuffleReadExec => isVanillaColumnarOp(a.child)
case s: QueryStageExec => isVanillaColumnarOp(s.plan)
case _: RowToColumnarExec => false
case _: InputAdapter => false
case _: WholeStageCodegenExec => false
case r: ReusedExchangeExec => isVanillaColumnarOp(r.child)
case _ => !plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
}
}

def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
plan.isInstanceOf[GlutenPlan]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.spark.sql.execution

import io.glutenproject.execution.WholeStageTransformer
import io.glutenproject.extension.{GlutenPlan, InMemoryTableScanHelper}
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.utils.PlanUtil

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
Expand Down Expand Up @@ -94,7 +95,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
numGlutenNodes += 1
p.innerChildren.foreach(collect)
case i: InMemoryTableScanExec =>
if (InMemoryTableScanHelper.isGlutenTableCache(i)) {
if (PlanUtil.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
Expand Down
Loading

0 comments on commit dd8e872

Please sign in to comment.