diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala index 6f8b4d93beb6..e76d3ca55d68 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -59,6 +59,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", + "8192") } override protected def createTPCHNotNullTables(): Unit = { @@ -80,10 +83,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) - assert( - countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 641 + assertResult(462)( + countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) ) // many merged parts } } @@ -116,23 +119,23 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sparkContext.setJobGroup("test", "test") spark.sql("optimize lineitem_mergetree_optimize_p") val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test") - assert(job_ids.size == 1) // will not trigger actual merge job + assertResult(1)(job_ids.length) // will not trigger actual merge job spark.sparkContext.clearJobGroup() val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728) + assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p"))) spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728) + assertResult(22728)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p"))) } else { // For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir. - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730) + assertResult(22730)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p"))) } val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect() - assert(ret2.apply(0).get(0) == 600572) + assertResult(600572)(ret2.apply(0).get(0)) } test("test mergetree optimize partitioned by one low card column") { @@ -152,33 +155,33 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize_p2") val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test2") if (sparkVersion.equals("3.2")) { - assert(job_ids.size == 7) // WILL trigger actual merge job + assertResult(7)(job_ids.length) // WILL trigger actual merge job } else { - assert(job_ids.size == 8) // WILL trigger actual merge job + assertResult(8)(job_ids.length) // WILL trigger actual merge job } spark.sparkContext.clearJobGroup() val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 348) + assertResult(372)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))) spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 236) + assertResult(239)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))) } else { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 238) + assertResult(241)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))) } spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") // the second VACUUM will remove some empty folders if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 220) + assertResult(220)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))) } else { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226) + assertResult(226)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))) } val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect() - assert(ret2.apply(0).get(0) == 600572) + assertResult(600572)(ret2.apply(0).get(0)) } test("test mergetree optimize partitioned by two low card column") { @@ -197,24 +200,24 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize_p3") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 448) + assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))) spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 294) + assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))) } else { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 296) + assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))) } spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 272) + assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))) } else { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 278) + assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))) } val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect() - assert(ret2.apply(0).get(0) == 600572) + assertResult(600572)(ret2.apply(0).get(0)) } } @@ -234,29 +237,29 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize_p4") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 448) + assertResult(516)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))) spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 294) + assertResult(306)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))) } else { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 296) + assertResult(308)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))) } spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 272) + assertResult(276)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))) } else { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 278) + assertResult(282)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))) } val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect() - assert(ret2.apply(0).get(0) == 600572) + assertResult(600572)(ret2.apply(0).get(0)) } } test("test mergetree optimize with optimize.minFileSize and optimize.maxFileSize") { - withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "83800000") { + withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") { // 3 from 37 parts are larger than this, so after optimize there should be 4 parts: // 3 original parts and 1 merged part spark.sql(s""" @@ -275,20 +278,20 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75) + assertResult(99)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))) } else { // For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir. // this case will create a checkpoint - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 81) + assertResult(105)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))) } val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) } withSQLConf( - ("spark.databricks.delta.optimize.maxFileSize" -> "10000000"), - ("spark.databricks.delta.optimize.minFileSize" -> "838250")) { + "spark.databricks.delta.optimize.maxFileSize" -> "10000000", + "spark.databricks.delta.optimize.minFileSize" -> "838250") { // of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255) // the merged part is ~27MB, so after optimize there should be 3 parts: // 1 merged part from 2 original parts, 1 merged part from 34 original parts @@ -299,14 +302,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75) + assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))) } else { // For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir. - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 85) + assertResult(104)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))) } val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) } // now merge all parts (testing merging from merged parts) @@ -315,14 +318,14 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 75) + assertResult(77)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))) } else { // For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir. - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 90) + assertResult(93)(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))) } val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) } test("test mergetree optimize table with partition and bucket") { @@ -343,24 +346,22 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize_p6") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == { - if (sparkVersion.equals("3.2")) 475 else 501 - }) + assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))( + if (sparkVersion.equals("3.2")) 499 else 528) spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == { - if (sparkVersion.equals("3.2")) 315 else 327 - }) + assertResult(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")))( + if (sparkVersion.equals("3.2")) 315 else 327) val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect() - assert(ret2.apply(0).get(0) == 600572) + assertResult(600572)(ret2.apply(0).get(0)) } test("test skip index after optimize") { withSQLConf( - "spark.databricks.delta.optimize.maxFileSize" -> "100000000", + "spark.databricks.delta.optimize.maxFileSize" -> "2000000", "spark.sql.adaptive.enabled" -> "false") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_index; @@ -385,12 +386,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + assertResult(1)(scanExec.size) + val mergetreeScan = scanExec.head val ret = df.collect() - assert(ret.apply(0).get(0) == 2) + assertResult(2)(ret.apply(0).get(0)) val marks = mergetreeScan.metrics("selectedMarks").value - assert(marks == 1) + assertResult(1)(marks) val directory = new File(s"$basePath/lineitem_mergetree_index") val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head @@ -403,7 +404,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite test("test mergetree optimize with the path based table") { val dataPath = s"$basePath/lineitem_mergetree_optimize_path_based" clearDataPath(dataPath) - withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "83800000") { + withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") { // 3 from 37 parts are larger than this, so after optimize there should be 4 parts: // 3 original parts and 1 merged part @@ -422,18 +423,18 @@ class GlutenClickHouseMergeTreeOptimizeSuite clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(dataPath)) == 75) + assertResult(99)(countFiles(new File(dataPath))) } else { - assert(countFiles(new File(dataPath)) == 81) + assertResult(105)(countFiles(new File(dataPath))) } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) } withSQLConf( - ("spark.databricks.delta.optimize.maxFileSize" -> "10000000"), - ("spark.databricks.delta.optimize.minFileSize" -> "838250")) { + "spark.databricks.delta.optimize.maxFileSize" -> "10000000", + "spark.databricks.delta.optimize.minFileSize" -> "838250") { // of the remaing 3 original parts, 2 are less than 838250, 1 is larger (size 838255) // the merged part is ~27MB, so after optimize there should be 3 parts: // 1 merged part from 2 original parts, 1 merged part from 34 original parts @@ -445,13 +446,13 @@ class GlutenClickHouseMergeTreeOptimizeSuite clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(dataPath)) == 75) + assertResult(93)(countFiles(new File(dataPath))) } else { - assert(countFiles(new File(dataPath)) == 85) + assertResult(104)(countFiles(new File(dataPath))) } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) } // now merge all parts (testing merging from merged parts) @@ -461,19 +462,19 @@ class GlutenClickHouseMergeTreeOptimizeSuite clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(dataPath)) == 75) + assertResult(77)(countFiles(new File(dataPath))) } else { - assert(countFiles(new File(dataPath)) == 90) + assertResult(93)(countFiles(new File(dataPath))) } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) } test("test mergetree insert with optimize basic") { withSQLConf( - ("spark.databricks.delta.optimize.minFileSize" -> "200000000"), - ("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true") + "spark.databricks.delta.optimize.minFileSize" -> "200000000", + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true" ) { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic; @@ -487,10 +488,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite |""".stripMargin) val ret = spark.sql("select count(*) from lineitem_mergetree_insert_optimize_basic").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) eventually(timeout(60.seconds), interval(3.seconds)) { - assert( - new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length == 2 + assertResult(2)( + new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length ) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index c8c6307aba06..791239fabf48 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -60,6 +60,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", + "8192") } override protected def createTPCHNotNullTables(): Unit = { @@ -143,7 +146,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite case f: FileSourceScanExecTransformer => f case w: WholeStageTransformer => w } - assert(plans.size == 4) + assertResult(4)(plans.size) val mergetreeScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -154,10 +157,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) // GLUTEN-5060: check the unnecessary FilterExec val wholeStageTransformer = plans(2).asInstanceOf[WholeStageTransformer] @@ -174,7 +175,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .load(dataPath) .where("l_shipdate = date'1998-09-02'") .count() - assert(result == 183) + assertResult(183)(result) } test("test mergetree path based write with dataframe api") { @@ -236,40 +237,35 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite case f: FileSourceScanExecTransformer => f case w: WholeStageTransformer => w } - assert(plans.size == 4) + assertResult(4)(plans.size) val mergetreeScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) - assert( + .mkString(",")) + assertResult("l_returnflag,l_linestatus")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .lowCardKeyOption .get - .mkString(",") - .equals("l_returnflag,l_linestatus")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) // GLUTEN-5060: check the unnecessary FilterExec val wholeStageTransformer = plans(2).asInstanceOf[WholeStageTransformer] @@ -286,7 +282,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .load(dataPath) .where("l_shipdate = date'1998-09-02'") .collect() - assert(result.length == 183) + assertResult(110501)(result.apply(0).get(0)) } test("test mergetree path based insert overwrite partitioned table with small table, static") { @@ -320,7 +316,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .format("clickhouse") .load(dataPath) .count() - assert(result == 2418) + assertResult(2418)(result) } test("test mergetree path based insert overwrite partitioned table with small table, dynamic") { @@ -355,7 +351,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .format("clickhouse") .load(dataPath) .count() - assert(result == 600572) + assertResult(600572)(result) } } @@ -381,11 +377,11 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .format("clickhouse") .load(dataPath) .where("l_returnflag = 'Z'") - assert(df.count() == 1) + assertResult(1)(df.count()) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -397,16 +393,13 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert( - addFiles.map(_.rows).sum - == 600572) - + assertResult(600572)(addFiles.map(_.rows).sum) // 4 parts belong to the first batch // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assert(addFiles.size == 6) + assertResult(6)(addFiles.size) val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assert(filePaths.size == 2) - assert(Array(3, 3).sameElements(filePaths.values.map(paths => paths.size).toArray.sorted)) + assertResult(2)(filePaths.size) + assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) } val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) @@ -417,33 +410,31 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .format("clickhouse") .load(dataPath) .where("l_returnflag = 'X'") - assert(df.count() == 1) + assertResult(1)(df.count()) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(600572)(addFiles.map(_.rows).sum) // 4 parts belong to the first batch // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assert(addFiles.size == 6) + assertResult(6)(addFiles.size) val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assert(filePaths.size == 3) - assert(Array(1, 2, 3).sameElements(filePaths.values.map(paths => paths.size).toArray.sorted)) + assertResult(2)(filePaths.size) + assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) } val df = spark.read .format("clickhouse") .load(dataPath) - assert(df.count() == 600572) + assertResult(600572)(df.count()) } test("test mergetree path based table delete") { @@ -465,7 +456,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val df = spark.read .format("clickhouse") .load(dataPath) - assert(df.count() == 600571) + assertResult(600571)(df.count()) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } @@ -474,17 +465,17 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) // 4 parts belong to the first batch // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assert(addFiles.size == 6) + assertResult(6)(addFiles.size) val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assert(filePaths.size == 2) - assert(Array(3, 3).sameElements(filePaths.values.map(paths => paths.size).toArray.sorted)) + assertResult(2)(filePaths.size) + assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) clickhouseTable.delete("mod(l_orderkey, 3) = 2") val df1 = spark.read .format("clickhouse") .load(dataPath) - assert(df1.count() == 400089) + assertResult(400089)(df1.count()) } test("test mergetree path based table upsert") { @@ -503,8 +494,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val df0 = spark.sql(s""" | select count(*) from clickhouse.`$dataPath` |""".stripMargin) - assert( - df0.collect().apply(0).get(0) == 600572 + assertResult(600572)( + df0.collect().apply(0).get(0) ) upsertSourceTableAndCheck(dataPath) } @@ -540,8 +531,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val df1 = spark.sql(s""" | select count(*) from clickhouse.`$dataPath` |""".stripMargin) - assert( - df1.collect().apply(0).get(0) == 600572 + 3506 + assertResult(600572 + 3506)( + df1.collect().apply(0).get(0) ) } { @@ -549,8 +540,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite spark.sql(s""" | select count(*) from clickhouse.`$dataPath` where l_returnflag = 'Z' |""".stripMargin) - assert( - df2.collect().apply(0).get(0) == 3506 + assertResult(3506)( + df2.collect().apply(0).get(0) ) } @@ -559,8 +550,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite spark.sql(s""" | select count(*) from clickhouse.`$dataPath` where l_orderkey > 10000000 |""".stripMargin) - assert( - df3.collect().apply(0).get(0) == 3506 + assertResult(3506)( + df3.collect().apply(0).get(0) ) } } @@ -610,33 +601,31 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } val df = spark.read @@ -650,7 +639,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite |""".stripMargin) .agg(sum("l_linenumber").alias("res")) val result = df.collect() - assert(result(0).getLong(0) == 34842) + assertResult(34842)(result(0).getLong(0)) } test("test mergetree path based write with partition") { @@ -707,62 +696,56 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite runTPCHQueryBySQL(1, sqlStr, compareResult = false) { df => val result = df.collect() - assert(result.size == 4) - assert(result(0).getString(0).equals("A")) - assert(result(0).getString(1).equals("F")) - assert(result(0).getDouble(2) == 3803858.0) + assertResult(4)(result.length) + assertResult("A")(result(0).getString(0)) + assertResult("F")(result(0).getString(1)) + assertResult(3803858.0)(result(0).getDouble(2)) - assert(result(2).getString(0).equals("N")) - assert(result(2).getString(1).equals("O")) - assert(result(2).getDouble(2) == 7454519.0) + assertResult("N")(result(2).getString(0)) + assertResult("O")(result(2).getString(1)) + assertResult(7454519.0)(result(2).getDouble(2)) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - assert(mergetreeScan.metrics("numFiles").value == 3744) + assertResult(3744)(mergetreeScan.metrics("numFiles").value) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert( + .mkString(",")) + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 2) - assert( + .mkString(",")) + assertResult(2)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_shipdate")) - assert( + .partitionColumns + .head) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(1) - .equals("l_returnflag")) + .partitionColumns(1)) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 3835) - assert(addFiles.map(_.rows).sum == 602945) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1992-06-01")).size == 2) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-01-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1995-01-21")).size == 2) + assertResult(3835)(addFiles.size) + assertResult(602945)(addFiles.map(_.rows).sum) + assertResult(2)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(2)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) } } @@ -814,61 +797,49 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) val buckets = ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption - assert(!buckets.isEmpty) - assert(buckets.get.numBuckets == 4) - assert( + assert(buckets.isDefined) + assertResult(4)(buckets.get.numBuckets) + assertResult("l_partkey,l_returnflag")( buckets.get.sortColumnNames - .mkString(",") - .equals("l_partkey,l_returnflag")) - assert( + .mkString(",")) + assertResult("l_orderkey")( buckets.get.bucketColumnNames - .mkString(",") - .equals("l_orderkey")) - assert( + .mkString(",")) + assertResult("l_partkey,l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_partkey,l_returnflag")) - assert( + .mkString(",")) + assertResult("l_partkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_partkey")) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + .mkString(",")) + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_shipdate")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 10089) - assert(addFiles.map(_.rows).sum == 600572) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1992-06-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-01-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1995-01-21")).size == 4) - assert( - addFiles - .filter( - f => - f.partitionValues.get("l_shipdate").get.equals("1995-01-21") && f.bucketNum.equals( - "00000")) - .size == 1) + assertResult(10089)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) + assertResult(1)(addFiles.count( + f => f.partitionValues("l_shipdate").equals("1995-01-21") && f.bucketNum.equals("00000"))) } // check part pruning effect of filter on bucket column val df = spark.sql(s""" @@ -883,7 +854,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .flatMap(partition => partition.asInstanceOf[GlutenMergeTreePartition].partList) .map(_.name) .distinct - assert(touchedParts.size == 1) + assertResult(1)(touchedParts.size) // test upsert on partitioned & bucketed table upsertSourceTableAndCheck(dataPath) @@ -929,9 +900,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] @@ -941,10 +912,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } } @@ -1052,9 +1021,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite | |""".stripMargin - assert( + assertResult("R")( // total rows should remain unchanged - spark.sql(sqlStr2).collect().apply(0).get(0) == "R" + spark.sql(sqlStr2).collect().apply(0).get(0) ) // test select * @@ -1101,40 +1070,38 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) val plans = collect(df.queryExecution.executedPlan) { case scanExec: BasicScanExecTransformer => scanExec } - assert(plans.size == 1) - assert(plans(0).metrics("selectedMarksPk").value === 15) - assert(plans(0).metrics("totalMarksPk").value === 74) + assertResult(1)(plans.size) + assertResult(17)(plans.head.metrics("selectedMarksPk").value) + assertResult(74)(plans.head.metrics("totalMarksPk").value) } } @@ -1161,12 +1128,12 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + assertResult(1)(scanExec.size) + val mergetreeScan = scanExec.head - assert(ret.apply(0).get(0) == 1) + assertResult(1)(ret.apply(0).get(0)) val marks = mergetreeScan.metrics("selectedMarks").value - assert(marks == 1) + assertResult(1)(marks) val directory = new File(dataPath) // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -1197,11 +1164,11 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) - assert(ret.apply(0).get(0) == 2) + assertResult(1)(scanExec.size) + val mergetreeScan = scanExec.head + assertResult(2)(ret.apply(0).get(0)) val marks = mergetreeScan.metrics("selectedMarks").value - assert(marks == 2) + assertResult(1)(marks) val directory = new File(dataPath) // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -1233,11 +1200,11 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) - assert(ret.apply(0).get(0) == 2) + assertResult(1)(scanExec.size) + val mergetreeScan = scanExec.head + assertResult(2)(ret.apply(0).get(0)) val marks = mergetreeScan.metrics("selectedMarks").value - assert(marks == 1) + assertResult(1)(marks) val directory = new File(dataPath) // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -1277,18 +1244,16 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } } @@ -1320,7 +1285,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val fileFilter = new WildcardFileFilter("*_0_*") var dataFileList = dataPathFile.list(fileFilter) - assert(dataFileList.size == 5) + assertResult(6)(dataFileList.length) // re-create the same table val dataPath2 = s"$basePath/lineitem_mergetree_5219_s" @@ -1339,7 +1304,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite assert(dataPathFile.isDirectory && dataPathFile.isDirectory) dataFileList = dataPathFile.list(fileFilter) - assert(dataFileList.size == 5) + assertResult(6)(dataFileList.length) } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 679cea37ba67..70c6553416e2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -57,6 +57,9 @@ class GlutenClickHouseMergeTreeWriteSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", + "8192") } override protected def createTPCHNotNullTables(): Unit = { @@ -128,7 +131,7 @@ class GlutenClickHouseMergeTreeWriteSuite case f: FileSourceScanExecTransformer => f case w: WholeStageTransformer => w } - assert(plans.size == 4) + assertResult(4)(plans.size) val mergetreeScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -140,10 +143,8 @@ class GlutenClickHouseMergeTreeWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) // GLUTEN-5060: check the unnecessary FilterExec val wholeStageTransformer = plans(2).asInstanceOf[WholeStageTransformer] @@ -200,9 +201,9 @@ class GlutenClickHouseMergeTreeWriteSuite | select count(*) from lineitem_mergetree_insertoverwrite | |""".stripMargin - assert( + assertResult(300001)( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 300001 + spark.sql(sql2).collect().apply(0).get(0) ) } @@ -251,9 +252,9 @@ class GlutenClickHouseMergeTreeWriteSuite | select count(*) from lineitem_mergetree_insertoverwrite2 | |""".stripMargin - assert( + assertResult(2418)( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 2418 + spark.sql(sql2).collect().apply(0).get(0) ) } @@ -303,9 +304,9 @@ class GlutenClickHouseMergeTreeWriteSuite | select count(*) from lineitem_mergetree_insertoverwrite3 | |""".stripMargin - assert( + assertResult(600572)( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 600572 + spark.sql(sql2).collect().apply(0).get(0) ) } } @@ -357,14 +358,14 @@ class GlutenClickHouseMergeTreeWriteSuite val df = spark.sql(sql1) val result = df.collect() - assert( + assertResult(1)( // in test data, there are only 1 row with l_orderkey = 12647 - result.apply(0).get(0) == 1 + result.apply(0).get(0) ) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -376,16 +377,14 @@ class GlutenClickHouseMergeTreeWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(600572)(addFiles.map(_.rows).sum) // 4 parts belong to the first batch // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assert(addFiles.size == 6) + assertResult(6)(addFiles.size) val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assert(filePaths.size == 2) - assert(Array(3, 3).sameElements(filePaths.values.map(paths => paths.size).toArray.sorted)) + assertResult(2)(filePaths.size) + assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) } val sql2 = @@ -393,9 +392,9 @@ class GlutenClickHouseMergeTreeWriteSuite | select count(*) from lineitem_mergetree_update | |""".stripMargin - assert( + assertResult(600572)( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 600572 + spark.sql(sql2).collect().apply(0).get(0) ) } @@ -444,8 +443,8 @@ class GlutenClickHouseMergeTreeWriteSuite | select count(*) from lineitem_mergetree_delete |""".stripMargin) val result = df.collect() - assert( - result.apply(0).get(0) == 600571 + assertResult(600571)( + result.apply(0).get(0) ) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f @@ -455,10 +454,10 @@ class GlutenClickHouseMergeTreeWriteSuite val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) // 4 parts belong to the first batch // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assert(addFiles.size == 6) + assertResult(6)(addFiles.size) val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assert(filePaths.size == 2) - assert(Array(3, 3).sameElements(filePaths.values.map(paths => paths.size).toArray.sorted)) + assertResult(2)(filePaths.size) + assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) } { @@ -468,9 +467,7 @@ class GlutenClickHouseMergeTreeWriteSuite val df3 = spark.sql(s""" | select count(*) from lineitem_mergetree_delete |""".stripMargin) - assert( - df3.collect().apply(0).get(0) == 400089 - ) + assertResult(400089)(df3.collect().apply(0).get(0)) } } @@ -512,9 +509,7 @@ class GlutenClickHouseMergeTreeWriteSuite val df0 = spark.sql(s""" | select count(*) from lineitem_mergetree_upsert |""".stripMargin) - assert( - df0.collect().apply(0).get(0) == 600572 - ) + assertResult(600572)(df0.collect().apply(0).get(0)) } upsertSourceTableAndCheck("lineitem_mergetree_upsert") @@ -551,18 +546,14 @@ class GlutenClickHouseMergeTreeWriteSuite val df1 = spark.sql(s""" | select count(*) from $tableName |""".stripMargin) - assert( - df1.collect().apply(0).get(0) == 600572 + 3506 - ) + assertResult(600572 + 3506)(df1.collect().apply(0).get(0)) } { val df2 = spark.sql(s""" | select count(*) from $tableName where l_returnflag = 'Z' |""".stripMargin) - assert( - df2.collect().apply(0).get(0) == 3506 - ) + assertResult(3506)(df2.collect().apply(0).get(0)) } { @@ -570,9 +561,7 @@ class GlutenClickHouseMergeTreeWriteSuite spark.sql(s""" | select count(*) from $tableName where l_orderkey > 10000000 |""".stripMargin) - assert( - df3.collect().apply(0).get(0) == 3506 - ) + assertResult(3506)(df3.collect().apply(0).get(0)) } } @@ -642,33 +631,31 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } } @@ -800,62 +787,56 @@ class GlutenClickHouseMergeTreeWriteSuite runTPCHQueryBySQL(1, sqlStr, compareResult = false) { df => val result = df.collect() - assert(result.size == 4) - assert(result(0).getString(0).equals("A")) - assert(result(0).getString(1).equals("F")) - assert(result(0).getDouble(2) == 3865234.0) + assertResult(4)(result.length) + assertResult("A")(result(0).getString(0)) + assertResult("F")(result(0).getString(1)) + assertResult(3865234.0)(result(0).getDouble(2)) - assert(result(2).getString(0).equals("N")) - assert(result(2).getString(1).equals("O")) - assert(result(2).getDouble(2) == 7454519.0) + assertResult("N")(result(2).getString(0)) + assertResult("O")(result(2).getString(1)) + assertResult(7454519.0)(result(2).getDouble(2)) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - assert(mergetreeScan.metrics("numFiles").value == 3745) + assertResult(3745)(mergetreeScan.metrics("numFiles").value) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert( + .mkString(",")) + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 2) - assert( + .mkString(",")) + assertResult(2)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_shipdate")) - assert( + .partitionColumns + .head) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(1) - .equals("l_returnflag")) + .partitionColumns(1)) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 3836) - assert(addFiles.map(_.rows).sum == 605363) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1992-06-01")).size == 2) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-01-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1995-01-21")).size == 3) + assertResult(3836)(addFiles.size) + assertResult(605363)(addFiles.map(_.rows).sum) + assertResult(2)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(3)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) } } @@ -927,49 +908,40 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined) if (sparkVersion.equals("3.2")) { assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) } else { - assert( + assertResult("l_partkey,l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_partkey,l_returnflag")) + .mkString(",")) } assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_shipdate")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 10089) - assert(addFiles.map(_.rows).sum == 600572) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1992-06-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-01-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1995-01-21")).size == 4) - assert( - addFiles - .filter( - f => - f.partitionValues.get("l_shipdate").get.equals("1995-01-21") && f.bucketNum.equals( - "00000")) - .size == 1) + assertResult(10089)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) + assertResult(1)(addFiles.count( + f => f.partitionValues("l_shipdate").equals("1995-01-21") && f.bucketNum.equals("00000"))) } // check part pruning effect of filter on bucket column val df = spark.sql(s""" @@ -984,7 +956,7 @@ class GlutenClickHouseMergeTreeWriteSuite .flatMap(partition => partition.asInstanceOf[GlutenMergeTreePartition].partList) .map(_.name) .distinct - assert(touchedParts.size == 1) + assertResult(1)(touchedParts.size) // test upsert on partitioned & bucketed table upsertSourceTableAndCheck("lineitem_mergetree_bucket") @@ -996,9 +968,7 @@ class GlutenClickHouseMergeTreeWriteSuite val df0 = spark.sql(s""" | select count(*) from lineitem_mergetree_bucket |""".stripMargin) - assert( - df0.collect().apply(0).get(0) == 3 - ) + assertResult(3)(df0.collect().apply(0).get(0)) } @@ -1065,40 +1035,40 @@ class GlutenClickHouseMergeTreeWriteSuite warehouse + "/" + tableName } val deletedPath = new File(deletedPathStr) - assert(deletedPath.exists() == exceptedExists) + assertResult(exceptedExists)(deletedPath.exists()) } // test non external table var tableName = "lineitem_mergetree_drop" var tableLocation = "" createAndDropTable(tableName, tableLocation) - checkTableExists(tableName, tableLocation, false) + checkTableExists(tableName, tableLocation, exceptedExists = false) // test external table tableName = "lineitem_mergetree_external_drop" - createAndDropTable(tableName, tableLocation, true) - checkTableExists(tableName, tableLocation, false) + createAndDropTable(tableName, tableLocation, isExternal = true) + checkTableExists(tableName, tableLocation, exceptedExists = false) // test table with the specified location tableName = "lineitem_mergetree_location_drop" tableLocation = basePath + "/" + tableName createAndDropTable(tableName, tableLocation) - checkTableExists(tableName, tableLocation, true) + checkTableExists(tableName, tableLocation, exceptedExists = true) tableName = "lineitem_mergetree_external_location_drop" tableLocation = basePath + "/" + tableName - createAndDropTable(tableName, tableLocation, true) - checkTableExists(tableName, tableLocation, true) + createAndDropTable(tableName, tableLocation, isExternal = true) + checkTableExists(tableName, tableLocation, exceptedExists = true) tableName = "lineitem_mergetree_location_purge" tableLocation = basePath + "/" + tableName createAndDropTable(tableName, tableLocation, purgeTable = true) - checkTableExists(tableName, tableLocation, false) + checkTableExists(tableName, tableLocation, exceptedExists = false) tableName = "lineitem_mergetree_external_location_purge" tableLocation = basePath + "/" + tableName - createAndDropTable(tableName, tableLocation, true, true) - checkTableExists(tableName, tableLocation, false) + createAndDropTable(tableName, tableLocation, isExternal = true, purgeTable = true) + checkTableExists(tableName, tableLocation, exceptedExists = false) } test("test mergetree CTAS simple") { @@ -1143,9 +1113,9 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] @@ -1155,10 +1125,8 @@ class GlutenClickHouseMergeTreeWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } } @@ -1289,9 +1257,9 @@ class GlutenClickHouseMergeTreeWriteSuite | |""".stripMargin - assert( + assertResult("R")( // total rows should remain unchanged - spark.sql(sqlStr2).collect().apply(0).get(0) == "R" + spark.sql(sqlStr2).collect().apply(0).get(0) ) // test select * @@ -1359,40 +1327,38 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) val plans = collect(df.queryExecution.executedPlan) { case scanExec: BasicScanExecTransformer => scanExec } - assert(plans.size == 1) - assert(plans(0).metrics("selectedMarksPk").value === 15) - assert(plans(0).metrics("totalMarksPk").value === 74) + assertResult(1)(plans.size) + assertResult(17)(plans.head.metrics("selectedMarksPk").value) + assertResult(74)(plans.head.metrics("totalMarksPk").value) } } @@ -1447,21 +1413,20 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert( ClickHouseTableV2 .getTable(fileIndex.deltaLog) @@ -1470,15 +1435,15 @@ class GlutenClickHouseMergeTreeWriteSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) val plans = collect(df.queryExecution.executedPlan) { case scanExec: BasicScanExecTransformer => scanExec } - assert(plans.size == 1) - assert(plans(0).metrics("selectedMarksPk").value === 15) - assert(plans(0).metrics("totalMarksPk").value === 74) + assertResult(1)(plans.size) + assertResult(17)(plans.head.metrics("selectedMarksPk").value) + assertResult(74)(plans.head.metrics("totalMarksPk").value) } } @@ -1527,21 +1492,21 @@ class GlutenClickHouseMergeTreeWriteSuite runSql(sqlStr)( df => { val result = df.collect() - assert(result.size == 1) - assert(result(0).getLong(0) == 10) + assertResult(1)(result.length) + assertResult(10)(result(0).getLong(0)) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 1) - assert(addFiles(0).rows == 10) + assertResult(1)(addFiles.size) + assertResult(10)(addFiles.head.rows) }) } @@ -1585,16 +1550,16 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert( - (addFiles.map(_.marks).sum - addFiles.size) == mergetreeScan.metrics("totalMarksPk").value) - assert(mergetreeScan.metrics("selectedMarksPk").value == exceptedCnt) + assertResult(mergetreeScan.metrics("totalMarksPk").value)( + addFiles.map(_.marks).sum - addFiles.size) + assertResult(exceptedCnt)(mergetreeScan.metrics("selectedMarksPk").value) } val sqlStr1 = @@ -1609,8 +1574,8 @@ class GlutenClickHouseMergeTreeWriteSuite runSql(sqlStr1)( df => { val result = df.collect() - assert(result.size == 1) - assert(result(0).getDouble(0).toString.substring(0, 6).equals("2.6480")) + assertResult(1)(result.length) + assertResult("2.6480")(result(0).getDouble(0).toString.substring(0, 6)) checkSelectedMarksCnt(df, 34) }) @@ -1627,10 +1592,10 @@ class GlutenClickHouseMergeTreeWriteSuite runSql(sqlStr2)( df => { val result = df.collect() - assert(result.size == 1) - assert(result(0).getDouble(0).toString.substring(0, 6).equals("5.3379")) + assertResult(1)(result.length) + assertResult("5.3379")(result(0).getDouble(0).toString.substring(0, 6)) - checkSelectedMarksCnt(df, 24) + checkSelectedMarksCnt(df, 29) }) } @@ -1666,18 +1631,16 @@ class GlutenClickHouseMergeTreeWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 5) - assert( - addFiles.map(_.rows).sum - == 600572) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } } @@ -1715,7 +1678,7 @@ class GlutenClickHouseMergeTreeWriteSuite val fileFilter = new WildcardFileFilter("*_0_*") var dataFileList = dataPath.list(fileFilter) - assert(dataFileList.size == 5) + assertResult(6)(dataFileList.length) // test with the normal table spark.sql(s""" @@ -1796,7 +1759,7 @@ class GlutenClickHouseMergeTreeWriteSuite assert(dataPath.isDirectory && dataPath.isDirectory) dataFileList = dataPath.list(fileFilter) - assert(dataFileList.size == 5) + assertResult(6)(dataFileList.length) // re-create the same table for (i <- 0 until 10) { @@ -1818,7 +1781,7 @@ class GlutenClickHouseMergeTreeWriteSuite assert(dataPath.isDirectory && dataPath.isDirectory) dataFileList = dataPath.list(fileFilter) - assert(dataFileList.size == 5) + assertResult(6)(dataFileList.length) } test("test mergetree with primary keys filter pruning by driver") { @@ -1872,22 +1835,22 @@ class GlutenClickHouseMergeTreeWriteSuite Seq(("true", 2), ("false", 3)).foreach( conf => { withSQLConf( - ("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> conf._1)) { + "spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> conf._1) { runTPCHQueryBySQL(6, sqlStr) { df => val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val plans = collect(df.queryExecution.executedPlan) { case scanExec: BasicScanExecTransformer => scanExec } - assert(plans.size == 1) - assert(plans(0).getSplitInfos.size == conf._2) + assertResult(1)(plans.size) + assertResult(conf._2)(plans.head.getSplitInfos.size) } } }) @@ -1990,14 +1953,14 @@ class GlutenClickHouseMergeTreeWriteSuite Seq(("true", 2), ("false", 2)).foreach( conf => { withSQLConf( - ("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> conf._1)) { + "spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> conf._1) { runTPCHQueryBySQL(12, sqlStr) { df => val scanExec = collect(df.queryExecution.executedPlan) { case f: BasicScanExecTransformer => f } - assert(scanExec.size == 2) - assert(scanExec(1).getSplitInfos.size == conf._2) + assertResult(2)(scanExec.size) + assertResult(conf._2)(scanExec(1).getSplitInfos.size) } } }) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala index ab11c1e0c201..f9e831cb4aa7 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala @@ -61,6 +61,9 @@ class GlutenClickHouseTableAfterRestart .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", + "8192") } override protected def createTPCHNotNullTables(): Unit = { @@ -180,9 +183,9 @@ class GlutenClickHouseTableAfterRestart // for this run, missing count should not increase runTPCHQueryBySQL(1, sqlStr)(_ => {}) val stats1 = ClickhouseSnapshot.deltaScanCache.stats() - assert(stats1.missCount() - oldMissingCount1 == 0) + assertResult(oldMissingCount1)(stats1.missCount()) val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats() - assert(stats2.missCount() - oldMissingCount2 == 0) + assertResult(oldMissingCount2)(stats2.missCount()) } val oldMissingCount1 = ClickhouseSnapshot.deltaScanCache.stats().missCount() @@ -194,10 +197,9 @@ class GlutenClickHouseTableAfterRestart // after restart, additionally check stats of delta scan cache val stats1 = ClickhouseSnapshot.deltaScanCache.stats() - assert(stats1.missCount() - oldMissingCount1 == 1) + assertResult(oldMissingCount1 + 1)(stats1.missCount()) val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats() - assert(stats2.missCount() - oldMissingCount2 == 5) - + assertResult(oldMissingCount2 + 6)(stats2.missCount()) } test("test optimize after restart") { @@ -222,7 +224,8 @@ class GlutenClickHouseTableAfterRestart restartSpark() spark.sql("optimize table_restart_optimize") - assert(spark.sql("select count(*) from table_restart_optimize").collect().apply(0).get(0) == 4) + assertResult(4)( + spark.sql("select count(*) from table_restart_optimize").collect().apply(0).get(0)) } test("test vacuum after restart") { @@ -250,7 +253,8 @@ class GlutenClickHouseTableAfterRestart spark.sql("vacuum table_restart_vacuum") - assert(spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0) == 4) + assertResult(4)( + spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0)) } test("test update after restart") { @@ -276,7 +280,8 @@ class GlutenClickHouseTableAfterRestart spark.sql("update table_restart_update set name = 'tom' where id = 1") - assert(spark.sql("select count(*) from table_restart_update").collect().apply(0).get(0) == 4) + assertResult(4)( + spark.sql("select count(*) from table_restart_update").collect().apply(0).get(0)) } test("test delete after restart") { @@ -302,7 +307,8 @@ class GlutenClickHouseTableAfterRestart spark.sql("delete from table_restart_delete where where id = 1") - assert(spark.sql("select count(*) from table_restart_delete").collect().apply(0).get(0) == 2) + assertResult(2)( + spark.sql("select count(*) from table_restart_delete").collect().apply(0).get(0)) } test("test drop after restart") {