Skip to content

Commit

Permalink
add checks
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Aug 20, 2024
1 parent db7d4be commit c5de44e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

def getBroadcastThreshold: Long = {
val conf = SQLConf.get
conf
.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
.getOrElse(conf.autoBroadcastJoinThreshold)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
Expand All @@ -31,7 +32,7 @@ import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer

import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.ShuffleDependency
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
Expand Down Expand Up @@ -539,9 +540,19 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHExecUtil.buildSideRDD(dataSize, newChild).collect

val batches = countsAndBytes.map(_._2)
val totalBatchesBytes = batches.map(_.length).sum
if (
totalBatchesBytes < 0 ||
totalBatchesBytes.toLong > CHBackendSettings.getBroadcastThreshold * 2
) {
throw new GlutenException(
s"Cannot broadcast the table ($totalBatchesBytes) that is larger than threshold:" +
s" ${CHBackendSettings.getBroadcastThreshold}. Ensure the shuffle written" +
s"bytes is collected properly.")
}
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
throw new SparkException(
throw new GlutenException(
s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB")
}
val rowCount = countsAndBytes.map(_._1).sum
Expand Down

0 comments on commit c5de44e

Please sign in to comment.