Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH][UT] Fix UT due to https://github.com/ClickHouse/ClickHouse/pull/64427 #6079

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
"8192")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand All @@ -80,10 +83,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 641
assertResult(462)(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize"))
) // many merged parts
}
}
Expand Down Expand Up @@ -116,23 +119,23 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sparkContext.setJobGroup("test", "test")
spark.sql("optimize lineitem_mergetree_optimize_p")
val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test")
assert(job_ids.size == 1) // will not trigger actual merge job
assertResult(1)(job_ids.length) // will not trigger actual merge job
spark.sparkContext.clearJobGroup()

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730)
assertResult(22730)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test mergetree optimize partitioned by one low card column") {
Expand All @@ -152,33 +155,33 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p2")
val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test2")
if (sparkVersion.equals("3.2")) {
assert(job_ids.size == 7) // WILL trigger actual merge job
assertResult(7)(job_ids.length) // WILL trigger actual merge job
} else {
assert(job_ids.size == 8) // WILL trigger actual merge job
assertResult(8)(job_ids.length) // WILL trigger actual merge job
}

spark.sparkContext.clearJobGroup()

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 348)
assertResult(372)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 236)
assertResult(239)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 238)
assertResult(241)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
// the second VACUUM will remove some empty folders
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 220)
assertResult(220)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226)
assertResult(226)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test mergetree optimize partitioned by two low card column") {
Expand All @@ -197,24 +200,24 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p3")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 448)
assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 294)
assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 296)
assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 272)
assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 278)
assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}
}

Expand All @@ -234,29 +237,29 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p4")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 448)
assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 294)
assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 296)
assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 272)
assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 278)
assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}
}

test("test mergetree optimize with optimize.minFileSize and optimize.maxFileSize") {
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "83800000") {
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") {
// 3 from 37 parts are larger than this, so after optimize there should be 4 parts:
// 3 original parts and 1 merged part
spark.sql(s"""
Expand All @@ -275,20 +278,20 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75)
assertResult(99)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
// this case will create a checkpoint
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 81)
assertResult(105)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

withSQLConf(
("spark.databricks.delta.optimize.maxFileSize" -> "10000000"),
("spark.databricks.delta.optimize.minFileSize" -> "838250")) {
"spark.databricks.delta.optimize.maxFileSize" -> "10000000",
"spark.databricks.delta.optimize.minFileSize" -> "838250") {
// of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255)
// the merged part is ~27MB, so after optimize there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original parts
Expand All @@ -299,14 +302,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75)
assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 85)
assertResult(104)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

// now merge all parts (testing merging from merged parts)
Expand All @@ -315,14 +318,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75)
assertResult(77)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 90)
assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

test("test mergetree optimize table with partition and bucket") {
Expand All @@ -343,24 +346,22 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p6")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 475 else 501
})
assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))(
if (sparkVersion.equals("3.2")) 499 else 528)
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 315 else 327
})
assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))(
if (sparkVersion.equals("3.2")) 315 else 327)

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test skip index after optimize") {
withSQLConf(
"spark.databricks.delta.optimize.maxFileSize" -> "100000000",
"spark.databricks.delta.optimize.maxFileSize" -> "2000000",
"spark.sql.adaptive.enabled" -> "false") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_index;
Expand All @@ -385,12 +386,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
assertResult(2)(ret.apply(0).get(0))
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)
assertResult(1)(marks)

val directory = new File(s"$basePath/lineitem_mergetree_index")
val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head
Expand All @@ -403,7 +404,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
test("test mergetree optimize with the path based table") {
val dataPath = s"$basePath/lineitem_mergetree_optimize_path_based"
clearDataPath(dataPath)
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "83800000") {
withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") {
// 3 from 37 parts are larger than this, so after optimize there should be 4 parts:
// 3 original parts and 1 merged part

Expand All @@ -422,18 +423,18 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 75)
assertResult(99)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 81)
assertResult(105)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

withSQLConf(
("spark.databricks.delta.optimize.maxFileSize" -> "10000000"),
("spark.databricks.delta.optimize.minFileSize" -> "838250")) {
"spark.databricks.delta.optimize.maxFileSize" -> "10000000",
"spark.databricks.delta.optimize.minFileSize" -> "838250") {
// of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255)
// the merged part is ~27MB, so after optimize there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original parts
Expand All @@ -445,13 +446,13 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 75)
assertResult(93)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 85)
assertResult(104)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

// now merge all parts (testing merging from merged parts)
Expand All @@ -461,19 +462,19 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 75)
assertResult(77)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 90)
assertResult(93)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

test("test mergetree insert with optimize basic") {
withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true")
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic;
Expand All @@ -487,10 +488,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)

val ret = spark.sql("select count(*) from lineitem_mergetree_insert_optimize_basic").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
eventually(timeout(60.seconds), interval(3.seconds)) {
assert(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length == 2
assertResult(2)(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length
)
}
}
Expand Down
Loading
Loading