diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 775206c7d8d6c..01d465279887f 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -21,11 +21,11 @@ import org.apache.gluten.execution.ColumnarToRowExecBase import org.apache.spark.SparkException import org.apache.spark.sql.Dataset -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.constraints.{Constraint, Constraints} -import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeCommitProtocol, TransactionalWrite} +import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeDelayedCommitProtocol, TransactionalWrite} import org.apache.spark.sql.delta.hooks.AutoCompact import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -95,10 +95,27 @@ class ClickhouseOptimisticTransaction( val (queryExecution, output, generatedColumnConstraints, _) = normalizeData(deltaLog, writeOptions, data) - val partitioningColumns = getPartitioningColumns(partitionSchema, output) + val tableV2 = ClickHouseTableV2.getTable(deltaLog) + val bukSpec = if (tableV2.catalogTable.isDefined) { + tableV2.bucketOption + } else { + tableV2.bucketOption.map { + bucketSpec => + CatalogUtils.normalizeBucketSpec( + tableV2.tableName, + output.map(_.name), + bucketSpec, + spark.sessionState.conf.resolver) + } + } val committer = - new MergeTreeCommitProtocol("delta-mergetree", outputPath.toString, None, None) + new MergeTreeDelayedCommitProtocol( + outputPath.toString, + None, + None, + tableV2.dataBaseName, + tableV2.tableName) // val (optionalStatsTracker, _) = // getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data) @@ -108,10 +125,15 @@ class ClickhouseOptimisticTransaction( Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) { - val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output) - val queryPlan = queryExecution.executedPlan val newQueryPlan = insertFakeRowAdaptor(queryPlan) + assert(output.size == newQueryPlan.output.size) + val newOutput = newQueryPlan.output.zip(output).map { + case (newAttr, oldAttr) => + oldAttr.withExprId(newAttr.exprId) + } + val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput) + val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput) val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() @@ -148,7 +170,6 @@ class ClickhouseOptimisticTransaction( }) try { - val tableV2 = ClickHouseTableV2.getTable(deltaLog) val format = tableV2.getFileFormat(protocol, metadata) GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName())) MergeTreeFileFormatWriter.write( @@ -163,7 +184,7 @@ class ClickhouseOptimisticTransaction( .newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options), // scalastyle:on deltahadoopconfiguration partitionColumns = partitioningColumns, - bucketSpec = tableV2.bucketOption, + bucketSpec = bukSpec, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, options = options, constraints = constraints diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/files/MergeTreeCommitProtocol.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/files/MergeTreeCommitProtocol.scala deleted file mode 100644 index 33dbce138a426..0000000000000 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/files/MergeTreeCommitProtocol.scala +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.delta.files - -// scalastyle:off import.ordering.noEmptyLine -import java.util.UUID - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql.delta.DeltaErrors -import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} -import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_LOCATION, CDC_PARTITION_COL} -import org.apache.spark.sql.delta.util.{DateFormatter, PartitionUtils, TimestampFormatter, Utils => DeltaUtils} -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} - -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.spark.sql.types.StringType - -/** - * This file is copied from the DelayedCommitProtocol of the Delta 3.2.0 - * and renamed to MergeTreeCommitProtocol. - * It is modified to overcome the following issues: - * 1. the function commitTask will return TaskCommitMessage(Nil), - * the FileStatus list will be get from the CH backend. - */ - -/** - * Writes out the files to `path` and returns a list of them in `addedStatuses`. Includes - * special handling for partitioning on [[CDC_PARTITION_COL]] for - * compatibility between enabled and disabled CDC; partitions with a value of false in this - * column produce no corresponding partitioning directory. - * @param path The base path files will be written - * @param randomPrefixLength The length of random subdir name under 'path' that files been written - * @param subdir The immediate subdir under path; If randomPrefixLength and subdir both exist, file - * path will be path/subdir/[rand str of randomPrefixLength]/file - */ -class MergeTreeCommitProtocol( - jobId: String, - path: String, - randomPrefixLength: Option[Int], - subdir: Option[String]) - extends FileCommitProtocol with Serializable with Logging { - // Track the list of files added by a task, only used on the executors. - @transient protected var addedFiles: ArrayBuffer[(Map[String, String], String)] = _ - - // Track the change files added, only used on the driver. Files are sorted between this buffer - // and addedStatuses based on the value of the [[CDC_TYPE_COLUMN_NAME]] partition column - a - // file goes to addedStatuses if the value is CDC_TYPE_NOT_CDC and changeFiles otherwise. - @transient val changeFiles = new ArrayBuffer[AddCDCFile] - - // Track the overall files added, only used on the driver. - // - // In rare cases, some of these AddFiles can be empty (i.e. contain no logical records). - // If the caller wishes to have only non-empty AddFiles, they must collect stats and perform - // the filter themselves. See TransactionalWrite::writeFiles. This filter will be best-effort, - // since there's no guarantee the stats will exist. - @transient val addedStatuses = new ArrayBuffer[AddFile] - - val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" - - // Constants for CDC partition manipulation. Used only in newTaskTempFile(), but we define them - // here to avoid building a new redundant regex for every file. - protected val cdcPartitionFalse = s"${CDC_PARTITION_COL}=false" - protected val cdcPartitionTrue = s"${CDC_PARTITION_COL}=true" - protected val cdcPartitionTrueRegex = cdcPartitionTrue.r - - override def setupJob(jobContext: JobContext): Unit = { - - } - - /** - * Commits a job after the writes succeed. Must be called on the driver. Partitions the written - * files into [[AddFile]]s and [[AddCDCFile]]s as these metadata actions are treated differently - * by [[TransactionalWrite]] (i.e. AddFile's may have additional statistics injected) - */ - override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { - val (addFiles, changeFiles) = taskCommits.flatMap(_.obj.asInstanceOf[Seq[_]]) - .partition { - case _: AddFile => true - case _: AddCDCFile => false - case other => - throw DeltaErrors.unrecognizedFileAction(s"$other", s"${other.getClass}") - } - - // we cannot add type information above because of type erasure - addedStatuses ++= addFiles.map(_.asInstanceOf[AddFile]) - this.changeFiles ++= changeFiles.map(_.asInstanceOf[AddCDCFile]).toArray[AddCDCFile] - } - - override def abortJob(jobContext: JobContext): Unit = { - // TODO: Best effort cleanup - } - - override def setupTask(taskContext: TaskAttemptContext): Unit = { - addedFiles = new ArrayBuffer[(Map[String, String], String)] - } - - protected def getFileName( - taskContext: TaskAttemptContext, - ext: String, - partitionValues: Map[String, String]): String = { - // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, - // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - val uuid = UUID.randomUUID.toString - // CDC files (CDC_PARTITION_COL = true) are named with "cdc-..." instead of "part-...". - if (partitionValues.get(CDC_PARTITION_COL).contains("true")) { - f"cdc-$split%05d-$uuid$ext" - } else { - f"part-$split%05d-$uuid$ext" - } - } - - protected def parsePartitions(dir: String): Map[String, String] = { - // TODO: timezones? - // TODO: enable validatePartitionColumns? - val dateFormatter = DateFormatter() - val timestampFormatter = - TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault) - val parsedPartition = - PartitionUtils - .parsePartition( - new Path(dir), - typeInference = false, - Set.empty, - Map.empty, - validatePartitionColumns = false, - java.util.TimeZone.getDefault, - dateFormatter, - timestampFormatter) - ._1 - .get - parsedPartition - .columnNames - .zip( - parsedPartition - .literals - .map(l => Cast(l, StringType).eval()) - .map(Option(_).map(_.toString).orNull)) - .toMap - } - - /** - * Notifies the commit protocol to add a new file, and gets back the full path that should be - * used. - * - * Includes special logic for CDC files and paths. Specifically, if the directory `dir` contains - * the CDC partition `__is_cdc=true` then - * - the file name begins with `cdc-` instead of `part-` - * - the directory has the `__is_cdc=true` partition removed and is placed in the `_changed_data` - * folder - */ - override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String]) - val filename = getFileName(taskContext, ext, partitionValues) - val relativePath = randomPrefixLength.map { prefixLength => - DeltaUtils.getRandomPrefix(prefixLength) // Generate a random prefix as a first choice - }.orElse { - dir // or else write into the partition directory if it is partitioned - }.map { subDir => - // Do some surgery on the paths we write out to eliminate the CDC_PARTITION_COL. Non-CDC - // data is written to the base location, while CDC data is written to a special folder - // _change_data. - // The code here gets a bit complicated to accommodate two corner cases: an empty subdir - // can't be passed to new Path() at all, and a single-level subdir won't have a trailing - // slash. - if (subDir == cdcPartitionFalse) { - new Path(filename) - } else if (subDir.startsWith(cdcPartitionTrue)) { - val cleanedSubDir = cdcPartitionTrueRegex.replaceFirstIn(subDir, CDC_LOCATION) - new Path(cleanedSubDir, filename) - } else if (subDir.startsWith(cdcPartitionFalse)) { - // We need to remove the trailing slash in addition to the directory - otherwise - // it'll be interpreted as an absolute path and fail. - val cleanedSubDir = subDir.stripPrefix(cdcPartitionFalse + "/") - new Path(cleanedSubDir, filename) - } else { - new Path(subDir, filename) - } - }.getOrElse(new Path(filename)) // or directly write out to the output path - - val relativePathWithSubdir = subdir.map(new Path(_, relativePath)).getOrElse(relativePath) - addedFiles.append((partitionValues, relativePathWithSubdir.toUri.toString)) - new Path(path, relativePathWithSubdir).toString - } - - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - throw DeltaErrors.unsupportedAbsPathAddFile(s"$this") - } - - protected def buildActionFromAddedFile( - f: (Map[String, String], String), - stat: FileStatus, - taskContext: TaskAttemptContext): FileAction = { - // The partitioning in the Delta log action will be read back as part of the data, so our - // virtual CDC_PARTITION_COL needs to be stripped out. - val partitioning = f._1.filter { case (k, v) => k != CDC_PARTITION_COL } - f._1.get(CDC_PARTITION_COL) match { - case Some("true") => - val partitioning = f._1.filter { case (k, v) => k != CDC_PARTITION_COL } - AddCDCFile(f._2, partitioning, stat.getLen) - case _ => - val addFile = AddFile(f._2, partitioning, stat.getLen, stat.getModificationTime, true) - addFile - } - } - - override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { - // --- modified start - /* if (addedFiles.nonEmpty) { - val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration) - val statuses: Seq[FileAction] = addedFiles.map { f => - // scalastyle:off pathfromuri - val filePath = new Path(path, new Path(new URI(f._2))) - // scalastyle:on pathfromuri - val stat = fs.getFileStatus(filePath) - - buildActionFromAddedFile(f, stat, taskContext) - }.toSeq - - new TaskCommitMessage(statuses) - } else { - new TaskCommitMessage(Nil) - } */ - // --- modified end - new TaskCommitMessage(Nil) - } - - override def abortTask(taskContext: TaskAttemptContext): Unit = { - // TODO: we can also try delete the addedFiles as a best-effort cleanup. - } -} - diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/files/MergeTreeDelayedCommitProtocol.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/files/MergeTreeDelayedCommitProtocol.scala new file mode 100644 index 0000000000000..f2c22234a6922 --- /dev/null +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/files/MergeTreeDelayedCommitProtocol.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.files + +class MergeTreeDelayedCommitProtocol( + val outputPath: String, + randomPrefixLength: Option[Int], + subdir: Option[String], + val database: String, + val tableName: String) + extends DelayedCommitProtocol("delta-mergetree", outputPath, randomPrefixLength, subdir) + with MergeTreeFileCommitProtocol {} diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala index 57c1a7ce815ab..8f3c105c3a334 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala @@ -20,140 +20,27 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.write.DataWriter -import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType -import org.apache.spark.util.Utils import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext import scala.collection.mutable -/** - * Abstract class for writing out data in a single Spark task. Exceptions thrown by the - * implementation of this trait will automatically trigger task aborts. - */ -abstract class MergeTreeFileFormatDataWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric]) - extends DataWriter[InternalRow] { - - /** - * Max number of files a single task writes out due to file size. In most cases the number of - * files written should be very small. This is just a safe guard to protect some really bad - * settings, e.g. maxRecordsPerFile = 1. - */ - protected val MAX_FILE_COUNTER: Int = 1000 * 1000 - protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]() - protected var currentWriter: OutputWriter = _ - - protected val returnedMetrics: mutable.Map[String, AddFile] = mutable.HashMap[String, AddFile]() - - /** Trackers for computing various statistics on the data as it's being written out. */ - protected val statsTrackers: Seq[WriteTaskStatsTracker] = - description.statsTrackers.map(_.newTaskInstance()) - - /** Release resources of `currentWriter`. */ - protected def releaseCurrentWriter(): Unit = { - if (currentWriter != null) { - try { - currentWriter.close() - statsTrackers.foreach(_.closeFile(currentWriter.path())) - currentWriter - .asInstanceOf[MergeTreeOutputWriter] - .getAddFiles - .foreach(addFile => returnedMetrics.put(addFile.path, addFile)) - } finally { - currentWriter = null - } - } - } - - /** Release all resources. */ - protected def releaseResources(): Unit = { - // Call `releaseCurrentWriter()` by default, as this is the only resource to be released. - releaseCurrentWriter() - } - - /** Writes a record. */ - def write(record: InternalRow): Unit - - def writeWithMetrics(record: InternalRow, count: Long): Unit = { - if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) { - CustomMetrics.updateMetrics(currentMetricsValues, customMetrics) - } - write(record) - } - - /** Write an iterator of records. */ - def writeWithIterator(iterator: Iterator[InternalRow]): Unit = { - var count = 0L - while (iterator.hasNext) { - writeWithMetrics(iterator.next(), count) - count += 1 - } - CustomMetrics.updateMetrics(currentMetricsValues, customMetrics) - } - - /** - * Returns the summary of relative information which includes the list of partition strings - * written out. The list of partitions is sent back to the driver and used to update the catalog. - * Other information will be sent back to the driver too and used to e.g. update the metrics in - * UI. - */ - override def commit(): WriteTaskResult = { - releaseResources() - val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { - // committer.commitTask(taskAttemptContext) - val statuses = returnedMetrics.map(_._2).toSeq - new TaskCommitMessage(statuses) - } - - val summary = ExecutedWriteSummary( - updatedPartitions = updatedPartitions.toSet, - stats = statsTrackers.map(_.getFinalStats(taskCommitTime))) - WriteTaskResult(taskCommitMessage, summary) - } - - def abort(): Unit = { - try { - releaseResources() - } finally { - committer.abortTask(taskAttemptContext) - } - } - - override def close(): Unit = {} -} - -/** FileFormatWriteTask for empty partitions */ -class MergeTreeEmptyDirectoryDataWriter( - description: WriteJobDescription, - taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - customMetrics: Map[String, SQLMetric] = Map.empty -) extends MergeTreeFileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - override def write(record: InternalRow): Unit = {} -} - /** Writes data to a single directory (used for non-dynamic-partition writes). */ -class MergeTreeSingleDirectoryDataWriter( +class SingleDirectoryDataWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, customMetrics: Map[String, SQLMetric] = Map.empty) - extends MergeTreeFileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { + extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { private var fileCounter: Int = _ private var recordsInFile: Long = _ // Initialize currentWriter and statsTrackers @@ -165,16 +52,10 @@ class MergeTreeSingleDirectoryDataWriter( val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val currentPath = - committer.newTaskTempFile( - taskAttemptContext, - None, - FileNameSpec( - "", - f"-c$fileCounter%03d" + - ext)) + committer.newTaskTempFile(taskAttemptContext, None, f"-c$fileCounter%03d" + ext) currentWriter = description.outputWriterFactory.newInstance( - path = description.path, + path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) @@ -207,18 +88,18 @@ class MergeTreeSingleDirectoryDataWriter( * Holds common logic for writing data with dynamic partition writes, meaning it can write to * multiple directories (partitions) or files (bucketing). */ -abstract class MergeTreeBaseDynamicPartitionDataWriter( +abstract class BaseDynamicPartitionDataWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, customMetrics: Map[String, SQLMetric]) - extends MergeTreeFileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { + extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - /** Flag saying whether the data to be written out is partitioned. */ - protected val isPartitioned: Boolean = description.partitionColumns.nonEmpty + /** Flag saying whether or not the data to be written out is partitioned. */ + protected val isPartitioned = description.partitionColumns.nonEmpty - /** Flag saying whether the data to be written out is bucketed. */ - protected val isBucketed: Boolean = description.bucketSpec.isDefined + /** Flag saying whether or not the data to be written out is bucketed. */ + protected val isBucketed = description.bucketSpec.isDefined assert( isPartitioned || isBucketed, @@ -298,11 +179,10 @@ abstract class MergeTreeBaseDynamicPartitionDataWriter( releaseCurrentWriter() } - val partDir = - partitionValues.map(getPartitionPath(_)).map(str => new Path(str).toUri.toASCIIString) + val partDir = partitionValues.map(getPartitionPath(_)) partDir.foreach(updatedPartitions.add) - val bucketIdStr = bucketId.map(id => f"$id%05d").getOrElse("") + val bucketIdStr = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") // The prefix and suffix must be in a form that matches our bucketing format. See BucketingUtils // for details. The prefix is required to represent bucket id when writing Hive-compatible @@ -324,15 +204,8 @@ abstract class MergeTreeBaseDynamicPartitionDataWriter( committer.newTaskTempFile(taskAttemptContext, partDir, fileNameSpec) } - taskAttemptContext.getConfiguration.set( - "mapreduce.task.gluten.mergetree.partition.dir", - partDir.getOrElse("")) - taskAttemptContext.getConfiguration.set( - "mapreduce.task.gluten.mergetree.bucketid.str", - bucketIdStr) - currentWriter = description.outputWriterFactory.newInstance( - path = description.path, + path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) @@ -384,12 +257,12 @@ abstract class MergeTreeBaseDynamicPartitionDataWriter( * writing. The records to be written are required to be sorted on partition and/or bucket column(s) * before writing. */ -class MergeTreeDynamicPartitionDataSingleWriter( +class DynamicPartitionDataSingleWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, customMetrics: Map[String, SQLMetric] = Map.empty) - extends MergeTreeBaseDynamicPartitionDataWriter( + extends BaseDynamicPartitionDataWriter( description, taskAttemptContext, committer, @@ -435,11 +308,7 @@ class MergeTreeDynamicPartitionDataSingleWriter( case fakeRow: FakeRow => if (fakeRow.batch.numRows() > 0) { val blockStripes = GlutenFormatFactory.rowSplitter - .splitBlockByPartitionAndBucket( - fakeRow, - partitionColIndice, - isBucketed, - reserve_partition_columns = true) + .splitBlockByPartitionAndBucket(fakeRow, partitionColIndice, isBucketed) val iter = blockStripes.iterator() while (iter.hasNext) { @@ -478,17 +347,13 @@ class MergeTreeDynamicPartitionDataSingleWriter( * * Caller is expected to call `writeWithIterator()` instead of `write()` to write records. */ -class MergeTreeDynamicPartitionDataConcurrentWriter( +class DynamicPartitionDataConcurrentWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, concurrentOutputWriterSpec: ConcurrentOutputWriterSpec, customMetrics: Map[String, SQLMetric] = Map.empty) - extends MergeTreeBaseDynamicPartitionDataWriter( - description, - taskAttemptContext, - committer, - customMetrics) + extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer, customMetrics) with Logging { /** Wrapper class to index a unique concurrent output writer. */ @@ -522,10 +387,6 @@ class MergeTreeDynamicPartitionDataConcurrentWriter( if (status.outputWriter != null) { try { status.outputWriter.close() - status.outputWriter - .asInstanceOf[MergeTreeOutputWriter] - .getAddFiles - .foreach(addFile => returnedMetrics.put(addFile.path, addFile)) } finally { status.outputWriter = null } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala index 9a64c7fdf743f..beb73a372556e 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.v1.clickhouse import org.apache.gluten.execution.datasource.GlutenFormatFactory -import org.apache.gluten.memory.CHThreadGroup import org.apache.spark.{SparkException, TaskContext, TaskOutputFileAlreadyExistException} import org.apache.spark.internal.Logging @@ -32,9 +31,10 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.delta.constraints.Constraint import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceUtils, EmptyDirectoryDataWriter, FileFormat, IFakeRowAdaptor, WriteJobDescription, WriteJobStatsTracker, WriterBucketSpec, WriteTaskResult} import org.apache.spark.sql.execution.datasources.FileFormatWriter.{processStats, ConcurrentOutputWriterSpec, OutputSpec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StringType import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.hadoop.conf.Configuration @@ -64,42 +64,78 @@ object MergeTreeFileFormatWriter extends Logging { constraints: Seq[Constraint], numStaticPartitionCols: Int = 0): Set[String] = { - assert(plan.isInstanceOf[IFakeRowAdaptor]) + val nativeEnabled = + "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") + val staticPartitionWriteOnly = + "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly") + + if (nativeEnabled) { + logInfo("Use Gluten partition write for hive") + assert(plan.isInstanceOf[IFakeRowAdaptor]) + } val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - - val outputPath = new Path(outputSpec.outputPath) - val outputPathName = outputPath.toString - - FileOutputFormat.setOutputPath(job, outputPath) + FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) val partitionSet = AttributeSet(partitionColumns) // cleanup the internal metadata information of // the file source metadata attribute if any before write out - // val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns - // .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)) - val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns) + val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns + .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)) val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) - // TODO: check whether it needs to use `convertEmptyToNullIfNeeded` to convert empty to null - val empty2NullPlan = plan // convertEmptyToNullIfNeeded(plan, partitionColumns, constraints) + var needConvert = false + val projectList: Seq[NamedExpression] = plan.output.map { + case p if partitionSet.contains(p) && p.dataType == StringType && p.nullable => + needConvert = true + Alias(Empty2Null(p), p.name)() + case attr => attr + } + + val empty2NullPlan = if (staticPartitionWriteOnly && nativeEnabled) { + // Velox backend only support static partition write. + // And no need to add sort operator for static partition write. + plan + } else { + if (needConvert) ProjectExec(projectList, plan) else plan + } val writerBucketSpec = bucketSpec.map { spec => - val bucketColumns = - spec.bucketColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get) - // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id - // expression, so that we can guarantee the data distribution is same between shuffle and - // bucketed data source, which enables us to only shuffle one side when join a bucketed - // table and a normal one. - val bucketIdExpression = - HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression - WriterBucketSpec(bucketIdExpression, (_: Int) => "") + val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + + if ( + options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") == + "true" + ) { + // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression. + // Without the extra bitwise-and operation, we can get wrong bucket id when hash value of + // columns is negative. See Hive implementation in + // `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`. + val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue)) + val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets)) + + // The bucket file name prefix is following Hive, Presto and Trino conversion, so this + // makes sure Hive bucketed table written by Spark, can be read by other SQL engines. + // + // Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`. + // Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`. + val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_" + WriterBucketSpec(bucketIdExpression, fileNamePrefix) + } else { + // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id + // expression, so that we can guarantee the data distribution is same between shuffle and + // bucketed data source, which enables us to only shuffle one side when join a bucketed + // table and a normal one. + val bucketIdExpression = + HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression + WriterBucketSpec(bucketIdExpression, (_: Int) => "") + } } val sortColumns = bucketSpec.toSeq.flatMap { - spec => spec.sortColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get) + spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) } val caseInsensitiveOptions = CaseInsensitiveMap(options) @@ -118,7 +154,7 @@ object MergeTreeFileFormatWriter extends Logging { dataColumns = dataColumns, partitionColumns = partitionColumns, bucketSpec = writerBucketSpec, - path = outputPathName, + path = finalOutputSpec.outputPath, customPartitionLocations = finalOutputSpec.customPartitionLocations, maxRecordsPerFile = caseInsensitiveOptions .get("maxRecordsPerFile") @@ -159,11 +195,9 @@ object MergeTreeFileFormatWriter extends Logging { if (writerBucketSpec.isDefined) { // We need to add the bucket id expression to the output of the sort plan, // so that we can use backend to calculate the bucket id for each row. - val bucketValueExpr = bindReferences( - Seq(writerBucketSpec.get.bucketIdExpression), - finalOutputSpec.outputColumns) - wrapped = - ProjectExec(wrapped.output :+ Alias(bucketValueExpr.head, "__bucket_value__")(), wrapped) + wrapped = ProjectExec( + wrapped.output :+ Alias(writerBucketSpec.get.bucketIdExpression, "__bucket_value__")(), + wrapped) // TODO: to optimize, bucket value is computed twice here } @@ -173,7 +207,11 @@ object MergeTreeFileFormatWriter extends Logging { try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - nativeWrap(empty2NullPlan) + if (!nativeEnabled || (staticPartitionWriteOnly && nativeEnabled)) { + (empty2NullPlan.execute(), None) + } else { + nativeWrap(empty2NullPlan) + } } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some @@ -185,7 +223,7 @@ object MergeTreeFileFormatWriter extends Logging { val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters var concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty - if (concurrentWritersEnabled) { + if (nativeEnabled && concurrentWritersEnabled) { log.warn( s"spark.sql.maxConcurrentOutputFileWriters(being set to $maxWriters) will be " + "ignored when native writer is being active. No concurrent Writers.") @@ -197,7 +235,16 @@ object MergeTreeFileFormatWriter extends Logging { empty2NullPlan.execute(), Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))) } else { - nativeWrap(sortPlan) + if (staticPartitionWriteOnly && nativeEnabled) { + // remove the sort operator for static partition write. + (empty2NullPlan.execute(), None) + } else { + if (!nativeEnabled) { + (sortPlan.execute(), None) + } else { + nativeWrap(sortPlan) + } + } } } @@ -252,7 +299,8 @@ object MergeTreeFileFormatWriter extends Logging { } // scalastyle:on argcount - def executeTask( + /** Writes data out in a single Spark task. */ + private def executeTask( description: WriteJobDescription, jobIdInstant: Long, sparkStageId: Int, @@ -260,9 +308,8 @@ object MergeTreeFileFormatWriter extends Logging { sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow], - concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec] - ): WriteTaskResult = { - CHThreadGroup.registerNewThreadGroup() + concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { + val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) @@ -286,22 +333,19 @@ object MergeTreeFileFormatWriter extends Logging { if (sparkPartitionId != 0 && !iterator.hasNext) { // In case of empty job, // leave first partition to save meta for file format like parquet/orc. - new MergeTreeEmptyDirectoryDataWriter(description, taskAttemptContext, committer) + new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new MergeTreeSingleDirectoryDataWriter(description, taskAttemptContext, committer) + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) } else { concurrentOutputWriterSpec match { case Some(spec) => - new MergeTreeDynamicPartitionDataConcurrentWriter( + new DynamicPartitionDataConcurrentWriter( description, taskAttemptContext, committer, spec) case _ => - new MergeTreeDynamicPartitionDataSingleWriter( - description, - taskAttemptContext, - committer) + new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) } }