Skip to content

Commit

Permalink
Revert "[VL] Support Spark assert_true function (#6329)"
Browse files Browse the repository at this point in the history
This reverts commit 6f189c7.
  • Loading branch information
yma11 committed Jul 15, 2024
1 parent 42e1f44 commit 1382f4c
Show file tree
Hide file tree
Showing 25 changed files with 12 additions and 1,062 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,6 @@ object CHExpressionUtil {
UNIX_MICROS -> DefaultValidator(),
TIMESTAMP_MILLIS -> DefaultValidator(),
TIMESTAMP_MICROS -> DefaultValidator(),
STACK -> DefaultValidator(),
TRANSFORM_KEYS -> DefaultValidator(),
TRANSFORM_VALUES -> DefaultValidator(),
RAISE_ERROR -> DefaultValidator()
STACK -> DefaultValidator()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.ExpressionNames.{TRANSFORM_KEYS, TRANSFORM_VALUES}
import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar.FallbackTags
Expand Down Expand Up @@ -834,6 +835,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
Sig[VeloxCollectSet](ExpressionNames.COLLECT_SET),
Sig[VeloxBloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN),
Sig[VeloxBloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG),
Sig[TransformKeys](TRANSFORM_KEYS),
Sig[TransformValues](TRANSFORM_VALUES),
// For test purpose.
Sig[VeloxDummyExpression](VeloxDummyExpression.VELOX_DUMMY_EXPRESSION)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.spark.SparkException
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -664,19 +663,6 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest {
}
}

test("Test raise_error, assert_true function") {
runQueryAndCompare("""SELECT assert_true(l_orderkey >= 1), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
val e = intercept[SparkException] {
sql("""SELECT assert_true(l_orderkey >= 100), l_orderkey from
| lineitem limit 100""".stripMargin).collect()
}
assert(e.getCause.isInstanceOf[RuntimeException])
assert(e.getMessage.contains("l_orderkey"))
}

test("Test E function") {
runQueryAndCompare("""SELECT E() from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ object ExpressionMappings {
Sig[MapEntries](MAP_ENTRIES),
Sig[MapZipWith](MAP_ZIP_WITH),
Sig[StringToMap](STR_TO_MAP),
Sig[TransformKeys](TRANSFORM_KEYS),
Sig[TransformValues](TRANSFORM_VALUES),
// Struct functions
Sig[GetStructField](GET_STRUCT_FIELD),
Sig[CreateNamedStruct](NAMED_STRUCT),
Expand All @@ -286,7 +284,6 @@ object ExpressionMappings {
Sig[SparkPartitionID](SPARK_PARTITION_ID),
Sig[WidthBucket](WIDTH_BUCKET),
Sig[ReplicateRows](REPLICATE_ROWS),
Sig[RaiseError](RAISE_ERROR),
// Decimal
Sig[UnscaledValue](UNSCALED_VALUE),
// Generator function
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("sliding range between with aggregation")
.exclude("store and retrieve column stats in different time zones")
enableSuite[GlutenColumnExpressionSuite]
// Velox raise_error('errMsg') throws a velox_user_error exception with the message 'errMsg'.
// The final caught Spark exception's getCause().getMessage() contains 'errMsg' but does not
// equal 'errMsg' exactly. The following two tests will be skipped and overridden in Gluten.
.exclude("raise_error")
.exclude("assert_true")
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenGeneratorFunctionSuite]
enableSuite[GlutenDataFrameTimeWindowingSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,11 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkException
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions.{assert_true, expr, input_file_name, lit, raise_error}
import org.apache.spark.sql.functions.{expr, input_file_name}

class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait {
import testImplicits._
testGluten("raise_error") {
val strDf = Seq(("hello")).toDF("a")

val e1 = intercept[SparkException] {
strDf.select(raise_error(lit(null.asInstanceOf[String]))).collect()
}
assert(e1.getCause.isInstanceOf[RuntimeException])

val e2 = intercept[SparkException] {
strDf.select(raise_error($"a")).collect()
}
assert(e2.getCause.isInstanceOf[RuntimeException])
assert(e2.getCause.getMessage contains "hello")
}

testGluten("assert_true") {
// assert_true(condition, errMsgCol)
val booleanDf = Seq((true), (false)).toDF("cond")
checkAnswer(
booleanDf.filter("cond = true").select(assert_true($"cond")),
Row(null) :: Nil
)
val e1 = intercept[SparkException] {
booleanDf.select(assert_true($"cond", lit(null.asInstanceOf[String]))).collect()
}
assert(e1.getCause.isInstanceOf[RuntimeException])

val nullDf = Seq(("first row", None), ("second row", Some(true))).toDF("n", "cond")
checkAnswer(
nullDf.filter("cond = true").select(assert_true($"cond", $"cond")),
Row(null) :: Nil
)
val e2 = intercept[SparkException] {
nullDf.select(assert_true($"cond", $"n")).collect()
}
assert(e2.getCause.isInstanceOf[RuntimeException])
assert(e2.getCause.getMessage contains "first row")

// assert_true(condition)
val intDf = Seq((0, 1)).toDF("a", "b")
checkAnswer(intDf.select(assert_true($"a" < $"b")), Row(null) :: Nil)
val e3 = intercept[SparkException] {
intDf.select(assert_true($"a" > $"b")).collect()
}
assert(e3.getCause.isInstanceOf[RuntimeException])
assert(e3.getCause.getMessage contains "'('a > 'b)' is not true!")
}

testGluten(
"input_file_name, input_file_block_start and input_file_block_length " +
"should fall back if scan falls back") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.spark.sql

import org.apache.gluten.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.utils.{BackendTestSettings, BackendTestUtils, SystemParameters}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand All @@ -40,7 +39,6 @@ import java.util.Locale
import scala.collection.mutable.ArrayBuffer
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try
import scala.util.control.NonFatal

/**
* End-to-end test cases for SQL queries.
Expand Down Expand Up @@ -763,45 +761,4 @@ class GlutenSQLQueryTestSuite
super.afterAll()
}
}

/**
* This method handles exceptions occurred during query execution as they may need special care to
* become comparable to the expected output.
*
* @param result
* a function that returns a pair of schema and output
*/
override protected def handleExceptions(
result: => (String, Seq[String])): (String, Seq[String]) = {
try {
result
} catch {
case a: AnalysisException =>
// Do not output the logical plan tree which contains expression IDs.
// Also implement a crude way of masking expression IDs in the error message
// with a generic pattern "###".
val msg = if (a.plan.nonEmpty) a.getSimpleMessage else a.getMessage
(emptySchema, Seq(a.getClass.getName, msg.replaceAll("#\\d+", "#x")))
case s: SparkException if s.getCause != null =>
// For a runtime exception, it is hard to match because its message contains
// information of stage, task ID, etc.
// To make result matching simpler, here we match the cause of the exception if it exists.
s.getCause match {
case e: GlutenException =>
val reasonPattern = "Reason: (.*)".r
val reason = reasonPattern.findFirstMatchIn(e.getMessage).map(_.group(1))

reason match {
case Some(r) =>
(emptySchema, Seq(e.getClass.getName, r))
case None => (emptySchema, Seq())
}
case cause =>
(emptySchema, Seq(cause.getClass.getName, cause.getMessage))
}
case NonFatal(e) =>
// If there is an exception, put the exception class followed by the message.
(emptySchema, Seq(e.getClass.getName, e.getMessage))
}
}
}

This file was deleted.

Loading

0 comments on commit 1382f4c

Please sign in to comment.