From 455e24cf94f33a9cbc012bb602796c51c3738465 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 8 Dec 2023 14:38:59 +0800 Subject: [PATCH] Improve merge fallback reason --- .../extension/ExpandFallbackPolicy.scala | 6 ++- .../columnar/TransformHintRule.scala | 23 ++++++--- .../execution/GlutenFallbackReporter.scala | 47 ++++++++++++------ .../sql/gluten/GlutenFallbackSuite.scala | 48 ++++++++++++++++++- .../sql/gluten/GlutenFallbackSuite.scala | 48 ++++++++++++++++++- 5 files changed, 144 insertions(+), 28 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ExpandFallbackPolicy.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ExpandFallbackPolicy.scala index 4e3f7ce58a15..a7c3747de9d2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ExpandFallbackPolicy.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ExpandFallbackPolicy.scala @@ -18,7 +18,7 @@ package io.glutenproject.extension import io.glutenproject.GlutenConfig import io.glutenproject.execution.BroadcastHashJoinExecTransformer -import io.glutenproject.extension.columnar.TransformHints +import io.glutenproject.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -247,7 +247,9 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP val reason = fallback(plan) if (reason.isDefined) { val fallbackPlan = fallbackToRowBasedPlan() - TransformHints.tagAllNotTransformable(fallbackPlan, reason.get) + TransformHints.tagAllNotTransformable( + fallbackPlan, + TRANSFORM_UNSUPPORTED(reason, appendReasonIfExists = false)) FallbackNode(fallbackPlan) } else { plan diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala index 45029c0ee503..d463c309a82c 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala @@ -54,7 +54,8 @@ trait TransformHint { } case class TRANSFORM_SUPPORTED() extends TransformHint -case class TRANSFORM_UNSUPPORTED(reason: Option[String]) extends TransformHint +case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists: Boolean = true) + extends TransformHint object TransformHints { val TAG: TreeNodeTag[TransformHint] = @@ -77,11 +78,19 @@ object TransformHints { def tag(plan: SparkPlan, hint: TransformHint): Unit = { val mergedHint = getHintOption(plan) .map { - case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason)) => + case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason), originAppend) => hint match { - case TRANSFORM_UNSUPPORTED(Some(newReason)) => - TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason)) - case TRANSFORM_UNSUPPORTED(None) => + case TRANSFORM_UNSUPPORTED(Some(newReason), append) => + if (originAppend && append) { + TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason)) + } else if (originAppend) { + TRANSFORM_UNSUPPORTED(Some(originalReason)) + } else if (append) { + TRANSFORM_UNSUPPORTED(Some(newReason)) + } else { + TRANSFORM_UNSUPPORTED(Some(originalReason), false) + } + case TRANSFORM_UNSUPPORTED(None, _) => originalHint case _ => throw new UnsupportedOperationException( @@ -113,10 +122,10 @@ object TransformHints { tag(plan, TRANSFORM_UNSUPPORTED(Some(reason))) } - def tagAllNotTransformable(plan: SparkPlan, reason: String): Unit = { + def tagAllNotTransformable(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = { plan.foreach { case _: GlutenPlan => // ignore - case other => tagNotTransformable(other, reason) + case other => tag(other, hint) } } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index c50b99a2d01c..3f81fa7e5486 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -55,24 +55,41 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio private def printFallbackReason(plan: SparkPlan): Unit = { val validationLogLevel = glutenConfig.validationLogLevel - plan.foreach { + plan.foreachUp { case _: GlutenPlan => // ignore case plan: SparkPlan => - plan.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match { - case Some(_) => // We have logged it before, so skip it - case _ => - // some SparkPlan do not have hint, e.g., `ColumnarAQEShuffleRead` - TransformHints.getHintOption(plan) match { - case Some(TRANSFORM_UNSUPPORTED(reason)) => - logFallbackReason(validationLogLevel, plan.nodeName, reason.getOrElse("")) - // With in next round stage in AQE, the physical plan would be a new instance that - // can not preserve the tag, so we need to set the fallback reason to logical plan. - // Then we can be aware of the fallback reason for the whole plan. - plan.logicalLink.foreach { - logicalPlan => logicalPlan.setTagValue(FALLBACK_REASON_TAG, reason.getOrElse("")) - } - case _ => + TransformHints.getHintOption(plan) match { + case Some(TRANSFORM_UNSUPPORTED(Some(reason), append)) => + logFallbackReason(validationLogLevel, plan.nodeName, reason) + // With in next round stage in AQE, the physical plan would be a new instance that + // can not preserve the tag, so we need to set the fallback reason to logical plan. + // Then we can be aware of the fallback reason for the whole plan. + // If a logical plan mapping to several physical plan, we add all reason into + // that logical plan to make sure we do not lose any fallback reason. + plan.logicalLink.foreach { + logicalPlan => + val newReason = logicalPlan + .getTagValue(FALLBACK_REASON_TAG) + .map { + lastReason => + if (!append) { + lastReason + } else if (lastReason.contains(reason)) { + // use the last reason, as the reason is redundant + lastReason + } else if (reason.contains(lastReason)) { + // overwrite the reason + reason + } else { + // add the new reason + lastReason + "; " + reason + } + } + .getOrElse(reason) + logicalPlan.setTagValue(FALLBACK_REASON_TAG, newReason) } + + case _ => } } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 3799e494ab4e..55678ce5397f 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -17,13 +17,18 @@ package org.apache.spark.sql.gluten import io.glutenproject.{GlutenConfig, VERSION} +import io.glutenproject.events.GlutenPlanFallbackEvent +import io.glutenproject.execution.FileSourceScanExecTransformer import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusStore, SparkListenerSQLExecutionStart} import org.apache.spark.status.ElementTrackingStore -class GlutenFallbackSuite extends GlutenSQLTestsTrait { +import scala.collection.mutable.ArrayBuffer + +class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper { test("test fallback logging") { val testAppender = new LogAppender("fallback reason") @@ -101,4 +106,43 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait { .contains("Gluten does not touch it or does not support it")) } } + + test("Improve merge fallback reason") { + spark.sql("create table t using parquet as select 1 as c1, timestamp '2023-01-01' as c2") + withTable("t") { + val events = new ArrayBuffer[GlutenPlanFallbackEvent] + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: GlutenPlanFallbackEvent => events.append(e) + case _ => + } + } + } + spark.sparkContext.addSparkListener(listener) + withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") { + try { + val df = + spark.sql("select c1, count(*) from t where c2 > timestamp '2022-01-01' group by c1") + checkAnswer(df, Row(1, 1)) + spark.sparkContext.listenerBus.waitUntilEmpty() + + // avoid failing when we support transform timestamp filter in future + val isFallback = find(df.queryExecution.executedPlan) { + _.isInstanceOf[FileSourceScanExecTransformer] + }.isEmpty + if (isFallback) { + events.exists( + _.fallbackNodeToReason.values.exists( + _.contains("Subfield filters creation not supported for input type 'TIMESTAMP'"))) + events.exists( + _.fallbackNodeToReason.values.exists( + _.contains("Timestamp is not fully supported in Filter"))) + } + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + } + } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 62fbedd04771..3321775fe364 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -17,13 +17,18 @@ package org.apache.spark.sql.gluten import io.glutenproject.{GlutenConfig, VERSION} +import io.glutenproject.events.GlutenPlanFallbackEvent +import io.glutenproject.execution.FileSourceScanExecTransformer import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusStore, SparkListenerSQLExecutionStart} import org.apache.spark.status.ElementTrackingStore -class GlutenFallbackSuite extends GlutenSQLTestsTrait { +import scala.collection.mutable.ArrayBuffer + +class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper { ignore("test fallback logging") { val testAppender = new LogAppender("fallback reason") @@ -101,4 +106,43 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait { .contains("Gluten does not touch it or does not support it")) } } + + test("Improve merge fallback reason") { + spark.sql("create table t using parquet as select 1 as c1, timestamp '2023-01-01' as c2") + withTable("t") { + val events = new ArrayBuffer[GlutenPlanFallbackEvent] + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: GlutenPlanFallbackEvent => events.append(e) + case _ => + } + } + } + spark.sparkContext.addSparkListener(listener) + withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") { + try { + val df = + spark.sql("select c1, count(*) from t where c2 > timestamp '2022-01-01' group by c1") + checkAnswer(df, Row(1, 1)) + spark.sparkContext.listenerBus.waitUntilEmpty() + + // avoid failing when we support transform timestamp filter in future + val isFallback = find(df.queryExecution.executedPlan) { + _.isInstanceOf[FileSourceScanExecTransformer] + }.isEmpty + if (isFallback) { + events.exists( + _.fallbackNodeToReason.values.exists( + _.contains("Subfield filters creation not supported for input type 'TIMESTAMP'"))) + events.exists( + _.fallbackNodeToReason.values.exists( + _.contains("Timestamp is not fully supported in Filter"))) + } + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + } + } }