Skip to content

Commit

Permalink
Split config to multiple modules
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Jan 21, 2025
1 parent c8284e5 commit 49d9e49
Show file tree
Hide file tree
Showing 28 changed files with 632 additions and 689 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
*/
package org.apache.spark.shuffle

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConf}
import org.apache.gluten.execution.ColumnarNativeIterator
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.vectorized._

import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
Expand Down Expand Up @@ -80,10 +78,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
nativeBufferSize,
capitalizedCompressionCodec,
compressionLevel,
GlutenConfig.get.chColumnarShuffleSpillThreshold,
CHConf.get.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.get.chColumnarForceMemorySortShuffle
CHConf.get.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

object CHConf {
Expand Down Expand Up @@ -58,16 +59,66 @@ object CHConf {

def get: CHConf = new CHConf(SQLConf.get)

import SQLConf._
import GlutenConfig._

val ENABLE_ONEPIPELINE_MERGETREE_WRITE =
buildConf(prefixOf("mergetree.write.pipeline"))
.doc("Using one pipeline to write data to MergeTree table in Spark 3.5")
.booleanConf
.createWithDefault(false)

val COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD =
buildConf("spark.gluten.sql.columnar.backend.ch.spillThreshold")
.internal()
.doc("Shuffle spill threshold on ch backend")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("0MB")

val COLUMNAR_CH_MAX_SORT_BUFFER_SIZE =
buildConf("spark.gluten.sql.columnar.backend.ch.maxSortBufferSize")
.internal()
.doc("The maximum size of sort shuffle buffer in CH backend.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("0")

val COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE =
buildConf("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle")
.internal()
.doc("Whether to force to use memory sort shuffle in CH backend. ")
.booleanConf
.createWithDefault(false)

val ENABLE_CH_REWRITE_DATE_CONVERSION =
buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion")
.internal()
.doc(
"Rewrite the conversion between date and string."
+ "For example `to_date(from_unixtime(unix_timestamp(stringType, 'yyyyMMdd')))`"
+ " will be rewritten to `to_date(stringType)`")
.booleanConf
.createWithDefault(true)
}

class CHConf(conf: SQLConf) extends GlutenConfig(conf) {
import CHConf._

def enableOnePipelineMergeTreeWrite: Boolean =
conf.getConf(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE)
getConf(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE)

def chColumnarShuffleSpillThreshold: Long = {
val threshold = getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
if (threshold == 0) {
(taskOffHeapMemorySize * 0.9).toLong
} else {
threshold
}
}

def chColumnarMaxSortBufferSize: Long = getConf(COLUMNAR_CH_MAX_SORT_BUFFER_SIZE)

def chColumnarForceMemorySortShuffle: Boolean =
getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)

def enableCHRewriteDateConversion: Boolean =
getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -43,7 +44,7 @@ class RewriteToDateExpresstionRule(spark: SparkSession) extends Rule[LogicalPlan
if (
plan.resolved &&
GlutenConfig.get.enableGluten &&
GlutenConfig.get.enableCHRewriteDateConversion
CHConf.get.enableCHRewriteDateConversion
) {
visitPlan(plan)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.shuffle

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConf}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ColumnarNativeIterator
import org.apache.gluten.memory.CHThreadGroup
Expand Down Expand Up @@ -59,9 +59,9 @@ class CHColumnarShuffleWriter[K, V](
conf,
compressionCodec,
GlutenConfig.get.columnarShuffleCodecBackend.orNull)
private val maxSortBufferSize = GlutenConfig.get.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle = GlutenConfig.get.chColumnarForceMemorySortShuffle
private val spillThreshold = GlutenConfig.get.chColumnarShuffleSpillThreshold
private val maxSortBufferSize = CHConf.get.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle = CHConf.get.chColumnarForceMemorySortShuffle
private val spillThreshold = CHConf.get.chColumnarShuffleSpillThreshold
private val jniWrapper = new CHShuffleSplitterJniWrapper
// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class GlutenClickHouseSyntheticDataSuite
override protected def afterAll(): Unit = {
DeltaLog.clearCache()
super.afterAll()
// init GlutenConfig in the next beforeAll
GlutenConfig.ins = null
}

test("test all data types all agg") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
}

FileUtils.forceDelete(new File(basePath))
// init GlutenConfig in the next beforeAll
GlutenConfig.ins = null
}

protected def runTPCDSQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,6 @@ abstract class GlutenClickHouseTPCHAbstractSuite
ClickhouseSnapshot.clearAllFileStatusCache()
DeltaLog.clearCache()
super.afterAll()
// init GlutenConfig in the next beforeAll
GlutenConfig.ins = null
}

override protected def runTPCHQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -233,7 +234,7 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit

