diff --git a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala index 72145f1b5f5c..23ee2543eceb 100644 --- a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala +++ b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala @@ -59,14 +59,14 @@ private class CHCelebornColumnarBatchSerializerInstance( with Logging { private lazy val conf = SparkEnv.get.conf - private lazy val gluten_conf = GlutenConfig.getConf + private lazy val gluten_conf = GlutenConfig.get private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT) private lazy val compressionLevel = GlutenShuffleUtils.getCompressionLevel( conf, compressionCodec, - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) + GlutenConfig.get.columnarShuffleCodecBackend.orNull) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { diff --git a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index 2263cda7eafb..c7d7286c0cc1 100644 --- a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -80,10 +80,10 @@ class CHCelebornColumnarShuffleWriter[K, V]( nativeBufferSize, capitalizedCompressionCodec, compressionLevel, - GlutenConfig.getConf.chColumnarShuffleSpillThreshold, + GlutenConfig.get.chColumnarShuffleSpillThreshold, CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher, - GlutenConfig.getConf.chColumnarForceMemorySortShuffle + GlutenConfig.get.chColumnarForceMemorySortShuffle || ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType) ) diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index a4411e83f220..f35fc021c7d4 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -68,7 +68,7 @@ class ClickhouseOptimisticTransaction( writeOptions: Option[DeltaOptions], isOptimize: Boolean, additionalConstraints: Seq[Constraint]): Seq[FileAction] = { - val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false) + val nativeWrite = GlutenConfig.get.enableNativeWriter.getOrElse(false) if (writingMergeTree) { // TODO: update FallbackByBackendSettings for mergetree always return true val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 97d862e13c06..bc710f15f212 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -282,11 +282,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } override def supportSortExec(): Boolean = { - GlutenConfig.getConf.enableColumnarSort + GlutenConfig.get.enableColumnarSort } override def supportSortMergeJoinExec(): Boolean = { - GlutenConfig.getConf.enableColumnarSortMergeJoin + GlutenConfig.get.enableColumnarSortMergeJoin } override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = { @@ -391,7 +391,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } override def enableNativeWriteFiles(): Boolean = { - GlutenConfig.getConf.enableNativeWriter.getOrElse(false) + GlutenConfig.get.enableNativeWriter.getOrElse(false) } override def supportCartesianProductExec(): Boolean = true diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 635228881566..fc04040f144f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -452,12 +452,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { val readBatchNumRows = metrics("avgReadBatchNumRows") val numOutputRows = metrics("numOutputRows") val dataSize = metrics("dataSize") - if (GlutenConfig.getConf.isUseCelebornShuffleManager) { + if (GlutenConfig.get.isUseCelebornShuffleManager) { val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CHCelebornColumnarBatchSerializer") val constructor = clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], classOf[SQLMetric]) constructor.newInstance(readBatchNumRows, numOutputRows, dataSize).asInstanceOf[Serializer] - } else if (GlutenConfig.getConf.isUseUniffleShuffleManager) { + } else if (GlutenConfig.get.isUseUniffleShuffleManager) { throw new UnsupportedOperationException("temporarily uniffle not support ch ") } else { new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala index 29d26410b9b0..0d31859c0796 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala @@ -95,7 +95,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg } case rangePartitoning: RangePartitioning => if ( - GlutenConfig.getConf.enableColumnarSort && + GlutenConfig.get.enableColumnarSort && RangePartitionerBoundsGenerator.supportedOrderings(rangePartitoning, child) ) { None diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala index 28d19fa29ac9..25042e81e15e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala @@ -39,8 +39,8 @@ class CommonSubexpressionEliminateRule(spark: SparkSession) extends Rule[Logical override def apply(plan: LogicalPlan): LogicalPlan = { val newPlan = if ( - plan.resolved && GlutenConfig.getConf.enableGluten - && GlutenConfig.getConf.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan) + plan.resolved && GlutenConfig.get.enableGluten + && GlutenConfig.get.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan) ) { lastPlan = plan visitPlan(plan) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala index 82051baeebc7..a79f474b2921 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala @@ -32,9 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE_EXPRESSION */ object CountDistinctWithoutExpand extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { - if ( - GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableCountDistinctWithoutExpand - ) { + if (GlutenConfig.get.enableGluten && GlutenConfig.get.enableCountDistinctWithoutExpand) { plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { case ae: AggregateExpression if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] && diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala index f2a0a549bc7e..821c1b8b2638 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala @@ -36,7 +36,7 @@ object ExtendedGeneratorNestedColumnAliasing { def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { case pj @ Project(projectList, f @ Filter(condition, g: Generate)) if canPruneGenerator(g.generator) && - GlutenConfig.getConf.enableExtendedColumnPruning && + GlutenConfig.get.enableExtendedColumnPruning && (SQLConf.get.nestedPruningOnExpressions || SQLConf.get.nestedSchemaPruningEnabled) => val attrToExtractValues = getAttributeToExtractValues(projectList ++ g.generator.children :+ condition, Seq.empty) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala index 6a788617a6c4..25346c0a7ffa 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala @@ -37,7 +37,7 @@ import scala.util.control.Breaks.{break, breakable} // queryStagePrepRules. case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - val glutenConf: GlutenConfig = GlutenConfig.getConf + val glutenConf: GlutenConfig = GlutenConfig.get plan.foreach { case bhj: BroadcastHashJoinExec => val buildSidePlan = bhj.buildSide match { @@ -106,8 +106,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend case Some(exchange @ BroadcastExchangeExec(mode, child)) => val isTransformable = if ( - !GlutenConfig.getConf.enableColumnarBroadcastExchange || - !GlutenConfig.getConf.enableColumnarBroadcastJoin + !GlutenConfig.get.enableColumnarBroadcastExchange || + !GlutenConfig.get.enableColumnarBroadcastJoin ) { ValidationResult.failed( "columnar broadcast exchange is disabled or " + @@ -146,12 +146,12 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPlan] { private val enableColumnarBroadcastJoin: Boolean = - GlutenConfig.getConf.enableColumnarBroadcastJoin && - GlutenConfig.getConf.enableColumnarBroadcastExchange + GlutenConfig.get.enableColumnarBroadcastJoin && + GlutenConfig.get.enableColumnarBroadcastExchange private val enableColumnarBroadcastNestedLoopJoin: Boolean = - GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled && - GlutenConfig.getConf.enableColumnarBroadcastExchange + GlutenConfig.get.broadcastNestedLoopJoinTransformerTransformerEnabled && + GlutenConfig.get.enableColumnarBroadcastExchange override def apply(plan: SparkPlan): SparkPlan = { plan.foreachUp { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala index fa8a37ffa2be..d495919dded1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala @@ -56,8 +56,8 @@ class RewriteDateTimestampComparisonRule(spark: SparkSession) override def apply(plan: LogicalPlan): LogicalPlan = { if ( plan.resolved && - GlutenConfig.getConf.enableGluten && - GlutenConfig.getConf.enableRewriteDateTimestampComparison + GlutenConfig.get.enableGluten && + GlutenConfig.get.enableRewriteDateTimestampComparison ) { visitPlan(plan) } else { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala index 6e8486330465..43c0206b58b9 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala @@ -42,8 +42,8 @@ class RewriteToDateExpresstionRule(spark: SparkSession) extends Rule[LogicalPlan override def apply(plan: LogicalPlan): LogicalPlan = { if ( plan.resolved && - GlutenConfig.getConf.enableGluten && - GlutenConfig.getConf.enableCHRewriteDateConversion + GlutenConfig.get.enableGluten && + GlutenConfig.get.enableCHRewriteDateConversion ) { visitPlan(plan) } else { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala index 8ab78dcff9bc..bc5e77f51d2c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala @@ -34,8 +34,8 @@ object WriteFilesWithBucketValue extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { if ( - GlutenConfig.getConf.enableGluten - && GlutenConfig.getConf.enableNativeWriter.getOrElse(false) + GlutenConfig.get.enableGluten + && GlutenConfig.get.enableNativeWriter.getOrElse(false) ) { plan.transformDown { case writeFiles: WriteFilesExec if writeFiles.bucketSpec.isDefined => diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala index 370d93d7e7fb..6795818f6c67 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala @@ -61,9 +61,9 @@ private class CHColumnarBatchSerializerInstance( GlutenShuffleUtils.getCompressionLevel( conf, compressionCodec, - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) + GlutenConfig.get.columnarShuffleCodecBackend.orNull) - private val useColumnarShuffle: Boolean = GlutenConfig.getConf.isUseColumnarShuffleManager + private val useColumnarShuffle: Boolean = GlutenConfig.get.isUseColumnarShuffleManager override def deserializeStream(in: InputStream): DeserializationStream = { // Don't use GlutenConfig in this method. It will execute in non task Thread. diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index b0595fd05dd7..acbc41ceed71 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -51,17 +51,17 @@ class CHColumnarShuffleWriter[K, V]( .map(_.getAbsolutePath) .mkString(",") private val subDirsPerLocalDir = blockManager.diskBlockManager.subDirsPerLocalDir - private val splitSize = GlutenConfig.getConf.maxBatchSize + private val splitSize = GlutenConfig.get.maxBatchSize private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) private val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT) private val compressionLevel = GlutenShuffleUtils.getCompressionLevel( conf, compressionCodec, - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) - private val maxSortBufferSize = GlutenConfig.getConf.chColumnarMaxSortBufferSize - private val forceMemorySortShuffle = GlutenConfig.getConf.chColumnarForceMemorySortShuffle - private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold + GlutenConfig.get.columnarShuffleCodecBackend.orNull) + private val maxSortBufferSize = GlutenConfig.get.chColumnarMaxSortBufferSize + private val forceMemorySortShuffle = GlutenConfig.get.chColumnarForceMemorySortShuffle + private val spillThreshold = GlutenConfig.get.chColumnarShuffleSpillThreshold private val jniWrapper = new CHShuffleSplitterJniWrapper // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala index b11e1e2bb306..a18aad0393bf 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala @@ -35,8 +35,8 @@ case class CHAggregateFunctionRewriteRule(spark: SparkSession) extends Rule[Logi case a: Aggregate => a.transformExpressions { case avgExpr @ AggregateExpression(avg: Average, _, _, _, _) - if GlutenConfig.getConf.enableCastAvgAggregateFunction && - GlutenConfig.getConf.enableColumnarHashAgg && + if GlutenConfig.get.enableCastAvgAggregateFunction && + GlutenConfig.get.enableColumnarHashAgg && !avgExpr.isDistinct && isDataTypeNeedConvert(avg.child.dataType) => AggregateExpression( avg.copy(child = Cast(avg.child, DoubleType)), diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala index 1e4d0b8841c9..164eaaf8ca8b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala @@ -34,11 +34,11 @@ class CHParquetWriterInjects extends CHFormatWriterInjects { sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec) val blockSize = options.getOrElse( GlutenConfig.PARQUET_BLOCK_SIZE, - GlutenConfig.getConf.columnarParquetWriteBlockSize.toString) + GlutenConfig.get.columnarParquetWriteBlockSize.toString) sparkOptions.put(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize) val blockRows = options.getOrElse( GlutenConfig.PARQUET_BLOCK_ROWS, - GlutenConfig.getConf.columnarParquetWriteBlockRows.toString) + GlutenConfig.get.columnarParquetWriteBlockRows.toString) sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows) sparkOptions } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 21d113248821..6a0e5d0c6d25 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -239,7 +239,7 @@ object CHExecUtil extends Logging { private def buildPartitioningOptions(nativePartitioning: NativePartitioning): IteratorOptions = { val options = new IteratorOptions - options.setBufferSize(GlutenConfig.getConf.maxBatchSize) + options.setBufferSize(GlutenConfig.get.maxBatchSize) options.setName(nativePartitioning.getShortName) options.setPartitionNum(nativePartitioning.getNumPartitions) options.setExpr(new String(nativePartitioning.getExprList)) @@ -345,8 +345,8 @@ object CHExecUtil extends Logging { val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] = if ( - GlutenConfig.getConf.isUseColumnarShuffleManager - || GlutenConfig.getConf.isUseCelebornShuffleManager + GlutenConfig.get.isUseColumnarShuffleManager + || GlutenConfig.get.isUseCelebornShuffleManager ) { newPartitioning match { case _ => diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index a4a97d43de08..a519fdab5f36 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -85,12 +85,12 @@ private class CelebornColumnarBatchSerializerInstance( null // uncompressed } val compressionCodecBackend = - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull - val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType + GlutenConfig.get.columnarShuffleCodecBackend.orNull + val shuffleWriterType = GlutenConfig.get.celebornShuffleWriterType .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER) val jniWrapper = ShuffleReaderJniWrapper.create(runtime) - val batchSize = GlutenConfig.getConf.maxBatchSize - val bufferSize = GlutenConfig.getConf.columnarShuffleReaderBufferSize + val batchSize = GlutenConfig.get.maxBatchSize + val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val handle = jniWrapper .make( cSchema.memoryAddress(), diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 165d68785d4b..389d32573025 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -127,7 +127,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( compressionLevel, compressionBufferSize, bufferCompressThreshold, - GlutenConfig.getConf.columnarShuffleCompressionMode, + GlutenConfig.get.columnarShuffleCompressionMode, conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, conf.get(SHUFFLE_SORT_USE_RADIXSORT), clientPushBufferMaxSize, @@ -138,7 +138,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), "celeborn", shuffleWriterType, - GlutenConfig.getConf.columnarShuffleReallocThreshold + GlutenConfig.get.columnarShuffleReallocThreshold ) runtime .memoryManager() diff --git a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 7cbf452f76f6..663fd3792e27 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -64,8 +64,8 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter extends RssShuffleWriter null)); + GlutenConfig.get().columnarShuffleCodecBackend().getOrElse(() -> null)); compressionBufferSize = GlutenShuffleUtils.getSortEvictBufferSize(sparkConf, compressionCodec); } @@ -158,7 +158,7 @@ protected void writeImpl(Iterator> records) { compressionLevel, compressionBufferSize, compressThreshold, - GlutenConfig.getConf().columnarShuffleCompressionMode(), + GlutenConfig.get().columnarShuffleCompressionMode(), (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()), bufferSize, diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index dba41b1f8d0c..a752e5a0f527 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -109,7 +109,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val filteredRootPaths = distinctRootPaths(rootPaths) if (filteredRootPaths.nonEmpty) { val resolvedPaths = - if (GlutenConfig.getConf.enableHdfsViewfs) { + if (GlutenConfig.get.enableHdfsViewfs) { ViewFileSystemUtils.convertViewfsToHdfs( filteredRootPaths, mutable.Map.empty[String, String], @@ -145,7 +145,7 @@ object VeloxBackendSettings extends BackendSettingsApi { def isCharType(stringType: StringType, metadata: Metadata): Boolean = { val charTypePattern = "char\\((\\d+)\\)".r - GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern + GlutenConfig.get.forceOrcCharTypeScanFallbackEnabled && charTypePattern .findFirstIn( CharVarcharUtils .getRawTypeString(metadata) @@ -158,7 +158,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val typeValidator: PartialFunction[StructField, String] = { // Parquet timestamp is not fully supported yet case StructField(_, TimestampType, _, _) - if GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled => + if GlutenConfig.get.forceParquetTimestampTypeScanFallbackEnabled => "TimestampType(force fallback)" } val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) @@ -170,7 +170,7 @@ object VeloxBackendSettings extends BackendSettingsApi { } case DwrfReadFormat => None case OrcReadFormat => - if (!GlutenConfig.getConf.veloxOrcScanEnabled) { + if (!GlutenConfig.get.veloxOrcScanEnabled) { Some(s"Velox ORC scan is turned off, ${GlutenConfig.VELOX_ORC_SCAN_ENABLED.key}") } else { val typeValidator: PartialFunction[StructField, String] = { @@ -315,7 +315,7 @@ object VeloxBackendSettings extends BackendSettingsApi { def validateFileFormat(): Option[String] = { format match { case _: ParquetFileFormat => None // Parquet is directly supported - case h: HiveFileFormat if GlutenConfig.getConf.enableHiveFileFormatWriter => + case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter => validateHiveFileFormat(h) // Parquet via Hive SerDe case _ => Some( @@ -375,7 +375,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportSortExec(): Boolean = true override def supportSortMergeJoinExec(): Boolean = { - GlutenConfig.getConf.enableColumnarSortMergeJoin + GlutenConfig.get.enableColumnarSortMergeJoin } override def supportWindowGroupLimitExec(rankLikeFunction: Expression): Boolean = { @@ -456,7 +456,7 @@ object VeloxBackendSettings extends BackendSettingsApi { } override def supportColumnarShuffleExec(): Boolean = { - val conf = GlutenConfig.getConf + val conf = GlutenConfig.get conf.enableColumnarShuffle && (conf.isUseGlutenShuffleManager || conf.isUseColumnarShuffleManager || conf.isUseCelebornShuffleManager @@ -513,7 +513,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def alwaysFailOnMapExpression(): Boolean = true override def requiredChildOrderingForWindow(): Boolean = { - GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming") + GlutenConfig.get.veloxColumnarWindowType.equals("streaming") } override def requiredChildOrderingForWindowGroupLimit(): Boolean = false @@ -523,13 +523,13 @@ object VeloxBackendSettings extends BackendSettingsApi { override def allowDecimalArithmetic: Boolean = true override def enableNativeWriteFiles(): Boolean = { - GlutenConfig.getConf.enableNativeWriter.getOrElse( + GlutenConfig.get.enableNativeWriter.getOrElse( SparkShimLoader.getSparkShims.enableNativeWriteFilesByDefault() ) } override def enableNativeArrowReadFiles(): Boolean = { - GlutenConfig.getConf.enableNativeArrowReader + GlutenConfig.get.enableNativeArrowReader } override def shouldRewriteCount(): Boolean = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 5975a20a267d..f8a141d00005 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -345,8 +345,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { plan match { case shuffle: ColumnarShuffleExchangeExec if !shuffle.useSortBasedShuffle && - GlutenConfig.getConf.veloxResizeBatchesShuffleInput => - val range = GlutenConfig.getConf.veloxResizeBatchesShuffleInputRange + GlutenConfig.get.veloxResizeBatchesShuffleInput => + val range = GlutenConfig.get.veloxResizeBatchesShuffleInputRange val appendBatches = VeloxResizeBatchesExec(shuffle.child, range.min, range.max) shuffle.withNewChildren(Seq(appendBatches)) @@ -541,12 +541,12 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { /** Determine whether to use sort-based shuffle based on shuffle partitioning and output. */ override def useSortBasedShuffle(partitioning: Partitioning, output: Seq[Attribute]): Boolean = { - val conf = GlutenConfig.getConf + val conf = GlutenConfig.get lazy val isCelebornSortBasedShuffle = conf.isUseCelebornShuffleManager && conf.celebornShuffleWriterType == GlutenConfig.GLUTEN_SORT_SHUFFLE_WRITER partitioning != SinglePartition && - (partitioning.numPartitions >= GlutenConfig.getConf.columnarShuffleSortPartitionsThreshold || - output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold) || + (partitioning.numPartitions >= GlutenConfig.get.columnarShuffleSortPartitionsThreshold || + output.size >= GlutenConfig.get.columnarShuffleSortColumnsThreshold) || isCelebornSortBasedShuffle } @@ -599,7 +599,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val deserializeTime = metrics("deserializeTime") val readBatchNumRows = metrics("avgReadBatchNumRows") val decompressTime = metrics("decompressTime") - if (GlutenConfig.getConf.isUseCelebornShuffleManager) { + if (GlutenConfig.get.isUseCelebornShuffleManager) { val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer") val constructor = clazz.getConstructor(classOf[StructType], classOf[SQLMetric], classOf[SQLMetric]) @@ -728,7 +728,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { // ISOControl characters, refer java.lang.Character.isISOControl(int) val isoControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to '\u009F')).toList.mkString // scalastyle:on nonascii - if (GlutenConfig.getConf.castFromVarcharAddTrimNode && c.child.dataType == StringType) { + if (GlutenConfig.get.castFromVarcharAddTrimNode && c.child.dataType == StringType) { val trimStr = c.dataType match { case BinaryType | _: ArrayType | _: MapType | _: StructType | _: UserDefinedType[_] => None @@ -770,7 +770,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { } override def rewriteSpillPath(path: String): String = { - val fs = GlutenConfig.getConf.veloxSpillFileSystem + val fs = GlutenConfig.get.veloxSpillFileSystem fs match { case "local" => path diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala index 576f3a2cb205..256983d70c55 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala @@ -138,7 +138,7 @@ case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( } override protected def doValidateInternal(): ValidationResult = { - if (!GlutenConfig.getConf.enableColumnarPartialProject) { + if (!GlutenConfig.get.enableColumnarPartialProject) { return ValidationResult.failed("Config disable this feature") } if (UDFAttrNotExists) { @@ -164,9 +164,7 @@ case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( return ValidationResult.failed("Contains expression not supported") } if ( - ExpressionUtils.hasComplexExpressions( - original, - GlutenConfig.getConf.fallbackExpressionsThreshold) + ExpressionUtils.hasComplexExpressions(original, GlutenConfig.get.fallbackExpressionsThreshold) ) { return ValidationResult.failed("Fallback by complex expression") } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 9cdcf854db8b..5030de65bf24 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -48,7 +48,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val convertTime = longMetric("convertTime") - val numRows = GlutenConfig.getConf.maxBatchSize + val numRows = GlutenConfig.get.maxBatchSize // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. val localSchema = schema @@ -68,7 +68,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val convertTime = longMetric("convertTime") - val numRows = GlutenConfig.getConf.maxBatchSize + val numRows = GlutenConfig.get.maxBatchSize val mode = BroadcastUtils.getBroadcastMode(outputPartitioning) val relation = child.executeBroadcast() BroadcastUtils.sparkToVeloxUnsafe( diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala index 56a3d86a9038..360b2b9c5e28 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkPlan case class BloomFilterMightContainJointRewriteRule(spark: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - if (!GlutenConfig.getConf.enableNativeBloomFilter) { + if (!GlutenConfig.get.enableNativeBloomFilter) { return plan } val out = plan.transformWithSubqueries { diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala index 2e5390697795..f051aa878dfb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike case class FlushableHashAggregateRule(session: SparkSession) extends Rule[SparkPlan] { import FlushableHashAggregateRule._ override def apply(plan: SparkPlan): SparkPlan = { - if (!GlutenConfig.getConf.enableVeloxFlushablePartialAggregation) { + if (!GlutenConfig.get.enableVeloxFlushablePartialAggregation) { return plan } plan.transformUpWithPruning(_.containsPattern(EXCHANGE)) { diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala index 8ceee3d573b9..59f633d4a520 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala @@ -33,8 +33,8 @@ case class HLLRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { case a: Aggregate => a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { case aggExpr @ AggregateExpression(hll: HyperLogLogPlusPlus, _, _, _, _) - if GlutenConfig.getConf.enableNativeHyperLogLogAggregateFunction && - GlutenConfig.getConf.enableColumnarHashAgg && + if GlutenConfig.get.enableNativeHyperLogLogAggregateFunction && + GlutenConfig.get.enableColumnarHashAgg && isSupportedDataType(hll.child.dataType) => val hllAdapter = HLLAdapter( hll.child, diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index cd035e3202d2..3d2b568e7b88 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -97,9 +97,9 @@ private class ColumnarBatchSerializerInstance( null // uncompressed } val compressionCodecBackend = - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull - val batchSize = GlutenConfig.getConf.maxBatchSize - val bufferSize = GlutenConfig.getConf.columnarShuffleReaderBufferSize + GlutenConfig.get.columnarShuffleCodecBackend.orNull + val batchSize = GlutenConfig.get.maxBatchSize + val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName, "ShuffleReader") val jniWrapper = ShuffleReaderJniWrapper.create(runtime) val shuffleReaderHandle = jniWrapper.make( diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index bb84e3066b5c..56967d7d12c9 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -63,8 +63,8 @@ class ColumnarShuffleWriter[K, V]( .mkString(",") private lazy val nativeBufferSize = { - val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize - val maxBatchSize = GlutenConfig.getConf.maxBatchSize + val bufferSize = GlutenConfig.get.shuffleWriterBufferSize + val maxBatchSize = GlutenConfig.get.maxBatchSize if (bufferSize > maxBatchSize) { logInfo( s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " + @@ -75,9 +75,9 @@ class ColumnarShuffleWriter[K, V]( } } - private val nativeMergeBufferSize = GlutenConfig.getConf.maxBatchSize + private val nativeMergeBufferSize = GlutenConfig.get.maxBatchSize - private val nativeMergeThreshold = GlutenConfig.getConf.columnarShuffleMergeThreshold + private val nativeMergeThreshold = GlutenConfig.get.columnarShuffleMergeThreshold private val compressionCodec = if (conf.getBoolean(SHUFFLE_COMPRESS.key, SHUFFLE_COMPRESS.defaultValue.get)) { @@ -87,7 +87,7 @@ class ColumnarShuffleWriter[K, V]( } private val compressionCodecBackend = - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull + GlutenConfig.get.columnarShuffleCodecBackend.orNull private val compressionLevel = GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, compressionCodecBackend) @@ -96,9 +96,9 @@ class ColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getSortEvictBufferSize(conf, compressionCodec) private val bufferCompressThreshold = - GlutenConfig.getConf.columnarShuffleCompressionThreshold + GlutenConfig.get.columnarShuffleCompressionThreshold - private val reallocThreshold = GlutenConfig.getConf.columnarShuffleReallocThreshold + private val reallocThreshold = GlutenConfig.get.columnarShuffleReallocThreshold private val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName, "ShuffleWriter") @@ -149,7 +149,7 @@ class ColumnarShuffleWriter[K, V]( compressionLevel, sortEvictBufferSize, bufferCompressThreshold, - GlutenConfig.getConf.columnarShuffleCompressionMode, + GlutenConfig.get.columnarShuffleCompressionMode, conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, conf.get(SHUFFLE_SORT_USE_RADIXSORT), dataTmp.getAbsolutePath, diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala index b6d4f6557b82..f00469f86dbf 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala @@ -33,11 +33,11 @@ class VeloxParquetWriterInjects extends VeloxFormatWriterInjects { sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec) val blockSize = options.getOrElse( GlutenConfig.PARQUET_BLOCK_SIZE, - GlutenConfig.getConf.columnarParquetWriteBlockSize.toString) + GlutenConfig.get.columnarParquetWriteBlockSize.toString) sparkOptions.put(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize) val blockRows = options.getOrElse( GlutenConfig.PARQUET_BLOCK_ROWS, - GlutenConfig.getConf.columnarParquetWriteBlockRows.toString) + GlutenConfig.get.columnarParquetWriteBlockRows.toString) sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows) options .get(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE) diff --git a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java index a76c0aabee3b..8176b2817fd7 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java @@ -29,7 +29,7 @@ public class ManagedAllocationListener implements AllocationListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ManagedAllocationListener.class); - public static long BLOCK_SIZE = GlutenConfig.getConf().memoryReservationBlockSize(); + public static long BLOCK_SIZE = GlutenConfig.get().memoryReservationBlockSize(); private final MemoryTarget target; private final SimpleMemoryUsageRecorder sharedUsage; diff --git a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java index 9d63a8601b4d..4e0be4e6929c 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java @@ -44,8 +44,8 @@ public static ReservationListener create( private static ReservationListener create0( String name, Spiller spiller, Map mutableStats) { // Memory target. - final double overAcquiredRatio = GlutenConfig.getConf().memoryOverAcquiredRatio(); - final long reservationBlockSize = GlutenConfig.getConf().memoryReservationBlockSize(); + final double overAcquiredRatio = GlutenConfig.get().memoryOverAcquiredRatio(); + final long reservationBlockSize = GlutenConfig.get().memoryReservationBlockSize(); final TaskMemoryManager tmm = TaskResources.getLocalTaskContext().taskMemoryManager(); final TreeMemoryTarget consumer = MemoryTargets.newConsumer( diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala index f9bb7478e7d0..e21159ae494a 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala @@ -27,23 +27,23 @@ object DebugUtil { // if specify stageId and partitionId, then only do that partition for that stage def saveInputToFile(): Boolean = { def taskIdMatches = - GlutenConfig.getConf.benchmarkTaskId.nonEmpty && - GlutenConfig.getConf.benchmarkTaskId + GlutenConfig.get.benchmarkTaskId.nonEmpty && + GlutenConfig.get.benchmarkTaskId .split(",") .map(_.toLong) .contains(TaskContext.get().taskAttemptId()) def partitionIdMatches = - TaskContext.get().stageId() == GlutenConfig.getConf.benchmarkStageId && - (GlutenConfig.getConf.benchmarkPartitionId.isEmpty || - GlutenConfig.getConf.benchmarkPartitionId + TaskContext.get().stageId() == GlutenConfig.get.benchmarkStageId && + (GlutenConfig.get.benchmarkPartitionId.isEmpty || + GlutenConfig.get.benchmarkPartitionId .split(",") .map(_.toInt) .contains(TaskContext.get().partitionId())) val saveInput = taskIdMatches || partitionIdMatches if (saveInput) { - if (GlutenConfig.getConf.benchmarkSaveDir.isEmpty) { + if (GlutenConfig.get.benchmarkSaveDir.isEmpty) { throw new IllegalArgumentException(GlutenConfig.BENCHMARK_SAVE_DIR.key + " is not set.") } } diff --git a/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index 5e14531d6f19..a4d4c4f5c538 100644 --- a/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -216,7 +216,7 @@ public ShuffleHandle registerShuffle( if (dependency instanceof ColumnarShuffleDependency) { if (fallbackPolicyRunner.applyAllFallbackPolicy( lifecycleManager, dependency.partitioner().numPartitions())) { - if (GlutenConfig.getConf().enableCelebornFallback()) { + if (GlutenConfig.get().enableCelebornFallback()) { logger.warn("Fallback to ColumnarShuffleManager!"); columnarShuffleIds.add(shuffleId); return columnarShuffleManager().registerShuffle(shuffleId, dependency); diff --git a/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala b/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala index 42e939e4420d..1da1554178a5 100644 --- a/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala @@ -52,8 +52,8 @@ abstract class CelebornColumnarShuffleWriter[K, V]( protected val mapId: Int = context.partitionId() protected lazy val nativeBufferSize: Int = { - val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize - val maxBatchSize = GlutenConfig.getConf.maxBatchSize + val bufferSize = GlutenConfig.get.shuffleWriterBufferSize + val maxBatchSize = GlutenConfig.get.maxBatchSize if (bufferSize > maxBatchSize) { logInfo( s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " + @@ -95,13 +95,13 @@ abstract class CelebornColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getCompressionLevel( conf, customizedCompressionCodec, - GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) + GlutenConfig.get.columnarShuffleCodecBackend.orNull) protected val compressionBufferSize: Int = GlutenShuffleUtils.getSortEvictBufferSize(conf, customizedCompressionCodec) protected val bufferCompressThreshold: Int = - GlutenConfig.getConf.columnarShuffleCompressionThreshold + GlutenConfig.get.columnarShuffleCompressionThreshold // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java index ea18a05d0c60..cdcdb282f4f2 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java @@ -30,7 +30,7 @@ public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget { private final MemoryTarget delegated; // When dynamic off-heap sizing is enabled, the off-heap should be sized for the total usable // memory, so we can use it as the max memory we will use. - private static final long MAX_MEMORY_IN_BYTES = GlutenConfig.getConf().offHeapMemorySize(); + private static final long MAX_MEMORY_IN_BYTES = GlutenConfig.get().offHeapMemorySize(); private static final AtomicLong USED_OFFHEAP_BYTES = new AtomicLong(); public DynamicOffHeapSizingMemoryTarget(MemoryTarget delegated) { diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java index c6f5b59de8c2..d3df35bd4073 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java @@ -50,7 +50,7 @@ public static MemoryTarget overAcquire( @Experimental public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget memoryTarget) { - if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) { + if (GlutenConfig.get().dynamicOffHeapSizingEnabled()) { return new DynamicOffHeapSizingMemoryTarget(memoryTarget); } @@ -63,7 +63,7 @@ public static TreeMemoryTarget newConsumer( Spiller spiller, Map virtualChildren) { final TreeMemoryConsumers.Factory factory = TreeMemoryConsumers.factory(tmm); - if (GlutenConfig.getConf().memoryIsolation()) { + if (GlutenConfig.get().memoryIsolation()) { return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller, virtualChildren); } final TreeMemoryTarget root = factory.legacyRoot(); diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java index a11a4a3e4a19..9dc1422c520c 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java @@ -77,7 +77,7 @@ public TreeMemoryTarget legacyRoot() { *

See GLUTEN-3030 */ public TreeMemoryTarget isolatedRoot() { - return ofCapacity(GlutenConfig.getConf().conservativeTaskOffHeapMemorySize()); + return ofCapacity(GlutenConfig.get().conservativeTaskOffHeapMemorySize()); } } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala index eea698e078bd..d56dbbe5d9f2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala @@ -57,7 +57,7 @@ object ColumnarRuleExecutor { override def apply(plan: SparkPlan): SparkPlan = { val (out, millisTime) = GlutenTimeMetric.recordMillisTime(delegate.apply(plan)) - logOnLevel(GlutenConfig.getConf.transformPlanLogLevel, message(plan, out, millisTime)) + logOnLevel(GlutenConfig.get.transformPlanLogLevel, message(plan, out, millisTime)) out } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala index 754e42b81c4d..66a38d5ac730 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala @@ -116,7 +116,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { totalRegisteredExecutors.addAndGet(1) } logOnLevel( - GlutenConfig.getConf.softAffinityLogLevel, + GlutenConfig.get.softAffinityLogLevel, s"After adding executor ${execHostId._1} on host ${execHostId._2}, " + s"idForExecutors is ${idForExecutors.mkString(",")}, " + s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " + @@ -149,7 +149,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { totalRegisteredExecutors.addAndGet(-1) } logOnLevel( - GlutenConfig.getConf.softAffinityLogLevel, + GlutenConfig.get.softAffinityLogLevel, s"After removing executor $execId, " + s"idForExecutors is ${idForExecutors.mkString(",")}, " + s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " + @@ -196,7 +196,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { (originalValues ++ value) } logOnLevel( - GlutenConfig.getConf.softAffinityLogLevel, + GlutenConfig.get.softAffinityLogLevel, s"update host for $key: ${values.mkString(",")}") duplicateReadingInfos.put(key, values) } @@ -279,7 +279,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { if (!hosts.isEmpty) { rand.shuffle(hosts) logOnLevel( - GlutenConfig.getConf.softAffinityLogLevel, + GlutenConfig.get.softAffinityLogLevel, s"get host for $f: ${hosts.distinct.mkString(",")}") } hosts.distinct.toSeq diff --git a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala index 66e4456d46d6..558aec962125 100644 --- a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala +++ b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala @@ -66,7 +66,7 @@ abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with val locations = manager.askExecutors(filePath) if (locations.nonEmpty) { logOnLevel( - GlutenConfig.getConf.softAffinityLogLevel, + GlutenConfig.get.softAffinityLogLevel, s"SAMetrics=File $filePath - the expected executors are ${locations.mkString("_")} ") locations.map { case (executor, host) => toTaskLocation(host, executor) } } else { diff --git a/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java index 398bdbdb5f03..bb2fc6fc6805 100644 --- a/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java @@ -64,7 +64,7 @@ protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBui ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() .setEnableRowGroupMaxminIndex( - GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + GlutenConfig.get().enableParquetRowGroupMaxMinIndex()) .build(); icebergBuilder.setParquet(parquetReadOptions); break; @@ -104,7 +104,7 @@ protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBui ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() .setEnableRowGroupMaxminIndex( - GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + GlutenConfig.get().enableParquetRowGroupMaxMinIndex()) .build(); deleteFileBuilder.setParquet(parquetReadOptions); break; diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index 9513f497602a..69e81126c80c 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -198,7 +198,7 @@ public ReadRel.LocalFiles toProtobuf() { ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() .setEnableRowGroupMaxminIndex( - GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + GlutenConfig.get().enableParquetRowGroupMaxMinIndex()) .build(); fileBuilder.setParquet(parquetReadOptions); break; @@ -225,15 +225,15 @@ public ReadRel.LocalFiles toProtobuf() { .setHeader(Long.parseLong(header)) .setEscape(escape) .setNullValue(nullValue) - .setMaxBlockSize(GlutenConfig.getConf().textInputMaxBlockSize()) - .setEmptyAsDefault(GlutenConfig.getConf().textIputEmptyAsDefault()) + .setMaxBlockSize(GlutenConfig.get().textInputMaxBlockSize()) + .setEmptyAsDefault(GlutenConfig.get().textIputEmptyAsDefault()) .build(); fileBuilder.setText(textReadOptions); break; case JsonReadFormat: ReadRel.LocalFiles.FileOrFiles.JsonReadOptions jsonReadOptions = ReadRel.LocalFiles.FileOrFiles.JsonReadOptions.newBuilder() - .setMaxBlockSize(GlutenConfig.getConf().textInputMaxBlockSize()) + .setMaxBlockSize(GlutenConfig.get().textInputMaxBlockSize()) .build(); fileBuilder.setJson(jsonReadOptions); break; diff --git a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java index 37d9e09bb5d8..03f8c7a1b221 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java @@ -37,7 +37,7 @@ public class TestStats { public static int offloadGlutenTestNumber = 0; private static boolean enabled() { - return GlutenConfig.getConf().collectUtStats(); + return GlutenConfig.get().collectUtStats(); } public static void reset() { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 052348aee4b6..887c761fc5f2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -68,7 +68,7 @@ trait BackendSettingsApi { false } def supportColumnarShuffleExec(): Boolean = { - GlutenConfig.getConf.enableColumnarShuffle + GlutenConfig.get.enableColumnarShuffle } def enableJoinKeysRewrite(): Boolean = true def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 0d1296999a73..8626b40de6b6 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -52,7 +52,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource val fileFormat: ReadFileFormat def getRootFilePaths: Seq[String] = { - if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) { + if (GlutenConfig.get.scanFileSchemeValidationEnabled) { getRootPathsInternal } else { Seq.empty diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala index 1e0c3d5a7e70..8da7bdd50348 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala @@ -185,7 +185,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer( } override protected def doValidateInternal(): ValidationResult = { - if (!GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled) { + if (!GlutenConfig.get.broadcastNestedLoopJoinTransformerTransformerEnabled) { return ValidationResult.failed( s"Config ${GlutenConfig.BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED.key} not enabled") } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index de5fc85f356d..cdd1fe652b45 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -59,7 +59,7 @@ class GlutenWholeStageColumnarRDD( updateInputMetrics: InputMetricsWrapper => Unit, updateNativeMetrics: IMetrics => Unit) extends RDD[ColumnarBatch](sc, rdds.getDependencies) { - private val numaBindingInfo = GlutenConfig.getConf.numaBindingInfo + private val numaBindingInfo = GlutenConfig.get.numaBindingInfo override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 003efb5f9af8..6ab0e22ce20b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -62,7 +62,7 @@ case class WholeStageTransformContext(root: PlanNode, substraitContext: Substrai * Since https://github.com/apache/incubator-gluten/pull/2185. */ trait ValidatablePlan extends GlutenPlan with LogLevelUtil { - protected def glutenConf: GlutenConfig = GlutenConfig.getConf + protected def glutenConf: GlutenConfig = GlutenConfig.get protected lazy val enableNativeValidation = glutenConf.enableNativeValidation @@ -214,7 +214,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( sparkContext.hadoopConfiguration) - val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo + val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo @transient private var wholeStageTransformerContext: Option[WholeStageTransformContext] = None @@ -360,7 +360,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f }( t => logOnLevel( - GlutenConfig.getConf.substraitPlanLogLevel, + GlutenConfig.get.substraitPlanLogLevel, s"$nodeName generating the substrait plan took: $t ms.")) val inputRDDs = new ColumnarInputRDDsWrapper(columnarInputRDDs) // Check if BatchScan exists. @@ -376,7 +376,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val allScanPartitions = basicScanExecTransformers.map(_.getPartitions.toIndexedSeq) val allScanSplitInfos = getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) - if (GlutenConfig.getConf.enableHdfsViewfs) { + if (GlutenConfig.get.enableHdfsViewfs) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty allScanSplitInfos.foreach { splitInfos => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala index 28d780992492..48d71fc3ef78 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala @@ -88,7 +88,7 @@ case class WindowExecTransformer( val windowParametersStr = new StringBuffer("WindowParameters:") // isStreaming: 1 for streaming, 0 for sort val isStreaming: Int = - if (GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")) 1 else 0 + if (GlutenConfig.get.veloxColumnarWindowType.equals("streaming")) 1 else 0 windowParametersStr .append("isStreaming=") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala index f6cc01f8ac83..a3a6135cdb70 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala @@ -339,7 +339,7 @@ object ExpressionMappings { ) def expressionsMap: Map[Class[_], String] = { - val blacklist = GlutenConfig.getConf.expressionBlacklist + val blacklist = GlutenConfig.get.expressionBlacklist val filtered = (defaultExpressionsMap ++ toMap( BackendsApiManager.getSparkPlanExecApiInstance.extraExpressionMappings)).filterNot( kv => blacklist.contains(kv._2)) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala index bfb926706cf7..bbb6fa1b8f1a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkPlan object CollapseProjectExecTransformer extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - if (!GlutenConfig.getConf.enableColumnarProjectCollapse) { + if (!GlutenConfig.get.enableColumnarProjectCollapse) { return plan } plan.transformUp { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala index 432ecd1584d8..76f6b2e680c6 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.joins._ case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - if (GlutenConfig.getConf.enableAnsiMode) { + if (GlutenConfig.get.enableAnsiMode) { plan.foreach(FallbackTags.add(_, "does not support ansi mode")) } plan @@ -35,7 +35,7 @@ case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] { } case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] { - lazy val glutenConf: GlutenConfig = GlutenConfig.getConf + lazy val glutenConf: GlutenConfig = GlutenConfig.get lazy val physicalJoinOptimize = glutenConf.enablePhysicalJoinOptimize lazy val optimizeLevel: Integer = glutenConf.physicalJoinOptimizationThrottle @@ -47,7 +47,7 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] case plan: ShuffledHashJoinExec => if ((count + 1) >= optimizeLevel) return true plan.children.exists(existsMultiCodegens(_, count + 1)) - case plan: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin => + case plan: SortMergeJoinExec if GlutenConfig.get.forceShuffledHashJoin => if ((count + 1) >= optimizeLevel) return true plan.children.exists(existsMultiCodegens(_, count + 1)) case _ => false diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala index a034a3229a88..b5ef2b847199 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala @@ -38,11 +38,11 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[SparkPlan] with Logging { - val glutenConf: GlutenConfig = GlutenConfig.getConf + val glutenConf: GlutenConfig = GlutenConfig.get val scanOnly: Boolean = glutenConf.enableScanOnly val enableColumnarHashAgg: Boolean = !scanOnly && glutenConf.enableColumnarHashAgg - val replaceSortAggWithHashAgg: Boolean = GlutenConfig.getConf.forceToUseHashAgg - val mergeTwoPhasesAggEnabled: Boolean = GlutenConfig.getConf.mergeTwoPhasesAggEnabled + val replaceSortAggWithHashAgg: Boolean = GlutenConfig.get.forceToUseHashAgg + val mergeTwoPhasesAggEnabled: Boolean = GlutenConfig.get.mergeTwoPhasesAggEnabled private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg: BaseAggregateExec): Boolean = { // TODO: now it can not support to merge agg which there are the filters in the aggregate exprs. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala index f0eea08018dd..40fea93eaa54 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan */ case class UnionTransformerRule() extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - if (!GlutenConfig.getConf.enableNativeUnion) { + if (!GlutenConfig.get.enableNativeUnion) { return plan } plan.transformUp { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala index 832c524d7b55..cc1c0f584ebd 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala @@ -45,11 +45,11 @@ object RoughCoster2 extends LongCoster { case ColumnarToRowLike(_) => 1L case RowToColumnarLike(_) => // If sizeBytes is less than the threshold, the cost of RowToColumnarLike is ignored. - if (sizeFactor == 0) 1L else GlutenConfig.getConf.rasRough2R2cCost + if (sizeFactor == 0) 1L else GlutenConfig.get.rasRough2R2cCost case p if PlanUtil.isGlutenColumnarOp(p) => 1L - case p if PlanUtil.isVanillaColumnarOp(p) => GlutenConfig.getConf.rasRough2VanillaCost + case p if PlanUtil.isVanillaColumnarOp(p) => GlutenConfig.get.rasRough2VanillaCost // Other row ops. Usually a vanilla row op. - case _ => GlutenConfig.getConf.rasRough2VanillaCost + case _ => GlutenConfig.get.rasRough2VanillaCost } opCost * Math.max(1, sizeFactor) } @@ -61,7 +61,7 @@ object RoughCoster2 extends LongCoster { case _: LeafExecNode => 0L case p => p.children.map(getStatSizeBytes).sum } - sizeBytes / GlutenConfig.getConf.rasRough2SizeBytesThreshold + sizeBytes / GlutenConfig.get.rasRough2SizeBytesThreshold } private def getStatSizeBytes(plan: SparkPlan): Long = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala index 3418d3dddc99..2d5ac6852033 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala @@ -74,7 +74,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP import ExpandFallbackPolicy._ private def countTransitionCost(plan: SparkPlan): Int = { - val ignoreRowToColumnar = GlutenConfig.getConf.fallbackIgnoreRowToColumnar + val ignoreRowToColumnar = GlutenConfig.get.fallbackIgnoreRowToColumnar var transitionCost = 0 def countFallbackInternal(plan: SparkPlan): Unit = { plan match { @@ -183,7 +183,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP private def fallback(plan: SparkPlan): FallbackInfo = { val fallbackThreshold = if (isAdaptiveContext) { - GlutenConfig.getConf.wholeStageFallbackThreshold + GlutenConfig.get.wholeStageFallbackThreshold } else if (plan.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined) { // if we are here, that means we are now at `QueryExecution.preparations` and // AQE is actually not applied. We do nothing for this case, and later in @@ -191,7 +191,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP return FallbackInfo.DO_NOT_FALLBACK() } else { // AQE is not applied, so we use the whole query threshold to check if should fallback - GlutenConfig.getConf.queryFallbackThreshold + GlutenConfig.get.queryFallbackThreshold } if (fallbackThreshold < 0) { return FallbackInfo.DO_NOT_FALLBACK() @@ -255,7 +255,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP val vanillaSparkPlan = fallbackToRowBasedPlan(plan, outputsColumnar) val vanillaSparkTransitionCost = countTransitionCostForVanillaSparkPlan(vanillaSparkPlan) if ( - GlutenConfig.getConf.fallbackPreferColumnar && + GlutenConfig.get.fallbackPreferColumnar && fallbackInfo.netTransitionCost <= vanillaSparkTransitionCost ) { plan diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 7f258d10b2fc..5fcc5675f8fc 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -43,7 +43,7 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { case p if FallbackTags.nonEmpty(p) => p case s: ShuffleExchangeExec - if (s.child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && + if (s.child.supportsColumnar || GlutenConfig.get.enablePreferColumnar) && BackendsApiManager.getSettings.supportColumnarShuffleExec() => logDebug(s"Columnar Processing for ${s.getClass} is currently supported.") BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(s) @@ -140,7 +140,7 @@ object OffloadJoin { } // Both left and right are buildable. Find out the better one. - if (!GlutenConfig.getConf.shuffledHashJoinOptimizeBuildSide) { + if (!GlutenConfig.get.shuffledHashJoinOptimizeBuildSide) { // User disabled build side re-optimization. Return original build side from vanilla Spark. return shj.buildSide } @@ -314,7 +314,7 @@ object OffloadOthers { // Velox backend uses ColumnarArrowEvalPythonExec. if ( !BackendsApiManager.getSettings.supportColumnarArrowUdf() || - !GlutenConfig.getConf.enableColumnarArrowUDF + !GlutenConfig.get.enableColumnarArrowUDF ) { EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child) } else { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala index 2d844a616c5b..fb6a8cae3c5a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala @@ -56,7 +56,7 @@ object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper { } override def rewrite(plan: SparkPlan): SparkPlan = plan match { - case smj: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin => + case smj: SortMergeJoinExec if GlutenConfig.get.forceShuffledHashJoin => getSmjBuildSide(smj) match { case Some(buildSide) => ShuffledHashJoinExec( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 93c444fbd01b..171235a7c319 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.hive.HiveTableScanExecTransformer object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { - private val conf = GlutenConfig.getConf + private val conf = GlutenConfig.get private val settings = BackendsApiManager.getSettings /** Fails validation if a plan node was already tagged with TRANSFORM_UNSUPPORTED. */ diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index e3e374256ff4..74cae15ee1ed 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -51,7 +51,7 @@ object GlutenShuffleUtils { s"${validValues.mkString(", ")}, but was $codec") } } - val glutenConfig = GlutenConfig.getConf + val glutenConfig = GlutenConfig.get glutenConfig.columnarShuffleCodec match { case Some(codec) => val glutenCodecKey = GlutenConfig.COLUMNAR_SHUFFLE_CODEC.key diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala index 70b08a2ed8bb..a3480bbfca91 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala @@ -32,7 +32,7 @@ object HiveTableScanNestedColumnPruning extends Logging { import org.apache.spark.sql.catalyst.expressions.SchemaPruning._ def supportNestedColumnPruning(project: ProjectExecTransformer): Boolean = { - if (GlutenConfig.getConf.enableColumnarHiveTableScanNestedColumnPruning) { + if (GlutenConfig.get.enableColumnarHiveTableScanNestedColumnPruning) { project.child match { case HiveTableScanExecTransformer(_, relation, _, _) => relation.tableMeta.storage.inputFormat match { diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 754c29241864..f22eb4b09630 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -172,7 +172,7 @@ private object FallbackStrategiesSuite { List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()), - _ => ColumnarCollapseTransformStages(GlutenConfig.getConf) + _ => ColumnarCollapseTransformStages(GlutenConfig.get) ), List(_ => RemoveFallbackTagRule()) ) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala index ddd4cf1d4c31..e57caf8a3774 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala @@ -37,7 +37,7 @@ class GlutenBloomFilterAggregateQuerySuite GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304" ) { val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT every(might_contain( | (SELECT bloom_filter_agg(col, @@ -72,7 +72,7 @@ class GlutenBloomFilterAggregateQuerySuite testGluten("Test bloom_filter_agg filter fallback") { val table = "bloom_filter_test" val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT col positive_membership_test |FROM $table @@ -118,7 +118,7 @@ class GlutenBloomFilterAggregateQuerySuite testGluten("Test bloom_filter_agg agg fallback") { val table = "bloom_filter_test" val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT col positive_membership_test |FROM $table diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 88e0ecf65a9a..c021b210355a 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -182,7 +182,7 @@ private object FallbackStrategiesSuite { List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()), - _ => ColumnarCollapseTransformStages(GlutenConfig.getConf) + _ => ColumnarCollapseTransformStages(GlutenConfig.get) ), List(_ => RemoveFallbackTagRule()) ) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala index 0c75db830830..5972194c1a29 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala @@ -36,7 +36,7 @@ class GlutenBloomFilterAggregateQuerySuite GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304" ) { val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT every(might_contain( | (SELECT bloom_filter_agg(col, @@ -71,7 +71,7 @@ class GlutenBloomFilterAggregateQuerySuite testGluten("Test bloom_filter_agg filter fallback") { val table = "bloom_filter_test" val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT col positive_membership_test |FROM $table @@ -117,7 +117,7 @@ class GlutenBloomFilterAggregateQuerySuite testGluten("Test bloom_filter_agg agg fallback") { val table = "bloom_filter_test" val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT col positive_membership_test |FROM $table diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 88e0ecf65a9a..c021b210355a 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -182,7 +182,7 @@ private object FallbackStrategiesSuite { List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()), - _ => ColumnarCollapseTransformStages(GlutenConfig.getConf) + _ => ColumnarCollapseTransformStages(GlutenConfig.get) ), List(_ => RemoveFallbackTagRule()) ) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala index ddd4cf1d4c31..e57caf8a3774 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala @@ -37,7 +37,7 @@ class GlutenBloomFilterAggregateQuerySuite GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304" ) { val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT every(might_contain( | (SELECT bloom_filter_agg(col, @@ -72,7 +72,7 @@ class GlutenBloomFilterAggregateQuerySuite testGluten("Test bloom_filter_agg filter fallback") { val table = "bloom_filter_test" val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT col positive_membership_test |FROM $table @@ -118,7 +118,7 @@ class GlutenBloomFilterAggregateQuerySuite testGluten("Test bloom_filter_agg agg fallback") { val table = "bloom_filter_test" val numEstimatedItems = 5000000L - val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits + val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits val sqlString = s""" |SELECT col positive_membership_test |FROM $table diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 8908047a3321..625e499df252 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -183,7 +183,7 @@ private object FallbackStrategiesSuite { List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()), - _ => ColumnarCollapseTransformStages(GlutenConfig.getConf) + _ => ColumnarCollapseTransformStages(GlutenConfig.get) ), List(_ => RemoveFallbackTagRule()) ) diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index f5d69bdbd3a9..f1a7bf90d140 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -677,7 +677,7 @@ object GlutenConfig { var ins: GlutenConfig = _ - def getConf: GlutenConfig = { + def get: GlutenConfig = { new GlutenConfig(SQLConf.get) } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala index a6f066462a1a..ec05abb5d6cd 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.SparkPlan /** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */ case class GlutenCostEvaluator() extends CostEvaluator { override def evaluateCost(plan: SparkPlan): Cost = { - if (GlutenConfig.getConf.enableGluten) { + if (GlutenConfig.get.enableGluten) { new GlutenCost(SimpleCostEvaluator, plan) } else { SimpleCostEvaluator.evaluateCost(plan) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala index 8fcfa735f463..89b5693baf0c 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper { override def evaluateCost(plan: SparkPlan): Cost = { val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN) - if (GlutenConfig.getConf.enableGluten) { + if (GlutenConfig.get.enableGluten) { new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan) } else { SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan) diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala index 8fcfa735f463..89b5693baf0c 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper { override def evaluateCost(plan: SparkPlan): Cost = { val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN) - if (GlutenConfig.getConf.enableGluten) { + if (GlutenConfig.get.enableGluten) { new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan) } else { SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan) diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala index 8fcfa735f463..89b5693baf0c 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper { override def evaluateCost(plan: SparkPlan): Cost = { val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN) - if (GlutenConfig.getConf.enableGluten) { + if (GlutenConfig.get.enableGluten) { new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan) } else { SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)