Skip to content

Commit

Permalink
[GLUTEN-4560][SUB-2] Refactor Gluten Spark33 Unit Test (#4648)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng authored Feb 5, 2024
1 parent 43ff351 commit 248cbf8
Show file tree
Hide file tree
Showing 53 changed files with 353 additions and 409 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSortOrderExpressionsSuite]
enableSuite[GlutenStringExpressionsSuite]
enableSuite[VeloxAdaptiveQueryExecSuite]
.includeAllGlutenTests()
.includeByPrefix(
"gluten",
"SPARK-29906",
"SPARK-30291",
"SPARK-30403",
Expand Down Expand Up @@ -315,9 +315,9 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("appending insert")
.exclude("overwrite insert")
.exclude("SPARK-34897: Support reconcile schemas based on index after nested column pruning")
.exclude("Gluten - SPARK-31238: compatibility with Spark 2.4 in reading dates")
.exclude("Gluten - SPARK-31238, SPARK-31423: rebasing dates in write")
.exclude("Gluten - SPARK-34862: Support ORC vectorized reader for nested column")
.excludeGlutenTest("SPARK-31238: compatibility with Spark 2.4 in reading dates")
.excludeGlutenTest("SPARK-31238, SPARK-31423: rebasing dates in write")
.excludeGlutenTest("SPARK-34862: Support ORC vectorized reader for nested column")
// exclude as struct not supported
.exclude("SPARK-36663: OrcUtils.toCatalystSchema should correctly handle a column name which consists of only numbers")
.exclude("SPARK-37812: Reuse result row when deserializing a struct")
Expand Down Expand Up @@ -815,8 +815,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("change column type from short to int/long")
.exclude("change column type from int to long")
.exclude("change column type from float to double")
.exclude("Gluten - read byte, int, short, long together")
.exclude("Gluten - read float and double together")
.excludeGlutenTest("read byte, int, short, long together")
.excludeGlutenTest("read float and double together")
enableSuite[GlutenMergedOrcReadSchemaSuite]
.exclude("append column into middle")
.exclude("add a nested column at the end of the leaf struct column")
Expand Down Expand Up @@ -988,15 +988,15 @@ class VeloxTestSettings extends BackendTestSettings {
"NaN is greater than all other non-NaN numeric values",
// Rewrite this test because the describe functions creates unmatched plan.
"describe",
// The describe issue is just fixed by https://github.com/apache/spark/pull/40914.
// We can enable the below test for spark 3.4 and higher versions.
"Gluten - describe",
// decimal failed ut.
"SPARK-22271: mean overflows and returns null for some decimal variables",
// Result depends on the implementation for nondeterministic expression rand.
// Not really an issue.
"SPARK-9083: sort with non-deterministic expressions"
)
// The describe issue is just fixed by https://github.com/apache/spark/pull/40914.
// We can enable the below test for spark 3.4 and higher versions.
.excludeGlutenTest("describe")
enableSuite[GlutenDataFrameTimeWindowingSuite]
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFramesSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GlutenBloomFilterAggregateQuerySuite
with AdaptiveSparkPlanHelper {
import testImplicits._

test("Test bloom_filter_agg with big RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS") {
testGluten("Test bloom_filter_agg with big RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS") {
val table = "bloom_filter_test"
withSQLConf(
SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key -> "5000000",
Expand All @@ -55,7 +55,7 @@ class GlutenBloomFilterAggregateQuerySuite
}
}

test("Test that might_contain on bloom_filter_agg with empty input") {
testGluten("Test that might_contain on bloom_filter_agg with empty input") {
checkAnswer(
spark.sql("""SELECT might_contain((select bloom_filter_agg(cast(id as long))
| from range(1, 1)), cast(123 as long))""".stripMargin),
Expand All @@ -68,7 +68,7 @@ class GlutenBloomFilterAggregateQuerySuite
Row(null))
}

test("Test bloom_filter_agg fallback") {
testGluten("Test bloom_filter_agg fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class GlutenCachedTableSuite
super.sparkConf.set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
}

test("GLUTEN - InMemoryRelation statistics") {
testGluten("InMemoryRelation statistics") {
sql("CACHE TABLE testData")
spark.table("testData").queryExecution.withCachedData.collect {
case cached: InMemoryRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ class GlutenComplexTypesSuite extends ComplexTypesSuite with GlutenSQLTestsTrait
}
}

test(
GlutenTestConstants.GLUTEN_TEST +
"types bool/byte/short/float/double/decimal/binary/map/array/struct") {
testGluten("types bool/byte/short/float/double/decimal/binary/map/array/struct") {
val df = spark
.table("tab_types")
.selectExpr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS

// blackTestNameList is defined in ClickHouseNotSupport

test(GlutenTestConstants.GLUTEN_TEST + "count") {
testGluten("count") {
// agg with no input col
assert(testData2.count() === testData2.rdd.map(_ => 1).count())

Expand All @@ -44,7 +44,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
Row(6, 6.0))
}

test(GlutenTestConstants.GLUTEN_TEST + "null count") {
testGluten("null count") {
checkAnswer(testData3.groupBy($"a").agg(count($"b")), Seq(Row(1, 0), Row(2, 1)))

checkAnswer(testData3.groupBy($"a").agg(count($"a" + $"b")), Seq(Row(1, 0), Row(2, 1)))
Expand All @@ -61,7 +61,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
// )
}

test(GlutenTestConstants.GLUTEN_TEST + "groupBy") {
testGluten("groupBy") {
checkAnswer(testData2.groupBy("a").agg(sum($"b")), Seq(Row(1, 3), Row(2, 3), Row(3, 3)))
checkAnswer(testData2.groupBy("a").agg(sum($"b").as("totB")).agg(sum($"totB")), Row(9))
checkAnswer(testData2.groupBy("a").agg(count("*")), Row(1, 2) :: Row(2, 2) :: Row(3, 2) :: Nil)
Expand Down Expand Up @@ -103,7 +103,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
// )
}

test(GlutenTestConstants.GLUTEN_TEST + "average") {
testGluten("average") {

checkAnswer(testData2.agg(avg($"a"), mean($"a")), Row(2.0, 2.0))

Expand All @@ -130,7 +130,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
// Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil)
}

ignore("gluten SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
ignore(GLUTEN_TEST + "SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
withTempView("view") {
Seq(
("mithunr", Float.NaN),
Expand All @@ -144,14 +144,14 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
}
}

test(GlutenTestConstants.GLUTEN_TEST + "variance") {
testGluten("variance") {
checkAnswer(
testData2.agg(var_samp($"a"), var_pop($"a"), variance($"a")),
Row(0.8, 2.0 / 3.0, 0.8))
checkAnswer(testData2.agg(var_samp("a"), var_pop("a"), variance("a")), Row(0.8, 2.0 / 3.0, 0.8))
}

test("aggregation with filter") {
testGluten("aggregation with filter") {
Seq(
("mithunr", 12.3f, 5.0f, true, 9.4f),
("mithunr", 15.5f, 4.0f, false, 19.9f),
Expand All @@ -175,7 +175,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
checkAnswer(df, Row(2) :: Nil)
}

test(GlutenTestConstants.GLUTEN_TEST + "extend with cast expression") {
testGluten("extend with cast expression") {
checkAnswer(
decimalData.agg(
sum($"a".cast("double")),
Expand All @@ -186,9 +186,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
}

// This test is applicable to velox backend. For CH backend, the replacement is disabled.
test(
GlutenTestConstants.GLUTEN_TEST
+ "use gluten hash agg to replace vanilla spark sort agg") {
testGluten("use gluten hash agg to replace vanilla spark sort agg") {

withSQLConf(("spark.gluten.sql.columnar.force.hashagg", "false")) {
Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
Expand All @@ -209,7 +207,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
}
}

test("mixed supported and unsupported aggregate functions") {
testGluten("mixed supported and unsupported aggregate functions") {
withUserDefinedFunction(("udaf_sum", true)) {
spark.udf.register(
"udaf_sum",
Expand Down Expand Up @@ -275,8 +273,8 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
}
}

test(
GLUTEN_TEST + "SPARK-19471: AggregationIterator does not initialize the generated" +
testGluten(
"SPARK-19471: AggregationIterator does not initialize the generated" +
" result projection before using it") {
Seq(
monotonically_increasing_id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class GlutenDataFramePivotSuite extends DataFramePivotSuite with GlutenSQLTestsT
// This test is ported from vanilla spark with pos value (1-based) changed from 0 to 1 for
// substring. In vanilla spark, pos=0 has same effectiveness as pos=1. But in velox, pos=0
// will return an empty string as substring result.
test("pivot with column definition in groupby - using pos=1") {
testGluten("pivot with column definition in groupby - using pos=1") {
val df = courseSales
.groupBy(substring(col("course"), 1, 1).as("foo"))
.pivot("year", Seq(2012, 2013))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql
import io.glutenproject.execution.{ProjectExecTransformer, WholeStageTransformer}

import org.apache.spark.SparkException
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression}
import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
Expand All @@ -35,7 +34,7 @@ import scala.util.Random

class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait {

test(GlutenTestConstants.GLUTEN_TEST + "repartitionByRange") {
testGluten("repartitionByRange") {
val partitionNum = 10
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
Expand Down Expand Up @@ -94,7 +93,7 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait {
}
}

test(GlutenTestConstants.GLUTEN_TEST + "distributeBy and localSort") {
testGluten("distributeBy and localSort") {
import testImplicits._
val data = spark.sparkContext.parallelize((1 to 100).map(i => TestData2(i % 10, i))).toDF()

Expand Down Expand Up @@ -210,7 +209,7 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait {
}
}

test(GLUTEN_TEST + "reuse exchange") {
testGluten("reuse exchange") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") {
val df = spark.range(100).toDF()
val join = df.join(df, "id")
Expand All @@ -237,7 +236,7 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait {
}

/** Failed to check WholeStageCodegenExec, so we rewrite the UT. */
test(GLUTEN_TEST + "SPARK-22520: support code generation for large CaseWhen") {
testGluten("SPARK-22520: support code generation for large CaseWhen") {
import org.apache.spark.sql.catalyst.dsl.expressions.StringToAttributeConversionHelper
val N = 30
var expr1 = when(equalizer($"id", lit(0)), 0)
Expand All @@ -261,7 +260,7 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait {
("David", 60, 192),
("Amy", 24, 180)).toDF("name", "age", "height")

test(GLUTEN_TEST + "describe") {
testGluten("describe") {
val describeResult = Seq(
Row("count", "4", "4", "4"),
Row("mean", null, "33.0", "178.0"),
Expand Down Expand Up @@ -323,9 +322,7 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait {
}
}

test(
GLUTEN_TEST +
"Allow leading/trailing whitespace in string before casting") {
testGluten("Allow leading/trailing whitespace in string before casting") {
def checkResult(df: DataFrame, expectedResult: Seq[Row]): Unit = {
checkAnswer(df, expectedResult)
assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.types._

class GlutenDataFrameTungstenSuite extends DataFrameTungstenSuite with GlutenSQLTestsTrait {

test("Map type with struct type as key") {
testGluten("Map type with struct type as key") {
val kv = Map(Row(1, 2L) -> Seq("v"))
val data = sparkContext.parallelize(Seq(Row(1, kv)))
val schema = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.sql

import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -27,9 +26,7 @@ class GlutenDataFrameWindowFunctionsSuite

import testImplicits._

test(
GLUTEN_TEST +
"covar_samp, var_samp (variance), stddev_samp (stddev) functions in specific window") {
testGluten("covar_samp, var_samp (variance), stddev_samp (stddev) functions in specific window") {
withSQLConf(SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true") {
val df = Seq(
("a", "p1", 10.0, 20.0),
Expand Down Expand Up @@ -84,7 +81,7 @@ class GlutenDataFrameWindowFunctionsSuite
}
}

test(GLUTEN_TEST + "corr, covar_pop, stddev_pop functions in specific window") {
testGluten("corr, covar_pop, stddev_pop functions in specific window") {
withSQLConf(SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true") {
val df = Seq(
("a", "p1", 10.0, 20.0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
class GlutenDatasetSuite extends DatasetSuite with GlutenSQLTestsTrait {
import testImplicits._

test("Gluten: dropDuplicates: columns with same column name") {
testGluten("dropDuplicates: columns with same column name") {
val ds1 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
val ds2 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
// The dataset joined has two columns of the same name "_2".
Expand All @@ -30,7 +30,7 @@ class GlutenDatasetSuite extends DatasetSuite with GlutenSQLTestsTrait {
checkDatasetUnorderly(joined.dropDuplicates(), (1, 2), (1, 1), (2, 1), (2, 2))
}

test("Gluten: groupBy.as") {
testGluten("groupBy.as") {
val df1 = Seq(DoubleData(1, "one"), DoubleData(2, "two"), DoubleData(3, "three"))
.toDS()
.repartition($"id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.sql

import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -30,7 +29,7 @@ class GlutenDateFunctionsSuite extends DateFunctionsSuite with GlutenSQLTestsTra

private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis)

test(GLUTEN_TEST + "unix_timestamp") {
testGluten("unix_timestamp") {
Seq("corrected", "legacy").foreach {
legacyParserPolicy =>
withSQLConf(
Expand Down Expand Up @@ -143,7 +142,7 @@ class GlutenDateFunctionsSuite extends DateFunctionsSuite with GlutenSQLTestsTra
}
}

test(GLUTEN_TEST + "to_unix_timestamp") {
testGluten("to_unix_timestamp") {
Seq("corrected", "legacy").foreach {
legacyParserPolicy =>
withSQLConf(
Expand Down Expand Up @@ -207,7 +206,7 @@ class GlutenDateFunctionsSuite extends DateFunctionsSuite with GlutenSQLTestsTra
}

// Ported from spark with a test case for legacy mode removed.
test(GLUTEN_TEST + "to_timestamp") {
testGluten("to_timestamp") {
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
Expand Down
Loading

0 comments on commit 248cbf8

Please sign in to comment.