From c5de44e00ac7e2f4965fd2408664dda1711371f3 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 20 Aug 2024 18:10:59 +0800 Subject: [PATCH] add checks --- .../gluten/backendsapi/clickhouse/CHBackend.scala | 7 +++++++ .../clickhouse/CHSparkPlanExecApi.scala | 15 +++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 9884a0c6ef39f..ffd9068b166a8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -407,4 +407,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } } + def getBroadcastThreshold: Long = { + val conf = SQLConf.get + conf + .getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD) + .getOrElse(conf.autoBroadcastJoinThreshold) + } + } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 8fdc2645a5fb1..e3ef0547f4ad0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi} +import org.apache.gluten.exception.GlutenException import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ @@ -31,7 +32,7 @@ import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy} import org.apache.gluten.vectorized.CHColumnarBatchSerializer -import org.apache.spark.{ShuffleDependency, SparkException} +import org.apache.spark.ShuffleDependency import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper} @@ -539,9 +540,19 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { CHExecUtil.buildSideRDD(dataSize, newChild).collect val batches = countsAndBytes.map(_._2) + val totalBatchesBytes = batches.map(_.length).sum + if ( + totalBatchesBytes < 0 || + totalBatchesBytes.toLong > CHBackendSettings.getBroadcastThreshold * 2 + ) { + throw new GlutenException( + s"Cannot broadcast the table ($totalBatchesBytes) that is larger than threshold:" + + s" ${CHBackendSettings.getBroadcastThreshold}. Ensure the shuffle written" + + s"bytes is collected properly.") + } val rawSize = dataSize.value if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) { - throw new SparkException( + throw new GlutenException( s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB") } val rowCount = countsAndBytes.map(_._1).sum