From a39326964cb7649007b078b15977d44ebbfa6151 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Sep 2024 10:01:26 +0800 Subject: [PATCH] [VL] New option to follow vanilla Spark's build side in shuffled hash join (#7133) * [VL] New option to follow vanilla Spark's build side in shuffled hash join * fixup --- docs/Configuration.md | 121 +++++++++--------- .../columnar/OffloadSingleNode.scala | 40 +++--- .../org/apache/gluten/GlutenConfig.scala | 10 ++ 3 files changed, 93 insertions(+), 78 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index 4e47564a67119..4983f92e26b8a 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -11,66 +11,67 @@ You can add these configurations into spark-defaults.conf to enable or disable t ## Spark parameters -| Parameters | Description | Recommend Setting | -|-------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------| -| spark.driver.extraClassPath | To add Gluten Plugin jar file in Spark Driver | /path/to/jar_file | -| spark.executor.extraClassPath | To add Gluten Plugin jar file in Spark Executor | /path/to/jar_file | -| spark.executor.memory | To set up how much memory to be used for Spark Executor. | | -| spark.memory.offHeap.size | To set up how much memory to be used for Java OffHeap.
Please notice Gluten Plugin will leverage this setting to allocate memory space for native usage even offHeap is disabled.
The value is based on your system and it is recommended to set it larger if you are facing Out of Memory issue in Gluten Plugin | 30G | -| spark.sql.sources.useV1SourceList | Choose to use V1 source | avro | -| spark.sql.join.preferSortMergeJoin | To turn off preferSortMergeJoin in Spark | false | -| spark.plugins | To load Gluten's components by Spark's plug-in loader | org.apache.gluten.GlutenPlugin | -| spark.shuffle.manager | To turn on Gluten Columnar Shuffle Plugin | org.apache.spark.shuffle.sort.ColumnarShuffleManager | -| spark.gluten.enabled | Enable Gluten, default is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for `spark.plugins`. | true | -| spark.gluten.memory.isolation | (Experimental) Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. Note when true, setting spark.memory.storageFraction to a lower value is suggested since storage memory is considered non-usable by Gluten. | false | -| spark.gluten.ras.enabled | Experimental: Enables RAS (relation algebra selector) during physical planning to generate more efficient query plan. Note, this feature is still in development and may not bring performance profits. | false | -| spark.gluten.sql.columnar.maxBatchSize | Number of rows to be processed in each batch. Default value is 4096. | 4096 | -| spark.gluten.sql.columnar.scanOnly | When enabled, this config will overwrite all other operators' enabling, and only Scan and Filter pushdown will be offloaded to native. | false | -| spark.gluten.sql.columnar.batchscan | Enable or Disable Columnar BatchScan, default is true | true | -| spark.gluten.sql.columnar.hashagg | Enable or Disable Columnar Hash Aggregate, default is true | true | -| spark.gluten.sql.columnar.project | Enable or Disable Columnar Project, default is true | true | -| spark.gluten.sql.columnar.filter | Enable or Disable Columnar Filter, default is true | true | -| spark.gluten.sql.columnar.sort | Enable or Disable Columnar Sort, default is true | true | -| spark.gluten.sql.columnar.window | Enable or Disable Columnar Window, default is true | true | -| spark.gluten.sql.columnar.shuffledHashJoin | Enable or Disable ShuffledHashJoin, default is true | true | -| spark.gluten.sql.columnar.forceShuffledHashJoin | Force to use ShuffledHashJoin over SortMergeJoin, default is true. For queries that can benefit from storaged patitioned join, please set it to false. | true | -| spark.gluten.sql.columnar.sortMergeJoin | Enable or Disable Columnar Sort Merge Join, default is true | true | -| spark.gluten.sql.columnar.union | Enable or Disable Columnar Union, default is true | true | -| spark.gluten.sql.columnar.expand | Enable or Disable Columnar Expand, default is true | true | -| spark.gluten.sql.columnar.generate | Enable or Disable Columnar Generate, default is true | true | -| spark.gluten.sql.columnar.limit | Enable or Disable Columnar Limit, default is true | true | -| spark.gluten.sql.columnar.tableCache | Enable or Disable Columnar Table Cache, default is false | true | -| spark.gluten.sql.columnar.broadcastExchange | Enable or Disable Columnar Broadcast Exchange, default is true | true | -| spark.gluten.sql.columnar.broadcastJoin | Enable or Disable Columnar BroadcastHashJoin, default is true | true | -| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. | 100000 | -| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | 100000 | -| spark.gluten.sql.columnar.shuffle.codec | Set up the codec to be used for Columnar Shuffle. If this configuration is not set, will check the value of spark.io.compression.codec. By default, Gluten use software compression. Valid options for software compression are lz4, zstd. Valid options for QAT and IAA is gzip. | lz4 | -| spark.gluten.sql.columnar.shuffle.codecBackend | Enable using hardware accelerators for shuffle de/compression. Valid options are QAT and IAA. | | -| spark.gluten.sql.columnar.shuffle.compressionMode | Setting different compression mode in shuffle, Valid options are buffer and rowvector, buffer option compress each buffer of RowVector individually into one pre-allocated large buffer, rowvector option first copies each buffer of RowVector to a large buffer and then compress the entire buffer in one go. | buffer | -| spark.gluten.sql.columnar.shuffle.compression.threshold | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | 100 | -| spark.gluten.sql.columnar.shuffle.realloc.threshold | Set the threshold to dynamically adjust the size of shuffle split buffers. The size of each split buffer is recalculated for each incoming batch of data. If the new size deviates from the current partition buffer size by a factor outside the range of [1 - threshold, 1 + threshold], the split buffer will be re-allocated using the newly calculated size | 0.25 | -| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the threshold control the minimum merged size. When a partition buffer is full, and the number of rows is below (`threshold * spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging. | 0.25 | -| spark.gluten.sql.columnar.numaBinding | Set up NUMABinding, default is false | true | -| spark.gluten.sql.columnar.coreRange | Set up the core range for NUMABinding, only works when numaBinding set to true.
The setting is based on the number of cores in your system. Use 72 cores as an example. | 0-17,36-53 |18-35,54-71 | -| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure the threshold for whether whole stage will fall back in AQE supported case by counting the number of ColumnarToRow & vanilla leaf node | \>= 1 | -| spark.gluten.sql.columnar.query.fallback.threshold | Configure the threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node | \>= 1 | -| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | When true, the fallback policy ignores the RowToColumnar when counting fallback number. | true | -| spark.gluten.sql.columnar.fallback.preferColumnar | When true, the fallback policy prefers to use Gluten plan rather than vanilla Spark plan if the both of them contains ColumnarToRow and the vanilla Spark plan ColumnarToRow number is not smaller than Gluten plan. | true | -| spark.gluten.sql.columnar.force.hashagg | Force to use hash agg to replace sort agg. | true | -| spark.gluten.sql.columnar.vanillaReaders | Enable vanilla spark's vectorized reader. Please note it may bring perf. overhead due to extra data transition. We recommend to disable it if most queries can be fully offloaded to gluten. | false | -| spark.gluten.sql.native.bloomFilter | Enable or Disable native runtime bloom filter. | true | -| spark.gluten.sql.native.arrow.reader.enabled | Enable or Disable native arrow read CSV file format | false | -| spark.gluten.shuffleWriter.bufferSize | Set the number of buffer rows for the shuffle writer | value of spark.gluten.sql.columnar.maxBatchSize | -| spark.gluten.loadLibFromJar | Controls whether to load dynamic link library from a packed jar for gluten/cpp. Not applicable to static build and clickhouse backend. | false | -| spark.gluten.loadLibOS | When `spark.gluten.loadLibFromJar` is true. Manually specify the system os to load library, e.g., CentOS | | -| spark.gluten.loadLibOSVersion | Manually specify the system os version to load library, e.g., if `spark.gluten.loadLibOS` is CentOS, this config can be 7 | | -| spark.gluten.expression.blacklist | A black list of expression to skip transform, multiple values separated by commas. | | -| spark.gluten.sql.columnar.fallback.expressions.threshold | Fall back filter/project if the height of expression tree reaches this threshold, considering Spark codegen can bring better performance for such case. | 50 | -| spark.gluten.sql.cartesianProductTransformerEnabled | Config to enable CartesianProductExecTransformer. | true | -| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | Config to enable BroadcastNestedLoopJoinExecTransformer. | true | -| spark.gluten.sql.cacheWholeStageTransformerContext | When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` when executing. It is used to get substrait plan node and native plan string. | false | -| spark.gluten.sql.injectNativePlanStringToExplain | When true, Gluten will inject native plan tree to explain string inside `WholeStageTransformerContext`. | false | -| spark.gluten.sql.fallbackRegexpExpressions | When true, Gluten will fall back all regexp expressions to avoid any incompatibility risk. | false | +| Parameters | Description | Recommend Setting | +|--------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------| +| spark.driver.extraClassPath | To add Gluten Plugin jar file in Spark Driver | /path/to/jar_file | +| spark.executor.extraClassPath | To add Gluten Plugin jar file in Spark Executor | /path/to/jar_file | +| spark.executor.memory | To set up how much memory to be used for Spark Executor. | | +| spark.memory.offHeap.size | To set up how much memory to be used for Java OffHeap.
Please notice Gluten Plugin will leverage this setting to allocate memory space for native usage even offHeap is disabled.
The value is based on your system and it is recommended to set it larger if you are facing Out of Memory issue in Gluten Plugin | 30G | +| spark.sql.sources.useV1SourceList | Choose to use V1 source | avro | +| spark.sql.join.preferSortMergeJoin | To turn off preferSortMergeJoin in Spark | false | +| spark.plugins | To load Gluten's components by Spark's plug-in loader | org.apache.gluten.GlutenPlugin | +| spark.shuffle.manager | To turn on Gluten Columnar Shuffle Plugin | org.apache.spark.shuffle.sort.ColumnarShuffleManager | +| spark.gluten.enabled | Enable Gluten, default is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for `spark.plugins`. | true | +| spark.gluten.memory.isolation | (Experimental) Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. Note when true, setting spark.memory.storageFraction to a lower value is suggested since storage memory is considered non-usable by Gluten. | false | +| spark.gluten.ras.enabled | Experimental: Enables RAS (relation algebra selector) during physical planning to generate more efficient query plan. Note, this feature is still in development and may not bring performance profits. | false | +| spark.gluten.sql.columnar.maxBatchSize | Number of rows to be processed in each batch. Default value is 4096. | 4096 | +| spark.gluten.sql.columnar.scanOnly | When enabled, this config will overwrite all other operators' enabling, and only Scan and Filter pushdown will be offloaded to native. | false | +| spark.gluten.sql.columnar.batchscan | Enable or Disable Columnar BatchScan, default is true | true | +| spark.gluten.sql.columnar.hashagg | Enable or Disable Columnar Hash Aggregate, default is true | true | +| spark.gluten.sql.columnar.project | Enable or Disable Columnar Project, default is true | true | +| spark.gluten.sql.columnar.filter | Enable or Disable Columnar Filter, default is true | true | +| spark.gluten.sql.columnar.sort | Enable or Disable Columnar Sort, default is true | true | +| spark.gluten.sql.columnar.window | Enable or Disable Columnar Window, default is true | true | +| spark.gluten.sql.columnar.shuffledHashJoin | Enable or Disable ShuffledHashJoin, default is true | true | +| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | Whether to allow Gluten to choose an optimal build side for shuffled hash join | true | +| spark.gluten.sql.columnar.forceShuffledHashJoin | Force to use ShuffledHashJoin over SortMergeJoin, default is true. For queries that can benefit from storaged patitioned join, please set it to false. | true | +| spark.gluten.sql.columnar.sortMergeJoin | Enable or Disable Columnar Sort Merge Join, default is true | true | +| spark.gluten.sql.columnar.union | Enable or Disable Columnar Union, default is true | true | +| spark.gluten.sql.columnar.expand | Enable or Disable Columnar Expand, default is true | true | +| spark.gluten.sql.columnar.generate | Enable or Disable Columnar Generate, default is true | true | +| spark.gluten.sql.columnar.limit | Enable or Disable Columnar Limit, default is true | true | +| spark.gluten.sql.columnar.tableCache | Enable or Disable Columnar Table Cache, default is false | true | +| spark.gluten.sql.columnar.broadcastExchange | Enable or Disable Columnar Broadcast Exchange, default is true | true | +| spark.gluten.sql.columnar.broadcastJoin | Enable or Disable Columnar BroadcastHashJoin, default is true | true | +| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. | 100000 | +| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | 100000 | +| spark.gluten.sql.columnar.shuffle.codec | Set up the codec to be used for Columnar Shuffle. If this configuration is not set, will check the value of spark.io.compression.codec. By default, Gluten use software compression. Valid options for software compression are lz4, zstd. Valid options for QAT and IAA is gzip. | lz4 | +| spark.gluten.sql.columnar.shuffle.codecBackend | Enable using hardware accelerators for shuffle de/compression. Valid options are QAT and IAA. | | +| spark.gluten.sql.columnar.shuffle.compressionMode | Setting different compression mode in shuffle, Valid options are buffer and rowvector, buffer option compress each buffer of RowVector individually into one pre-allocated large buffer, rowvector option first copies each buffer of RowVector to a large buffer and then compress the entire buffer in one go. | buffer | +| spark.gluten.sql.columnar.shuffle.compression.threshold | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | 100 | +| spark.gluten.sql.columnar.shuffle.realloc.threshold | Set the threshold to dynamically adjust the size of shuffle split buffers. The size of each split buffer is recalculated for each incoming batch of data. If the new size deviates from the current partition buffer size by a factor outside the range of [1 - threshold, 1 + threshold], the split buffer will be re-allocated using the newly calculated size | 0.25 | +| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the threshold control the minimum merged size. When a partition buffer is full, and the number of rows is below (`threshold * spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging. | 0.25 | +| spark.gluten.sql.columnar.numaBinding | Set up NUMABinding, default is false | true | +| spark.gluten.sql.columnar.coreRange | Set up the core range for NUMABinding, only works when numaBinding set to true.
The setting is based on the number of cores in your system. Use 72 cores as an example. | 0-17,36-53 |18-35,54-71 | +| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure the threshold for whether whole stage will fall back in AQE supported case by counting the number of ColumnarToRow & vanilla leaf node | \>= 1 | +| spark.gluten.sql.columnar.query.fallback.threshold | Configure the threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node | \>= 1 | +| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | When true, the fallback policy ignores the RowToColumnar when counting fallback number. | true | +| spark.gluten.sql.columnar.fallback.preferColumnar | When true, the fallback policy prefers to use Gluten plan rather than vanilla Spark plan if the both of them contains ColumnarToRow and the vanilla Spark plan ColumnarToRow number is not smaller than Gluten plan. | true | +| spark.gluten.sql.columnar.force.hashagg | Force to use hash agg to replace sort agg. | true | +| spark.gluten.sql.columnar.vanillaReaders | Enable vanilla spark's vectorized reader. Please note it may bring perf. overhead due to extra data transition. We recommend to disable it if most queries can be fully offloaded to gluten. | false | +| spark.gluten.sql.native.bloomFilter | Enable or Disable native runtime bloom filter. | true | +| spark.gluten.sql.native.arrow.reader.enabled | Enable or Disable native arrow read CSV file format | false | +| spark.gluten.shuffleWriter.bufferSize | Set the number of buffer rows for the shuffle writer | value of spark.gluten.sql.columnar.maxBatchSize | +| spark.gluten.loadLibFromJar | Controls whether to load dynamic link library from a packed jar for gluten/cpp. Not applicable to static build and clickhouse backend. | false | +| spark.gluten.loadLibOS | When `spark.gluten.loadLibFromJar` is true. Manually specify the system os to load library, e.g., CentOS | | +| spark.gluten.loadLibOSVersion | Manually specify the system os version to load library, e.g., if `spark.gluten.loadLibOS` is CentOS, this config can be 7 | | +| spark.gluten.expression.blacklist | A black list of expression to skip transform, multiple values separated by commas. | | +| spark.gluten.sql.columnar.fallback.expressions.threshold | Fall back filter/project if the height of expression tree reaches this threshold, considering Spark codegen can bring better performance for such case. | 50 | +| spark.gluten.sql.cartesianProductTransformerEnabled | Config to enable CartesianProductExecTransformer. | true | +| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | Config to enable BroadcastNestedLoopJoinExecTransformer. | true | +| spark.gluten.sql.cacheWholeStageTransformerContext | When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` when executing. It is used to get substrait plan node and native plan string. | false | +| spark.gluten.sql.injectNativePlanStringToExplain | When true, Gluten will inject native plan tree to explain string inside `WholeStageTransformerContext`. | false | +| spark.gluten.sql.fallbackRegexpExpressions | When true, Gluten will fall back all regexp expressions to avoid any incompatibility risk. | false | ## Velox Parameters diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 70b85165c37b4..3cea5e76a8373 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -197,25 +197,29 @@ object OffloadJoin { val rightBuildable = BackendsApiManager.getSettings.supportHashBuildJoinTypeOnRight(shj.joinType) if (!leftBuildable) { - BuildRight - } else if (!rightBuildable) { - BuildLeft - } else { - shj.logicalLink match { - case Some(join: Join) => - val leftSize = join.left.stats.sizeInBytes - val rightSize = join.right.stats.sizeInBytes - val leftRowCount = join.left.stats.rowCount - val rightRowCount = join.right.stats.rowCount - if (rightSize == leftSize && rightRowCount.isDefined && leftRowCount.isDefined) { - if (rightRowCount.get <= leftRowCount.get) BuildRight - else BuildLeft - } else if (rightSize <= leftSize) BuildRight + return BuildRight + } + if (!rightBuildable) { + return BuildLeft + } + // Both left and right are buildable. Find out the better one. + if (!GlutenConfig.getConf.shuffledHashJoinOptimizeBuildSide) { + return shj.buildSide + } + shj.logicalLink match { + case Some(join: Join) => + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val leftRowCount = join.left.stats.rowCount + val rightRowCount = join.right.stats.rowCount + if (rightSize == leftSize && rightRowCount.isDefined && leftRowCount.isDefined) { + if (rightRowCount.get <= leftRowCount.get) BuildRight else BuildLeft - // Only the ShuffledHashJoinExec generated directly in some spark tests is not link - // logical plan, such as OuterJoinSuite. - case _ => shj.buildSide - } + } else if (rightSize <= leftSize) BuildRight + else BuildLeft + // Only the ShuffledHashJoinExec generated directly in some spark tests is not link + // logical plan, such as OuterJoinSuite. + case _ => shj.buildSide } } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index df4b64c74ea01..a49582bb55ccb 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -70,6 +70,9 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableColumnarShuffledHashJoin: Boolean = conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED) + def shuffledHashJoinOptimizeBuildSide: Boolean = + conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_OPTIMIZE_BUILD_SIDE) + def enableNativeColumnarToRow: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED) def forceShuffledHashJoin: Boolean = conf.getConf(COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED) @@ -928,6 +931,13 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val COLUMNAR_SHUFFLED_HASH_JOIN_OPTIMIZE_BUILD_SIDE = + buildConf("spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide") + .internal() + .doc("Whether to allow Gluten to choose an optimal build side for shuffled hash join.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_COLUMNAR_TO_ROW_ENABLED = buildConf("spark.gluten.sql.columnar.columnarToRow") .internal()