Skip to content

Commit

Permalink
[CH][UT] Fix UT due to ClickHouse/ClickHouse#64427 (#6079)
Browse files Browse the repository at this point in the history
Don't know the reason, but this PR changed the 'input_format_parquet_max_block_size' to 'DEFAULT_block_size', which is 65409, causing the MergeTree related tests to fail.  Let's set input_format_parquet_max_block_size to 8192, which reverts 7445f02

Also I use assertResult instead of assert, so we can know the actual result once failed.
  • Loading branch information
baibaichen authored Jun 13, 2024
1 parent 00fee1d commit 142cf0f
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
"8192")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand All @@ -80,10 +83,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 641
assertResult(462)(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize"))
) // many merged parts
}
}
Expand Down Expand Up @@ -116,23 +119,23 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sparkContext.setJobGroup("test", "test")
spark.sql("optimize lineitem_mergetree_optimize_p")
val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test")
assert(job_ids.size == 1) // will not trigger actual merge job
assertResult(1)(job_ids.length) // will not trigger actual merge job
spark.sparkContext.clearJobGroup()

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730)
assertResult(22730)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test mergetree optimize partitioned by one low card column") {
Expand All @@ -152,33 +155,33 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p2")
val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test2")
if (sparkVersion.equals("3.2")) {
assert(job_ids.size == 7) // WILL trigger actual merge job
assertResult(7)(job_ids.length) // WILL trigger actual merge job
} else {
assert(job_ids.size == 8) // WILL trigger actual merge job
assertResult(8)(job_ids.length) // WILL trigger actual merge job
}

spark.sparkContext.clearJobGroup()

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 348)
assertResult(372)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 236)
assertResult(239)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 238)
assertResult(241)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
// the second VACUUM will remove some empty folders
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 220)
assertResult(220)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226)
assertResult(226)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test mergetree optimize partitioned by two low card column") {
Expand All @@ -197,24 +200,24 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p3")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 448)
assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 294)
assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 296)
assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 272)
assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 278)
assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}
}

Expand All @@ -234,29 +237,29 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p4")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 448)
assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 294)
assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 296)
assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 272)
assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 278)
assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}
}

test("test mergetree optimize with optimize.minFileSize and optimize.maxFileSize") {
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "83800000") {
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") {
// 3 from 37 parts are larger than this, so after optimize there should be 4 parts:
// 3 original parts and 1 merged part
spark.sql(s"""
Expand All @@ -275,20 +278,20 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75)
assertResult(99)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
// this case will create a checkpoint
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 81)
assertResult(105)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

withSQLConf(
("spark.databricks.delta.optimize.maxFileSize" -> "10000000"),
("spark.databricks.delta.optimize.minFileSize" -> "838250")) {
"spark.databricks.delta.optimize.maxFileSize" -> "10000000",
"spark.databricks.delta.optimize.minFileSize" -> "838250") {
// of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255)
// the merged part is ~27MB, so after optimize there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original parts
Expand All @@ -299,14 +302,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75)
assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 85)
assertResult(104)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

// now merge all parts (testing merging from merged parts)
Expand All @@ -315,14 +318,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75)
assertResult(77)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 90)
assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

test("test mergetree optimize table with partition and bucket") {
Expand All @@ -343,24 +346,22 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p6")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 475 else 501
})
assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))(
if (sparkVersion.equals("3.2")) 499 else 528)
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 315 else 327
})
assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))(
if (sparkVersion.equals("3.2")) 315 else 327)

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test skip index after optimize") {
withSQLConf(
"spark.databricks.delta.optimize.maxFileSize" -> "100000000",
"spark.databricks.delta.optimize.maxFileSize" -> "2000000",
"spark.sql.adaptive.enabled" -> "false") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_index;
Expand All @@ -385,12 +386,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
assertResult(2)(ret.apply(0).get(0))
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)
assertResult(1)(marks)

val directory = new File(s"$basePath/lineitem_mergetree_index")
val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head
Expand All @@ -403,7 +404,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
test("test mergetree optimize with the path based table") {
val dataPath = s"$basePath/lineitem_mergetree_optimize_path_based"
clearDataPath(dataPath)
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "83800000") {
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") {
// 3 from 37 parts are larger than this, so after optimize there should be 4 parts:
// 3 original parts and 1 merged part

Expand All @@ -422,18 +423,18 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 75)
assertResult(99)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 81)
assertResult(105)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

withSQLConf(
("spark.databricks.delta.optimize.maxFileSize" -> "10000000"),
("spark.databricks.delta.optimize.minFileSize" -> "838250")) {
"spark.databricks.delta.optimize.maxFileSize" -> "10000000",
"spark.databricks.delta.optimize.minFileSize" -> "838250") {
// of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255)
// the merged part is ~27MB, so after optimize there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original parts
Expand All @@ -445,13 +446,13 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 75)
assertResult(93)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 85)
assertResult(104)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

// now merge all parts (testing merging from merged parts)
Expand All @@ -461,19 +462,19 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 75)
assertResult(77)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 90)
assertResult(93)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

test("test mergetree insert with optimize basic") {
withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true")
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic;
Expand All @@ -487,10 +488,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)

val ret = spark.sql("select count(*) from lineitem_mergetree_insert_optimize_basic").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
eventually(timeout(60.seconds), interval(3.seconds)) {
assert(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length == 2
assertResult(2)(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length
)
}
}
Expand Down
Loading

0 comments on commit 142cf0f

Please sign in to comment.