diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index db77ce2ae93ec..38cfe013868f3 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, SparkPlan} -import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric @@ -463,7 +462,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { * @return */ override def genExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] = { - List(spark => NativeWritePostRule(spark)) + List() } /** diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala deleted file mode 100644 index b893e89270d50..0000000000000 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.util.SerializableConfiguration - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} - -import java.io.FileNotFoundException -import java.nio.charset.StandardCharsets - -import scala.collection.mutable - -/** - * Simple metrics collected during an instance of [[FileFormatDataWriter]]. These were first - * introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703). - */ -case class BasicWriteTaskStats( - partitions: Seq[InternalRow], - numFiles: Int, - numBytes: Long, - numRows: Long) - extends WriteTaskStats - -/** Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. */ -class BasicWriteTaskStatsTracker( - hadoopConf: Configuration, - taskCommitTimeMetric: Option[SQLMetric] = None) - extends WriteTaskStatsTracker - with Logging { - - private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty - private[this] var numFiles: Int = 0 - private[this] var numSubmittedFiles: Int = 0 - private[this] var numBytes: Long = 0L - private[this] var numRows: Long = 0L - - private[this] val submittedFiles = mutable.HashSet[String]() - - /** - * Get the size of the file expected to have been written by a worker. - * @param filePath - * path to the file - * @return - * the file size or None if the file was not found. - */ - private def getFileSize(filePath: String): Option[Long] = { - val path = new Path(filePath) - val fs = path.getFileSystem(hadoopConf) - getFileSize(fs, path) - } - - /** - * Get the size of the file expected to have been written by a worker. This supports the XAttr in - * HADOOP-17414 when the "magic committer" adds a custom HTTP header to the a zero byte marker. If - * the output file as returned by getFileStatus > 0 then the length if returned. For zero-byte - * files, the (optional) Hadoop FS API getXAttr() is invoked. If a parseable, non-negative length - * can be retrieved, this is returned instead of the length. - * @return - * the file size or None if the file was not found. - */ - private[datasources] def getFileSize(fs: FileSystem, path: Path): Option[Long] = { - // the normal file status probe. - try { - val len = fs.getFileStatus(path).getLen - if (len > 0) { - return Some(len) - } - } catch { - case e: FileNotFoundException => - // may arise against eventually consistent object stores. - logDebug(s"File $path is not yet visible", e) - return None - } - - // Output File Size is 0. Look to see if it has an attribute - // declaring a future-file-length. - // Failure of API call, parsing, invalid value all return the - // 0 byte length. - - var len = 0L - try { - val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR) - if (attr != null && attr.nonEmpty) { - val str = new String(attr, StandardCharsets.UTF_8) - logDebug(s"File Length statistics for $path retrieved from XAttr: $str") - // a non-empty header was found. parse to a long via the java class - val l = java.lang.Long.parseLong(str) - if (l > 0) { - len = l - } else { - logDebug("Ignoring negative value in XAttr file length") - } - } - } catch { - case e: NumberFormatException => - // warn but don't dump the whole stack - logInfo( - s"Failed to parse" + - s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" + - s" bytes written may be under-reported"); - case e: UnsupportedOperationException => - // this is not unusual; ignore - logDebug(s"XAttr not supported on path $path", e); - case e: Exception => - // Something else. Log at debug and continue. - logDebug(s"XAttr processing failure on $path", e); - } - Some(len) - } - - override def newPartition(partitionValues: InternalRow): Unit = { - partitions.append(partitionValues) - } - - override def newFile(filePath: String): Unit = { - submittedFiles += filePath - numSubmittedFiles += 1 - } - - override def closeFile(filePath: String): Unit = { - updateFileStats(filePath) - submittedFiles.remove(filePath) - } - - private def updateFileStats(filePath: String): Unit = { - getFileSize(filePath).foreach { - len => - numBytes += len - numFiles += 1 - } - } - - override def newRow(filePath: String, row: InternalRow): Unit = row match { - case fake: FakeRow => - numRows += fake.batch.numRows() - case _ => numRows += 1 - } - - override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { - submittedFiles.foreach(updateFileStats) - submittedFiles.clear() - - // Reports bytesWritten and recordsWritten to the Spark output metrics. - Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { - outputMetrics => - outputMetrics.setBytesWritten(numBytes) - outputMetrics.setRecordsWritten(numRows) - } - - if (numSubmittedFiles != numFiles) { - logWarning( - s"Expected $numSubmittedFiles files, but only saw $numFiles. " + - "This could be due to the output format not writing empty files, " + - "or files being not immediately visible in the filesystem.") - } - taskCommitTimeMetric.foreach(_ += taskCommitTime) - BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) - } -} - -/** - * Simple [[WriteJobStatsTracker]] implementation that's serializable, capable of instantiating - * [[BasicWriteTaskStatsTracker]] on executors and processing the [[BasicWriteTaskStats]] they - * produce by aggregating the metrics and posting them as DriverMetricUpdates. - */ -class BasicWriteJobStatsTracker( - serializableHadoopConf: SerializableConfiguration, - @transient val driverSideMetrics: Map[String, SQLMetric], - taskCommitTimeMetric: SQLMetric) - extends WriteJobStatsTracker { - - def this(serializableHadoopConf: SerializableConfiguration, metrics: Map[String, SQLMetric]) = { - this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME)) - } - - override def newTaskInstance(): WriteTaskStatsTracker = { - new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric)) - } - - override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { - val sparkContext = SparkContext.getActive.get - val partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty - var numFiles: Long = 0L - var totalNumBytes: Long = 0L - var totalNumOutput: Long = 0L - - val basicStats = stats.map(_.asInstanceOf[BasicWriteTaskStats]) - - basicStats.foreach { - summary => - partitionsSet ++= summary.partitions - numFiles += summary.numFiles - totalNumBytes += summary.numBytes - totalNumOutput += summary.numRows - } - - driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime) - driverSideMetrics(NUM_FILES_KEY).add(numFiles) - driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) - driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) - driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList) - } -} - -object BasicWriteJobStatsTracker { - private val NUM_FILES_KEY = "numFiles" - private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes" - private val NUM_OUTPUT_ROWS_KEY = "numOutputRows" - private val NUM_PARTS_KEY = "numParts" - val TASK_COMMIT_TIME = "taskCommitTime" - val JOB_COMMIT_TIME = "jobCommitTime" - - /** XAttr key of the data length header added in HADOOP-17414. */ - val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length" - - def metrics: Map[String, SQLMetric] = { - val sparkContext = SparkContext.getActive.get - Map( - NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), - NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), - NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"), - TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"), - JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time") - ) - } -} diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala deleted file mode 100644 index 2d54cffd7ef9d..0000000000000 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ /dev/null @@ -1,675 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import io.glutenproject.execution.datasource.GlutenRowSplitter - -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} -import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec -import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StringType -import org.apache.spark.util.{SerializableConfiguration, Utils} - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.TaskAttemptContext - -import scala.collection.mutable - -/** - * Abstract class for writing out data in a single Spark task. Exceptions thrown by the - * implementation of this trait will automatically trigger task aborts. - */ -abstract class FileFormatDataWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric]) - extends DataWriter[InternalRow] { - - /** - * Max number of files a single task writes out due to file size. In most cases the number of - * files written should be very small. This is just a safe guard to protect some really bad - * settings, e.g. maxRecordsPerFile = 1. - */ - protected val MAX_FILE_COUNTER: Int = 1000 * 1000 - protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]() - protected var currentWriter: OutputWriter = _ - - /** Trackers for computing various statistics on the data as it's being written out. */ - protected val statsTrackers: Seq[WriteTaskStatsTracker] = - description.statsTrackers.map(_.newTaskInstance()) - - /** Release resources of `currentWriter`. */ - protected def releaseCurrentWriter(): Unit = { - if (currentWriter != null) { - try { - currentWriter.close() - statsTrackers.foreach(_.closeFile(currentWriter.path())) - } finally { - currentWriter = null - } - } - } - - /** Release all resources. */ - protected def releaseResources(): Unit = { - // Call `releaseCurrentWriter()` by default, as this is the only resource to be released. - releaseCurrentWriter() - } - - /** Writes a record. */ - def write(record: InternalRow): Unit - - def writeWithMetrics(record: InternalRow, count: Long): Unit = { - if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) { - CustomMetrics.updateMetrics(currentMetricsValues, customMetrics) - } - write(record) - } - - /** Write an iterator of records. */ - def writeWithIterator(iterator: Iterator[InternalRow]): Unit = { - var count = 0L - while (iterator.hasNext) { - writeWithMetrics(iterator.next(), count) - count += 1 - } - CustomMetrics.updateMetrics(currentMetricsValues, customMetrics) - } - - /** - * Returns the summary of relative information which includes the list of partition strings - * written out. The list of partitions is sent back to the driver and used to update the catalog. - * Other information will be sent back to the driver too and used to e.g. update the metrics in - * UI. - */ - override def commit(): WriteTaskResult = { - releaseResources() - val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { - committer.commitTask(taskAttemptContext) - } - val summary = ExecutedWriteSummary( - updatedPartitions = updatedPartitions.toSet, - stats = statsTrackers.map(_.getFinalStats(taskCommitTime))) - WriteTaskResult(taskCommitMessage, summary) - } - - def abort(): Unit = { - try { - releaseResources() - } finally { - committer.abortTask(taskAttemptContext) - } - } - - override def close(): Unit = {} -} - -/** FileFormatWriteTask for empty partitions */ -class EmptyDirectoryDataWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric] = Map.empty -) extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - override def write(record: InternalRow): Unit = {} -} - -/** Writes data to a single directory (used for non-dynamic-partition writes). */ -class SingleDirectoryDataWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric] = Map.empty) - extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - private var fileCounter: Int = _ - private var recordsInFile: Long = _ - // Initialize currentWriter and statsTrackers - newOutputWriter() - - private def newOutputWriter(): Unit = { - recordsInFile = 0 - releaseResources() - - val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) - val currentPath = - committer.newTaskTempFile(taskAttemptContext, None, f"-c$fileCounter%03d" + ext) - - currentWriter = description.outputWriterFactory.newInstance( - path = currentPath, - dataSchema = description.dataColumns.toStructType, - context = taskAttemptContext) - - statsTrackers.foreach(_.newFile(currentPath)) - } - - private def updateRecordsInFile(record: InternalRow): Unit = record match { - case fake: FakeRow => - recordsInFile += fake.batch.numRows() - case _ => recordsInFile += 1 - } - - override def write(record: InternalRow): Unit = { - if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { - fileCounter += 1 - assert( - fileCounter < MAX_FILE_COUNTER, - s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") - - newOutputWriter() - } - - currentWriter.write(record) - statsTrackers.foreach(_.newRow(currentWriter.path, record)) - updateRecordsInFile(record) - } -} - -/** - * Holds common logic for writing data with dynamic partition writes, meaning it can write to - * multiple directories (partitions) or files (bucketing). - */ -abstract class BaseDynamicPartitionDataWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric]) - extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - - /** Flag saying whether or not the data to be written out is partitioned. */ - protected val isPartitioned = description.partitionColumns.nonEmpty - - /** Flag saying whether or not the data to be written out is bucketed. */ - protected val isBucketed = description.bucketSpec.isDefined - - assert( - isPartitioned || isBucketed, - s"""DynamicPartitionWriteTask should be used for writing out data that's either - |partitioned or bucketed. In this case neither is true. - |WriteJobDescription: $description - """.stripMargin - ) - - /** Number of records in current file. */ - protected var recordsInFile: Long = _ - - /** - * File counter for writing current partition or bucket. For same partition or bucket, we may have - * more than one file, due to number of records limit per file. - */ - protected var fileCounter: Int = _ - - /** Extracts the partition values out of an input row. */ - protected lazy val getPartitionValues: InternalRow => UnsafeRow = { - val proj = UnsafeProjection.create(description.partitionColumns, description.allColumns) - row => proj(row) - } - - /** Expression that given partition columns builds a path string like: col1=val/col2=val/... */ - private lazy val partitionPathExpression: Expression = Concat( - description.partitionColumns.zipWithIndex.flatMap { - case (c, i) => - val partitionName = ScalaUDF( - ExternalCatalogUtils.getPartitionPathString _, - StringType, - Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId)))) - if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName) - }) - - /** - * Evaluates the `partitionPathExpression` above on a row of `partitionValues` and returns the - * partition string. - */ - private lazy val getPartitionPath: InternalRow => String = { - val proj = UnsafeProjection.create(Seq(partitionPathExpression), description.partitionColumns) - row => proj(row).getString(0) - } - - /** Given an input row, returns the corresponding `bucketId` */ - protected lazy val getBucketId: InternalRow => Int = { - val proj = - UnsafeProjection.create( - Seq(description.bucketSpec.get.bucketIdExpression), - description.allColumns) - row => proj(row).getInt(0) - } - - /** Returns the data columns to be written given an input row */ - protected val getOutputRow = - UnsafeProjection.create(description.dataColumns, description.allColumns) - - /** - * Opens a new OutputWriter given a partition key and/or a bucket id. If bucket id is specified, - * we will append it to the end of the file name, but before the file extension, e.g. - * part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet - * - * @param partitionValues - * the partition which all tuples being written by this OutputWriter belong to - * @param bucketId - * the bucket which all tuples being written by this OutputWriter belong to - * @param closeCurrentWriter - * close and release resource for current writer - */ - protected def renewCurrentWriter( - partitionValues: Option[InternalRow], - bucketId: Option[Int], - closeCurrentWriter: Boolean): Unit = { - - recordsInFile = 0 - if (closeCurrentWriter) { - releaseCurrentWriter() - } - - val partDir = partitionValues.map(getPartitionPath(_)) - partDir.foreach(updatedPartitions.add) - - val bucketIdStr = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - - // The prefix and suffix must be in a form that matches our bucketing format. See BucketingUtils - // for details. The prefix is required to represent bucket id when writing Hive-compatible - // bucketed table. - val prefix = bucketId match { - case Some(id) => description.bucketSpec.get.bucketFileNamePrefix(id) - case _ => "" - } - val suffix = f"$bucketIdStr.c$fileCounter%03d" + - description.outputWriterFactory.getFileExtension(taskAttemptContext) - val fileNameSpec = FileNameSpec(prefix, suffix) - - val customPath = partDir.flatMap { - dir => description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) - } - val currentPath = if (customPath.isDefined) { - committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, fileNameSpec) - } else { - committer.newTaskTempFile(taskAttemptContext, partDir, fileNameSpec) - } - - currentWriter = description.outputWriterFactory.newInstance( - path = currentPath, - dataSchema = description.dataColumns.toStructType, - context = taskAttemptContext) - - statsTrackers.foreach(_.newFile(currentPath)) - } - - /** - * Open a new output writer when number of records exceeding limit. - * - * @param partitionValues - * the partition which all tuples being written by this `OutputWriter` belong to - * @param bucketId - * the bucket which all tuples being written by this `OutputWriter` belong to - */ - protected def renewCurrentWriterIfTooManyRecords( - partitionValues: Option[InternalRow], - bucketId: Option[Int]): Unit = { - // Exceeded the threshold in terms of the number of records per file. - // Create a new file by increasing the file counter. - fileCounter += 1 - assert( - fileCounter < MAX_FILE_COUNTER, - s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") - renewCurrentWriter(partitionValues, bucketId, closeCurrentWriter = true) - } - - protected def updateRecordsInFile(record: InternalRow): Unit = record match { - case fake: FakeRow => - recordsInFile += fake.batch.numRows() - case _ => recordsInFile += 1 - } - - /** - * Writes the given record with current writer. - * - * @param record - * The record to write - */ - protected def writeRecord(record: InternalRow): Unit = { - val outputRow = getOutputRow(record) - currentWriter.write(outputRow) - statsTrackers.foreach(_.newRow(currentWriter.path, outputRow)) - recordsInFile += 1 - } -} - -/** - * Dynamic partition writer with single writer, meaning only one writer is opened at any time for - * writing. The records to be written are required to be sorted on partition and/or bucket column(s) - * before writing. - */ -class DynamicPartitionDataSingleWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric] = Map.empty) - extends BaseDynamicPartitionDataWriter( - description, - taskAttemptContext, - committer, - customMetrics) { - - private var currentPartitionValues: Option[UnsafeRow] = None - private var currentBucketId: Option[Int] = None - - private val partitionColIndice: Array[Int] = - description.partitionColumns.flatMap { - pcol => - description.allColumns.zipWithIndex.collect { - case (acol, index) if acol.name == pcol.name && acol.exprId == pcol.exprId => index - } - }.toArray - - private def beforeWrite(record: InternalRow): Unit = { - val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None - val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None - - if (currentPartitionValues != nextPartitionValues || currentBucketId != nextBucketId) { - // See a new partition or bucket - write to a new partition dir (or a new bucket file). - if (isPartitioned && currentPartitionValues != nextPartitionValues) { - currentPartitionValues = Some(nextPartitionValues.get.copy()) - statsTrackers.foreach(_.newPartition(currentPartitionValues.get)) - } - if (isBucketed) { - currentBucketId = nextBucketId - } - - fileCounter = 0 - renewCurrentWriter(currentPartitionValues, currentBucketId, closeCurrentWriter = true) - } else if ( - description.maxRecordsPerFile > 0 && - recordsInFile >= description.maxRecordsPerFile - ) { - renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId) - } - } - - override def write(record: InternalRow): Unit = { - record match { - case fakeRow: FakeRow => - if (fakeRow.batch.numRows() > 0) { - val blockStripes = GlutenRowSplitter.getInstance - .splitBlockByPartitionAndBucket(fakeRow, partitionColIndice, isBucketed) - - val iter = blockStripes.iterator() - while (iter.hasNext) { - val blockStripe = iter.next() - val headingRow = blockStripe.getHeadingRow - beforeWrite(headingRow) - writeStripe(new FakeRow(blockStripe.getColumnarBatch)) - } - blockStripes.release() - } - case _ => - beforeWrite(record) - writeRecord(record) - } - } - - protected def writeStripe(record: InternalRow): Unit = { - currentWriter.write(record) - statsTrackers.foreach(_.newRow(currentWriter.path, record)) - updateRecordsInFile(record) - } -} - -/** - * Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened - * for writing. - * - * The process has the following steps: - * - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all - * writers opened and write rows one by one. - * - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or - * bucket column(s). Write rows one by one, and eagerly close the writer when finishing each - * partition and/or bucket. - * - * Caller is expected to call `writeWithIterator()` instead of `write()` to write records. - */ -class DynamicPartitionDataConcurrentWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - concurrentOutputWriterSpec: ConcurrentOutputWriterSpec, - customMetrics: Map[String, SQLMetric] = Map.empty) - extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer, customMetrics) - with Logging { - - /** Wrapper class to index a unique concurrent output writer. */ - private case class WriterIndex(var partitionValues: Option[UnsafeRow], var bucketId: Option[Int]) - - /** Wrapper class for status of a unique concurrent output writer. */ - private class WriterStatus( - var outputWriter: OutputWriter, - var recordsInFile: Long, - var fileCounter: Int) - - /** - * State to indicate if we are falling back to sort-based writer. Because we first try to use - * concurrent writers, its initial value is false. - */ - private var sorted: Boolean = false - private val concurrentWriters = mutable.HashMap[WriterIndex, WriterStatus]() - - /** - * The index for current writer. Intentionally make the index mutable and reusable. Avoid JVM GC - * issue when many short-living `WriterIndex` objects are created if switching between concurrent - * writers frequently. - */ - private val currentWriterId = WriterIndex(None, None) - - /** Release resources for all concurrent output writers. */ - override protected def releaseResources(): Unit = { - currentWriter = null - concurrentWriters.values.foreach( - status => { - if (status.outputWriter != null) { - try { - status.outputWriter.close() - } finally { - status.outputWriter = null - } - } - }) - concurrentWriters.clear() - } - - override def write(record: InternalRow): Unit = { - val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None - val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None - - if ( - currentWriterId.partitionValues != nextPartitionValues || - currentWriterId.bucketId != nextBucketId - ) { - // See a new partition or bucket - write to a new partition dir (or a new bucket file). - if (currentWriter != null) { - if (!sorted) { - // Update writer status in concurrent writers map, because the writer is probably needed - // again later for writing other rows. - updateCurrentWriterStatusInMap() - } else { - // Remove writer status in concurrent writers map and release current writer resource, - // because the writer is not needed any more. - concurrentWriters.remove(currentWriterId) - releaseCurrentWriter() - } - } - - if (isBucketed) { - currentWriterId.bucketId = nextBucketId - } - if (isPartitioned && currentWriterId.partitionValues != nextPartitionValues) { - currentWriterId.partitionValues = Some(nextPartitionValues.get.copy()) - if (!concurrentWriters.contains(currentWriterId)) { - statsTrackers.foreach(_.newPartition(currentWriterId.partitionValues.get)) - } - } - setupCurrentWriterUsingMap() - } - - if ( - description.maxRecordsPerFile > 0 && - recordsInFile >= description.maxRecordsPerFile - ) { - renewCurrentWriterIfTooManyRecords(currentWriterId.partitionValues, currentWriterId.bucketId) - // Update writer status in concurrent writers map, as a new writer is created. - updateCurrentWriterStatusInMap() - } - writeRecord(record) - } - - /** Write iterator of records with concurrent writers. */ - override def writeWithIterator(iterator: Iterator[InternalRow]): Unit = { - var count = 0L - while (iterator.hasNext && !sorted) { - writeWithMetrics(iterator.next(), count) - count += 1 - } - CustomMetrics.updateMetrics(currentMetricsValues, customMetrics) - - if (iterator.hasNext) { - count = 0L - clearCurrentWriterStatus() - val sorter = concurrentOutputWriterSpec.createSorter() - val sortIterator = sorter.sort(iterator.asInstanceOf[Iterator[UnsafeRow]]) - while (sortIterator.hasNext) { - writeWithMetrics(sortIterator.next(), count) - count += 1 - } - CustomMetrics.updateMetrics(currentMetricsValues, customMetrics) - } - } - - /** Update current writer status in map. */ - private def updateCurrentWriterStatusInMap(): Unit = { - val status = concurrentWriters(currentWriterId) - status.outputWriter = currentWriter - status.recordsInFile = recordsInFile - status.fileCounter = fileCounter - } - - /** Retrieve writer in map, or create a new writer if not exists. */ - private def setupCurrentWriterUsingMap(): Unit = { - if (concurrentWriters.contains(currentWriterId)) { - val status = concurrentWriters(currentWriterId) - currentWriter = status.outputWriter - recordsInFile = status.recordsInFile - fileCounter = status.fileCounter - } else { - fileCounter = 0 - renewCurrentWriter( - currentWriterId.partitionValues, - currentWriterId.bucketId, - closeCurrentWriter = false) - if (!sorted) { - assert( - concurrentWriters.size <= concurrentOutputWriterSpec.maxWriters, - s"Number of concurrent output file writers is ${concurrentWriters.size} " + - s" which is beyond max value ${concurrentOutputWriterSpec.maxWriters}" - ) - } else { - assert( - concurrentWriters.size <= concurrentOutputWriterSpec.maxWriters + 1, - s"Number of output file writers after sort is ${concurrentWriters.size} " + - s" which is beyond max value ${concurrentOutputWriterSpec.maxWriters + 1}" - ) - } - concurrentWriters.put( - currentWriterId.copy(), - new WriterStatus(currentWriter, recordsInFile, fileCounter)) - if (concurrentWriters.size >= concurrentOutputWriterSpec.maxWriters && !sorted) { - // Fall back to sort-based sequential writer mode. - logInfo( - s"Number of concurrent writers ${concurrentWriters.size} reaches the threshold. " + - "Fall back from concurrent writers to sort-based sequential writer. You may change " + - s"threshold with configuration ${SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS.key}") - sorted = true - } - } - } - - /** Clear the current writer status in map. */ - private def clearCurrentWriterStatus(): Unit = { - if (currentWriterId.partitionValues.isDefined || currentWriterId.bucketId.isDefined) { - updateCurrentWriterStatusInMap() - } - currentWriterId.partitionValues = None - currentWriterId.bucketId = None - currentWriter = null - recordsInFile = 0 - fileCounter = 0 - } -} - -/** - * Bucketing specification for all the write tasks. - * - * @param bucketIdExpression - * Expression to calculate bucket id based on bucket column(s). - * @param bucketFileNamePrefix - * Prefix of output file name based on bucket id. - */ -case class WriterBucketSpec(bucketIdExpression: Expression, bucketFileNamePrefix: Int => String) - -/** A shared job description for all the write tasks. */ -class WriteJobDescription( - val uuid: String, // prevent collision between different (appending) write jobs - val serializableHadoopConf: SerializableConfiguration, - val outputWriterFactory: OutputWriterFactory, - val allColumns: Seq[Attribute], - val dataColumns: Seq[Attribute], - val partitionColumns: Seq[Attribute], - val bucketSpec: Option[WriterBucketSpec], - val path: String, - val customPartitionLocations: Map[TablePartitionSpec, String], - val maxRecordsPerFile: Long, - val timeZoneId: String, - val statsTrackers: Seq[WriteJobStatsTracker]) - extends Serializable { - - assert( - AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns), - s""" - |All columns: ${allColumns.mkString(", ")} - |Partition columns: ${partitionColumns.mkString(", ")} - |Data columns: ${dataColumns.mkString(", ")} - """.stripMargin - ) -} - -/** The result of a successful write task. */ -case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary) - extends WriterCommitMessage - -/** - * Wrapper class for the metrics of writing data out. - * - * @param updatedPartitions - * the partitions updated during writing data out. Only valid for dynamic partition. - * @param stats - * one `WriteTaskStats` object for every `WriteJobStatsTracker` that the job had. - */ -case class ExecutedWriteSummary(updatedPartitions: Set[String], stats: Seq[WriteTaskStats]) diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala deleted file mode 100644 index cbdf13159c082..0000000000000 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ /dev/null @@ -1,466 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.connector.write.WriterCommitMessage -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.{SerializableConfiguration, Utils} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl - -import java.util.{Date, UUID} - -/** A helper object for writing FileFormat data out to a location. */ -object FileFormatWriter extends Logging { - - /** Describes how output files should be placed in the filesystem. */ - case class OutputSpec( - outputPath: String, - customPartitionLocations: Map[TablePartitionSpec, String], - outputColumns: Seq[Attribute]) - - /** Describes how concurrent output writers should be executed. */ - case class ConcurrentOutputWriterSpec( - maxWriters: Int, - createSorter: () => UnsafeExternalRowSorter) - - /** - * A variable used in tests to check whether the output ordering of the query matches the required - * ordering of the write command. - */ - private[sql] var outputOrderingMatched: Boolean = false - - /** A variable used in tests to check the final executed plan. */ - private[sql] var executedPlan: Option[SparkPlan] = None - - // scalastyle:off argcount - /** - * Basic work flow of this command is: - * 1. Driver side setup, including output committer initialization and data source specific - * preparation work for the write job to be issued. 2. Issues a write job consists of one or - * more executor side tasks, each of which writes all rows within an RDD partition. 3. If no - * exception is thrown in a task, commits that task, otherwise aborts that task; If any - * exception is thrown during task commitment, also aborts that task. 4. If all tasks are - * committed, commit the job, otherwise aborts the job; If any exception is thrown during job - * commitment, also aborts the job. 5. If the job is successfully committed, perform - * post-commit operations such as processing statistics. - * @return - * The set of all partition paths that were updated during this write job. - */ - def write( - sparkSession: SparkSession, - plan: SparkPlan, - fileFormat: FileFormat, - committer: FileCommitProtocol, - outputSpec: OutputSpec, - hadoopConf: Configuration, - partitionColumns: Seq[Attribute], - bucketSpec: Option[BucketSpec], - statsTrackers: Seq[WriteJobStatsTracker], - options: Map[String, String], - numStaticPartitionCols: Int = 0): Set[String] = { - require(partitionColumns.size >= numStaticPartitionCols) - - val job = Job.getInstance(hadoopConf) - job.setOutputKeyClass(classOf[Void]) - job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) - - val partitionSet = AttributeSet(partitionColumns) - // cleanup the internal metadata information of - // the file source metadata attribute if any before write out - val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns - .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)) - val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) - - val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) - val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) - - val caseInsensitiveOptions = CaseInsensitiveMap(options) - - val dataSchema = dataColumns.toStructType - DataSourceUtils.verifySchema(fileFormat, dataSchema) - DataSourceUtils.checkFieldNames(fileFormat, dataSchema) - // Note: prepareWrite has side effect. It sets "job". - val outputWriterFactory = - fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema) - - val description = new WriteJobDescription( - uuid = UUID.randomUUID.toString, - serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), - outputWriterFactory = outputWriterFactory, - allColumns = finalOutputSpec.outputColumns, - dataColumns = dataColumns, - partitionColumns = partitionColumns, - bucketSpec = writerBucketSpec, - path = finalOutputSpec.outputPath, - customPartitionLocations = finalOutputSpec.customPartitionLocations, - maxRecordsPerFile = caseInsensitiveOptions - .get("maxRecordsPerFile") - .map(_.toLong) - .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), - timeZoneId = caseInsensitiveOptions - .get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), - statsTrackers = statsTrackers - ) - - // We should first sort by dynamic partition columns, then bucket id, and finally sorting - // columns. - val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ - writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns - val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) - - // SPARK-40588: when planned writing is disabled and AQE is enabled, - // plan contains an AdaptiveSparkPlanExec, which does not know - // its final plan's ordering, so we have to materialize that plan first - // it is fine to use plan further down as the final plan is cached in that plan - def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { - case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan - case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) - } - - // the sort order doesn't matter - val actualOrdering = writeFilesOpt - .map(_.child) - .getOrElse(materializeAdaptiveSparkPlan(plan)) - .outputOrdering - val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) - - SQLExecution.checkSQLExecutionId(sparkSession) - - // propagate the description UUID into the jobs, so that committers - // get an ID guaranteed to be unique. - job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) - - // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort - // operator based on the required ordering of the V1 write command. So the output - // ordering of the physical plan should always match the required ordering. Here - // we set the variable to verify this behavior in tests. - // There are two cases where FileFormatWriter still needs to add physical sort: - // 1) When the planned write config is disabled. - // 2) When the concurrent writers are enabled (in this case the required ordering of a - // V1 write command will be empty). - if (Utils.isTesting) outputOrderingMatched = orderingMatched - - if (writeFilesOpt.isDefined) { - // build `WriteFilesSpec` for `WriteFiles` - val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => { - val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec) - createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) - } - val writeSpec = WriteFilesSpec( - description = description, - committer = committer, - concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc - ) - executeWrite(sparkSession, plan, writeSpec, job) - } else { - executeWrite( - sparkSession, - plan, - job, - description, - committer, - outputSpec, - requiredOrdering, - partitionColumns, - sortColumns, - orderingMatched) - } - } - // scalastyle:on argcount - - private def executeWrite( - sparkSession: SparkSession, - plan: SparkPlan, - job: Job, - description: WriteJobDescription, - committer: FileCommitProtocol, - outputSpec: OutputSpec, - requiredOrdering: Seq[Expression], - partitionColumns: Seq[Attribute], - sortColumns: Seq[Attribute], - orderingMatched: Boolean): Set[String] = { - val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns) - val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan - - writeAndCommit(job, description, committer) { - val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { - (empty2NullPlan, None) - } else { - val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) - val concurrentOutputWriterSpec = - createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) - if (concurrentOutputWriterSpec.isDefined) { - (empty2NullPlan, concurrentOutputWriterSpec) - } else { - (sortPlan, concurrentOutputWriterSpec) - } - } - - // In testing, this is the only way to get hold of the actually executed plan written to file - if (Utils.isTesting) executedPlan = Some(planToExecute) - - val rdd = planToExecute.execute() - - // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single - // partition rdd to make sure we at least set up one write task to write the metadata. - val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { - sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) - } else { - rdd - } - - val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) - val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) - sparkSession.sparkContext.runJob( - rddWithNonEmptyPartitions, - (taskContext: TaskContext, iter: Iterator[InternalRow]) => { - executeTask( - description = description, - jobTrackerID = jobTrackerID, - sparkStageId = taskContext.stageId(), - sparkPartitionId = taskContext.partitionId(), - sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, - committer, - iterator = iter, - concurrentOutputWriterSpec = concurrentOutputWriterSpec - ) - }, - rddWithNonEmptyPartitions.partitions.indices, - (index, res: WriteTaskResult) => { - committer.onTaskCommit(res.commitMsg) - ret(index) = res - } - ) - ret - } - } - - private def writeAndCommit( - job: Job, - description: WriteJobDescription, - committer: FileCommitProtocol)(f: => Array[WriteTaskResult]): Set[String] = { - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - committer.setupJob(job) - try { - val ret = f - val commitMsgs = ret.map(_.commitMsg) - - logInfo(s"Start to commit write Job ${description.uuid}.") - val (_, duration) = Utils.timeTakenMs(committer.commitJob(job, commitMsgs)) - logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") - - processStats(description.statsTrackers, ret.map(_.summary.stats), duration) - logInfo(s"Finished processing stats for write job ${description.uuid}.") - - // return a set of all the partition paths that were updated during this job - ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) - } catch { - case cause: Throwable => - logError(s"Aborting job ${description.uuid}.", cause) - committer.abortJob(job) - throw cause - } - } - - /** Write files using [[SparkPlan.executeWrite]] */ - private def executeWrite( - session: SparkSession, - planForWrites: SparkPlan, - writeFilesSpec: WriteFilesSpec, - job: Job): Set[String] = { - val committer = writeFilesSpec.committer - val description = writeFilesSpec.description - - // In testing, this is the only way to get hold of the actually executed plan written to file - if (Utils.isTesting) executedPlan = Some(planForWrites) - - writeAndCommit(job, description, committer) { - val rdd = planForWrites.executeWrite(writeFilesSpec) - val ret = new Array[WriteTaskResult](rdd.partitions.length) - session.sparkContext.runJob( - rdd, - (context: TaskContext, iter: Iterator[WriterCommitMessage]) => { - assert(iter.hasNext) - val commitMessage = iter.next() - assert(!iter.hasNext) - commitMessage - }, - rdd.partitions.indices, - (index, res: WriterCommitMessage) => { - assert(res.isInstanceOf[WriteTaskResult]) - val writeTaskResult = res.asInstanceOf[WriteTaskResult] - committer.onTaskCommit(writeTaskResult.commitMsg) - ret(index) = writeTaskResult - } - ) - ret - } - } - - private def createSortPlan( - plan: SparkPlan, - requiredOrdering: Seq[Expression], - outputSpec: OutputSpec): SortExec = { - // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and - // the physical plan may have different attribute ids due to optimizer removing some - // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. - val orderingExpr = - bindReferences(requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) - SortExec(orderingExpr, global = false, child = plan) - } - - private def createConcurrentOutputWriterSpec( - sparkSession: SparkSession, - sortPlan: SortExec, - sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = { - val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters - val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty - if (concurrentWritersEnabled) { - Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())) - } else { - None - } - } - - /** Writes data out in a single Spark task. */ - private[spark] def executeTask( - description: WriteJobDescription, - jobTrackerID: String, - sparkStageId: Int, - sparkPartitionId: Int, - sparkAttemptNumber: Int, - committer: FileCommitProtocol, - iterator: Iterator[InternalRow], - concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { - - val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) - val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) - val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) - - // Set up the attempt context required to use in the output committer. - val taskAttemptContext: TaskAttemptContext = { - // Set up the configuration object - val hadoopConf = description.serializableHadoopConf.value - hadoopConf.set("mapreduce.job.id", jobId.toString) - hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) - hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) - hadoopConf.setBoolean("mapreduce.task.ismap", true) - hadoopConf.setInt("mapreduce.task.partition", 0) - - new TaskAttemptContextImpl(hadoopConf, taskAttemptId) - } - - committer.setupTask(taskAttemptContext) - - val dataWriter = - if (sparkPartitionId != 0 && !iterator.hasNext) { - // In case of empty job, leave first partition to save meta for file format like parquet. - new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) - } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new SingleDirectoryDataWriter(description, taskAttemptContext, committer) - } else { - concurrentOutputWriterSpec match { - case Some(spec) => - new DynamicPartitionDataConcurrentWriter( - description, - taskAttemptContext, - committer, - spec) - case _ => - new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) - } - } - - try { - Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - // Execute the task to write rows out and commit the task. - dataWriter.writeWithIterator(iterator) - dataWriter.commit() - })( - catchBlock = { - // If there is an error, abort the task - dataWriter.abort() - logError(s"Job $jobId aborted.") - }, - finallyBlock = { - dataWriter.close() - }) - } catch { - case e: FetchFailedException => - throw e - case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => - // If any output file to write already exists, it does not make sense to re-run this task. - // We throw the exception and let Executor throw ExceptionFailure to abort the job. - throw new TaskOutputFileAlreadyExistException(f) - case t: Throwable => - throw QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t) - } - } - - /** - * For every registered [[WriteJobStatsTracker]], call `processStats()` on it, passing it the - * corresponding [[WriteTaskStats]] from all executors. - */ - private[datasources] def processStats( - statsTrackers: Seq[WriteJobStatsTracker], - statsPerTask: Seq[Seq[WriteTaskStats]], - jobCommitDuration: Long): Unit = { - - val numStatsTrackers = statsTrackers.length - assert( - statsPerTask.forall(_.length == numStatsTrackers), - s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker. - |There are $numStatsTrackers statsTrackers, but some task returned - |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead. - """.stripMargin - ) - - val statsPerTracker = if (statsPerTask.nonEmpty) { - statsPerTask.transpose - } else { - statsTrackers.map(_ => Seq.empty) - } - - statsTrackers.zip(statsPerTracker).foreach { - case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration) - } - } -} diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala deleted file mode 100644 index 0994bb6e0ff16..0000000000000 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.orc - -import org.apache.spark.TaskContext -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, Utils} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileSplit -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.orc.{OrcUtils => _, _} -import org.apache.orc.OrcConf.COMPRESS -import org.apache.orc.mapred.OrcStruct -import org.apache.orc.mapreduce._ - -import java.io._ - -/** New ORC File Format based on Apache ORC. */ -class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable { - - override def shortName(): String = "orc" - - override def toString: String = "ORC" - - override def hashCode(): Int = getClass.hashCode() - - override def equals(other: Any): Boolean = other.isInstanceOf[OrcFileFormat] - - override def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - OrcUtils.inferSchema(sparkSession, files, options) - } - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) - - val conf = job.getConfiguration - - conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec) - - conf - .asInstanceOf[JobConf] - .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - - val batchSize = sparkSession.sessionState.conf.orcVectorizedWriterBatchSize - - new OutputWriterFactory { - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new OrcOutputWriter(path, dataSchema, context, batchSize) - } - - override def getFileExtension(context: TaskAttemptContext): String = { - val compressionExtension: String = { - val name = context.getConfiguration.get(COMPRESS.getAttribute) - OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "") - } - - compressionExtension + ".orc" - } - } - } - - override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - val conf = sparkSession.sessionState.conf - conf.orcVectorizedReaderEnabled && - schema.forall( - s => - OrcUtils.supportColumnarReads( - s.dataType, - sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) - } - - override def isSplitable( - sparkSession: SparkSession, - options: Map[String, String], - path: Path): Boolean = { - true - } - - /** - * Build the reader. - * - * @note - * It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether the - * reader should return row or columnar output. If the caller can handle both, pass - * FileFormat.OPTION_RETURNING_BATCH -> supportBatch(sparkSession, - * StructType(requiredSchema.fields ++ partitionSchema.fields)) as the option. It should be set - * to "true" only if this reader can support it. - */ - override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - - val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) - val sqlConf = sparkSession.sessionState.conf - val capacity = sqlConf.orcVectorizedReaderBatchSize - - // Should always be set by FileSourceScanExec creating this. - // Check conf before checking option, to allow working around an issue by changing conf. - val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled && - options - .get(FileFormat.OPTION_RETURNING_BATCH) - .getOrElse { - throw new IllegalArgumentException( - "OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " + - "To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.") - } - .equals("true") - if (enableVectorizedReader) { - // If the passed option said that we are to return batches, we need to also be able to - // do this based on config and resultSchema. - assert(supportBatch(sparkSession, resultSchema)) - } - - OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) - - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown - - (file: PartitionedFile) => { - val conf = broadcastedConf.value.value - - val filePath = file.toPath - - val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val orcSchema = - Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions))(_.getSchema) - val resultedColPruneInfo = - OrcUtils.requestedColumnIds(isCaseSensitive, dataSchema, requiredSchema, orcSchema, conf) - - if (resultedColPruneInfo.isEmpty) { - Iterator.empty - } else { - // ORC predicate pushdown - if (orcFilterPushDown && filters.nonEmpty) { - val fileSchema = OrcUtils.toCatalystSchema(orcSchema) - OrcFilters.createFilter(fileSchema, filters).foreach { - f => OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames) - } - } - - val (requestedColIds, canPruneCols) = resultedColPruneInfo.get - val resultSchemaString = OrcUtils.orcResultSchemaString( - canPruneCols, - dataSchema, - resultSchema, - partitionSchema, - conf) - assert( - requestedColIds.length == requiredSchema.length, - "[BUG] requested column IDs do not match required schema") - val taskConf = new Configuration(conf) - - val includeColumns = requestedColIds.filter(_ != -1).sorted.mkString(",") - taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, includeColumns) - val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) - - if (enableVectorizedReader) { - val batchReader = new OrcColumnarBatchReader(capacity) - // SPARK-23399 Register a task completion listener first to call `close()` in all cases. - // There is a possibility that `initialize` and `initBatch` hit some errors (like OOM) - // after opening a file. - val iter = new RecordReaderIterator(batchReader) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) - val requestedPartitionColIds = - Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) - batchReader.initialize(fileSplit, taskAttemptContext) - batchReader.initBatch( - TypeDescription.fromString(resultSchemaString), - resultSchema.fields, - requestedDataColIds, - requestedPartitionColIds, - file.partitionValues) - - iter.asInstanceOf[Iterator[InternalRow]] - } else { - val orcRecordReader = new OrcInputFormat[OrcStruct] - .createRecordReader(fileSplit, taskAttemptContext) - val iter = new RecordReaderIterator[OrcStruct](orcRecordReader) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - val deserializer = new OrcDeserializer(requiredSchema, requestedColIds) - - if (partitionSchema.length == 0) { - iter.map(value => unsafeProjection(deserializer.deserialize(value))) - } else { - val joinedRow = new JoinedRow() - iter.map( - value => - unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues))) - } - } - } - } - } - - override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true - - case st: StructType => st.forall(f => supportDataType(f.dataType)) - - case ArrayType(elementType, _) => supportDataType(elementType) - - case MapType(keyType, valueType, _) => - supportDataType(keyType) && supportDataType(valueType) - - case udt: UserDefinedType[_] => supportDataType(udt.sqlType) - - case _ => false - } -} diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala deleted file mode 100644 index c44092d719752..0000000000000 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ /dev/null @@ -1,515 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.parquet - -import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.mapred.FileSplit -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS -import org.apache.parquet.hadoop._ - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.util.{Failure, Try} - -class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable { - - override def shortName(): String = "parquet" - - override def toString: String = "Parquet" - - override def hashCode(): Int = getClass.hashCode() - - override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat] - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - val sqlConf = sparkSession.sessionState.conf - val parquetOptions = new ParquetOptions(options, sqlConf) - ParquetUtils.prepareWrite(sqlConf, job, dataSchema, parquetOptions) - } - - override def inferSchema( - sparkSession: SparkSession, - parameters: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - ParquetUtils.inferSchema(sparkSession, parameters, files) - } - - /** Returns whether the reader can return the rows as batch or not. */ - override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - val conf = sparkSession.sessionState.conf - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) - } - - override def vectorTypes( - requiredSchema: StructType, - partitionSchema: StructType, - sqlConf: SQLConf): Option[Seq[String]] = { - Option( - Seq.fill(requiredSchema.fields.length)( - if (!sqlConf.offHeapColumnVectorEnabled) { - classOf[OnHeapColumnVector].getName - } else { - classOf[OffHeapColumnVector].getName - } - ) ++ Seq.fill(partitionSchema.fields.length)(classOf[ConstantColumnVector].getName)) - } - - override def isSplitable( - sparkSession: SparkSession, - options: Map[String, String], - path: Path): Boolean = { - true - } - - /** - * Build the reader. - * - * @note - * It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether the - * reader should return row or columnar output. If the caller can handle both, pass - * FileFormat.OPTION_RETURNING_BATCH -> supportBatch(sparkSession, - * StructType(requiredSchema.fields ++ partitionSchema.fields)) as the option. It should be set - * to "true" only if this reader can support it. - */ - override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) - hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, requiredSchema.json) - hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json) - hadoopConf.set( - SQLConf.SESSION_LOCAL_TIMEZONE.key, - sparkSession.sessionState.conf.sessionLocalTimeZone) - hadoopConf.setBoolean( - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - sparkSession.sessionState.conf.nestedSchemaPruningEnabled) - hadoopConf.setBoolean( - SQLConf.CASE_SENSITIVE.key, - sparkSession.sessionState.conf.caseSensitiveAnalysis) - - // Sets flags for `ParquetToSparkSchemaConverter` - hadoopConf.setBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.sessionState.conf.isParquetBinaryAsString) - hadoopConf.setBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) - hadoopConf.setBoolean( - SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, - sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) - hadoopConf.setBoolean( - SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, - sparkSession.sessionState.conf.legacyParquetNanosAsLong) - - val broadcastedHadoopConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - - // TODO: if you move this into the closure it reverts to the default values. - // If true, enable using the custom RecordReader for parquet. This only works for - // a subset of the types (no complex types). - val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) - val sqlConf = sparkSession.sessionState.conf - val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled - val enableVectorizedReader: Boolean = - ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) - val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled - val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion - val capacity = sqlConf.parquetVectorizedReaderBatchSize - val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown - val pushDownDate = sqlConf.parquetFilterPushDownDate - val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp - val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate - val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - val isCaseSensitive = sqlConf.caseSensitiveAnalysis - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead - - // Should always be set by FileSourceScanExec creating this. - // Check conf before checking option, to allow working around an issue by changing conf. - val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled && - options - .get(FileFormat.OPTION_RETURNING_BATCH) - .getOrElse { - throw new IllegalArgumentException( - "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + - "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.") - } - .equals("true") - if (returningBatch) { - // If the passed option said that we are to return batches, we need to also be able to - // do this based on config and resultSchema. - assert(supportBatch(sparkSession, resultSchema)) - } - - (file: PartitionedFile) => { - assert(file.partitionValues.numFields == partitionSchema.size) - - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - - val sharedConf = broadcastedHadoopConf.value.value - - lazy val footerFileMetaData = - ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter(_)) - .reduceOption(FilterApi.and) - } else { - None - } - - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) - } else { - None - } - - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - int96RebaseModeInRead) - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = - new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, - enableOffHeapColumnVector && taskContext.isDefined, - capacity - ) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. - val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize(split, hadoopAttemptContext) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e - } - } else { - logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) - } else { - new ParquetRecordReader[InternalRow](readSupport) - } - val readerWithRowIndexes = - ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, requiredSchema) - val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) - try { - readerWithRowIndexes.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) - } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e - } - } - } - } - - override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true - - case st: StructType => st.forall(f => supportDataType(f.dataType)) - - case ArrayType(elementType, _) => supportDataType(elementType) - - case MapType(keyType, valueType, _) => - supportDataType(keyType) && supportDataType(valueType) - - case udt: UserDefinedType[_] => supportDataType(udt.sqlType) - - case _ => false - } -} - -object ParquetFileFormat extends Logging { - private[parquet] def readSchema( - footers: Seq[Footer], - sparkSession: SparkSession): Option[StructType] = { - - val converter = new ParquetToSparkSchemaConverter( - sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp, - inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled, - nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong - ) - - val seen = mutable.HashSet[String]() - val finalSchemas: Seq[StructType] = footers.flatMap { - footer => - val metadata = footer.getParquetMetadata.getFileMetaData - val serializedSchema = metadata.getKeyValueMetaData.asScala.toMap - .get(ParquetReadSupport.SPARK_METADATA_KEY) - if (serializedSchema.isEmpty) { - // Falls back to Parquet schema if no Spark SQL schema found. - Some(converter.convert(metadata.getSchema)) - } else if (!seen.contains(serializedSchema.get)) { - seen += serializedSchema.get - - // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to - // whatever is available. - Some(Try(DataType.fromJson(serializedSchema.get)) - .recover { - case _: Throwable => - logInfo( - "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - LegacyTypeStringParser.parseString(serializedSchema.get) - } - .recover { - case cause: Throwable => - logWarning( - s"""Failed to parse serialized Spark schema in Parquet key-value metadata: - |\t$serializedSchema - """.stripMargin, - cause - ) - } - .map(_.asInstanceOf[StructType]) - .getOrElse { - // Falls back to Parquet schema if Spark SQL schema can't be parsed. - converter.convert(metadata.getSchema) - }) - } else { - None - } - } - - finalSchemas.reduceOption { - (left, right) => - try left.merge(right) - catch { - case e: Throwable => - throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(left, right, e) - } - } - } - - /** - * Reads Parquet footers in multi-threaded manner. If the config - * "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted files when - * reading footers. - */ - private[parquet] def readParquetFootersInParallel( - conf: Configuration, - partFiles: Seq[FileStatus], - ignoreCorruptFiles: Boolean): Seq[Footer] = { - ThreadUtils - .parmap(partFiles, "readingParquetFooters", 8) { - currentFile => - try { - // Skips row group information since we only need the schema. - // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, - // when it can't read the footer. - Some( - new Footer( - currentFile.getPath(), - ParquetFooterReader.readFooter(conf, currentFile, SKIP_ROW_GROUPS))) - } catch { - case e: RuntimeException => - if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) - None - } else { - throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e) - } - } - } - .flatten - } - - /** - * Figures out a merged Parquet schema with a distributed Spark job. - * - * Note that locality is not taken into consideration here because: - * - * 1. For a single Parquet part-file, in most cases the footer only resides in the last block of - * that file. Thus we only need to retrieve the location of the last block. However, Hadoop - * `FileSystem` only provides API to retrieve locations of all blocks, which can be - * potentially expensive. - * - * 2. This optimization is mainly useful for S3, where file metadata operations can be pretty - * slow. And basically locality is not available when using S3 (you can't run computation on S3 - * nodes). - */ - def mergeSchemasInParallel( - parameters: Map[String, String], - filesToTouch: Seq[FileStatus], - sparkSession: SparkSession): Option[StructType] = { - val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString - val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp - val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled - val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong - - val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { - // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` - val converter = new ParquetToSparkSchemaConverter( - assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp, - inferTimestampNTZ = inferTimestampNTZ, - nanosAsLong = nanosAsLong) - - readParquetFootersInParallel(conf, files, ignoreCorruptFiles) - .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) - } - - SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader) - } - - /** - * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string can - * be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns a - * [[StructType]] converted from the [[org.apache.parquet.schema.MessageType]] stored in this - * footer. - */ - def readSchemaFromFooter(footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = { - val fileMetaData = footer.getParquetMetadata.getFileMetaData - fileMetaData.getKeyValueMetaData.asScala.toMap - .get(ParquetReadSupport.SPARK_METADATA_KEY) - .flatMap(deserializeSchemaString) - .getOrElse(converter.convert(fileMetaData.getSchema)) - } - - private def deserializeSchemaString(schemaString: String): Option[StructType] = { - // Tries to deserialize the schema string as JSON first, then falls back to the case class - // string parser (data generated by older versions of Spark SQL uses this format). - Try(DataType.fromJson(schemaString).asInstanceOf[StructType]) - .recover { - case _: Throwable => - logInfo( - "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - LegacyTypeStringParser.parseString(schemaString).asInstanceOf[StructType] - } - .recoverWith { - case cause: Throwable => - logWarning( - "Failed to parse and ignored serialized Spark schema in " + - s"Parquet key-value metadata:\n\t$schemaString", - cause) - Failure(cause) - } - .toOption - } -}