Skip to content

Commit

Permalink
More CH Conf
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 19, 2024
1 parent dab6d7f commit 4f1adec
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class GlutenClickHouseColumnarMemorySortShuffleSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle", "true")

// TODO: forceMemorySortShuffle
}

test("TPCH Q1") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class GlutenClickHouseS3SourceSuite extends GlutenClickHouseTPCHAbstractSuite {
override protected val queriesResults: String = rootPath + "queries-output"

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
Expand All @@ -44,10 +46,8 @@ class GlutenClickHouseS3SourceSuite extends GlutenClickHouseTPCHAbstractSuite {
.set("spark.hadoop.fs.s3a.path.style.access", "true")
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.s3.local_cache.enabled", "true")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.s3.local_cache.cache_path",
"/data/gluten-ch-cache-dir")
.setCHConfig("s3.local_cache.enabled", true)
.setCHConfig("s3.local_cache.cache_path", "/data/gluten-ch-cache-dir")
}

override protected val createNullableTables = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
version(0) + "." + version(1)
}

val CH_CONFIG_PREFIX: String = "spark.gluten.sql.columnar.backend.ch.runtime_config"
val CH_SETTING_PREFIX: String = "spark.gluten.sql.columnar.backend.ch.runtime_settings"

val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
Expand Down Expand Up @@ -79,6 +76,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

val conf = super.sparkConf
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set(ClickHouseConfig.USE_DATASOURCE_V2, ClickHouseConfig.DEFAULT_USE_DATASOURCE_V2)
Expand Down Expand Up @@ -115,40 +113,24 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
"storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
"storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
"storage_configuration.policies.__hdfs_main.volumes" -> "main",
"storage_configuration.policies.__hdfs_main.volumes.main.disk" -> "hdfs_cache",
"storage_configuration.policies.__hdfs_main.volumes.main.disk" -> "hdfs_cache"
)
.setCHConfig(
"storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
"storage_configuration.disks.hdfs2.endpoint" -> s"$HDFS_URL_ENDPOINT/",
"storage_configuration.disks.hdfs2.metadata_path" -> HDFS_METADATA_PATH,
"storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
"storage_configuration.disks.hdfs_cache2.type" -> "cache",
"storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
"storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
"storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
"storage_configuration.policies.__hdfs_main_rocksdb.volumes" -> "main",
"storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk" -> "hdfs_cache2"
)
.setCHConfig(
"hdfs.dfs_client_read_shortcircuit" -> "false",
"hdfs.dfs_default_replica" -> "1"
)
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.type",
"hdfs_gluten")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.endpoint",
HDFS_URL_ENDPOINT + "/")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.metadata_path",
HDFS_METADATA_PATH)
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.metadata_type",
"rocksdb")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.type",
"cache")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.disk",
"hdfs2")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.path",
HDFS_CACHE_PATH)
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.max_size",
"10Gi")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main_rocksdb.volumes",
"main")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk",
"hdfs_cache2")
} else {
conf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class GlutenClickHouseHiveTableSuite
with AdaptiveSparkPlanHelper {

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.memory.offHeap.enabled", "true")
Expand All @@ -60,10 +62,10 @@ class GlutenClickHouseHiveTableSuite
.set("spark.gluten.sql.parquet.maxmin.index", "true")
.set(
"spark.sql.warehouse.dir",
getClass.getResource("/").getPath + "tests-working-home/spark-warehouse")
this.getClass.getResource("/").getPath + "tests-working-home/spark-warehouse")
.set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.gluten.supported.hive.udfs", "my_add")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.setCHConfig("use_local_format", true)
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set(
"spark.sql.catalog.spark_catalog",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ class GlutenClickHouseMergeTreeCacheDataSuite
}

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.setCHConfig("logger.level", "error")
.set("spark.gluten.soft-affinity.enabled", "true")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data")
.setCHSettings("mergetree.merge_after_insert", false)
.setCHConfig("path", "/data")
}

