Skip to content

Commit

Permalink
add checks
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Aug 20, 2024
1 parent db7d4be commit 9509d63
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

def getBroadcastThreshold: Long = {
val conf = SQLConf.get
conf
.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
.getOrElse(conf.autoBroadcastJoinThreshold)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHExecUtil.buildSideRDD(dataSize, newChild).collect

val batches = countsAndBytes.map(_._2)
val totalBatchesBytes = batches.map(_.length).sum
if (
totalBatchesBytes < 0 || totalBatchesBytes.toLong > CHBackendSettings.getBroadcastThreshold
) {
throw new SparkException(
s"Cannot broadcast the table that is larger than threshold:" +
s" ${CHBackendSettings.getBroadcastThreshold}. Ensure the shuffle written" +
s"bytes is collected properly.")
}
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
throw new SparkException(
Expand Down

0 comments on commit 9509d63

Please sign in to comment.