Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-2] Refactor: Move MergeTree related UT to merg…
Browse files Browse the repository at this point in the history
…etree module (apache#7279)

* Add CHConf

* Move MergeTree related UT to mergetree module

* fix scala stye

* spark32 spark33 spark35

* More CH Conf

* update per apache#7265

 - Use CHConf
 - use CHConf.prefixOf() instead of "spark.gluten.sql.columnar.backend.ch."
 - settingsKey => runtimeSettings
 - configKey => runtimeConfig
 - CH => CONF_PREFIX

* fix due to apache#7263
  • Loading branch information
baibaichen authored Sep 19, 2024
1 parent 87657f6 commit 4e30ed1
Show file tree
Hide file tree
Showing 50 changed files with 711 additions and 761 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -128,7 +128,7 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings")
CHConf.startWithSettings(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -128,7 +128,7 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings")
CHConf.startWithSettings(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object ClickhouseTable {
val badOptions = hadoopConf.filterKeys {
k => !DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
if (!badOptions.isEmpty) {
if (badOptions.nonEmpty) {
throw DeltaErrors.unsupportedDeltaTableForPathHadoopConf(badOptions)
}
val fileSystemOptions: Map[String, String] = hadoopConf.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -140,7 +140,7 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings")
CHConf.startWithSettings(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class CHBackend extends SubstraitBackend {
}

object CHBackend {
val BACKEND_NAME = "ch"
val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME)
val BACKEND_NAME: String = CHConf.BACKEND_NAME
}

object CHBackendSettings extends BackendSettingsApi with Logging {
Expand All @@ -74,40 +73,35 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
// experimental: when the files count per partition exceeds this threshold,
// it will put the files into one partition.
val GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".files.per.partition.threshold"
CHConf.prefixOf("files.per.partition.threshold")
val GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT = "-1"

private val GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".customized.shuffle.codec.enable"
CHConf.prefixOf("customized.shuffle.codec.enable")
private val GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE_DEFAULT = false
lazy val useCustomizedShuffleCodec: Boolean = SparkEnv.get.conf.getBoolean(
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE,
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE_DEFAULT
)

private val GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".customized.buffer.size"
CHConf.prefixOf("customized.buffer.size")
private val GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE_DEFAULT = 4096
lazy val customizeBufferSize: Int = SparkEnv.get.conf.getInt(
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE,
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE_DEFAULT
)

val GLUTEN_CLICKHOUSE_BROADCAST_CACHE_EXPIRED_TIME: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".broadcast.cache.expired.time"
CHConf.prefixOf("broadcast.cache.expired.time")
// unit: SECONDS, default 1 day
val GLUTEN_CLICKHOUSE_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT: Int = 86400

private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy")

// The algorithm for hash partition of the shuffle
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".shuffle.hash.algorithm"
CHConf.prefixOf("shuffle.hash.algorithm")
// valid values are: cityHash64 or sparkMurmurHash3_32
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "sparkMurmurHash3_32"
def shuffleHashAlgorithm: String = {
Expand All @@ -122,25 +116,19 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".affinity.mode"
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String = CHConf.prefixOf("affinity.mode")
val SOFT: String = "soft"
val FORCE: String = "force"
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE_DEFAULT = SOFT

val GLUTEN_MAX_BLOCK_SIZE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_settings.max_block_size"
private val GLUTEN_MAX_BLOCK_SIZE: String = CHConf.runtimeSettings("max_block_size")
// Same as default value in clickhouse
val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256
private val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
private val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
CHConf.runtimeConfig("max_source_concatenate_bytes")
private val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256

val GLUTEN_AQE_PROPAGATEEMPTY: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".aqe.propagate.empty.relation"
val GLUTEN_AQE_PROPAGATEEMPTY: String = CHConf.prefixOf("aqe.propagate.empty.relation")

def affinityMode: String = {
SparkEnv.get.conf
Expand Down Expand Up @@ -368,15 +356,15 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
// Need to enable AQE
def enableReorderHashJoinTables(): Boolean = {
SparkEnv.get.conf.getBoolean(
"spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables",
true
CHConf.prefixOf("enable_reorder_hash_join_tables"),
defaultValue = true
)
}
// The threshold to reorder hash join tables, if The result of dividing two tables' size is
// large then this threshold, reorder the tables. e.g. a/b > threshold or b/a > threshold
def reorderHashJoinTablesThreshold(): Int = {
SparkEnv.get.conf.getInt(
"spark.gluten.sql.columnar.backend.ch.reorder_hash_join_tables_thresdhold",
CHConf.prefixOf("reorder_hash_join_tables_thresdhold"),
10
)
}
Expand All @@ -385,8 +373,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
// for example, select a, b, sum(c+d) from t group by a, b with cube
def enablePushdownPreProjectionAheadExpand(): Boolean = {
SparkEnv.get.conf.getBoolean(
"spark.gluten.sql.columnar.backend.ch.enable_pushdown_preprojection_ahead_expand",
true
CHConf.prefixOf("enable_pushdown_preprojection_ahead_expand"),
defaultValue = true
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf

object CHConf {
private[clickhouse] val BACKEND_NAME: String = "ch"
private[clickhouse] val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME)
private val RUNTIME_SETTINGS: String = s"$CONF_PREFIX.runtime_settings"
private val RUNTIME_CONFIG = s"$CONF_PREFIX.runtime_config"
implicit class GlutenCHConf(conf: SparkConf) {
def setCHSettings(settings: (String, String)*): SparkConf = {
settings.foreach { case (k, v) => conf.set(runtimeSettings(k), v) }
conf
}

def setCHSettings[T](k: String, v: T): SparkConf = {
conf.set(runtimeSettings(k), v.toString)
conf
}

def setCHConfig(config: (String, String)*): SparkConf = {
config.foreach { case (k, v) => conf.set(runtimeConfig(k), v) }
conf
}

def setCHConfig[T](k: String, v: T): SparkConf = {
conf.set(runtimeConfig(k), v.toString)
conf
}
}

def prefixOf(key: String): String = s"$CONF_PREFIX.$key"
def runtimeConfig(key: String): String = s"$RUNTIME_CONFIG.$key"
def runtimeSettings(key: String): String = s"$RUNTIME_SETTINGS.$key"

def startWithSettings(key: String): Boolean = key.startsWith(RUNTIME_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,13 @@ class CHListenerApi extends ListenerApi with Logging {
JniLibLoader.loadFromPath(executorLibPath, true)
}
// Add configs
conf.set(
s"${CHBackend.CONF_PREFIX}.runtime_config.timezone",
conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID))
conf.set(
s"${CHBackend.CONF_PREFIX}.runtime_config" +
s".local_engine.settings.log_processors_profiles",
"true")
import org.apache.gluten.backendsapi.clickhouse.CHConf._
conf.setCHConfig(
"timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID),
"local_engine.settings.log_processors_profiles" -> "true")

// add memory limit for external sort
val externalSortKey = s"${CHBackend.CONF_PREFIX}.runtime_settings" +
s".max_bytes_before_external_sort"
val externalSortKey = CHConf.runtimeSettings("max_bytes_before_external_sort")
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,33 @@ class CHTransformerApi extends TransformerApi with Logging {
override def postProcessNativeConfig(
nativeConfMap: util.Map[String, String],
backendPrefix: String): Unit = {
val settingPrefix = backendPrefix + ".runtime_settings."

require(backendPrefix == CHConf.CONF_PREFIX)
if (nativeConfMap.getOrDefault("spark.memory.offHeap.enabled", "false").toBoolean) {
val offHeapSize =
nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", "0").toLong
if (offHeapSize > 0) {

// Only set default max_bytes_before_external_group_by for CH when it is not set explicitly.
val groupBySpillKey = settingPrefix + "max_bytes_before_external_group_by";
val groupBySpillKey = CHConf.runtimeSettings("max_bytes_before_external_group_by")
if (!nativeConfMap.containsKey(groupBySpillKey)) {
val groupBySpillValue = offHeapSize * 0.5
nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString)
}

val maxMemoryUsageKey = settingPrefix + "max_memory_usage";
val maxMemoryUsageKey = CHConf.runtimeSettings("max_memory_usage")
if (!nativeConfMap.containsKey(maxMemoryUsageKey)) {
val maxMemoryUsageValue = offHeapSize
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString)
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toString)
}

// Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash
val joinAlgorithmKey = settingPrefix + "join_algorithm";
val joinAlgorithmKey = CHConf.runtimeSettings("join_algorithm")
if (
nativeConfMap.containsKey(joinAlgorithmKey) &&
nativeConfMap.get(joinAlgorithmKey) == "grace_hash"
) {
val joinSpillKey = settingPrefix + "max_bytes_in_join";
val joinSpillKey = CHConf.runtimeSettings("max_bytes_in_join")
if (!nativeConfMap.containsKey(joinSpillKey)) {
val joinSpillValue = offHeapSize * 0.7
nativeConfMap.put(joinSpillKey, joinSpillValue.toLong.toString)
Expand All @@ -127,24 +128,24 @@ class CHTransformerApi extends TransformerApi with Logging {
}
}

val hdfsConfigPrefix = backendPrefix + ".runtime_config.hdfs."
injectConfig("spark.hadoop.input.connect.timeout", hdfsConfigPrefix + "input_connect_timeout")
injectConfig("spark.hadoop.input.read.timeout", hdfsConfigPrefix + "input_read_timeout")
injectConfig("spark.hadoop.input.write.timeout", hdfsConfigPrefix + "input_write_timeout")
val hdfsConfigPrefix = CHConf.runtimeConfig("hdfs")
injectConfig("spark.hadoop.input.connect.timeout", s"$hdfsConfigPrefix.input_connect_timeout")
injectConfig("spark.hadoop.input.read.timeout", s"$hdfsConfigPrefix.input_read_timeout")
injectConfig("spark.hadoop.input.write.timeout", s"$hdfsConfigPrefix.input_write_timeout")
injectConfig(
"spark.hadoop.dfs.client.log.severity",
hdfsConfigPrefix + "dfs_client_log_severity")
s"$hdfsConfigPrefix.dfs_client_log_severity")

// TODO: set default to true when metrics could be collected
// while ch query plan optimization is enabled.
val planOptKey = settingPrefix + "query_plan_enable_optimizations"
val planOptKey = CHConf.runtimeSettings("query_plan_enable_optimizations")
if (!nativeConfMap.containsKey(planOptKey)) {
nativeConfMap.put(planOptKey, "false")
}

// Respect spark config spark.sql.orc.compression.codec for CH backend
// TODO: consider compression or orc.compression in table options.
val orcCompressionKey = settingPrefix + "output_format_orc_compression_method"
val orcCompressionKey = CHConf.runtimeSettings("output_format_orc_compression_method")
if (!nativeConfMap.containsKey(orcCompressionKey)) {
if (nativeConfMap.containsKey("spark.sql.orc.compression.codec")) {
val compression = nativeConfMap.get("spark.sql.orc.compression.codec").toLowerCase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.expression

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.substrait.expression._
Expand Down Expand Up @@ -70,7 +70,7 @@ case class CHTruncTimestampTransformer(
if (
timeZoneIgnore && timeZoneId.nonEmpty &&
!timeZoneId.get.equalsIgnoreCase(
SQLConf.get.getConfString(s"${CHBackend.CONF_PREFIX}.runtime_config.timezone")
SQLConf.get.getConfString(s"${CHConf.runtimeConfig("timezone")}")
)
) {
throw new GlutenNotSupportException(
Expand Down Expand Up @@ -157,23 +157,23 @@ case class CHPosExplodeTransformer(
// Output (pos, col) when input is array type
val structType = StructType(
Array(
StructField("pos", IntegerType, false),
StructField("pos", IntegerType, nullable = false),
StructField("col", a.elementType, a.containsNull)))
ExpressionBuilder.makeScalarFunction(
funcId,
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
ConverterUtils.getTypeNode(structType, nullable = false))
case m: MapType =>
// Output (pos, key, value) when input is map type
val structType = StructType(
Array(
StructField("pos", IntegerType, false),
StructField("key", m.keyType, false),
StructField("pos", IntegerType, nullable = false),
StructField("key", m.keyType, nullable = false),
StructField("value", m.valueType, m.valueContainsNull)))
ExpressionBuilder.makeScalarFunction(
funcId,
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
ConverterUtils.getTypeNode(structType, nullable = false))
case _ =>
throw new GlutenNotSupportException(s"posexplode($childType) not supported yet.")
}
Expand Down Expand Up @@ -225,7 +225,7 @@ case class GetArrayItemTransformer(
Seq(IntegerType, getArrayItem.right.dataType),
FunctionConfig.OPT)
val addFunctionId = ExpressionBuilder.newScalarFunction(functionMap, addFunctionName)
val literalNode = ExpressionBuilder.makeLiteral(1.toInt, IntegerType, false)
val literalNode = ExpressionBuilder.makeLiteral(1, IntegerType, false)
rightNode = ExpressionBuilder.makeScalarFunction(
addFunctionId,
Lists.newArrayList(literalNode, rightNode),
Expand Down
Loading

0 comments on commit 4e30ed1

Please sign in to comment.