Skip to content

Commit

Permalink
[VL] Rewritten GlutenInsertSuite test cases with default values (#4737)
Browse files Browse the repository at this point in the history
[VL] Rewritten GlutenInsertSuite test cases with default values.
  • Loading branch information
Surbhi-Vijay authored Feb 23, 2024
1 parent f295887 commit b823592
Show file tree
Hide file tree
Showing 2 changed files with 318 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,10 @@ class VeloxTestSettings extends BackendTestSettings {
// the native write staing dir is differnt with vanilla Spark for coustom partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
// Rewrite: Additional support for file scan with default values has been added in Spark-3.4.
// It appends the default value in record if it is not present while scanning.
// Velox supports default values for new records but it does not backfill the
// existing records and provides null for the existing ones.
.exclude("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them")
.exclude("SPARK-39557 INSERT INTO statements with tables with array defaults")
.exclude("SPARK-39557 INSERT INTO statements with tables with struct defaults")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxC
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.QueryExecutionListener

import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
Expand Down Expand Up @@ -259,6 +260,319 @@ class GlutenInsertSuite
checkAndGetWriteFiles(df)
}
}

testGluten("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") {
import testImplicits._
case class Config(sqlConf: Option[(String, String)], useDataFrames: Boolean = false)
def runTest(dataSource: String, config: Config): Unit = {
def insertIntoT(): Unit = {
sql("insert into t(a, i) values('xyz', 42)")
}
def withTableT(f: => Unit): Unit = {
sql(s"create table t(a string, i int) using $dataSource")
withTable("t")(f)
}
// Positive tests:
// Adding a column with a valid default value into a table containing existing data
// returns null while it works successfully for newly added rows in Velox.
withTableT {
sql("alter table t add column (s string default concat('abc', 'def'))")
insertIntoT()
checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef"))
checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
// Now alter the column to change the default value.
// This still returns the previous value, not the new value.
sql("alter table t alter column s set default concat('ghi', 'jkl')")
checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
}
// Adding a column with a default value and then inserting explicit NULL values works.
// Querying data back from the table differentiates between the explicit NULL values and
// default values.
withTableT {
sql("alter table t add column (s string default concat('abc', 'def'))")
insertIntoT()
if (config.useDataFrames) {
Seq((null, null, null)).toDF.write.insertInto("t")
} else {
sql("insert into t values(null, null, null)")
}

checkAnswer(spark.table("t"), Seq(Row("xyz", 42, "abcdef"), Row(null, null, null)))
checkAnswer(sql("select i, s from t"), Seq(Row(42, "abcdef"), Row(null, null)))
}
// Adding two columns where only the first has a valid default value works successfully.
// Querying data from the altered table returns the default value as well as NULL for the
// second column.+
withTableT {
sql("alter table t add column (s string default concat('abc', 'def'))")
insertIntoT()
sql("alter table t add column (x string)")
checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef", null))
checkAnswer(sql("select i, s, x from t"), Row(42, "abcdef", null))
}
// Test other supported data types.
withTableT {
sql(
"alter table t add columns (" +
"s boolean default true, " +
"t byte default cast(null as byte), " +
"u short default cast(42 as short), " +
"v float default 0, " +
"w double default 0, " +
"x date default cast('2021-01-02' as date), " +
"y timestamp default cast('2021-01-02 01:01:01' as timestamp), " +
"z timestamp_ntz default cast('2021-01-02 01:01:01' as timestamp_ntz), " +
"a1 timestamp_ltz default cast('2021-01-02 01:01:01' as timestamp_ltz), " +
"a2 decimal(5, 2) default 123.45," +
"a3 bigint default 43," +
"a4 smallint default cast(5 as smallint)," +
"a5 tinyint default cast(6 as tinyint))")
insertIntoT()
// Manually inspect the result row values rather than using the 'checkAnswer' helper method
// in order to ensure the values' correctness while avoiding minor type incompatibilities.
val result: Array[Row] =
sql("select s, t, u, v, w, x, y, z, a1, a2, a3, a4, a5 from t").collect()
for (row <- result) {
assert(row.length == 13)
assert(row(0) == true)
assert(row(1) == null)
assert(row(2) == 42)
assert(row(3) == 0.0f)
assert(row(4) == 0.0d)
assert(row(5).toString == "2021-01-02")
assert(row(6).toString == "2021-01-02 01:01:01.0")
assert(row(7).toString.startsWith("2021-01-02"))
assert(row(8).toString == "2021-01-02 01:01:01.0")
assert(row(9).toString == "123.45")
assert(row(10) == 43L)
assert(row(11) == 5)
assert(row(12) == 6)
}
}
}

// This represents one test configuration over a data source.
case class TestCase(dataSource: String, configs: Seq[Config])
// Run the test several times using each configuration.
Seq(
TestCase(
dataSource = "csv",
Seq(Config(None), Config(Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))),
TestCase(
dataSource = "json",
Seq(Config(None), Config(Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false")))),
TestCase(
dataSource = "orc",
Seq(Config(None), Config(Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")))),
TestCase(
dataSource = "parquet",
Seq(Config(None), Config(Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"))))
).foreach {
testCase: TestCase =>
testCase.configs.foreach {
config: Config =>
// Run the test twice, once using SQL for the INSERT operations
// and again using DataFrames.
for (useDataFrames <- Seq(false, true)) {
config.sqlConf
.map {
kv: (String, String) =>
withSQLConf(kv) {
// Run the test with the pair of custom SQLConf values.
runTest(testCase.dataSource, config.copy(useDataFrames = useDataFrames))
}
}
.getOrElse {
// Run the test with default settings.
runTest(testCase.dataSource, config.copy(useDataFrames = useDataFrames))
}
}
}
}
}

testGluten("SPARK-39557 INSERT INTO statements with tables with array defaults") {
import testImplicits._
// Positive tests: array types are supported as default values.
case class Config(dataSource: String, useDataFrames: Boolean = false)
Seq(
Config("parquet"),
Config("parquet", useDataFrames = true),
Config("orc"),
Config("orc", useDataFrames = true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
sql("alter table t add column s array<int> default array(1, 2)")
checkAnswer(spark.table("t"), Row(false, null))
sql("insert into t(i) values (true)")
checkAnswer(spark.table("t"), Seq(Row(false, null), Row(true, Seq(1, 2))))
}
}
// Negative tests: provided array element types must match their corresponding DEFAULT
// declarations, if applicable.
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column s " +
"has a DEFAULT value with type"
Seq(Config("parquet"), Config("parquet", true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
assert(intercept[AnalysisException] {
sql("alter table t add column s array<int> default array('abc', 'def')")
}.getMessage.contains(incompatibleDefault))
}
}
}

testGluten("SPARK-39557 INSERT INTO statements with tables with struct defaults") {
import testImplicits._
// Positive tests: struct types are supported as default values.
case class Config(dataSource: String, useDataFrames: Boolean = false)
Seq(
Config("parquet"),
Config("parquet", useDataFrames = true),
Config("orc"),
Config("orc", useDataFrames = true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
sql("alter table t add column s struct<x boolean, y string> default struct(true, 'abc')")
checkAnswer(spark.table("t"), Row(false, null))
sql("insert into t(i) values (true)")
checkAnswer(spark.table("t"), Seq(Row(false, null), Row(true, Row(true, "abc"))))
}
}

// Negative tests: provided map element types must match their corresponding DEFAULT
// declarations, if applicable.
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column s " +
"has a DEFAULT value with type"
Seq(Config("parquet"), Config("parquet", true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
assert(intercept[AnalysisException] {
sql("alter table t add column s struct<x boolean, y string> default struct(42, 56)")
}.getMessage.contains(incompatibleDefault))
}
}
}

testGluten("SPARK-39557 INSERT INTO statements with tables with map defaults") {
import testImplicits._
// Positive tests: map types are supported as default values.
case class Config(dataSource: String, useDataFrames: Boolean = false)
Seq(
Config("parquet"),
Config("parquet", useDataFrames = true),
Config("orc"),
Config("orc", useDataFrames = true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
sql("alter table t add column s map<boolean, string> default map(true, 'abc')")
checkAnswer(spark.table("t"), Row(false, null))
sql("insert into t(i) select true")
checkAnswer(spark.table("t"), Seq(Row(false, null), Row(true, Map(true -> "abc"))))
}
withTable("t") {
sql(s"""
create table t(
i int,
s struct<
x array<
struct<a int, b int>>,
y array<
map<boolean, string>>>
default struct(
array(
struct(1, 2)),
array(
map(false, 'def', true, 'jkl'))))
using ${config.dataSource}""")
sql("insert into t select 1, default")
sql("alter table t alter column s drop default")
if (config.useDataFrames) {
Seq((2, null)).toDF.write.insertInto("t")
} else {
sql("insert into t select 2, default")
}
sql("""
alter table t alter column s
set default struct(
array(
struct(3, 4)),
array(
map(false, 'mno', true, 'pqr')))""")
sql("insert into t select 3, default")
sql("""
alter table t
add column t array<
map<boolean, string>>
default array(
map(true, 'xyz'))""")
sql("insert into t(i, s) select 4, default")
checkAnswer(
spark.table("t"),
Seq(
Row(1, Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl"))), null),
Row(2, null, null),
Row(3, Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))), null),
Row(
4,
Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),
Seq(Map(true -> "xyz")))
)
)
}
}
// Negative tests: provided map element types must match their corresponding DEFAULT
// declarations, if applicable.
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column s " +
"has a DEFAULT value with type"
Seq(Config("parquet"), Config("parquet", true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
assert(intercept[AnalysisException] {
sql("alter table t add column s map<boolean, string> default map(42, 56)")
}.getMessage.contains(incompatibleDefault))
}
}
}
}

class GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem {
Expand Down

0 comments on commit b823592

Please sign in to comment.