Skip to content

Commit

Permalink
Improve merge fallback reason (#3981)
Browse files Browse the repository at this point in the history
Co-authored-by: Kent Yao <[email protected]>
  • Loading branch information
ulysses-you and yaooqinn authored Dec 13, 2023
1 parent bf42fd7 commit 2bdf16c
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.BroadcastHashJoinExecTransformer
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -247,7 +247,9 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
val reason = fallback(plan)
if (reason.isDefined) {
val fallbackPlan = fallbackToRowBasedPlan()
TransformHints.tagAllNotTransformable(fallbackPlan, reason.get)
TransformHints.tagAllNotTransformable(
fallbackPlan,
TRANSFORM_UNSUPPORTED(reason, appendReasonIfExists = false))
FallbackNode(fallbackPlan)
} else {
plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ trait TransformHint {
}

case class TRANSFORM_SUPPORTED() extends TransformHint
case class TRANSFORM_UNSUPPORTED(reason: Option[String]) extends TransformHint
case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists: Boolean = true)
extends TransformHint

object TransformHints {
val TAG: TreeNodeTag[TransformHint] =
Expand All @@ -77,11 +78,19 @@ object TransformHints {
def tag(plan: SparkPlan, hint: TransformHint): Unit = {
val mergedHint = getHintOption(plan)
.map {
case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason)) =>
case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason), originAppend) =>
hint match {
case TRANSFORM_UNSUPPORTED(Some(newReason)) =>
TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason))
case TRANSFORM_UNSUPPORTED(None) =>
case TRANSFORM_UNSUPPORTED(Some(newReason), append) =>
if (originAppend && append) {
TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason))
} else if (originAppend) {
TRANSFORM_UNSUPPORTED(Some(originalReason))
} else if (append) {
TRANSFORM_UNSUPPORTED(Some(newReason))
} else {
TRANSFORM_UNSUPPORTED(Some(originalReason), false)
}
case TRANSFORM_UNSUPPORTED(None, _) =>
originalHint
case _ =>
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -113,10 +122,10 @@ object TransformHints {
tag(plan, TRANSFORM_UNSUPPORTED(Some(reason)))
}

def tagAllNotTransformable(plan: SparkPlan, reason: String): Unit = {
def tagAllNotTransformable(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = {
plan.foreach {
case _: GlutenPlan => // ignore
case other => tagNotTransformable(other, reason)
case other => tag(other, hint)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,41 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio

private def printFallbackReason(plan: SparkPlan): Unit = {
val validationLogLevel = glutenConfig.validationLogLevel
plan.foreach {
plan.foreachUp {
case _: GlutenPlan => // ignore
case plan: SparkPlan =>
plan.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
case Some(_) => // We have logged it before, so skip it
case _ =>
// some SparkPlan do not have hint, e.g., `ColumnarAQEShuffleRead`
TransformHints.getHintOption(plan) match {
case Some(TRANSFORM_UNSUPPORTED(reason)) =>
logFallbackReason(validationLogLevel, plan.nodeName, reason.getOrElse(""))
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
plan.logicalLink.foreach {
logicalPlan => logicalPlan.setTagValue(FALLBACK_REASON_TAG, reason.getOrElse(""))
}
case _ =>
TransformHints.getHintOption(plan) match {
case Some(TRANSFORM_UNSUPPORTED(Some(reason), append)) =>
logFallbackReason(validationLogLevel, plan.nodeName, reason)
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
// If a logical plan mapping to several physical plan, we add all reason into
// that logical plan to make sure we do not lose any fallback reason.
plan.logicalLink.foreach {
logicalPlan =>
val newReason = logicalPlan
.getTagValue(FALLBACK_REASON_TAG)
.map {
lastReason =>
if (!append) {
lastReason
} else if (lastReason.contains(reason)) {
// use the last reason, as the reason is redundant
lastReason
} else if (reason.contains(lastReason)) {
// overwrite the reason
reason
} else {
// add the new reason
lastReason + "; " + reason
}
}
.getOrElse(reason)
logicalPlan.setTagValue(FALLBACK_REASON_TAG, newReason)
}

case _ =>
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.spark.sql.gluten

import io.glutenproject.{GlutenConfig, VERSION}
import io.glutenproject.events.GlutenPlanFallbackEvent
import io.glutenproject.execution.FileSourceScanExecTransformer

import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusStore, SparkListenerSQLExecutionStart}
import org.apache.spark.status.ElementTrackingStore

class GlutenFallbackSuite extends GlutenSQLTestsTrait {
import scala.collection.mutable.ArrayBuffer

class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper {

test("test fallback logging") {
val testAppender = new LogAppender("fallback reason")
Expand Down Expand Up @@ -101,4 +106,43 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait {
.contains("Gluten does not touch it or does not support it"))
}
}

test("Improve merge fallback reason") {
spark.sql("create table t using parquet as select 1 as c1, timestamp '2023-01-01' as c2")
withTable("t") {
val events = new ArrayBuffer[GlutenPlanFallbackEvent]
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: GlutenPlanFallbackEvent => events.append(e)
case _ =>
}
}
}
spark.sparkContext.addSparkListener(listener)
withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
try {
val df =
spark.sql("select c1, count(*) from t where c2 > timestamp '2022-01-01' group by c1")
checkAnswer(df, Row(1, 1))
spark.sparkContext.listenerBus.waitUntilEmpty()

// avoid failing when we support transform timestamp filter in future
val isFallback = find(df.queryExecution.executedPlan) {
_.isInstanceOf[FileSourceScanExecTransformer]
}.isEmpty
if (isFallback) {
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Subfield filters creation not supported for input type 'TIMESTAMP'")))
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Timestamp is not fully supported in Filter")))
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.spark.sql.gluten

import io.glutenproject.{GlutenConfig, VERSION}
import io.glutenproject.events.GlutenPlanFallbackEvent
import io.glutenproject.execution.FileSourceScanExecTransformer

import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusStore, SparkListenerSQLExecutionStart}
import org.apache.spark.status.ElementTrackingStore

class GlutenFallbackSuite extends GlutenSQLTestsTrait {
import scala.collection.mutable.ArrayBuffer

class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper {

ignore("test fallback logging") {
val testAppender = new LogAppender("fallback reason")
Expand Down Expand Up @@ -101,4 +106,43 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait {
.contains("Gluten does not touch it or does not support it"))
}
}

test("Improve merge fallback reason") {
spark.sql("create table t using parquet as select 1 as c1, timestamp '2023-01-01' as c2")
withTable("t") {
val events = new ArrayBuffer[GlutenPlanFallbackEvent]
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: GlutenPlanFallbackEvent => events.append(e)
case _ =>
}
}
}
spark.sparkContext.addSparkListener(listener)
withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
try {
val df =
spark.sql("select c1, count(*) from t where c2 > timestamp '2022-01-01' group by c1")
checkAnswer(df, Row(1, 1))
spark.sparkContext.listenerBus.waitUntilEmpty()

// avoid failing when we support transform timestamp filter in future
val isFallback = find(df.queryExecution.executedPlan) {
_.isInstanceOf[FileSourceScanExecTransformer]
}.isEmpty
if (isFallback) {
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Subfield filters creation not supported for input type 'TIMESTAMP'")))
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Timestamp is not fully supported in Filter")))
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
}

0 comments on commit 2bdf16c

Please sign in to comment.