From c47112f6a5415ce13b43348e0453513da043fcff Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Thu, 4 Jan 2024 14:20:53 +0800 Subject: [PATCH] [VL] Support spark file commit protocol (#4264) --- .../backendsapi/velox/IteratorApiImpl.scala | 5 + .../SparkWriteFilesCommitProtocol.scala | 111 +++++++++ .../VeloxColumnarWriteFilesExec.scala | 217 +++++++++++------- cpp/core/compute/Runtime.h | 3 + cpp/core/jni/JniWrapper.cc | 16 ++ cpp/velox/compute/VeloxPlanConverter.cc | 3 +- cpp/velox/compute/VeloxPlanConverter.h | 1 + cpp/velox/compute/VeloxRuntime.cc | 9 +- cpp/velox/compute/VeloxRuntime.h | 2 + cpp/velox/substrait/SubstraitToVeloxPlan.cc | 13 +- cpp/velox/substrait/SubstraitToVeloxPlan.h | 6 +- .../substrait/SubstraitToVeloxPlanValidator.h | 2 +- cpp/velox/tests/RuntimeTest.cc | 3 + ...Substrait2VeloxValuesNodeConversionTest.cc | 2 +- .../tests/VeloxSubstraitRoundTripTest.cc | 4 +- .../substrait/rel/RelBuilder.java | 3 +- .../substrait/rel/WriteRelNode.java | 6 - .../backendsapi/IteratorApi.scala | 6 + .../execution/WriteFilesExecTransformer.scala | 75 ++---- .../vectorized/NativePlanEvaluator.java | 5 + .../vectorized/PlanEvaluatorJniWrapper.java | 2 + .../clickhouse/ClickHouseTestSettings.scala | 1 + .../spark/sql/sources/GlutenInsertSuite.scala | 22 ++ 23 files changed, 357 insertions(+), 160 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index 80d96e27ad49..baa2e904c315 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -116,6 +116,11 @@ class IteratorApiImpl extends IteratorApi with Logging { (paths, starts, lengths, partitionColumns) } + override def injectWriteFilesTempPath(path: String): Unit = { + val transKernel = NativePlanEvaluator.create() + transKernel.injectWriteFilesTempPath(path) + } + /** * Generate Iterator[ColumnarBatch] for first stage. * diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala new file mode 100644 index 000000000000..9f5e80427fdf --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.sql.execution.datasources.WriteJobDescription +import org.apache.spark.util.Utils + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import java.lang.reflect.Field + +/** + * A wrapper for [[HadoopMapReduceCommitProtocol]]. This class only affects the task side commit + * process. e.g., `setupTask`, `newTaskAttemptTempPath`, `commitTask`, `abortTask`. The job commit + * process is at vanilla Spark driver side. + */ +class SparkWriteFilesCommitProtocol( + jobTrackerID: String, + description: WriteJobDescription, + committer: FileCommitProtocol) + extends Logging { + assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol]) + + private val sparkStageId = TaskContext.get().stageId() + private val sparkPartitionId = TaskContext.get().partitionId() + private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue + private val jobId = createJobID(jobTrackerID, sparkStageId) + + private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) + private val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) + + // Set up the attempt context required to use in the output committer. + private val taskAttemptContext: TaskAttemptContext = { + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + hadoopConf.set("mapreduce.job.id", jobId.toString) + hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapreduce.task.ismap", true) + hadoopConf.setInt("mapreduce.task.partition", 0) + + new TaskAttemptContextImpl(hadoopConf, taskAttemptId) + } + + private lazy val internalCommitter: OutputCommitter = { + val field: Field = classOf[HadoopMapReduceCommitProtocol].getDeclaredField("committer") + field.setAccessible(true) + field.get(committer).asInstanceOf[OutputCommitter] + } + + def setupTask(): Unit = { + committer.setupTask(taskAttemptContext) + } + + def getJobId: String = jobId.toString + + def newTaskAttemptTempPath(): String = { + assert(internalCommitter != null) + val stagingDir: Path = internalCommitter match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => + new Path(Option(f.getWorkPath).map(_.toString).getOrElse(description.path)) + case _ => + new Path(description.path) + } + stagingDir.toString + } + + def commitTask(): Unit = { + val (_, taskCommitTime) = Utils.timeTakenMs { + committer.commitTask(taskAttemptContext) + } + + // Just for update task commit time + description.statsTrackers.foreach { + stats => stats.newTaskInstance().getFinalStats(taskCommitTime) + } + } + + def abortTask(): Unit = { + committer.abortTask(taskAttemptContext) + } + + // Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with multi-version + private def createJobID(jobTrackerID: String, id: Int): JobID = { + if (id < 0) { + throw new IllegalArgumentException("Job number is negative") + } + new JobID(jobTrackerID, id) + } +} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 18447ceb5dd6..dc9d6e693baa 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -16,22 +16,30 @@ */ package org.apache.spark.sql.execution +import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.columnarbatch.ColumnarBatches -import io.glutenproject.execution.WriteFilesExecTransformer import io.glutenproject.extension.GlutenPlan import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators -import org.apache.spark.TaskContext +import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.RDD +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.hadoop.fs.FileAlreadyExistsException + +import java.util.Date import scala.collection.mutable @@ -73,6 +81,131 @@ case class VeloxWriteFilesMetrics( // Velox write files metrics end +/** + * This RDD is used to make sure we have injected staging write path before initializing the native + * plan, and support Spark file commit protocol. + */ +class VeloxColumnarWriteFilesRDD( + var prev: RDD[ColumnarBatch], + writeFilesSpec: WriteFilesSpec, + jobTrackerID: String) + extends RDD[WriterCommitMessage](prev) { + + private def collectNativeWriteFilesMetrics(cb: ColumnarBatch): WriteTaskResult = { + // Currently, the cb contains three columns: row, fragments, and context. + // The first row in the row column contains the number of written numRows. + // The fragments column contains detailed information about the file writes. + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + assert(loadedCb.numCols() == 3) + val numWrittenRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + val objectMapper = new ObjectMapper() + objectMapper.registerModule(DefaultScalaModule) + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val metrics = objectMapper + .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) + logDebug(s"Velox write files metrics: $metrics") + + val fileWriteInfos = metrics.fileWriteInfos + assert(fileWriteInfos.length == 1) + val fileWriteInfo = fileWriteInfos.head + numBytes += fileWriteInfo.fileSize + val targetFileName = fileWriteInfo.targetFileName + val outputPath = writeFilesSpec.description.path + + // part1=1/part2=1 + val partitionFragment = metrics.name + // Write a non-partitioned table + if (partitionFragment != "") { + updatedPartitions += partitionFragment + val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName + val customOutputPath = writeFilesSpec.description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionFragment)) + if (customOutputPath.isDefined) { + addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName + } + } + } + + // Reports bytesWritten and recordsWritten to the Spark output metrics. + Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { + outputMetrics => + outputMetrics.setBytesWritten(numBytes) + outputMetrics.setRecordsWritten(numWrittenRows) + } + + val partitionsInternalRows = updatedPartitions.map { + part => + val parts = new Array[Any](1) + parts(0) = part + new GenericInternalRow(parts) + }.toSeq + val stats = BasicWriteTaskStats( + partitions = partitionsInternalRows, + numFiles = loadedCb.numRows() - 1, + numBytes = numBytes, + numRows = numWrittenRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + WriteTaskResult(new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), summary) + } + + override def compute(split: Partition, context: TaskContext): Iterator[WriterCommitMessage] = { + val commitProtocol = new SparkWriteFilesCommitProtocol( + jobTrackerID, + writeFilesSpec.description, + writeFilesSpec.committer) + + commitProtocol.setupTask() + val writePath = commitProtocol.newTaskAttemptTempPath() + logDebug(s"Velox staging write path: $writePath") + var resultColumnarBatch: ColumnarBatch = null + try { + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath) + + // Initialize the native plan + val iter = firstParent[ColumnarBatch].iterator(split, context) + assert(iter.hasNext) + resultColumnarBatch = iter.next() + commitProtocol.commitTask() + })( + catchBlock = { + // If there is an error, abort the task + commitProtocol.abortTask() + logError(s"Job ${commitProtocol.getJobId} aborted.") + } + ) + } catch { + case e: FetchFailedException => + throw e + case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => + throw new TaskOutputFileAlreadyExistException(f) + case t: Throwable => + throw new SparkException( + s"Task failed while writing rows to staging path: $writePath, " + + s"output path: ${writeFilesSpec.description.path}", + t) + } + + assert(resultColumnarBatch != null) + val writeTaskResult = collectNativeWriteFilesMetrics(resultColumnarBatch) + Iterator.single(writeTaskResult) + } + + override protected def getPartitions: Array[Partition] = firstParent[ColumnarBatch].partitions + + override def clearDependencies(): Unit = { + super.clearDependencies() + prev = null + } +} + class VeloxColumnarWriteFilesExec( child: SparkPlan, fileFormat: FileFormat, @@ -89,84 +222,8 @@ class VeloxColumnarWriteFilesExec( override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { assert(child.supportsColumnar) - - // We need to pass the WritePath to the Velox TableWriter in the doTransform - // method of the WriteTransformer. However, the WritePath is not accessible - // during the planning phase in the WriteTransformer, and can only be obtained - // during the actual execution, specifically in the doExecuteWrite method of - // ColumnarWriteFilesExec, where it is available within the WriteFilesSpec. - // Therefore, we use this hack method to pass the writePath. - WriteFilesExecTransformer.withWriteFilePath(writeFilesSpec.description.path) { - child.executeColumnar().mapPartitionsInternal { - iter => - // Currently, the cb contains three columns: row, fragments, and context. - // The first row in the row column contains the number of written numRows. - // The fragments column contains detailed information about the file writes. - assert(iter.hasNext) - val cb = iter.next() - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - assert(loadedCb.numCols() == 3) - val numWrittenRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - val objectMapper = new ObjectMapper() - objectMapper.registerModule(DefaultScalaModule) - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) - val metrics = objectMapper - .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) - logDebug(s"Velox write files metrics: $metrics") - - val fileWriteInfos = metrics.fileWriteInfos - assert(fileWriteInfos.length == 1) - val fileWriteInfo = fileWriteInfos.head - numBytes += fileWriteInfo.fileSize - val targetFileName = fileWriteInfo.targetFileName - val outputPath = writeFilesSpec.description.path - - // part1=1/part2=1 - val partitionFragment = metrics.name - // write a non-partitioned table - if (partitionFragment != "") { - updatedPartitions += partitionFragment - val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName - val customOutputPath = writeFilesSpec.description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionFragment)) - if (customOutputPath.isDefined) { - addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName - } - } - } - - // Reports bytesWritten and recordsWritten to the Spark output metrics. - Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { - outputMetrics => - outputMetrics.setBytesWritten(numBytes) - outputMetrics.setRecordsWritten(numWrittenRows) - } - - val partitionsInternalRows = updatedPartitions.map { - part => - val parts = new Array[Any](1) - parts(0) = part - new GenericInternalRow(parts) - }.toSeq - val stats = BasicWriteTaskStats( - partitions = partitionsInternalRows, - numFiles = loadedCb.numRows() - 1, - numBytes = numBytes, - numRows = numWrittenRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - - val result = WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary) - Iterator.single(result) - } - } + val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) + new VeloxColumnarWriteFilesRDD(child.executeColumnar(), writeFilesSpec, jobTrackerID) } override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index f188e24203d1..5d86656037dc 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -69,6 +69,8 @@ class Runtime : public std::enable_shared_from_this { virtual std::string planString(bool details, const std::unordered_map& sessionConf) = 0; + virtual void injectWriteFilesTempPath(const std::string& path) = 0; + // Just for benchmark ::substrait::Plan& getPlan() { return substraitPlan_; @@ -136,6 +138,7 @@ class Runtime : public std::enable_shared_from_this { protected: std::unique_ptr objStore_ = ObjectStore::create(); ::substrait::Plan substraitPlan_; + std::optional writeFilesTempPath_; SparkTaskInfo taskInfo_; // Session conf map const std::unordered_map confMap_; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 3974ec8363cb..c3edf4a4c8e3 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -349,6 +349,22 @@ JNIEXPORT jstring JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapp JNI_METHOD_END(nullptr) } +JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath( // NOLINT + JNIEnv* env, + jobject wrapper, + jbyteArray path) { + JNI_METHOD_START + + auto len = env->GetArrayLength(path); + jbyte* bytes = env->GetByteArrayElements(path, 0); + std::string pathStr(reinterpret_cast(bytes), len); + auto ctx = gluten::getRuntime(env, wrapper); + ctx->injectWriteFilesTempPath(pathStr); + env->ReleaseByteArrayElements(path, bytes, JNI_ABORT); + + JNI_METHOD_END() +} + JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithIterator( // NOLINT JNIEnv* env, diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 45b2927a4ff1..4d181e4ffd43 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -33,10 +33,11 @@ VeloxPlanConverter::VeloxPlanConverter( const std::vector>& inputIters, velox::memory::MemoryPool* veloxPool, const std::unordered_map& confMap, + const std::optional writeFilesTempPath, bool validationMode) : inputIters_(inputIters), validationMode_(validationMode), - substraitVeloxPlanConverter_(veloxPool, confMap, validationMode), + substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode), pool_(veloxPool) {} void VeloxPlanConverter::setInputPlanNode(const ::substrait::WriteRel& writeRel) { diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 01fd9bcfa4e6..f8833728f65c 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -33,6 +33,7 @@ class VeloxPlanConverter { const std::vector>& inputIters, facebook::velox::memory::MemoryPool* veloxPool, const std::unordered_map& confMap, + const std::optional writeFilesTempPath = std::nullopt, bool validationMode = false); std::shared_ptr toVeloxPlan(::substrait::Plan& substraitPlan); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 66f0962a47d7..73de78fa6c11 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -74,11 +74,15 @@ void VeloxRuntime::getInfoAndIds( std::string VeloxRuntime::planString(bool details, const std::unordered_map& sessionConf) { std::vector> inputs; auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); - VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), sessionConf, true); + VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), sessionConf, std::nullopt, true); auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_); return veloxPlan->toString(details, true); } +void VeloxRuntime::injectWriteFilesTempPath(const std::string& path) { + writeFilesTempPath_ = path; +} + std::shared_ptr VeloxRuntime::createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, @@ -88,7 +92,8 @@ std::shared_ptr VeloxRuntime::createResultIterator( LOG(INFO) << "VeloxRuntime session config:" << printConfig(confMap_); } - VeloxPlanConverter veloxPlanConverter(inputs, getLeafVeloxPool(memoryManager).get(), sessionConf); + VeloxPlanConverter veloxPlanConverter( + inputs, getLeafVeloxPool(memoryManager).get(), sessionConf, writeFilesTempPath_); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_); // Scan node can be required. diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 2d5d727624f2..3ae83f66b5a3 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -109,6 +109,8 @@ class VeloxRuntime final : public Runtime { std::string planString(bool details, const std::unordered_map& sessionConf) override; + void injectWriteFilesTempPath(const std::string& path) override; + std::shared_ptr getVeloxPlan() { return veloxPlan_; } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index fd3fd86bf7c9..0826ca050ce0 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -539,11 +539,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } } - std::vector writePath; - writePath.reserve(1); - VELOX_CHECK(writeRel.named_table().names().size() == 1) - for (const auto& name : writeRel.named_table().names()) { - writePath.emplace_back(name); + std::string writePath; + if (writeFilesTempPath_.has_value()) { + writePath = writeFilesTempPath_.value(); + } else { + VELOX_CHECK(validationMode_, "WriteRel should have the write path before initializing the plan."); + writePath = ""; } // spark default compression code is snappy. @@ -582,7 +583,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: inputType->children(), partitionedKey, nullptr /*bucketProperty*/, - makeLocationHandle(writePath[0]), + makeLocationHandle(writePath), dwio::common::FileFormat::PARQUET, // Currently only support parquet format. compressionCodec)), (partitionedKey.size() > 0) ? true : false, diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index ffb973ee01e6..1d90bfbbb80a 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -58,8 +58,9 @@ class SubstraitToVeloxPlanConverter { SubstraitToVeloxPlanConverter( memory::MemoryPool* pool, const std::unordered_map& confMap = {}, + const std::optional writeFilesTempPath = std::nullopt, bool validationMode = false) - : pool_(pool), confMap_(confMap), validationMode_(validationMode) {} + : pool_(pool), confMap_(confMap), writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {} /// Used to convert Substrait WriteRel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel); @@ -541,6 +542,9 @@ class SubstraitToVeloxPlanConverter { /// A map of custom configs. std::unordered_map confMap_; + /// The temporary path used to write files. + std::optional writeFilesTempPath_; + /// A flag used to specify validation. bool validationMode_ = false; }; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index ad237f7a701b..b4412d860962 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -27,7 +27,7 @@ namespace gluten { class SubstraitToVeloxPlanValidator { public: SubstraitToVeloxPlanValidator(memory::MemoryPool* pool, core::ExecCtx* execCtx) - : pool_(pool), execCtx_(execCtx), planConverter_(pool_, confMap_, true) {} + : pool_(pool), execCtx_(execCtx), planConverter_(pool_, confMap_, std::nullopt, true) {} /// Used to validate whether the computing of this Write is supported. bool validate(const ::substrait::WriteRel& writeRel); diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index c6af63b8e0c2..4860155e3e21 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -89,6 +89,9 @@ class DummyRuntime final : public Runtime { std::string planString(bool details, const std::unordered_map& sessionConf) override { throw GlutenException("Not yet implemented"); } + void injectWriteFilesTempPath(const std::string& path) override { + throw GlutenException("Not yet implemented"); + } private: ResourceMap> resultIteratorHolder_; diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc index 0c50d39c0d71..75099db95976 100644 --- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc @@ -42,7 +42,7 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) { JsonToProtoConverter::readFromFile(planPath, substraitPlan); std::unordered_map sessionConf = {}; std::shared_ptr planConverter_ = - std::make_shared(pool_.get(), sessionConf, true); + std::make_shared(pool_.get(), sessionConf, std::nullopt, true); auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan); RowVectorPtr expectedData = makeRowVector( diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc index fc302c8d7839..cf5b72b4d366 100644 --- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc +++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc @@ -67,7 +67,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase { auto substraitPlan = veloxConvertor_->toSubstrait(arena, plan); std::unordered_map sessionConf = {}; std::shared_ptr substraitConverter_ = - std::make_shared(pool_.get(), sessionConf, true); + std::make_shared(pool_.get(), sessionConf, std::nullopt, true); // Convert Substrait Plan to the same Velox Plan. auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan); @@ -87,7 +87,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase { auto substraitPlan = veloxConvertor_->toSubstrait(arena, plan); std::unordered_map sessionConf = {}; std::shared_ptr substraitConverter_ = - std::make_shared(pool_.get(), sessionConf, true); + std::make_shared(pool_.get(), sessionConf, std::nullopt, true); // Convert Substrait Plan to the same Velox Plan. auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan); diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java index 1c590cd785e9..0584317bd736 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java @@ -184,12 +184,11 @@ public static RelNode makeWriteRel( List types, List names, List columnTypeNodes, - String writePath, AdvancedExtensionNode extensionNode, SubstraitContext context, Long operatorId) { context.registerRelToOperator(operatorId); - return new WriteRelNode(input, types, names, columnTypeNodes, writePath, extensionNode); + return new WriteRelNode(input, types, names, columnTypeNodes, extensionNode); } public static RelNode makeSortRel( diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java index fae8deeb5d24..6b9c5e4e9dc9 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java @@ -35,7 +35,6 @@ public class WriteRelNode implements RelNode, Serializable { private final List types = new ArrayList<>(); private final List names = new ArrayList<>(); - private final String writePath; private final List columnTypeNodes = new ArrayList<>(); private final AdvancedExtensionNode extensionNode; @@ -45,13 +44,11 @@ public class WriteRelNode implements RelNode, Serializable { List types, List names, List partitionColumnTypeNodes, - String writePath, AdvancedExtensionNode extensionNode) { this.input = input; this.types.addAll(types); this.names.addAll(names); this.columnTypeNodes.addAll(partitionColumnTypeNodes); - this.writePath = writePath; this.extensionNode = extensionNode; } @@ -79,9 +76,6 @@ public Rel toProtobuf() { writeBuilder.setTableSchema(nStructBuilder); NamedObjectWrite.Builder nameObjectWriter = NamedObjectWrite.newBuilder(); - if (writePath != "") { - nameObjectWriter.addNames(writePath); - } if (extensionNode != null) { nameObjectWriter.setAdvancedExtension(extensionNode.toProtobuf()); diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala index 03100cfa6e0f..688755fb8ae2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala @@ -44,6 +44,12 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo + /** + * Inject the task attempt temporary path for native write files, this method should be called + * before `genFirstStageIterator` or `genFinalStageIterator` + */ + def injectWriteFilesTempPath(path: String): Unit = throw new UnsupportedOperationException() + /** * Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other * SCAN inputs) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala index 350f65251d25..907f5c1ac0b3 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -25,20 +25,22 @@ import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.extensions.ExtensionBuilder import io.glutenproject.substrait.rel.{RelBuilder, RelNode} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.protobuf.{Any, StringValue} import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` +/** + * Note that, the output staging path is set by `VeloxColumnarWriteFilesExec`, each task should have + * its own staging path. + */ case class WriteFilesExecTransformer( child: SparkPlan, fileFormat: FileFormat, @@ -77,7 +79,6 @@ case class WriteFilesExecTransformer( def getRelNode( context: SubstraitContext, originalInputAttributes: Seq[Attribute], - writePath: String, operatorId: Long, input: RelNode, validation: Boolean): RelNode = { @@ -103,34 +104,22 @@ case class WriteFilesExecTransformer( val nameList = ConverterUtils.collectAttributeNames(inputAttributes.toSeq) - - if (!validation) { - val extensionNode = ExtensionBuilder.makeAdvancedExtension( + val extensionNode = if (!validation) { + ExtensionBuilder.makeAdvancedExtension( genWriteParameters(), createEnhancement(originalInputAttributes)) - RelBuilder.makeWriteRel( - input, - typeNodes, - nameList, - columnTypeNodes, - writePath, - extensionNode, - context, - operatorId) } else { // Use a extension node to send the input types through Substrait plan for validation. - val extensionNode = - ExtensionBuilder.makeAdvancedExtension(createEnhancement(originalInputAttributes)) - RelBuilder.makeWriteRel( - input, - typeNodes, - nameList, - columnTypeNodes, - writePath, - extensionNode, - context, - operatorId) + ExtensionBuilder.makeAdvancedExtension(createEnhancement(originalInputAttributes)) } + RelBuilder.makeWriteRel( + input, + typeNodes, + nameList, + columnTypeNodes, + extensionNode, + context, + operatorId) } override protected def doValidateInternal(): ValidationResult = { @@ -148,49 +137,19 @@ case class WriteFilesExecTransformer( val substraitContext = new SubstraitContext val operatorId = substraitContext.nextOperatorId(this.nodeName) - - val relNode = - getRelNode(substraitContext, child.output, "", operatorId, null, validation = true) + val relNode = getRelNode(substraitContext, child.output, operatorId, null, validation = true) doNativeValidation(substraitContext, relNode) } override def doTransform(context: SubstraitContext): TransformContext = { - val writePath = WriteFilesExecTransformer.getWriteFilePath val childCtx = child.asInstanceOf[TransformSupport].doTransform(context) - val operatorId = context.nextOperatorId(this.nodeName) - - val currRel = - getRelNode(context, child.output, writePath, operatorId, childCtx.root, validation = false) + val currRel = getRelNode(context, child.output, operatorId, childCtx.root, validation = false) assert(currRel != null, "Write Rel should be valid") TransformContext(childCtx.outputAttributes, output, currRel) } - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().") - } - override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer = copy(child = newChild) } - -object WriteFilesExecTransformer { - private val writeFilePathThreadLocal = new ThreadLocal[String] - - def withWriteFilePath[T](path: String)(f: => T): T = { - val origin = writeFilePathThreadLocal.get() - writeFilePathThreadLocal.set(path) - try { - f - } finally { - writeFilePathThreadLocal.set(origin) - } - } - - def getWriteFilePath: String = { - val writeFilePath = writeFilePathThreadLocal.get() - assert(writeFilePath != null) - writeFilePath - } -} diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java index 0c7d0bccb81d..eac84f5ae3db 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java @@ -31,6 +31,7 @@ import org.apache.spark.util.SparkDirectoryUtil; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; import java.util.Set; @@ -58,6 +59,10 @@ public NativePlanValidationInfo doNativeValidateWithFailureReason(byte[] subPlan return jniWrapper.nativeValidateWithFailureReason(subPlan); } + public void injectWriteFilesTempPath(String path) { + jniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8)); + } + // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java index 3af3e8924c94..46115c191555 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java @@ -56,6 +56,8 @@ public long handle() { public native String nativePlanString(byte[] substraitPlan, Boolean details); + public native void injectWriteFilesTempPath(byte[] path); + /** * Create a native compute kernel and return a columnar result iterator. * diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index cee0417df763..b751588e245d 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -1782,6 +1782,7 @@ class ClickHouseTestSettings extends BackendTestSettings { enableSuite[GlutenFilteredScanSuite] enableSuite[GlutenFiltersSuite] enableSuite[GlutenInsertSuite] + .excludeByPrefix("Gluten: ") enableSuite[GlutenPartitionedWriteSuite] enableSuite[GlutenPathOptionSuite] enableSuite[GlutenPrunedScanSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index a34e6c065c79..e2a8996007b5 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -20,11 +20,14 @@ import org.apache.spark.SparkConf import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.util.QueryExecutionListener +import java.io.File + class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait { override def sparkConf: SparkConf = { @@ -73,10 +76,29 @@ class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait { assert(sqlMetrics("numOutputBytes").value > 0) assert(sqlMetrics("numFiles").value == 1) + checkAnswer(spark.sql("SELECT * FROM pt"), Row(1, "a", "a") :: Row(2, "b", "a") :: Nil) } finally { spark.sparkContext.removeSparkListener(taskListener) spark.listenerManager.unregister(queryListener) } } } + + test("Cleanup staging files if job is failed") { + withTable("t") { + spark.sql("CREATE TABLE t (c1 int, c2 string) USING PARQUET") + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(new File(table.location).list().length == 0) + + intercept[Exception] { + spark.sql( + """ + |INSERT INTO TABLE t + |SELECT id, assert_true(SPARK_PARTITION_ID() = 1) FROM range(1, 3, 1, 2) + |""".stripMargin + ) + } + assert(new File(table.location).list().length == 0) + } + } }