From dd39c4faacb0e0a63a60183503aa836eeb0a2db6 Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Fri, 13 Sep 2024 13:08:10 +0800 Subject: [PATCH] fix split files --- .../org/apache/gluten/utils/CHInputPartitionsUtil.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala index 0f35ff66d4d1e..777b32123c2b6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala @@ -91,15 +91,17 @@ case class CHInputPartitionsUtil( .sortBy(_.length)(implicitly[Ordering[Long]].reverse) val totalCores = SparkResourceUtil.getTotalCores(relation.sparkSession.sessionState.conf) - val fileCntPerPartition = math.ceil((splitFiles.size * 1.0) / totalCores).toInt + val isAllSmallFiles = splitFiles.forall(_.length < maxSplitBytes) val fileCntThreshold = relation.sparkSession.sessionState.conf .getConfString( CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD, CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT ) .toInt + val fileCntPerPartition = + math.ceil((splitFiles.size * 1.0) / (totalCores * fileCntThreshold)).toInt - if (fileCntThreshold > 0 && fileCntPerPartition > fileCntThreshold) { + if (fileCntThreshold > 0 && isAllSmallFiles && fileCntPerPartition <= fileCntThreshold) { getFilePartitionsByFileCnt(splitFiles, fileCntPerPartition) } else { FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)