From fb2e02b9a6032f0130cebfdf9b8016b7c418537e Mon Sep 17 00:00:00 2001 From: guihuawen Date: Fri, 24 May 2024 10:05:28 +0800 Subject: [PATCH] [SPARK-48378][SQL]Limit the maximum number of dynamic partitions --- .../datasources/FileFormatDataWriter.scala | 16 +++++++--------- .../sql/hive/execution/SaveAsHiveFile.scala | 3 +-- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 22b93959937d5..443759bcb8f3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -367,7 +365,7 @@ class DynamicPartitionDataSingleWriter( private var currentPartitionValues: Option[UnsafeRow] = None private var currentBucketId: Option[Int] = None private val maxDynamicPartitions: Int = description.maxDynamicPartitions - private val concurrentWriters = mutable.HashMap[Option[UnsafeRow], Option[UnsafeRow]]() + private val dynamicPartitions = mutable.HashMap[Option[UnsafeRow], Option[UnsafeRow]]() override def write(record: InternalRow): Unit = { val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None @@ -378,12 +376,12 @@ class DynamicPartitionDataSingleWriter( if (isPartitioned && currentPartitionValues != nextPartitionValues) { currentPartitionValues = Some(nextPartitionValues.get.copy()) statsTrackers.foreach(_.newPartition(currentPartitionValues.get)) - if (maxDynamicPartitions > 0 && !concurrentWriters.contains(nextPartitionValues)) { - concurrentWriters.put(nextPartitionValues, nextPartitionValues) - if (concurrentWriters.size > maxDynamicPartitions) { - throw QueryExecutionErrors. - writePartitionExceedConfigSizeWhenDynamicPartitionPerTaskError( - maxDynamicPartitions, "max partition num") + if (maxDynamicPartitions > 0 && !dynamicPartitions.contains(nextPartitionValues)) { + dynamicPartitions.put(nextPartitionValues, nextPartitionValues) + if (dynamicPartitions.size > maxDynamicPartitions) { + assert(dynamicPartitions.size < maxDynamicPartitions, + dynamicPartitions.size + " dynamic partitions have been created " + + " which is more than " + maxDynamicPartitions) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 67a644580cd2e..47d402c2e8b1a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -58,8 +58,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { partitionColumns = partitionAttributes, bucketSpec = bucketSpec, statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = options - ) + options = options) } }