Skip to content

Commit

Permalink
[SPARK-48378][SQL]Limit the maximum number of dynamic partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
guihuawen committed May 21, 2024
1 parent 2bf4346 commit 438b9a8
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources

import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
Expand Down Expand Up @@ -364,6 +366,8 @@ class DynamicPartitionDataSingleWriter(

private var currentPartitionValues: Option[UnsafeRow] = None
private var currentBucketId: Option[Int] = None
private val maxDynamicPartitions: Int = description.maxDynamicPartitions
private val concurrentWriters = mutable.HashMap[Option[UnsafeRow], Option[UnsafeRow]]()

override def write(record: InternalRow): Unit = {
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
Expand All @@ -374,6 +378,14 @@ class DynamicPartitionDataSingleWriter(
if (isPartitioned && currentPartitionValues != nextPartitionValues) {
currentPartitionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
if (maxDynamicPartitions > 0 && !concurrentWriters.contains(nextPartitionValues)) {
concurrentWriters.put(nextPartitionValues, nextPartitionValues)
if (concurrentWriters.size > maxDynamicPartitions) {
throw QueryExecutionErrors.
writePartitionExceedConfigSizeWhenDynamicPartitionPerTaskError(
maxDynamicPartitions, "max partition num")
}
}
}
if (isBucketed) {
currentBucketId = nextBucketId
Expand Down Expand Up @@ -604,7 +616,8 @@ class WriteJobDescription(
val customPartitionLocations: Map[TablePartitionSpec, String],
val maxRecordsPerFile: Long,
val timeZoneId: String,
val statsTrackers: Seq[WriteJobStatsTracker])
val statsTrackers: Seq[WriteJobStatsTracker],
val maxDynamicPartitions: Int = 0)
extends Serializable {

assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ object FileFormatWriter extends Logging {
.getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
statsTrackers = statsTrackers
statsTrackers = statsTrackers,
maxDynamicPartitions = caseInsensitiveOptions.get("maxDynamicPartitions")
.map(_.toInt).get
)

// We should first sort by dynamic partition columns, then bucket id, and finally sorting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ class FileFormatWriterSuite
}
}

test("test 1") {
withTable("t1", "t2") {
Seq((0, "p1"), (1, "p2"), (2, "p3")).toDF("id", "p")
.write.partitionBy("p").saveAsTable("t1")
checkAnswer(spark.table("t1").sort("id"), Seq(Row(0, "p1"), Row(1, "p2"), Row(2, "p3")))
withTempDir { tempDir =>
sql(s"create table t2(id long, p string) using parquet " +
s"partitioned by (p) location '${tempDir.toURI}'")
sql("set hive.exec.max.dynamic.partitions=2")
sql("insert overwrite table t2 partition(p) select id, p from t1")
}
}
}

test("SPARK-33904: save and insert into a table in a namespace of spark_catalog") {
val ns = "spark_catalog.ns"
withNamespace(ns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,14 @@ case class InsertIntoHiveTable(
outputLocation = tmpLocation.toString,
partitionAttributes = partitionColumns,
bucketSpec = bucketSpec,
options = options)
options = if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
options ++ Map("maxDynamicPartitions" -> String.valueOf(maxDynamicPartitions))
} else {
options
})

if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options)
options = options
)
}
}

0 comments on commit 438b9a8

Please sign in to comment.