Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7213][CORE] Avoid fallback caused by CheckOverflowInTableInsert #7214

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ object ExpressionConverter extends SQLConfHelper with Logging {
substraitExprName,
replaceWithExpressionTransformer0(c.child, attributeSeq, expressionsMap),
c)
case c if c.getClass.getSimpleName.equals("CheckOverflowInTableInsert") =>
ChildTransformer(
substraitExprName,
replaceWithExpressionTransformer0(expr.children.head, attributeSeq, expressionsMap),
expr
)
case b: BinaryArithmetic if DecimalArithmeticUtil.isDecimalArithmetic(b) =>
DecimalArithmeticUtil.checkAllowDecimalArithmetic()
if (!BackendsApiManager.getSettings.transformCheckOverflow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, QueryExecution}
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, ProjectExec, QueryExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -623,6 +623,21 @@ class GlutenInsertSuite
}
}
}

testGluten("GLUTEN-7213: Check no fallback with CheckOverflowInTableInsert") {
withTable("t1", "t2") {
sql("create table t1 (a float) using parquet")
sql("insert into t1 values(1.1)")
sql("create table t2 (b decimal(10,4)) using parquet")

val df = sql("insert overwrite t2 select * from t1")
val executedPlan = df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan
assert(find(executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined)
assert(find(executedPlan)(_.isInstanceOf[ProjectExec]).isEmpty)
}
}
}

class GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package org.apache.spark.sql.sources

import org.apache.gluten.GlutenColumnarWriteTestSupport
import org.apache.gluten.execution.SortExecTransformer
import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer}
import org.apache.gluten.extension.GlutenPlan

import org.apache.spark.SparkConf
import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, ProjectExec, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -595,6 +595,19 @@ class GlutenInsertSuite
}
}
}

testGluten("GLUTEN-7213: Check no fallback with CheckOverflowInTableInsert") {
withTable("t1", "t2") {
sql("create table t1 (a float) using parquet")
sql("insert into t1 values(1.1)")
sql("create table t2 (b decimal(10,4)) using parquet")

val df = sql("insert overwrite t2 select * from t1")
val (_, child) = checkWriteFilesAndGetChild(df)
assert(find(child)(_.isInstanceOf[ProjectExecTransformer]).isDefined)
assert(find(child)(_.isInstanceOf[ProjectExec]).isEmpty)
}
}
}

class GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ object ExpressionNames {
final val INLINE = "inline"
final val POSEXPLODE = "posexplode"
final val CHECK_OVERFLOW = "check_overflow"
final val CHECK_OVERFLOW_IN_TABLE_INSERT = "check_overflow_in_table_insert"
final val MAKE_DECIMAL = "make_decimal"
final val PROMOTE_PRECISION = "promote_precision"
final val SPARK_PARTITION_ID = "spark_partition_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class Spark34Shims extends SparkShims {
Sig[RoundFloor](ExpressionNames.FLOOR),
Sig[RoundCeil](ExpressionNames.CEIL),
Sig[Mask](ExpressionNames.MASK),
Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT)
Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT),
Sig[CheckOverflowInTableInsert](ExpressionNames.CHECK_OVERFLOW_IN_TABLE_INSERT)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class Spark35Shims extends SparkShims {
Sig[TimestampAdd](ExpressionNames.TIMESTAMP_ADD),
Sig[RoundFloor](ExpressionNames.FLOOR),
Sig[RoundCeil](ExpressionNames.CEIL),
Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT)
Sig[ArrayInsert](ExpressionNames.ARRAY_INSERT),
Sig[CheckOverflowInTableInsert](ExpressionNames.CHECK_OVERFLOW_IN_TABLE_INSERT)
)
}

Expand Down
Loading