Skip to content

Commit

Permalink
Using assertResult instead of assert, so we can know the actual resul…
Browse files Browse the repository at this point in the history
…t once failed.
  • Loading branch information
baibaichen committed Jun 13, 2024
1 parent e85c53f commit ad4e33e
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 438 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 462
assertResult(462)(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize"))
) // many merged parts
}
}
Expand Down Expand Up @@ -119,23 +119,23 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sparkContext.setJobGroup("test", "test")
spark.sql("optimize lineitem_mergetree_optimize_p")
val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test")
assert(job_ids.size == 1) // will not trigger actual merge job
assertResult(1)(job_ids.length) // will not trigger actual merge job
spark.sparkContext.clearJobGroup()

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730)
assertResult(22730)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test mergetree optimize partitioned by one low card column") {
Expand All @@ -155,33 +155,33 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p2")
val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test2")
if (sparkVersion.equals("3.2")) {
assert(job_ids.size == 7) // WILL trigger actual merge job
assertResult(7)(job_ids.length) // WILL trigger actual merge job
} else {
assert(job_ids.size == 8) // WILL trigger actual merge job
assertResult(8)(job_ids.length) // WILL trigger actual merge job
}

spark.sparkContext.clearJobGroup()

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 372)
assertResult(372)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 239)
assertResult(239)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 241)
assertResult(241)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
// the second VACUUM will remove some empty folders
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 220)
assertResult(220)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226)
assertResult(226)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test mergetree optimize partitioned by two low card column") {
Expand All @@ -200,24 +200,24 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p3")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 516)
assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 306)
assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 308)
assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 276)
assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282)
assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}
}

Expand All @@ -237,24 +237,24 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p4")
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 516)
assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 306)
assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 308)
assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 276)
assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282)
assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}
}

Expand All @@ -278,20 +278,20 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 99)
assertResult(99)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
// this case will create a checkpoint
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105)
assertResult(105)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

withSQLConf(
("spark.databricks.delta.optimize.maxFileSize" -> "10000000"),
("spark.databricks.delta.optimize.minFileSize" -> "838250")) {
"spark.databricks.delta.optimize.maxFileSize" -> "10000000",
"spark.databricks.delta.optimize.minFileSize" -> "838250") {
// of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255)
// the merged part is ~27MB, so after optimize there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original parts
Expand All @@ -302,14 +302,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 93)
assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104)
assertResult(104)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

// now merge all parts (testing merging from merged parts)
Expand All @@ -318,14 +318,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 77)
assertResult(77)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 93)
assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

test("test mergetree optimize table with partition and bucket") {
Expand All @@ -346,19 +346,17 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p6")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))

assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 499 else 528
})
assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))(
if (sparkVersion.equals("3.2")) 499 else 528)
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 315 else 327
})
assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))(
if (sparkVersion.equals("3.2")) 315 else 327)

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
assertResult(600572)(ret2.apply(0).get(0))
}

test("test skip index after optimize") {
Expand Down Expand Up @@ -388,12 +386,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
assertResult(2)(ret.apply(0).get(0))
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)
assertResult(1)(marks)

val directory = new File(s"$basePath/lineitem_mergetree_index")
val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head
Expand Down Expand Up @@ -425,18 +423,18 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 99)
assertResult(99)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 105)
assertResult(105)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

withSQLConf(
("spark.databricks.delta.optimize.maxFileSize" -> "10000000"),
("spark.databricks.delta.optimize.minFileSize" -> "838250")) {
"spark.databricks.delta.optimize.maxFileSize" -> "10000000",
"spark.databricks.delta.optimize.minFileSize" -> "838250") {
// of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255)
// the merged part is ~27MB, so after optimize there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original parts
Expand All @@ -448,13 +446,13 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 93)
assertResult(93)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 104)
assertResult(104)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

// now merge all parts (testing merging from merged parts)
Expand All @@ -464,19 +462,19 @@ class GlutenClickHouseMergeTreeOptimizeSuite
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 77)
assertResult(77)(countFiles(new File(dataPath)))
} else {
assert(countFiles(new File(dataPath)) == 93)
assertResult(93)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
}

test("test mergetree insert with optimize basic") {
withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true")
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic;
Expand All @@ -490,10 +488,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)

val ret = spark.sql("select count(*) from lineitem_mergetree_insert_optimize_basic").collect()
assert(ret.apply(0).get(0) == 600572)
assertResult(600572)(ret.apply(0).get(0))
eventually(timeout(60.seconds), interval(3.seconds)) {
assert(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length == 2
assertResult(2)(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length
)
}
}
Expand Down
Loading

0 comments on commit ad4e33e

Please sign in to comment.