Skip to content

Commit

Permalink
Add CHConf
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 9, 2024
1 parent 2d0ef3e commit a1661ce
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,21 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String =
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".affinity.mode"
val SOFT: String = "soft"
val FORCE: String = "force"
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE_DEFAULT = SOFT

val GLUTEN_MAX_BLOCK_SIZE: String =
private val GLUTEN_MAX_BLOCK_SIZE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_settings.max_block_size"
// Same as default value in clickhouse
val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
private val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
private val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256
private val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256

def affinityMode: String = {
SparkEnv.get.conf
Expand Down Expand Up @@ -364,7 +364,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
def enableReorderHashJoinTables(): Boolean = {
SparkEnv.get.conf.getBoolean(
"spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables",
true
defaultValue = true
)
}
// The threshold to reorder hash join tables, if The result of dividing two tables' size is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf

object CHConf {

private val CH = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + "."
private val CH_SETTINGS = CH + "runtime_settings."
private val CH_CONFIG = CH + "runtime_config."
implicit class GlutenCHConf(conf: SparkConf) {
def setCHSettings(settings: (String, String)*): SparkConf = {
settings.foreach { case (k, v) => conf.set(CH_SETTINGS + k, v) }
conf
}

def setCHSettings[T](k: String, v: T): SparkConf = {
conf.set(CH_SETTINGS + k, v.toString)
conf
}

def setCHConfig(config: (String, String)*): SparkConf = {
config.foreach { case (k, v) => conf.set(CH_CONFIG + k, v) }
conf
}

def setCHConfig[T](k: String, v: T): SparkConf = {
conf.set(CH_CONFIG + k, v.toString)
conf
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,10 @@ class CHListenerApi extends ListenerApi with Logging {
JniLibLoader.loadFromPath(executorLibPath, true)
}
// Add configs
conf.set(
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config.timezone",
conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID))
conf.set(
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config" +
s".local_engine.settings.log_processors_profiles",
"true")
import org.apache.gluten.backendsapi.clickhouse.CHConf._
conf.setCHConfig(
"timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID),
"local_engine.settings.log_processors_profiles" -> "true")

// add memory limit for external sort
val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
Expand Down
Loading

0 comments on commit a1661ce

Please sign in to comment.