From 614e4e4de7b302d59a02f7df201ea522c722712a Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Mon, 9 Sep 2024 14:23:19 +0800 Subject: [PATCH] Add CHConf --- .../io/delta/tables/ClickhouseTable.scala | 2 +- .../backendsapi/clickhouse/CHBackend.scala | 12 +- .../backendsapi/clickhouse/CHConf.scala | 49 +++++ .../clickhouse/CHListenerApi.scala | 11 +- .../v2/clickhouse/ClickHouseConfig.scala | 16 +- ...utenClickHouseDeltaParquetWriteSuite.scala | 203 +++++++++--------- .../GlutenClickHouseMergeTreeWriteSuite.scala | 20 +- ...ClickHouseWholeStageTransformerSuite.scala | 101 +++------ 8 files changed, 205 insertions(+), 209 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala diff --git a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala index 790b4c1f8a379..e747c87c6a67c 100644 --- a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala +++ b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala @@ -91,7 +91,7 @@ object ClickhouseTable { val badOptions = hadoopConf.filterKeys { k => !DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith) }.toMap - if (!badOptions.isEmpty) { + if (badOptions.nonEmpty) { throw DeltaErrors.unsupportedDeltaTableForPathHadoopConf(badOptions) } val fileSystemOptions: Map[String, String] = hadoopConf.toMap diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 163f7568f7131..3884daf60db3d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -117,21 +117,21 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } } - val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String = + private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".affinity.mode" val SOFT: String = "soft" val FORCE: String = "force" private val GLUTEN_CLICKHOUSE_AFFINITY_MODE_DEFAULT = SOFT - val GLUTEN_MAX_BLOCK_SIZE: String = + private val GLUTEN_MAX_BLOCK_SIZE: String = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".runtime_settings.max_block_size" // Same as default value in clickhouse - val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409 - val GLUTEN_MAX_SHUFFLE_READ_BYTES: String = + private val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409 + private val GLUTEN_MAX_SHUFFLE_READ_BYTES: String = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".runtime_config.max_source_concatenate_bytes" - val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256 + private val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256 def affinityMode: String = { SparkEnv.get.conf @@ -364,7 +364,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { def enableReorderHashJoinTables(): Boolean = { SparkEnv.get.conf.getBoolean( "spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables", - true + defaultValue = true ) } // The threshold to reorder hash join tables, if The result of dividing two tables' size is diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala new file mode 100644 index 0000000000000..26a5a1b1fd95b --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.GlutenConfig + +import org.apache.spark.SparkConf + +object CHConf { + + private val CH = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + "." + private val CH_SETTINGS = CH + "runtime_settings." + private val CH_CONFIG = CH + "runtime_config." + implicit class GlutenCHConf(conf: SparkConf) { + def setCHSettings(settings: (String, String)*): SparkConf = { + settings.foreach { case (k, v) => conf.set(CH_SETTINGS + k, v) } + conf + } + + def setCHSettings[T](k: String, v: T): SparkConf = { + conf.set(CH_SETTINGS + k, v.toString) + conf + } + + def setCHConfig(config: (String, String)*): SparkConf = { + config.foreach { case (k, v) => conf.set(CH_CONFIG + k, v) } + conf + } + + def setCHConfig[T](k: String, v: T): SparkConf = { + conf.set(CH_CONFIG + k, v.toString) + conf + } + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index 3e1443f3a59a4..8be75a4899bc3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -81,13 +81,10 @@ class CHListenerApi extends ListenerApi with Logging { JniLibLoader.loadFromPath(executorLibPath, true) } // Add configs - conf.set( - s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config.timezone", - conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID)) - conf.set( - s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config" + - s".local_engine.settings.log_processors_profiles", - "true") + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + conf.setCHConfig( + "timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID), + "local_engine.settings.log_processors_profiles" -> "true") // add memory limit for external sort val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" + diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala index 232e9ec10c5be..69fc3f3bcd6e1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala @@ -28,9 +28,9 @@ object ClickHouseConfig { val NAME = "clickhouse" val ALT_NAME = "clickhouse" val METADATA_DIR = "_delta_log" - val FORMAT_ENGINE = "engine" - val DEFAULT_ENGINE = "mergetree" - val OPT_NAME_PREFIX = "clickhouse." + private val FORMAT_ENGINE = "engine" + private val DEFAULT_ENGINE = "mergetree" + private val OPT_NAME_PREFIX = "clickhouse." @deprecated // Whether to use MergeTree DataSource V2 API, default is false, fall back to V1. @@ -53,8 +53,11 @@ object ClickHouseConfig { if (!configurations.contains(FORMAT_ENGINE)) { configurations += (FORMAT_ENGINE -> DEFAULT_ENGINE) } else { - val engineValue = configurations.get(FORMAT_ENGINE) - if (!engineValue.equals(DEFAULT_ENGINE) && !engineValue.equals("parquet")) { + if ( + !configurations + .get(FORMAT_ENGINE) + .exists(s => s.equals(DEFAULT_ENGINE) || s.equals("parquet")) + ) { configurations += (FORMAT_ENGINE -> DEFAULT_ENGINE) } } @@ -80,8 +83,7 @@ object ClickHouseConfig { } def isMergeTreeFormatEngine(configuration: Map[String, String]): Boolean = { - configuration.contains(FORMAT_ENGINE) && - configuration.get(FORMAT_ENGINE).get.equals(DEFAULT_ENGINE) + configuration.get(FORMAT_ENGINE).exists(_.equals(DEFAULT_ENGINE)) } /** Get the related clickhouse option when using DataFrameWriter / DataFrameReader */ diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala index d6f9a0162216f..ed9d104798b08 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala @@ -18,7 +18,6 @@ package org.apache.gluten.execution import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -39,6 +38,8 @@ class GlutenClickHouseDeltaParquetWriteSuite override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" override protected val queriesResults: String = rootPath + "mergetree-queries-output" + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + /** Run Gluten + ClickHouse Backend with SortShuffleManager */ override protected def sparkConf: SparkConf = { super.sparkConf @@ -50,13 +51,8 @@ class GlutenClickHouseDeltaParquetWriteSuite .set("spark.sql.files.maxPartitionBytes", "20000000") .set("spark.gluten.sql.native.writer.enabled", "true") .set("spark.sql.storeAssignmentPolicy", "legacy") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", - "false") - .set( - "spark.databricks.delta.retentionDurationCheck.enabled", - "false" - ) + .setCHSettings("mergetree.merge_after_insert", false) + .set("spark.databricks.delta.retentionDurationCheck.enabled", "false") } override protected def createTPCHNotNullTables(): Unit = { @@ -128,14 +124,14 @@ class GlutenClickHouseDeltaParquetWriteSuite case f: FileSourceScanExecTransformer => f case w: WholeStageTransformer => w } - assert(plans.size == 4) + assert(plans.size === 4) val parquetScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] assert(parquetScan.nodeName.startsWith("Scan parquet ")) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 5) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 5) } } @@ -183,7 +179,7 @@ class GlutenClickHouseDeltaParquetWriteSuite |""".stripMargin assert( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 300001 + spark.sql(sql2).collect().apply(0).get(0) === 300001 ) } @@ -235,7 +231,7 @@ class GlutenClickHouseDeltaParquetWriteSuite |""".stripMargin assert( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 2418 + spark.sql(sql2).collect().apply(0).get(0) === 2418 ) } @@ -288,7 +284,7 @@ class GlutenClickHouseDeltaParquetWriteSuite |""".stripMargin assert( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 21875 + spark.sql(sql2).collect().apply(0).get(0) === 21875 ) } } @@ -343,19 +339,19 @@ class GlutenClickHouseDeltaParquetWriteSuite val result = df.collect() assert( // in test data, there are only 1 row with l_orderkey = 12647 - result.apply(0).get(0) == 1 + result.apply(0).get(0) === 1 ) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assert(scanExec.size === 1) val parquetScan = scanExec.head assert(parquetScan.nodeName.startsWith("Scan parquet")) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 4) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 4) } val sql2 = @@ -365,7 +361,7 @@ class GlutenClickHouseDeltaParquetWriteSuite |""".stripMargin assert( // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) == 600572 + spark.sql(sql2).collect().apply(0).get(0) === 600572 ) } @@ -412,15 +408,15 @@ class GlutenClickHouseDeltaParquetWriteSuite |""".stripMargin) val result = df.collect() assert( - result.apply(0).get(0) == 1802445 + result.apply(0).get(0) === 1802445 ) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } val parquetScan = scanExec.head val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 4) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 4) } { @@ -431,7 +427,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select sum(l_linenumber) from lineitem_delta_parquet_delete |""".stripMargin) assert( - df3.collect().apply(0).get(0) == 1200671 + df3.collect().apply(0).get(0) === 1200671 ) } } @@ -475,7 +471,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select sum(l_linenumber) from lineitem_delta_parquet_upsert |""".stripMargin) assert( - df0.collect().apply(0).get(0) == 1802446 + df0.collect().apply(0).get(0) === 1802446 ) } @@ -514,7 +510,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from $tableName |""".stripMargin) assert( - df1.collect().apply(0).get(0) == 600572 + 3506 + df1.collect().apply(0).get(0) === 600572 + 3506 ) } { @@ -523,7 +519,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from $tableName where l_returnflag = 'Z' |""".stripMargin) assert( - df2.collect().apply(0).get(0) == 3506 + df2.collect().apply(0).get(0) === 3506 ) } @@ -533,7 +529,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from $tableName where l_orderkey > 10000000 |""".stripMargin) assert( - df3.collect().apply(0).get(0) == 3506 + df3.collect().apply(0).get(0) === 3506 ) } } @@ -666,34 +662,31 @@ class GlutenClickHouseDeltaParquetWriteSuite runTPCHQueryBySQL(1, sqlStr, compareResult = false) { df => val result = df.collect() - assert(result.size == 2) + assert(result.length === 2) assert(result(0).getString(0).equals("A")) assert(result(0).getString(1).equals("F")) - assert(result(0).getDouble(2) == 368009.0) + assert(result(0).getDouble(2) === 368009.0) assert(result(1).getString(0).equals("R")) assert(result(1).getString(1).equals("F")) - assert(result(1).getDouble(2) == 312371.0) + assert(result(1).getDouble(2) === 312371.0) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assert(scanExec.size === 1) - val parquetScan = scanExec(0) + val parquetScan = scanExec.head assert(parquetScan.nodeName.startsWith("Scan parquet")) - assert(parquetScan.metrics("numFiles").value == 201) + assert(parquetScan.metrics("numFiles").value === 201) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - - assert(addFiles.size == 201) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-03-31")).size == 2) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-01-01")).size == 4) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-02-21")).size == 3) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + + assert(addFiles.size === 201) + assert(addFiles.count(_.partitionValues("l_shipdate").equals("1993-03-31")) === 2) + assert(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01")) === 4) + assert(addFiles.count(_.partitionValues("l_shipdate").equals("1993-02-21")) === 3) } } @@ -739,14 +732,14 @@ class GlutenClickHouseDeltaParquetWriteSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assert(scanExec.size === 1) - val parquetScan = scanExec(0) + val parquetScan = scanExec.head assert(parquetScan.nodeName.startsWith("Scan parquet")) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 1) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 1) } } @@ -866,14 +859,14 @@ class GlutenClickHouseDeltaParquetWriteSuite case f: FileSourceScanExecTransformer => f case w: WholeStageTransformer => w } - assert(plans.size == 4) + assert(plans.size === 4) val parquetScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] assert(parquetScan.nodeName.startsWith("Scan parquet")) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 1) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 1) } val result = spark.read @@ -881,7 +874,7 @@ class GlutenClickHouseDeltaParquetWriteSuite .load(dataPath) .where("l_shipdate = date'1998-09-02'") .count() - assert(result == 183) + assert(result === 183) } test( @@ -914,7 +907,7 @@ class GlutenClickHouseDeltaParquetWriteSuite .format("delta") .load(dataPath) .count() - assert(result == 2418) + assert(result === 2418) } test( @@ -948,7 +941,7 @@ class GlutenClickHouseDeltaParquetWriteSuite .format("delta") .load(dataPath) .count() - assert(result == 21875) + assert(result === 21875) } } @@ -974,18 +967,18 @@ class GlutenClickHouseDeltaParquetWriteSuite .format("delta") .load(dataPath) .where("l_returnflag = 'Z'") - assert(df.count() == 1) + assert(df.count() === 1) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assert(scanExec.size === 1) val parquetScan = scanExec.head assert(parquetScan.nodeName.startsWith("Scan parquet")) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 4) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 4) } val clickhouseTable = DeltaTable.forPath(spark, dataPath) @@ -996,24 +989,24 @@ class GlutenClickHouseDeltaParquetWriteSuite .format("delta") .load(dataPath) .where("l_returnflag = 'X'") - assert(df.count() == 1) + assert(df.count() === 1) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assert(scanExec.size === 1) val parquetScan = scanExec.head assert(parquetScan.nodeName.startsWith("Scan parquet")) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 3) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 3) } val df = spark.read .format("delta") .load(dataPath) - assert(df.count() == 600572) + assert(df.count() === 600572) } test("test path based parquet delete with the delta") { @@ -1035,21 +1028,21 @@ class GlutenClickHouseDeltaParquetWriteSuite val df = spark.read .format("delta") .load(dataPath) - assert(df.count() == 600571) + assert(df.count() === 600571) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } val parquetScan = scanExec.head val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) - assert(addFiles.size == 4) + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 4) val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.delete("mod(l_orderkey, 3) = 2") val df1 = spark.read .format("delta") .load(dataPath) - assert(df1.count() == 400089) + assert(df1.count() === 400089) } test("test path based parquet upsert with the delta") { @@ -1069,7 +1062,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from delta.`$dataPath` |""".stripMargin) assert( - df0.collect().apply(0).get(0) == 600572 + df0.collect().apply(0).get(0) === 600572 ) upsertPathBasedSourceTableAndCheck(dataPath) } @@ -1106,7 +1099,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from delta.`$dataPath` |""".stripMargin) assert( - df1.collect().apply(0).get(0) == 600572 + 3506 + df1.collect().apply(0).get(0) === 600572 + 3506 ) } { @@ -1115,7 +1108,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from delta.`$dataPath` where l_returnflag = 'Z' |""".stripMargin) assert( - df2.collect().apply(0).get(0) == 3506 + df2.collect().apply(0).get(0) === 3506 ) } @@ -1125,7 +1118,7 @@ class GlutenClickHouseDeltaParquetWriteSuite | select count(*) from delta.`$dataPath` where l_orderkey > 10000000 |""".stripMargin) assert( - df3.collect().apply(0).get(0) == 3506 + df3.collect().apply(0).get(0) === 3506 ) } } @@ -1183,32 +1176,30 @@ class GlutenClickHouseDeltaParquetWriteSuite runTPCHQueryBySQL(1, sqlStr, compareResult = false) { df => val result = df.collect() - assert(result.size == 2) + assert(result.length === 2) assert(result(0).getString(0).equals("A")) assert(result(0).getString(1).equals("F")) - assert(result(0).getDouble(2) == 306633.0) + assert(result(0).getDouble(2) === 306633.0) assert(result(1).getString(0).equals("R")) assert(result(1).getString(1).equals("F")) - assert(result(1).getDouble(2) == 312371.0) + assert(result(1).getDouble(2) === 312371.0) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assert(scanExec.size === 1) - val parquetScan = scanExec(0) + val parquetScan = scanExec.head assert(parquetScan.nodeName.startsWith("Scan parquet")) - assert(parquetScan.metrics("numFiles").value == 200) + assert(parquetScan.metrics("numFiles").value === 200) val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile]) + val addFiles = fileIndex.matchingFiles(Nil, Nil) - assert(addFiles.size == 200) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-03-31")).size == 2) - assert( - addFiles.filter(_.partitionValues.get("l_shipdate").get.equals("1993-01-01")).size == 4) + assert(addFiles.size === 200) + assert(addFiles.count(_.partitionValues("l_shipdate").equals("1993-03-31")) === 2) + assert(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01")) === 4) } } @@ -1266,10 +1257,10 @@ class GlutenClickHouseDeltaParquetWriteSuite spark.sql("optimize lineitem_delta_parquet_optimize") val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize").collect() - assert(ret.apply(0).get(0) == 600572) + assert(ret.apply(0).get(0) === 600572) assert( - countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize")) == 24 + countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize")) === 24 ) } } @@ -1303,26 +1294,26 @@ class GlutenClickHouseDeltaParquetWriteSuite spark.sql("optimize lineitem_delta_parquet_optimize_p2") val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test3") if (sparkVersion.equals("3.2")) { - assert(job_ids.size == 7) // WILL trigger actual merge job + assert(job_ids.length === 7) // WILL trigger actual merge job } else { - assert(job_ids.size == 8) // WILL trigger actual merge job + assert(job_ids.length === 8) // WILL trigger actual merge job } spark.sparkContext.clearJobGroup() val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect() - assert(ret.apply(0).get(0) == 600572) + assert(ret.apply(0).get(0) === 600572) - assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23) + assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) === 23) spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 5) + assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) === 5) } else { - assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7) + assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) === 7) } val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect() - assert(ret2.apply(0).get(0) == 600572) + assert(ret2.apply(0).get(0) === 600572) } testSparkVersionLE33("test parquet optimize parallel delete") { @@ -1341,18 +1332,18 @@ class GlutenClickHouseDeltaParquetWriteSuite spark.sql("optimize lineitem_delta_parquet_optimize_p4") val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect() - assert(ret.apply(0).get(0) == 600572) + assert(ret.apply(0).get(0) === 600572) - assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149) + assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) === 149) spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 23) + assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) === 23) } else { - assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25) + assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) === 25) } val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect() - assert(ret2.apply(0).get(0) == 600572) + assert(ret2.apply(0).get(0) === 600572) } } @@ -1360,8 +1351,8 @@ class GlutenClickHouseDeltaParquetWriteSuite val dataPath = s"$basePath/lineitem_delta_parquet_optimize_path_based" clearDataPath(dataPath) withSQLConf( - ("spark.databricks.delta.optimize.maxFileSize" -> "1000000"), - ("spark.databricks.delta.optimize.minFileSize" -> "838000")) { + "spark.databricks.delta.optimize.maxFileSize" -> "1000000", + "spark.databricks.delta.optimize.minFileSize" -> "838000") { val sourceDF = spark.sql(s""" |select /*+ REPARTITION(50) */ * from lineitem @@ -1385,25 +1376,25 @@ class GlutenClickHouseDeltaParquetWriteSuite } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() - assert(ret.apply(0).get(0) == 600572) + assert(ret.apply(0).get(0) === 600572) } withSQLConf( - ("spark.databricks.delta.optimize.maxFileSize" -> "10000000"), - ("spark.databricks.delta.optimize.minFileSize" -> "1000000")) { + "spark.databricks.delta.optimize.maxFileSize" -> "10000000", + "spark.databricks.delta.optimize.minFileSize" -> "1000000") { val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() clickhouseTable.vacuum(0.0) if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(dataPath)) == 6) + assert(countFiles(new File(dataPath)) === 6) } else { - assert(countFiles(new File(dataPath)) == 12) + assert(countFiles(new File(dataPath)) === 12) } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() - assert(ret.apply(0).get(0) == 600572) + assert(ret.apply(0).get(0) === 600572) } // now merge all parts (testing merging from merged parts) @@ -1412,13 +1403,13 @@ class GlutenClickHouseDeltaParquetWriteSuite clickhouseTable.vacuum(0.0) if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(dataPath)) == 5) + assert(countFiles(new File(dataPath)) === 5) } else { - assert(countFiles(new File(dataPath)) == 13) + assert(countFiles(new File(dataPath)) === 13) } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() - assert(ret.apply(0).get(0) == 600572) + assert(ret.apply(0).get(0) === 600572) } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 3b7606daac6b5..7b1a79586eee0 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.utils.Arm + import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 @@ -43,6 +45,8 @@ class GlutenClickHouseMergeTreeWriteSuite override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" override protected val queriesResults: String = rootPath + "mergetree-queries-output" + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + /** Run Gluten + ClickHouse Backend with SortShuffleManager */ override protected def sparkConf: SparkConf = { super.sparkConf @@ -52,15 +56,9 @@ class GlutenClickHouseMergeTreeWriteSuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.files.maxPartitionBytes", "20000000") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows", - "100000") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", - "false") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", - "8192") + .setCHSettings("min_insert_block_size_rows", 100000) + .setCHSettings("mergetree.merge_after_insert", false) + .setCHSettings("input_format_parquet_max_block_size", 8192) } override protected def createTPCHNotNullTables(): Unit = { @@ -152,7 +150,7 @@ class GlutenClickHouseMergeTreeWriteSuite val planNodeJson = wholeStageTransformer.substraitPlanJson assert( !planNodeJson - .replaceAll("\\\n", "") + .replaceAll("\n", "") .replaceAll(" ", "") .contains("\"input\":{\"filter\":{")) } @@ -1201,7 +1199,7 @@ class GlutenClickHouseMergeTreeWriteSuite // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 val partDir = directory.listFiles().filter(f => f.getName.length > 20).head val columnsFile = new File(partDir, "columns.txt") - val columns = Source.fromFile(columnsFile).getLines().mkString + val columns = Arm.withResource(Source.fromFile(columnsFile))(_.getLines().mkString) assert(columns.contains("`l_returnflag` LowCardinality(Nullable(String))")) assert(columns.contains("`l_linestatus` LowCardinality(Nullable(String))")) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index dfc5fbd3b37e0..5e0c1f5e6da69 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -53,7 +53,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/" val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/" - val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020" + val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020" val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" @@ -74,16 +74,13 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu } override protected def sparkConf: SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConf._ val conf = super.sparkConf .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) - .set( - "spark.gluten.sql.columnar.backend.ch.use.v2", - ClickHouseConfig.DEFAULT_USE_DATASOURCE_V2) + .set(ClickHouseConfig.USE_DATASOURCE_V2, ClickHouseConfig.DEFAULT_USE_DATASOURCE_V2) .set("spark.gluten.sql.enable.native.validation", "false") .set("spark.sql.warehouse.dir", warehouse) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path", - "/tmp/user_defined") + .setCHConfig("user_defined_path", "/tmp/user_defined") if (UTSystemParameters.testMergeTreeOnObjectStorage) { conf .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) @@ -92,70 +89,32 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) .set("spark.hadoop.fs.s3a.path.style.access", "true") .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type", - "s3_gluten") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.endpoint", - WHOLE_PATH) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.access_key_id", - S3_ACCESS_KEY) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.secret_access_key", - S3_SECRET_KEY) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.metadata_path", - S3_METADATA_PATH) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.type", - "cache") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.disk", - "s3") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.path", - S3_CACHE_PATH) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.max_size", - "10Gi") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes", - "main") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk", - "s3_cache") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type", - "hdfs_gluten") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint", - HDFS_URL_ENDPOINT + "/") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path", - HDFS_METADATA_PATH) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type", - "cache") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk", - "hdfs") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path", - HDFS_CACHE_PATH) - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size", - "10Gi") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes", - "main") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk", - "hdfs_cache") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit", - "false") - .set("spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica", "1") + .setCHConfig( + "storage_configuration.disks.s3.type" -> "s3_gluten", + "storage_configuration.disks.s3.endpoint" -> WHOLE_PATH, + "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY, + "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY, + "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH, + "storage_configuration.disks.s3_cache.type" -> "cache", + "storage_configuration.disks.s3_cache.disk" -> "s3", + "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH, + "storage_configuration.disks.s3_cache.max_size" -> "10Gi", + "storage_configuration.policies.__s3_main.volumes" -> "main", + "storage_configuration.policies.__s3_main.volumes.main.disk" -> "s3_cache" + ) + .setCHConfig( + "storage_configuration.disks.hdfs.type" -> "hdfs_gluten", + "storage_configuration.disks.hdfs.endpoint" -> s"$HDFS_URL_ENDPOINT/", + "storage_configuration.disks.hdfs.metadata_path" -> HDFS_METADATA_PATH, + "storage_configuration.disks.hdfs_cache.type" -> "cache", + "storage_configuration.disks.hdfs_cache.disk" -> "hdfs", + "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH, + "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi", + "storage_configuration.policies.__hdfs_main.volumes" -> "main", + "storage_configuration.policies.__hdfs_main.volumes.main.disk" -> "hdfs_cache", + "hdfs.dfs_client_read_shortcircuit" -> "false", + "hdfs.dfs_default_replica" -> "1" + ) } else { conf }