Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
guihuawen committed May 20, 2024
1 parent 4811269 commit 2190ced
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
cause = null)
}

def writePartitionExceedConfigSizeWhenDynamicPartitionPerTaskError(
maxDynamicPartitions: Int,
maxDynamicPartitionsKey: String): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_BY_TASK_2277",
messageParameters = Map(
"maxDynamicPartitionsKey" -> maxDynamicPartitionsKey,
"maxDynamicPartitions" -> maxDynamicPartitions.toString),
cause = null)
}


def invalidNumberFormatError(
valueType: String, input: String, format: String): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.spark.sql.execution.datasources

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -29,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -346,6 +346,8 @@ class DynamicPartitionDataSingleWriter(

private var currentPartitionValues: Option[UnsafeRow] = None
private var currentBucketId: Option[Int] = None
private val maxDynamicPartitions: Int = description.maxDynamicPartitions
private val concurrentWriters = mutable.HashMap[Option[UnsafeRow], Option[UnsafeRow]]()

override def write(record: InternalRow): Unit = {
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
Expand All @@ -356,6 +358,14 @@ class DynamicPartitionDataSingleWriter(
if (isPartitioned && currentPartitionValues != nextPartitionValues) {
currentPartitionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
if (maxDynamicPartitions > 0 && !concurrentWriters.contains(nextPartitionValues)) {
concurrentWriters.put(nextPartitionValues, nextPartitionValues)
if (concurrentWriters.size > maxDynamicPartitions) {
throw QueryExecutionErrors.
writePartitionExceedConfigSizeWhenDynamicPartitionPerTaskError(
maxDynamicPartitions, "max partition num")
}
}
}
if (isBucketed) {
currentBucketId = nextBucketId
Expand Down Expand Up @@ -585,7 +595,8 @@ class WriteJobDescription(
val customPartitionLocations: Map[TablePartitionSpec, String],
val maxRecordsPerFile: Long,
val timeZoneId: String,
val statsTrackers: Seq[WriteJobStatsTracker])
val statsTrackers: Seq[WriteJobStatsTracker],
val maxDynamicPartitions: Int = 0)
extends Serializable {

assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ object FileFormatWriter extends Logging {
.getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
statsTrackers = statsTrackers
statsTrackers = statsTrackers,
maxDynamicPartitions = caseInsensitiveOptions.get("maxDynamicPartitions")
.map(_.toInt).get
)

// We should first sort by dynamic partition columns, then bucket id, and finally sorting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ class FileFormatWriterSuite
}
}

test("test 1") {
withTable("t1", "t2") {
Seq((0, "p1"), (1, "p2"), (2, "p3")).toDF("id", "p")
.write.partitionBy("p").saveAsTable("t1")
checkAnswer(spark.table("t1").sort("id"), Seq(Row(0, "p1"), Row(1, "p2"), Row(2, "p3")))
withTempDir { tempDir =>
sql(s"create table t2(id long, p string) using parquet " +
s"partitioned by (p) location '${tempDir.toURI}'")
sql("set hive.exec.max.dynamic.partitions=2")
sql("insert overwrite table t2 partition(p) select id, p from t1")
}
}
}

test("SPARK-33904: save and insert into a table in a namespace of spark_catalog") {
val ns = "spark_catalog.ns"
withNamespace(ns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,9 @@ case class InsertIntoHiveTable(
tmpLocation: Path,
child: SparkPlan): Unit = {

val numDynamicPartitions = partition.values.count(_.isEmpty)
val partitionSpec = getPartitionSpec(partition)
var checkMaxDynamicPartitions = false
if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
options ++ Map("maxDynamicPartitions" -> String.valueOf(maxDynamicPartitions))
checkMaxDynamicPartitions = true
}

val writtenParts = saveAsHiveFile(
Expand All @@ -146,7 +142,14 @@ case class InsertIntoHiveTable(
outputLocation = tmpLocation.toString,
partitionAttributes = partitionColumns,
bucketSpec = bucketSpec,
options = options)
options = if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
options ++ Map("maxDynamicPartitions" -> String.valueOf(maxDynamicPartitions))
} else {
options
})

if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options)
options = options
)
}
}

0 comments on commit 2190ced

Please sign in to comment.