Skip to content

Commit

Permalink
remove check it's not safet
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Aug 21, 2024
1 parent e7bbd40 commit 4574871
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,4 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}
}

def getBroadcastThreshold: Long = {
val conf = SQLConf.get
conf
.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
.getOrElse(conf.autoBroadcastJoinThreshold)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer

import org.apache.spark.ShuffleDependency
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
Expand Down Expand Up @@ -70,7 +71,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class CHSparkPlanExecApi extends SparkPlanExecApi {
class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {

/** The columnar-batch type this backend is using. */
override def batchType: Convention.BatchType = CHBatch
Expand Down Expand Up @@ -540,22 +541,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHExecUtil.buildSideRDD(dataSize, newChild).collect

val batches = countsAndBytes.map(_._2)
val totalBatchesBytes = batches.map(_.length).sum
// totalBatchesBytes could be larger than the shuffle written bytes, so we double the threshold
// here.
if (
totalBatchesBytes < 0 ||
totalBatchesBytes.toLong > CHBackendSettings.getBroadcastThreshold * 2
) {
val totalBatchesSize = batches.map(_.length).sum
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES ||) {
throw new GlutenException(
s"Cannot broadcast the table ($totalBatchesBytes) that is larger than threshold:" +
s" ${CHBackendSettings.getBroadcastThreshold}. Ensure the shuffle written" +
s"bytes is collected properly.")
s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes")
}
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) {
throw new GlutenException(
s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB")
s"Invalid rawSize($rawSize) or totalBatchesSize ($totalBatchesSize). Ensure the shuffle" +
s" written bytes is correct.")
}
val rowCount = countsAndBytes.map(_._1).sum
numOutputRows += rowCount
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_
// AQE has dependency on total_bytes_written, if the data is wrong, it will generate inappropriate plan
// add a log here for remining this.
if (!result.total_bytes_written)
LOG_WARNING(getLogger("_CHShuffleSplitterJniWrapper"), "total_bytes_written is 0, something may be wrong");
LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"), "total_bytes_written is 0, something may be wrong");

jobject split_result = env->NewObject(
split_result_class,
Expand Down

0 comments on commit 4574871

Please sign in to comment.