Skip to content

Commit

Permalink
Add CHConf
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 10, 2024
1 parent 4ce5162 commit 614e4e4
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object ClickhouseTable {
val badOptions = hadoopConf.filterKeys {
k => !DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
if (!badOptions.isEmpty) {
if (badOptions.nonEmpty) {
throw DeltaErrors.unsupportedDeltaTableForPathHadoopConf(badOptions)
}
val fileSystemOptions: Map[String, String] = hadoopConf.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,21 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String =
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".affinity.mode"
val SOFT: String = "soft"
val FORCE: String = "force"
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE_DEFAULT = SOFT

val GLUTEN_MAX_BLOCK_SIZE: String =
private val GLUTEN_MAX_BLOCK_SIZE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_settings.max_block_size"
// Same as default value in clickhouse
val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
private val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
private val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256
private val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256

def affinityMode: String = {
SparkEnv.get.conf
Expand Down Expand Up @@ -364,7 +364,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
def enableReorderHashJoinTables(): Boolean = {
SparkEnv.get.conf.getBoolean(
"spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables",
true
defaultValue = true
)
}
// The threshold to reorder hash join tables, if The result of dividing two tables' size is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf

object CHConf {

private val CH = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + "."
private val CH_SETTINGS = CH + "runtime_settings."
private val CH_CONFIG = CH + "runtime_config."
implicit class GlutenCHConf(conf: SparkConf) {
def setCHSettings(settings: (String, String)*): SparkConf = {
settings.foreach { case (k, v) => conf.set(CH_SETTINGS + k, v) }
conf
}

def setCHSettings[T](k: String, v: T): SparkConf = {
conf.set(CH_SETTINGS + k, v.toString)
conf
}

def setCHConfig(config: (String, String)*): SparkConf = {
config.foreach { case (k, v) => conf.set(CH_CONFIG + k, v) }
conf
}

def setCHConfig[T](k: String, v: T): SparkConf = {
conf.set(CH_CONFIG + k, v.toString)
conf
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,10 @@ class CHListenerApi extends ListenerApi with Logging {
JniLibLoader.loadFromPath(executorLibPath, true)
}
// Add configs
conf.set(
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config.timezone",
conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID))
conf.set(
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config" +
s".local_engine.settings.log_processors_profiles",
"true")
import org.apache.gluten.backendsapi.clickhouse.CHConf._
conf.setCHConfig(
"timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID),
"local_engine.settings.log_processors_profiles" -> "true")

// add memory limit for external sort
val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ object ClickHouseConfig {
val NAME = "clickhouse"
val ALT_NAME = "clickhouse"
val METADATA_DIR = "_delta_log"
val FORMAT_ENGINE = "engine"
val DEFAULT_ENGINE = "mergetree"
val OPT_NAME_PREFIX = "clickhouse."
private val FORMAT_ENGINE = "engine"
private val DEFAULT_ENGINE = "mergetree"
private val OPT_NAME_PREFIX = "clickhouse."

@deprecated
// Whether to use MergeTree DataSource V2 API, default is false, fall back to V1.
Expand All @@ -53,8 +53,11 @@ object ClickHouseConfig {
if (!configurations.contains(FORMAT_ENGINE)) {
configurations += (FORMAT_ENGINE -> DEFAULT_ENGINE)
} else {
val engineValue = configurations.get(FORMAT_ENGINE)
if (!engineValue.equals(DEFAULT_ENGINE) && !engineValue.equals("parquet")) {
if (
!configurations
.get(FORMAT_ENGINE)
.exists(s => s.equals(DEFAULT_ENGINE) || s.equals("parquet"))
) {
configurations += (FORMAT_ENGINE -> DEFAULT_ENGINE)
}
}
Expand All @@ -80,8 +83,7 @@ object ClickHouseConfig {
}

def isMergeTreeFormatEngine(configuration: Map[String, String]): Boolean = {
configuration.contains(FORMAT_ENGINE) &&
configuration.get(FORMAT_ENGINE).get.equals(DEFAULT_ENGINE)
configuration.get(FORMAT_ENGINE).exists(_.equals(DEFAULT_ENGINE))
}

/** Get the related clickhouse option when using DataFrameWriter / DataFrameReader */
Expand Down
Loading

0 comments on commit 614e4e4

Please sign in to comment.