Seq(("true", false), ("false", true)).foreach(
conf => {
withSQLConf((GlutenConfig.ENABLE_CH_REWRITE_DATE_CONVERSION.key, conf._1)) {
withSQLConf((CHConf.ENABLE_CH_REWRITE_DATE_CONVERSION.key, conf._1)) {
runSql(sqlStr)(
df => {
val project = df.queryExecution.executedPlan.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.setCHConfig("enable_streaming_aggregating", true)
.set(GlutenConfig.COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD.key, (1024 * 1024).toString)
.set(CHConf.COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD.key, (1024 * 1024).toString)
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ trait GlutenSQLTestUtils extends SparkFunSuite with SharedSparkSession {
override protected def afterAll(): Unit = {
DeltaLog.clearCache()
super.afterAll()
// init GlutenConfig in the next beforeAll
GlutenConfig.ins = null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.component.Component.BuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
Expand Down Expand Up @@ -166,8 +166,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
case DwrfReadFormat => None
case OrcReadFormat =>
if (!GlutenConfig.get.veloxOrcScanEnabled) {
Some(s"Velox ORC scan is turned off, ${GlutenConfig.VELOX_ORC_SCAN_ENABLED.key}")
if (!VeloxConfig.get.veloxOrcScanEnabled) {
Some(s"Velox ORC scan is turned off, ${VeloxConfig.VELOX_ORC_SCAN_ENABLED.key}")
} else {
val typeValidator: PartialFunction[StructField, String] = {
case StructField(_, arrayType: ArrayType, _, _)
Expand Down Expand Up @@ -542,7 +542,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def alwaysFailOnMapExpression(): Boolean = true

override def requiredChildOrderingForWindow(): Boolean = {
GlutenConfig.get.veloxColumnarWindowType.equals("streaming")
VeloxConfig.get.veloxColumnarWindowType.equals("streaming")
}

override def requiredChildOrderingForWindowGroupLimit(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, ArrowNativeBatch}
import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.VeloxConfig._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.extension.columnar.transition.Convention
Expand Down Expand Up @@ -51,6 +52,27 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()

// When the Velox cache is enabled, the Velox file handle cache should also be enabled.
// Otherwise, a 'reference id not found' error may occur.
if (
conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
!conf.getBoolean(COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, false)
) {
throw new IllegalArgumentException(
s"${COLUMNAR_VELOX_CACHE_ENABLED.key} and " +
s"${COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should be enabled together.")
}

if (
conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
conf.getSizeAsBytes(LOAD_QUANTUM.key, LOAD_QUANTUM.defaultValueString) > 8 * 1024 * 1024
) {
throw new IllegalArgumentException(
s"Velox currently only support up to 8MB load quantum size " +
s"on SSD cache enabled by ${COLUMNAR_VELOX_CACHE_ENABLED.key}, " +
s"User can set ${LOAD_QUANTUM.key} <= 8MB skip this error.")
}

// Generate HDFS client configurations.
HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)

Expand Down Expand Up @@ -180,7 +202,7 @@ class VeloxListenerApi extends ListenerApi with Logging {

// Workaround for https://github.com/apache/incubator-gluten/issues/7837
if (isDriver && !inLocalMode(conf)) {
parsed += (GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key -> "false")
parsed += (COLUMNAR_VELOX_CACHE_ENABLED.key -> "false")
}
NativeBackendInitializer.forBackend(VeloxBackend.BACKEND_NAME).initialize(parsed)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.SparkPlanExecApi
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.ReservedKeys
import org.apache.gluten.config.{GlutenConfig, ReservedKeys, VeloxConfig}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
Expand Down Expand Up @@ -348,8 +347,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
plan match {
case shuffle: ColumnarShuffleExchangeExec
if !shuffle.useSortBasedShuffle &&
GlutenConfig.get.veloxResizeBatchesShuffleInput =>
val range = GlutenConfig.get.veloxResizeBatchesShuffleInputRange
VeloxConfig.get.veloxResizeBatchesShuffleInput =>
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputRange
val appendBatches =
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
shuffle.withNewChildren(Seq(appendBatches))
Expand Down Expand Up @@ -625,7 +624,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
val useOffheapBroadcastBuildRelation =
GlutenConfig.get.enableBroadcastBuildRelationInOffheap
VeloxConfig.get.enableBroadcastBuildRelationInOffheap
val serialized: Array[ColumnarBatchSerializeResult] = child
.executeColumnar()
.mapPartitions(itr => Iterator(BroadcastUtils.serializeStream(itr)))
Expand Down Expand Up @@ -739,7 +738,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
// ISOControl characters, refer java.lang.Character.isISOControl(int)
val isoControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to '\u009F')).toList.mkString
// scalastyle:on nonascii
if (GlutenConfig.get.castFromVarcharAddTrimNode && c.child.dataType == StringType) {
if (VeloxConfig.get.castFromVarcharAddTrimNode && c.child.dataType == StringType) {
val trimStr = c.dataType match {
case BinaryType | _: ArrayType | _: MapType | _: StructType | _: UserDefinedType[_] =>
None
Expand Down Expand Up @@ -781,7 +780,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}

override def rewriteSpillPath(path: String): String = {
val fs = GlutenConfig.get.veloxSpillFileSystem
val fs = VeloxConfig.get.veloxSpillFileSystem
fs match {
case "local" =>
path
Expand Down
Loading

0 comments on commit 49d9e49

Please sign in to comment.