From 834d30d5bb2207a7287af21d1bad23f717fe4069 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 5 Sep 2024 13:18:17 +0800 Subject: [PATCH 1/4] [GLUTEN-7213][CORE] Avoid fallback caused by CheckOverflowInTableInsert --- .../gluten/expression/ExpressionConverter.scala | 6 ++++++ .../GlutenExpressionMappingSuite.scala | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 606cbd96e026..6e59d4da856e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -481,6 +481,12 @@ object ExpressionConverter extends SQLConfHelper with Logging { substraitExprName, replaceWithExpressionTransformer0(c.child, attributeSeq, expressionsMap), c) + case c if c.getClass.getSimpleName.equals("CheckOverflowInTableInsert") => + ChildTransformer( + substraitExprName, + replaceWithExpressionTransformer0(expr.children.head, attributeSeq, expressionsMap), + expr + ) case b: BinaryArithmetic if DecimalArithmeticUtil.isDecimalArithmetic(b) => DecimalArithmeticUtil.checkAllowDecimalArithmetic() if (!BackendsApiManager.getSettings.transformCheckOverflow) { diff --git a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala index 18f2baf6aa04..611aed3b8e20 100644 --- a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala @@ -94,4 +94,19 @@ class GlutenExpressionMappingSuite } } } + + test("GLUTEN-7213: Check no fallback even if there is CheckOverflowInTableInsert") { + withTable("t1","t2") { + sql("create table t1 (a float) stored as parquet") + sql("insert into t1 values(1.1)") + sql("create table t2 (b decimal(10,4)) stored as parquet") + + withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "true") { + val df = sql("insert overwrite t2 select * from t1") + assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) + assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExec]).isEmpty) + } + } + + } } From 5ccd24c0afdc72e6122bacf4426adde2bf0ba6c7 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 12 Sep 2024 21:02:04 +0800 Subject: [PATCH 2/4] fix style --- .../gluten/expressions/GlutenExpressionMappingSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala index 611aed3b8e20..75ed517525f7 100644 --- a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala @@ -96,14 +96,15 @@ class GlutenExpressionMappingSuite } test("GLUTEN-7213: Check no fallback even if there is CheckOverflowInTableInsert") { - withTable("t1","t2") { + withTable("t1", "t2") { sql("create table t1 (a float) stored as parquet") sql("insert into t1 values(1.1)") sql("create table t2 (b decimal(10,4)) stored as parquet") withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "true") { val df = sql("insert overwrite t2 select * from t1") - assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) + assert( + find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExec]).isEmpty) } } From 114e2a14814a3e579a838381f9d4b831cc6f79aa Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 12 Sep 2024 21:43:59 +0800 Subject: [PATCH 3/4] fix --- .../gluten/expressions/GlutenExpressionMappingSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala index 75ed517525f7..2649cde630b1 100644 --- a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala @@ -97,9 +97,9 @@ class GlutenExpressionMappingSuite test("GLUTEN-7213: Check no fallback even if there is CheckOverflowInTableInsert") { withTable("t1", "t2") { - sql("create table t1 (a float) stored as parquet") + sql("create table t1 (a float) using parquet") sql("insert into t1 values(1.1)") - sql("create table t2 (b decimal(10,4)) stored as parquet") + sql("create table t2 (b decimal(10,4)) using parquet") withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "true") { val df = sql("insert overwrite t2 select * from t1") From f840007d610af323690f25f3b2c8e46d6343307f Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Fri, 13 Sep 2024 14:31:47 +0800 Subject: [PATCH 4/4] fix --- .../spark/sql/sources/GlutenInsertSuite.scala | 17 ++++++++++++++++- .../spark/sql/sources/GlutenInsertSuite.scala | 17 +++++++++++++++-- .../GlutenExpressionMappingSuite.scala | 16 ---------------- .../gluten/expression/ExpressionNames.scala | 1 + .../gluten/sql/shims/spark34/Spark34Shims.scala | 3 ++- .../gluten/sql/shims/spark35/Spark35Shims.scala | 3 ++- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 74c4df197759..efbabb0e1778 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, QueryExecution} +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, ProjectExec, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -623,6 +623,21 @@ class GlutenInsertSuite } } } + + testGluten("GLUTEN-7213: Check no fallback with CheckOverflowInTableInsert") { + withTable("t1", "t2") { + sql("create table t1 (a float) using parquet") + sql("insert into t1 values(1.1)") + sql("create table t2 (b decimal(10,4)) using parquet") + + val df = sql("insert overwrite t2 select * from t1") + val executedPlan = df.queryExecution.executedPlan + .asInstanceOf[CommandResultExec] + .commandPhysicalPlan + assert(find(executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) + assert(find(executedPlan)(_.isInstanceOf[ProjectExec]).isEmpty) + } + } } class GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem { diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 1cb905e10abf..aeb6df49d3c0 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources import org.apache.gluten.GlutenColumnarWriteTestSupport -import org.apache.gluten.execution.SortExecTransformer +import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer} import org.apache.gluten.extension.GlutenPlan import org.apache.spark.SparkConf @@ -25,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, QueryExecution, SparkPlan} +import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, ProjectExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -595,6 +595,19 @@ class GlutenInsertSuite } } } + + testGluten("GLUTEN-7213: Check no fallback with CheckOverflowInTableInsert") { + withTable("t1", "t2") { + sql("create table t1 (a float) using parquet") + sql("insert into t1 values(1.1)") + sql("create table t2 (b decimal(10,4)) using parquet") + + val df = sql("insert overwrite t2 select * from t1") + val (_, child) = checkWriteFilesAndGetChild(df) + assert(find(child)(_.isInstanceOf[ProjectExecTransformer]).isDefined) + assert(find(child)(_.isInstanceOf[ProjectExec]).isEmpty) + } + } } class GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem { diff --git a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala index 2649cde630b1..18f2baf6aa04 100644 --- a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala @@ -94,20 +94,4 @@ class GlutenExpressionMappingSuite } } } - - test("GLUTEN-7213: Check no fallback even if there is CheckOverflowInTableInsert") { - withTable("t1", "t2") { - sql("create table t1 (a float) using parquet") - sql("insert into t1 values(1.1)") - sql("create table t2 (b decimal(10,4)) using parquet") - - withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "true") { - val df = sql("insert overwrite t2 select * from t1") - assert( - find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) - assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExec]).isEmpty) - } - } - - } } diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index c45f0b2d4e9c..0e08e013cb17 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -314,6 +314,7 @@ object ExpressionNames { final val INLINE = "inline" final val POSEXPLODE = "posexplode" final val CHECK_OVERFLOW = "check_overflow" + final val CHECK_OVERFLOW_IN_TABLE_INSERT = "check_overflow_in_table_insert" final val MAKE_DECIMAL = "make_decimal" final val PROMOTE_PRECISION = "promote_precision" final val SPARK_PARTITION_ID = "spark_partition_id" diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 5e42f66ba3c1..558d7f60d5eb 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -82,7 +82,8 @@ class Spark34Shims extends SparkShims { Sig[RoundFloor](ExpressionNames.FLOOR), Sig[RoundCeil](ExpressionNames.CEIL), Sig[Mask](ExpressionNames.MASK), - Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT) + Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT), + Sig[CheckOverflowInTableInsert](ExpressionNames.CHECK_OVERFLOW_IN_TABLE_INSERT) ) } diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index ddb023b5a4e9..4a6590161c4a 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -82,7 +82,8 @@ class Spark35Shims extends SparkShims { Sig[TimestampAdd](ExpressionNames.TIMESTAMP_ADD), Sig[RoundFloor](ExpressionNames.FLOOR), Sig[RoundCeil](ExpressionNames.CEIL), - Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT) + Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT), + Sig[CheckOverflowInTableInsert](ExpressionNames.CHECK_OVERFLOW_IN_TABLE_INSERT) ) }