Skip to content

Commit

Permalink
[SPARK-48378][SQL]Limit the maximum number of dynamic partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
guihuawen committed May 24, 2024
1 parent 438b9a8 commit fb2e02b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources

import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
Expand Down Expand Up @@ -367,7 +365,7 @@ class DynamicPartitionDataSingleWriter(
private var currentPartitionValues: Option[UnsafeRow] = None
private var currentBucketId: Option[Int] = None
private val maxDynamicPartitions: Int = description.maxDynamicPartitions
private val concurrentWriters = mutable.HashMap[Option[UnsafeRow], Option[UnsafeRow]]()
private val dynamicPartitions = mutable.HashMap[Option[UnsafeRow], Option[UnsafeRow]]()

override def write(record: InternalRow): Unit = {
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
Expand All @@ -378,12 +376,12 @@ class DynamicPartitionDataSingleWriter(
if (isPartitioned && currentPartitionValues != nextPartitionValues) {
currentPartitionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
if (maxDynamicPartitions > 0 && !concurrentWriters.contains(nextPartitionValues)) {
concurrentWriters.put(nextPartitionValues, nextPartitionValues)
if (concurrentWriters.size > maxDynamicPartitions) {
throw QueryExecutionErrors.
writePartitionExceedConfigSizeWhenDynamicPartitionPerTaskError(
maxDynamicPartitions, "max partition num")
if (maxDynamicPartitions > 0 && !dynamicPartitions.contains(nextPartitionValues)) {
dynamicPartitions.put(nextPartitionValues, nextPartitionValues)
if (dynamicPartitions.size > maxDynamicPartitions) {
assert(dynamicPartitions.size < maxDynamicPartitions,
dynamicPartitions.size + " dynamic partitions have been created " +
" which is more than " + maxDynamicPartitions)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options
)
options = options)
}
}

0 comments on commit fb2e02b

Please sign in to comment.