diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 9884a0c6ef39f..ffd9068b166a8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -407,4 +407,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } } + def getBroadcastThreshold: Long = { + val conf = SQLConf.get + conf + .getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD) + .getOrElse(conf.autoBroadcastJoinThreshold) + } + } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 8fdc2645a5fb1..6bd200ed51ee6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -539,6 +539,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { CHExecUtil.buildSideRDD(dataSize, newChild).collect val batches = countsAndBytes.map(_._2) + val totalBatchesBytes = batches.map(_.length).sum + if ( + totalBatchesBytes < 0 || totalBatchesBytes.toLong > CHBackendSettings.getBroadcastThreshold + ) { + throw new SparkException( + s"Cannot broadcast the table (${totalBatchesBytes}) that is larger than threshold:" + + s" ${CHBackendSettings.getBroadcastThreshold}. Ensure the shuffle written" + + s"bytes is collected properly.") + } val rawSize = dataSize.value if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) { throw new SparkException(