Skip to content

Commit

Permalink
fix split files
Browse files Browse the repository at this point in the history
  • Loading branch information
zzcclp committed Sep 13, 2024
1 parent aaff74d commit dd39c4f
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,17 @@ case class CHInputPartitionsUtil(
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val totalCores = SparkResourceUtil.getTotalCores(relation.sparkSession.sessionState.conf)
val fileCntPerPartition = math.ceil((splitFiles.size * 1.0) / totalCores).toInt
val isAllSmallFiles = splitFiles.forall(_.length < maxSplitBytes)
val fileCntThreshold = relation.sparkSession.sessionState.conf
.getConfString(
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD,
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT
)
.toInt
val fileCntPerPartition =
math.ceil((splitFiles.size * 1.0) / (totalCores * fileCntThreshold)).toInt

if (fileCntThreshold > 0 && fileCntPerPartition > fileCntThreshold) {
if (fileCntThreshold > 0 && isAllSmallFiles && fileCntPerPartition <= fileCntThreshold) {
getFilePartitionsByFileCnt(splitFiles, fileCntPerPartition)
} else {
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
Expand Down

0 comments on commit dd39c4f

Please sign in to comment.