diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala new file mode 100644 index 000000000000..e949bebf236a --- /dev/null +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.exception.GlutenNotSupportException + +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.execution.datasources.WriteJobDescription + +object CHDeltaColumnarWrite { + def apply( + jobTrackerID: String, + description: WriteJobDescription, + committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = + throw new GlutenNotSupportException("Delta Native is not supported in Spark 3.2") +} diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala new file mode 100644 index 000000000000..0a1aee5c4bfb --- /dev/null +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.exception.GlutenNotSupportException + +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.execution.datasources.WriteJobDescription + +object CHDeltaColumnarWrite { + def apply( + jobTrackerID: String, + description: WriteJobDescription, + committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = + throw new GlutenNotSupportException("Delta Native is not supported in Spark 3.3") +} diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 6eec68efece3..e023d3d7cbe7 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -25,17 +25,21 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.constraints.{Constraint, Constraints} -import org.apache.spark.sql.delta.files.MergeTreeCommitProtocol -import org.apache.spark.sql.delta.schema.InvariantViolationException +import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeCommitProtocol, TransactionalWrite} +import org.apache.spark.sql.delta.hooks.AutoCompact +import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker +import org.apache.spark.sql.execution.{CHDelayedCommitProtocol, QueryExecution, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker} +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteFiles, WriteJobStatsTracker} import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SerializableConfiguration import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.hadoop.fs.Path import scala.collection.mutable.ListBuffer @@ -190,4 +194,158 @@ class ClickhouseOptimisticTransaction( super.writeFiles(inputData, writeOptions, additionalConstraints) } } + + private def shouldOptimizeWrite( + writeOptions: Option[DeltaOptions], + sessionConf: SQLConf): Boolean = { + writeOptions + .flatMap(_.optimizeWrite) + .getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf)) + } + + override protected def getCommitter(outputPath: Path): DelayedCommitProtocol = + new CHDelayedCommitProtocol("delta", outputPath.toString, None, deltaDataSubdir) + + override def writeFiles( + inputData: Dataset[_], + writeOptions: Option[DeltaOptions], + isOptimize: Boolean, + additionalConstraints: Seq[Constraint]): Seq[FileAction] = { + + if (isOptimize) + throw new UnsupportedOperationException("Optimize is not supported for ClickHouse") + + hasWritten = true + + val spark = inputData.sparkSession + val (data, partitionSchema) = performCDCPartition(inputData) + val outputPath = deltaLog.dataPath + + val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats. + + // Iceberg spec requires partition columns in data files + val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata) + // Retain only a minimal selection of Spark writer options to avoid any potential + // compatibility issues + val options = (writeOptions match { + case None => Map.empty[String, String] + case Some(writeOptions) => + writeOptions.options.filterKeys { + key => + key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || + key.equalsIgnoreCase(DeltaOptions.COMPRESSION) + }.toMap + }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString) + + val (normalQueryExecution, output, generatedColumnConstraints, _) = + normalizeData(deltaLog, writeOptions, data) + val partitioningColumns = getPartitioningColumns(partitionSchema, output) + + val logicalPlan = normalQueryExecution.optimizedPlan + val write = + WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, Map.empty) + + val queryExecution = new QueryExecution(spark, write) + val committer = getCommitter(outputPath) + + // If Statistics Collection is enabled, then create a stats tracker that will be injected during + // the FileFormatWriter.write call below and will collect per-file stats using + // StatisticsCollection + // val (optionalStatsTracker, _) = + // getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data) + val optionalStatsTracker: Option[DeltaJobStatisticsTracker] = None + + val constraints = + Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints + + SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) { + val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output) + + val physicalPlan = materializeAdaptiveSparkPlan(queryExecution.executedPlan) + // convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints) + /* val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints) + // No need to plan optimized write if the write command is OPTIMIZE, which aims to produce + // evenly-balanced data files already. + val physicalPlan = + if ( + !isOptimize && + shouldOptimizeWrite(writeOptions, spark.sessionState.conf) + ) { + DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog) + } else { + checkInvariants + }*/ + val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() + + if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) { + val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker( + new SerializableConfiguration(deltaLog.newDeltaHadoopConf()), + BasicWriteJobStatsTracker.metrics) + registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics) + statsTrackers.append(basicWriteJobStatsTracker) + } + + try { + DeltaFileFormatWriter.write( + sparkSession = spark, + plan = physicalPlan, + fileFormat = fileFormat, + committer = committer, + outputSpec = outputSpec, + // scalastyle:off deltahadoopconfiguration + hadoopConf = + spark.sessionState.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options), + // scalastyle:on deltahadoopconfiguration + partitionColumns = partitioningColumns, + bucketSpec = None, + statsTrackers = optionalStatsTracker.toSeq + ++ statsTrackers, + options = options + ) + } catch { + case InnerInvariantViolationException(violationException) => + // Pull an InvariantViolationException up to the top level if it was the root cause. + throw violationException + } + } + + var resultFiles = + (if (optionalStatsTracker.isDefined) { + committer.addedStatuses.map { + a => + a.copy(stats = + optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats)) + } + } else { + committer.addedStatuses + }) + .filter { + // In some cases, we can write out an empty `inputData`. Some examples of this (though, they + // may be fixed in the future) are the MERGE command when you delete with empty source, or + // empty target, or on disjoint tables. This is hard to catch before the write without + // collecting the DF ahead of time. Instead, we can return only the AddFiles that + // a) actually add rows, or + // b) don't have any stats so we don't know the number of rows at all + case a: AddFile => a.numLogicalRecords.forall(_ > 0) + case _ => true + } + + // add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles + if (IcebergCompatV2.isEnabled(metadata)) { + resultFiles = resultFiles.map { + addFile => + val tags = if (addFile.tags != null) addFile.tags else Map.empty[String, String] + addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name -> "2")) + } + } + + if (resultFiles.nonEmpty && !isOptimize) registerPostCommitHook(AutoCompact) + + resultFiles.toSeq ++ committer.changeFiles + } + + private def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p + } } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala new file mode 100644 index 000000000000..66f502038fcd --- /dev/null +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.exception.GlutenNotSupportException + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.delta.files.DelayedCommitProtocol +import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, WriteJobDescription, WriteTaskResult} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** A Wrapper of [[DelayedCommitProtocol]] for accessing protected methods and fields. */ +class CHDelayedCommitProtocol( + jobId: String, + val outputPath: String, + randomPrefixLength: Option[Int], + subdir: Option[String]) + extends DelayedCommitProtocol(jobId, outputPath, randomPrefixLength, subdir) { + + override def getFileName( + taskContext: TaskAttemptContext, + ext: String, + partitionValues: Map[String, String]): String = { + super.getFileName(taskContext, ext, partitionValues) + } + + def updateAddedFiles(files: Seq[(Map[String, String], String)]): Unit = { + assert(addedFiles.isEmpty) + addedFiles ++= files + } + + override def parsePartitions(dir: String): Map[String, String] = + super.parsePartitions(dir) +} + +case class CHDelayedCommitProtocolWrite( + override val jobTrackerID: String, + override val description: WriteJobDescription, + override val committer: CHDelayedCommitProtocol) + extends CHColumnarWrite[CHDelayedCommitProtocol] + with Logging { + + override def doSetupNativeTask(): Unit = { + assert(description.path == committer.outputPath) + val nameSpec = CreateFileNameSpec(taskAttemptContext, description) + val writePath = description.path + val writeFileName = committer.getFileName(taskAttemptContext, nameSpec.suffix, Map.empty) + logDebug(s"Native staging write path: $writePath and file name: $writeFileName") + BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, writeFileName) + } + + private def doCollectNativeResult( + cb: ColumnarBatch): Option[(Seq[(Map[String, String], String)], ExecutedWriteSummary)] = { + val numFiles = cb.numRows() + // Write an empty iterator + if (numFiles == 0) { + None + } else { + val file_col = cb.column(0) + val partition_col = cb.column(1) + val count_col = cb.column(2) + + val partitions: mutable.Set[String] = mutable.Set[String]() + val addedFiles: ArrayBuffer[(Map[String, String], String)] = + new ArrayBuffer[(Map[String, String], String)] + + var numWrittenRows: Long = 0 + Range(0, cb.numRows()).foreach { + i => + val fileName = file_col.getUTF8String(i).toString + val partition = partition_col.getUTF8String(i).toString + if (partition == "__NO_PARTITION_ID__") { + addedFiles.append((Map.empty[String, String], fileName)) + } else { + val partitionValues = committer.parsePartitions(partition) + addedFiles.append((partitionValues, s"$partition/$fileName")) + } + numWrittenRows += count_col.getLong(i) + } + val updatedPartitions = partitions.toSet + Some( + ( + addedFiles.toSeq, + ExecutedWriteSummary( + updatedPartitions = updatedPartitions, + stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions, numWrittenRows))))) + } + } + + override def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] = { + doCollectNativeResult(batch).map { + case (addedFiles, summary) => + require(addedFiles.nonEmpty, "No files to commit") + + committer.updateAddedFiles(addedFiles) + + val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { + committer.commitTask(taskAttemptContext) + } + + // Just for update task commit time + description.statsTrackers.foreach { + stats => stats.newTaskInstance().getFinalStats(taskCommitTime) + } + WriteTaskResult(taskCommitMessage, summary) + } + } +} + +object CHDeltaColumnarWrite { + def apply( + jobTrackerID: String, + description: WriteJobDescription, + committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = committer match { + case c: CHDelayedCommitProtocol => + CHDelayedCommitProtocolWrite(jobTrackerID, description, c) + .asInstanceOf[CHColumnarWrite[FileCommitProtocol]] + case _ => + throw new GlutenNotSupportException( + s"Unsupported committer type: ${committer.getClass.getSimpleName}") + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index db9bba5f170a..d608734307fb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -178,6 +178,6 @@ class CHColumnarShuffleWriter[K, V]( } // VisibleForTesting - def getPartitionLengths: Array[Long] = partitionLengths + def getPartitionLengths(): Array[Long] = partitionLengths } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala new file mode 100644 index 000000000000..6a5c19a4f939 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, PartitioningUtils, WriteJobDescription, WriteTaskResult} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobID, OutputCommitter, TaskAttemptContext, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import java.lang.reflect.Field + +import scala.collection.mutable + +trait CHColumnarWrite[T <: FileCommitProtocol] { + + def description: WriteJobDescription + def jobTrackerID: String + def committer: T + def doSetupNativeTask(): Unit + + def setupTask(): Unit = { + committer.setupTask(taskAttemptContext) + doSetupNativeTask() + } + + def abortTask(): Unit = { + committer.abortTask(taskAttemptContext) + } + def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] + + lazy val (taskAttemptContext: TaskAttemptContext, jobId: String) = { + // Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with multi-version + def createJobID(jobTrackerID: String, id: Int): JobID = { + if (id < 0) { + throw new IllegalArgumentException("Job number is negative") + } + new JobID(jobTrackerID, id) + } + + val sparkStageId: Int = TaskContext.get().stageId() + val sparkPartitionId: Int = TaskContext.get().partitionId() + val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue + val jobID = createJobID(jobTrackerID, sparkStageId) + val taskId = new TaskID(jobID, TaskType.MAP, sparkPartitionId) + val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) + + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + hadoopConf.set("mapreduce.job.id", jobID.toString) + hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapreduce.task.ismap", true) + hadoopConf.setInt("mapreduce.task.partition", 0) + + (new TaskAttemptContextImpl(hadoopConf, taskAttemptId), jobID.toString) + } +} + +object CreateFileNameSpec { + def apply(taskContext: TaskAttemptContext, description: WriteJobDescription): FileNameSpec = { + val fileCounter = 0 + val suffix = f".c$fileCounter%03d" + + description.outputWriterFactory.getFileExtension(taskContext) + FileNameSpec("", suffix) + } +} + +object CreateBasicWriteTaskStats { + def apply( + numFiles: Int, + updatedPartitions: Set[String], + numWrittenRows: Long): BasicWriteTaskStats = { + val partitionsInternalRows = updatedPartitions.map { + part => + val parts = new Array[Any](1) + parts(0) = part + new GenericInternalRow(parts) + }.toSeq + BasicWriteTaskStats( + partitions = partitionsInternalRows, + numFiles = numFiles, + numBytes = 101, + numRows = numWrittenRows) + } +} + +/** [[HadoopMapReduceAdapter]] for [[HadoopMapReduceCommitProtocol]]. */ +case class HadoopMapReduceAdapter(sparkCommitter: HadoopMapReduceCommitProtocol) { + private lazy val committer: OutputCommitter = { + val field: Field = classOf[HadoopMapReduceCommitProtocol].getDeclaredField("committer") + field.setAccessible(true) + field.get(sparkCommitter).asInstanceOf[OutputCommitter] + } + private lazy val GetFilename = { + val m = classOf[HadoopMapReduceCommitProtocol] + .getDeclaredMethod("getFilename", classOf[TaskAttemptContext], classOf[FileNameSpec]) + m.setAccessible(true) + m + } + + private def newTaskAttemptTempPath(defaultPath: String): String = { + assert(committer != null) + val stagingDir: Path = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => + new Path(Option(f.getWorkPath).map(_.toString).getOrElse(defaultPath)) + case _ => + new Path(defaultPath) + } + stagingDir.toString + } + + private def getFilename(taskContext: TaskAttemptContext, spec: FileNameSpec): String = { + GetFilename.invoke(sparkCommitter, taskContext, spec).asInstanceOf[String] + } + + def getTaskAttemptTempPathAndFilename( + taskContext: TaskAttemptContext, + description: WriteJobDescription): (String, String) = { + val stageDir = newTaskAttemptTempPath(description.path) + val filename = getFilename(taskContext, CreateFileNameSpec(taskContext, description)) + (stageDir, filename) + } +} + +case class HadoopMapReduceCommitProtocolWrite( + override val jobTrackerID: String, + override val description: WriteJobDescription, + override val committer: HadoopMapReduceCommitProtocol) + extends CHColumnarWrite[HadoopMapReduceCommitProtocol] + with Logging { + + private lazy val adapter: HadoopMapReduceAdapter = HadoopMapReduceAdapter(committer) + + /** + * This function is used in [[CHColumnarWriteFilesRDD]] to inject the staging write path before + * initializing the native plan and collect native write files metrics for each backend. + */ + override def doSetupNativeTask(): Unit = { + val (writePath, writeFileName) = + adapter.getTaskAttemptTempPathAndFilename(taskAttemptContext, description) + logDebug(s"Native staging write path: $writePath and file name: $writeFileName") + BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, writeFileName) + } + + def doCollectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult] = { + val numFiles = cb.numRows() + // Write an empty iterator + if (numFiles == 0) { + None + } else { + val file_col = cb.column(0) + val partition_col = cb.column(1) + val count_col = cb.column(2) + + val outputPath = description.path + val partitions: mutable.Set[String] = mutable.Set[String]() + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + + var numWrittenRows: Long = 0 + Range(0, cb.numRows()).foreach { + i => + val targetFileName = file_col.getUTF8String(i).toString + val partition = partition_col.getUTF8String(i).toString + if (partition != "__NO_PARTITION_ID__") { + partitions += partition + val tmpOutputPath = outputPath + "/" + partition + "/" + targetFileName + val customOutputPath = + description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partition)) + if (customOutputPath.isDefined) { + addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName + } + } + numWrittenRows += count_col.getLong(i) + } + + val updatedPartitions = partitions.toSet + val summary = + ExecutedWriteSummary( + updatedPartitions = updatedPartitions, + stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions, numWrittenRows))) + Some( + WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary)) + } + } + + override def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] = { + doCollectNativeResult(batch).map( + nativeWriteTaskResult => { + val (_, taskCommitTime) = Utils.timeTakenMs { + committer.commitTask(taskAttemptContext) + } + + // Just for update task commit time + description.statsTrackers.foreach { + stats => stats.newTaskInstance().getFinalStats(taskCommitTime) + } + nativeWriteTaskResult + }) + } +} + +object CHColumnarWrite { + def apply( + jobTrackerID: String, + description: WriteJobDescription, + committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = committer match { + case h: HadoopMapReduceCommitProtocol => + HadoopMapReduceCommitProtocolWrite(jobTrackerID, description, h) + .asInstanceOf[CHColumnarWrite[FileCommitProtocol]] + case other => CHDeltaColumnarWrite(jobTrackerID, description, other) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala index 52019d770945..bf051671fbba 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala @@ -16,16 +16,13 @@ */ package org.apache.spark.sql.execution -import org.apache.gluten.backendsapi.BackendsApiManager - import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf @@ -33,11 +30,10 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils import org.apache.hadoop.fs.FileAlreadyExistsException +import org.apache.hadoop.mapreduce.TaskAttemptContext import java.util.Date -import scala.collection.mutable - /** * This RDD is used to make sure we have injected staging write path before initializing the native * plan, and support Spark file commit protocol. @@ -49,60 +45,6 @@ class CHColumnarWriteFilesRDD( jobTrackerID: String) extends RDD[WriterCommitMessage](prev) { - private def collectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult] = { - val numFiles = cb.numRows() - // Write an empty iterator - if (numFiles == 0) { - None - } else { - val file_col = cb.column(0) - val partition_col = cb.column(1) - val count_col = cb.column(2) - - val outputPath = description.path - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - - val write_stats = Range(0, cb.numRows()).map { - i => - val targetFileName = file_col.getUTF8String(i).toString - val partition = partition_col.getUTF8String(i).toString - if (partition != "__NO_PARTITION_ID__") { - updatedPartitions += partition - val tmpOutputPath = outputPath + "/" + partition + "/" + targetFileName - val customOutputPath = - description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partition)) - if (customOutputPath.isDefined) { - addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName - } - } - count_col.getLong(i) - } - - val partitionsInternalRows = updatedPartitions.map { - part => - val parts = new Array[Any](1) - parts(0) = part - new GenericInternalRow(parts) - }.toSeq - - val numWrittenRows = write_stats.sum - val stats = BasicWriteTaskStats( - partitions = partitionsInternalRows, - numFiles = numFiles, - numBytes = 101, - numRows = numWrittenRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - - Some( - WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary)) - } - } - private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = { val stats = writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats] val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows) @@ -116,11 +58,12 @@ class CHColumnarWriteFilesRDD( } private def writeFilesForEmptyIterator( - commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = { - val taskAttemptContext = commitProtocol.taskAttemptContext + taskAttemptContext: TaskAttemptContext, + sparkPartitionId: Int + ): WriteTaskResult = { val dataWriter = - if (commitProtocol.sparkPartitionId != 0) { + if (sparkPartitionId != 0) { // In case of empty job, leave first partition to save meta for file format like parquet. new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty) { @@ -135,39 +78,35 @@ class CHColumnarWriteFilesRDD( } override def compute(split: Partition, context: TaskContext): Iterator[WriterCommitMessage] = { - val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, description, committer) + val commitProtocol = CHColumnarWrite(jobTrackerID, description, committer) commitProtocol.setupTask() - val writePath = commitProtocol.newTaskAttemptTempPath() - val writeFileName = commitProtocol.getFilename - logDebug(s"Native staging write path: $writePath and file name: $writeFileName") - var writeTaskResult: WriteTaskResult = null try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, writeFileName) // Initialize the native plan val iter = firstParent[ColumnarBatch].iterator(split, context) assert(iter.hasNext) val resultColumnarBatch = iter.next() assert(resultColumnarBatch != null) - val nativeWriteTaskResult = collectNativeResult(resultColumnarBatch) - if (nativeWriteTaskResult.isEmpty) { - // If we are writing an empty iterator, then velox would do nothing. - // Here we fallback to use vanilla Spark write files to generate an empty file for - // metadata only. - writeTaskResult = writeFilesForEmptyIterator(commitProtocol) - // We have done commit task inside `writeFilesForEmptyIterator`. - } else { - writeTaskResult = nativeWriteTaskResult.get - commitProtocol.commitTask() - } + val writeTaskResult = commitProtocol + .commitTask(resultColumnarBatch) + .orElse({ + // If we are writing an empty iterator, then gluten backend would do nothing. + // Here we fallback to use vanilla Spark write files to generate an empty file for + // metadata only. + Some(writeFilesForEmptyIterator(commitProtocol.taskAttemptContext, context.partitionId)) + // We have done commit task inside `writeFilesForEmptyIterator`. + }) + .get + reportTaskMetrics(writeTaskResult) + Iterator.single(writeTaskResult) })( catchBlock = { // If there is an error, abort the task commitProtocol.abortTask() - logError(s"Job ${commitProtocol.getJobId} aborted.") + logError(s"Job ${commitProtocol.jobId} aborted.") } ) } catch { @@ -177,14 +116,9 @@ class CHColumnarWriteFilesRDD( throw new TaskOutputFileAlreadyExistException(f) case t: Throwable => throw new SparkException( - s"Task failed while writing rows to staging path: $writePath, " + - s"output path: ${description.path}", + s"Task failed while writing rows to output path: ${description.path}", t) } - - assert(writeTaskResult != null) - reportTaskMetrics(writeTaskResult) - Iterator.single(writeTaskResult) } override protected def getPartitions: Array[Partition] = firstParent[ColumnarBatch].partitions diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 17eb0ed0b037..7526e6d3d70d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -127,7 +127,7 @@ object CHExecUtil extends Logging { result } - override def next: UnsafeRow = { + override def next(): UnsafeRow = { if (rowId >= rows) throw new NoSuchElementException val (offset, length) = (rowInfo.offsets(rowId), rowInfo.lengths(rowId)) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala index 8f8351baeae1..d6f9a0162216 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala @@ -1251,7 +1251,7 @@ class GlutenClickHouseDeltaParquetWriteSuite runTPCHQueryBySQL(1, sqlStr) { _ => {} } } - test("test parquet optimize basic") { + testSparkVersionLE33("test parquet optimize basic") { withSQLConf("spark.databricks.delta.optimize.maxFileSize" -> "20000000") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_delta_parquet_optimize; @@ -1286,7 +1286,7 @@ class GlutenClickHouseDeltaParquetWriteSuite } } - test("test parquet optimize partitioned by one low card column") { + testSparkVersionLE33("test parquet optimize partitioned by one low card column") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_delta_parquet_optimize_p2; |""".stripMargin) @@ -1325,7 +1325,7 @@ class GlutenClickHouseDeltaParquetWriteSuite assert(ret2.apply(0).get(0) == 600572) } - test("test parquet optimize parallel delete") { + testSparkVersionLE33("test parquet optimize parallel delete") { withSQLConf("spark.databricks.delta.vacuum.parallelDelete.enabled" -> "true") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_delta_parquet_optimize_p4; @@ -1356,7 +1356,7 @@ class GlutenClickHouseDeltaParquetWriteSuite } } - test("test parquet optimize with the path based table") { + testSparkVersionLE33("test parquet optimize with the path based table") { val dataPath = s"$basePath/lineitem_delta_parquet_optimize_path_based" clearDataPath(dataPath) withSQLConf( @@ -1372,14 +1372,16 @@ class GlutenClickHouseDeltaParquetWriteSuite .mode(SaveMode.Append) .save(dataPath) + assert(countFiles(new File(dataPath)) === 51) + val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() clickhouseTable.vacuum(0.0) if (sparkVersion.equals("3.2")) { - assert(countFiles(new File(dataPath)) == 27) + assert(countFiles(new File(dataPath)) === 27) } else { - assert(countFiles(new File(dataPath)) == 29) + assert(countFiles(new File(dataPath)) === 29) } val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala index 1f99947e5b96..11710a7589da 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala @@ -187,7 +187,7 @@ class GlutenClickHouseNativeWriteTableSuite checkNative: Boolean = true): Unit = nativeWrite { format => val (table_name, table_create_sql, insert_sql) = f(format) - withDestinationTable(table_name, table_create_sql) { + withDestinationTable(table_name, Option(table_create_sql)) { checkInsertQuery(insert_sql, checkNative) Option(extraCheck).foreach(_(table_name, format)) } @@ -218,15 +218,36 @@ class GlutenClickHouseNativeWriteTableSuite } test("supplier: csv to parquet- insert overwrite local directory") { + val partitionNumber = 7 withSource(supplierDF, "supplier") { - nativeWrite { - format => + nativeWrite2( + format => { val sql = s"""insert overwrite local directory |'$basePath/test_insert_into_${format}_supplier' - |stored as $format select * from supplier""".stripMargin - checkInsertQuery(sql, checkNative = true) - } + |stored as $format + |select /*+ REPARTITION($partitionNumber) */ * from supplier""".stripMargin + (s"test_insert_into_${format}_supplier", null, sql) + }, + (table_name, format) => { + // spark 3.2 without orc or parquet suffix + val files = recursiveListFiles(new File(s"$basePath/$table_name")) + .map(_.getName) + .filterNot(s => s.endsWith(s".crc") || s.equals("_SUCCESS")) + + lazy val fileNames = { + val dir = s"$basePath/$table_name" + recursiveListFiles(new File(dir)) + .map(f => f.getAbsolutePath.stripPrefix(dir)) + .sorted + .mkString("\n") + } + + lazy val errorMessage = + s"Search $basePath/$table_name with suffix .$format, all files: \n $fileNames" + assert(files.length === partitionNumber, errorMessage) + } + ) } } @@ -851,7 +872,7 @@ class GlutenClickHouseNativeWriteTableSuite val table_name = "t_" + format withDestinationTable( table_name, - s"create table $table_name (id int, str string) stored as $format") { + Some(s"create table $table_name (id int, str string) stored as $format")) { checkInsertQuery( s"insert overwrite table $table_name " + "select id, cast(id as string) from range(10) union all " + diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala index fc30d151b675..4bee3f1771a9 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala @@ -62,9 +62,10 @@ trait NativeWriteChecker spark.sql(sqlStr) } - def withDestinationTable(table: String, createTableSql: String = "select 1")(f: => Unit): Unit = { + def withDestinationTable(table: String, createTableSql: Option[String] = None)( + f: => Unit): Unit = { spark.sql(s"drop table IF EXISTS $table") - spark.sql(s"$createTableSql") + createTableSql.foreach(spark.sql) f } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala similarity index 82% rename from gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala rename to backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala index 5e3ab83e32e7..845f2f98fb8c 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.WriteJobDescription import org.apache.spark.util.Utils @@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol( extends Logging { assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol]) - val sparkStageId: Int = TaskContext.get().stageId() - val sparkPartitionId: Int = TaskContext.get().partitionId() - private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue + val sparkStageId = TaskContext.get().stageId() + val sparkPartitionId = TaskContext.get().partitionId() + val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue private val jobId = createJobID(jobTrackerID, sparkStageId) private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -68,21 +68,6 @@ class SparkWriteFilesCommitProtocol( field.get(committer).asInstanceOf[OutputCommitter] } - private lazy val internalGetFilename = { - val m = classOf[HadoopMapReduceCommitProtocol] - .getDeclaredMethod("getFilename", classOf[TaskAttemptContext], classOf[FileNameSpec]) - m.setAccessible(true) - m - } - - def getFilename: String = { - val fileCounter = 0 - val suffix = f".c$fileCounter%03d" + - description.outputWriterFactory.getFileExtension(taskAttemptContext) - val fileNameSpec = FileNameSpec("", suffix) - internalGetFilename.invoke(committer, taskAttemptContext, fileNameSpec).asInstanceOf[String] - } - def setupTask(): Unit = { committer.setupTask(taskAttemptContext) } diff --git a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala index 727613f563f0..d981de8046a9 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala @@ -49,7 +49,7 @@ object GlutenPlanModel { override protected def doExecute(): RDD[InternalRow] = throw new IllegalStateException() override def output: Seq[Attribute] = metadata.schema().output - override def supportsColumnar(): Boolean = { + override def supportsColumnar: Boolean = { batchType != Convention.BatchType.None } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala index 7f6fd01ac47f..9c54653d9d4a 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala @@ -49,7 +49,7 @@ abstract class ColumnarWriteFilesExec protected ( override lazy val references: AttributeSet = AttributeSet.empty - override def supportsColumnar(): Boolean = true + override def supportsColumnar: Boolean = true override def output: Seq[Attribute] = Seq.empty