override protected def beforeEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,22 @@ class GlutenClickHouseMergeTreeOptimizeSuite

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"10000"
)
.setCHConfig("logger.level", "error")
.setCHSettings("min_insert_block_size_rows", 10000)
.set(
"spark.databricks.delta.retentionDurationCheck.enabled",
"false"
) // otherwise, RETAIN 0 HOURS will fail
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
"8192")
.setCHSettings("mergetree.merge_after_insert", false)
.setCHSettings("input_format_parquet_max_block_size", 8192)
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
Expand All @@ -54,15 +56,10 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.files.maxPartitionBytes", "20000000")
.set("spark.ui.enabled", "true")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
"8192")
.setCHSettings("min_insert_block_size_rows", 100000)
.setCHSettings("mergetree.merge_after_insert", false)
.setCHSettings("input_format_parquet_max_block_size", 8192)

}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
}

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data") // for local test
.setCHConfig("logger.level", "error")
.setCHSettings("mergetree.merge_after_insert", false)
.setCHConfig("path", "/data")
}

override protected def beforeEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
}

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data")
.setCHConfig("logger.level", "error")
.setCHSettings("mergetree.merge_after_insert", false)
.setCHConfig("path", "/data")
}

override protected def beforeEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.mergetree

import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.backendsapi.clickhouse.CHConf._
import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -62,8 +62,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data")
.setCHConfig("logger.level", "error")
.setCHConfig("path", "/data")
}

override protected def beforeEach(): Unit = {
Expand Down Expand Up @@ -684,8 +684,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite

withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
CHConf.settingsKey("mergetree.insert_without_local_storage") -> "true",
CHConf.settingsKey("mergetree.merge_after_insert") -> "true"
settingsKey("mergetree.insert_without_local_storage") -> "true",
settingsKey("mergetree.merge_after_insert") -> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $tableName;
Expand Down Expand Up @@ -757,7 +757,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
| AND l_quantity < 24
|""".stripMargin

withSQLConf(CHConf.settingsKey("enabled_driver_filter_mergetree_index") -> "true") {
withSQLConf(settingsKey("enabled_driver_filter_mergetree_index") -> "true") {
runTPCHQueryBySQL(6, sqlStr) {
df =>
val scanExec = collect(df.queryExecution.executedPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ class GlutenParquetColumnIndexSuite
val chFileScan = chScanPlan.head
assertResult(scanOutput)(chFileScan.longMetric("numOutputRows").value)
}
override protected def sparkConf: SparkConf =
override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.setCHConfig("use_local_format", true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends GlutenClickHouseTPC

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "sort")
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.memory.offHeap.size", "6g")
.set("spark.gluten.sql.columnar.backend.ch.runtime_settings.join_algorithm", "grace_hash")
.set("spark.gluten.sql.columnar.backend.ch.runtime_settings.max_bytes_in_join", "314572800")
.setCHSettings("join_algorithm", "grace_hash")
.setCHSettings("max_bytes_in_join", 314572800)
.setMaster("local[2]")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution.tpch

import org.apache.gluten.backendsapi.clickhouse.CHConf._
import org.apache.gluten.execution.{CHNativeCacheManager, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -43,14 +44,16 @@ class GlutenClickHouseHDFSSuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set(s"$CH_CONFIG_PREFIX.use_local_format", "true")
.setCHConfig("use_local_format", true)
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true")
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name)
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath)
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi")
.set(s"$CH_CONFIG_PREFIX.reuse_disk_cache", "false")
.setCHConfig("gluten_cache.local.enabled", "true")
.setCHConfig("gluten_cache.local.name", cache_name)
.setCHConfig("gluten_cache.local.path", hdfsCachePath)
.setCHConfig("gluten_cache.local.max_size", "10Gi")
.setCHConfig("reuse_disk_cache", "false")
.set("spark.sql.adaptive.enabled", "false")

// TODO: spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down Expand Up @@ -123,7 +126,7 @@ class GlutenClickHouseHDFSSuite

ignore("test no cache by query") {
withSQLConf(
s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache" -> "true") {
settingsKey("read_from_filesystem_cache_if_exists_otherwise_bypass_cache") -> "true") {
runWithoutCache()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ class GlutenClickHouseTPCHParquetAQESuite

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._

super.sparkConf
.set("spark.shuffle.manager", "sort")
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.setCHConfig("use_local_format", true)
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
}

Expand Down

0 comments on commit 4f1adec

Please sign in to comment.