Skip to content

Commit

Permalink
[VL] Support spark file commit protocol (apache#4264)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Jan 4, 2024
1 parent a3be911 commit c47112f
Show file tree
Hide file tree
Showing 23 changed files with 357 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ class IteratorApiImpl extends IteratorApi with Logging {
(paths, starts, lengths, partitionColumns)
}

override def injectWriteFilesTempPath(path: String): Unit = {
val transKernel = NativePlanEvaluator.create()
transKernel.injectWriteFilesTempPath(path)
}

/**
* Generate Iterator[ColumnarBatch] for first stage.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.datasources.WriteJobDescription
import org.apache.spark.util.Utils

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.lang.reflect.Field

/**
* A wrapper for [[HadoopMapReduceCommitProtocol]]. This class only affects the task side commit
* process. e.g., `setupTask`, `newTaskAttemptTempPath`, `commitTask`, `abortTask`. The job commit
* process is at vanilla Spark driver side.
*/
class SparkWriteFilesCommitProtocol(
jobTrackerID: String,
description: WriteJobDescription,
committer: FileCommitProtocol)
extends Logging {
assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])

private val sparkStageId = TaskContext.get().stageId()
private val sparkPartitionId = TaskContext.get().partitionId()
private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue
private val jobId = createJobID(jobTrackerID, sparkStageId)

private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
private val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)

// Set up the attempt context required to use in the output committer.
private val taskAttemptContext: TaskAttemptContext = {
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
hadoopConf.set("mapreduce.job.id", jobId.toString)
hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}

private lazy val internalCommitter: OutputCommitter = {
val field: Field = classOf[HadoopMapReduceCommitProtocol].getDeclaredField("committer")
field.setAccessible(true)
field.get(committer).asInstanceOf[OutputCommitter]
}

def setupTask(): Unit = {
committer.setupTask(taskAttemptContext)
}

def getJobId: String = jobId.toString

def newTaskAttemptTempPath(): String = {
assert(internalCommitter != null)
val stagingDir: Path = internalCommitter match {
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(description.path))
case _ =>
new Path(description.path)
}
stagingDir.toString
}

def commitTask(): Unit = {
val (_, taskCommitTime) = Utils.timeTakenMs {
committer.commitTask(taskAttemptContext)
}

// Just for update task commit time
description.statsTrackers.foreach {
stats => stats.newTaskInstance().getFinalStats(taskCommitTime)
}
}

def abortTask(): Unit = {
committer.abortTask(taskAttemptContext)
}

// Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with multi-version
private def createJobID(jobTrackerID: String, id: Int): JobID = {
if (id < 0) {
throw new IllegalArgumentException("Job number is negative")
}
new JobID(jobTrackerID, id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,30 @@
*/
package org.apache.spark.sql.execution

import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.execution.WriteFilesExecTransformer
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators

import org.apache.spark.TaskContext
import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.FileAlreadyExistsException

import java.util.Date

import scala.collection.mutable

Expand Down Expand Up @@ -73,6 +81,131 @@ case class VeloxWriteFilesMetrics(

// Velox write files metrics end

/**
* This RDD is used to make sure we have injected staging write path before initializing the native
* plan, and support Spark file commit protocol.
*/
class VeloxColumnarWriteFilesRDD(
var prev: RDD[ColumnarBatch],
writeFilesSpec: WriteFilesSpec,
jobTrackerID: String)
extends RDD[WriterCommitMessage](prev) {

private def collectNativeWriteFilesMetrics(cb: ColumnarBatch): WriteTaskResult = {
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val metrics = objectMapper
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
logDebug(s"Velox write files metrics: $metrics")

val fileWriteInfos = metrics.fileWriteInfos
assert(fileWriteInfos.length == 1)
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
val outputPath = writeFilesSpec.description.path

// part1=1/part2=1
val partitionFragment = metrics.name
// Write a non-partitioned table
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
val customOutputPath = writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}

// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numWrittenRows)
}

val partitionsInternalRows = updatedPartitions.map {
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = loadedCb.numRows() - 1,
numBytes = numBytes,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

WriteTaskResult(new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), summary)
}

override def compute(split: Partition, context: TaskContext): Iterator[WriterCommitMessage] = {
val commitProtocol = new SparkWriteFilesCommitProtocol(
jobTrackerID,
writeFilesSpec.description,
writeFilesSpec.committer)

commitProtocol.setupTask()
val writePath = commitProtocol.newTaskAttemptTempPath()
logDebug(s"Velox staging write path: $writePath")
var resultColumnarBatch: ColumnarBatch = null
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath)

// Initialize the native plan
val iter = firstParent[ColumnarBatch].iterator(split, context)
assert(iter.hasNext)
resultColumnarBatch = iter.next()
commitProtocol.commitTask()
})(
catchBlock = {
// If there is an error, abort the task
commitProtocol.abortTask()
logError(s"Job ${commitProtocol.getJobId} aborted.")
}
)
} catch {
case e: FetchFailedException =>
throw e
case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput =>
throw new TaskOutputFileAlreadyExistException(f)
case t: Throwable =>
throw new SparkException(
s"Task failed while writing rows to staging path: $writePath, " +
s"output path: ${writeFilesSpec.description.path}",
t)
}

assert(resultColumnarBatch != null)
val writeTaskResult = collectNativeWriteFilesMetrics(resultColumnarBatch)
Iterator.single(writeTaskResult)
}

override protected def getPartitions: Array[Partition] = firstParent[ColumnarBatch].partitions

override def clearDependencies(): Unit = {
super.clearDependencies()
prev = null
}
}

class VeloxColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
Expand All @@ -89,84 +222,8 @@ class VeloxColumnarWriteFilesExec(

override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
assert(child.supportsColumnar)

// We need to pass the WritePath to the Velox TableWriter in the doTransform
// method of the WriteTransformer. However, the WritePath is not accessible
// during the planning phase in the WriteTransformer, and can only be obtained
// during the actual execution, specifically in the doExecuteWrite method of
// ColumnarWriteFilesExec, where it is available within the WriteFilesSpec.
// Therefore, we use this hack method to pass the writePath.
WriteFilesExecTransformer.withWriteFilePath(writeFilesSpec.description.path) {
child.executeColumnar().mapPartitionsInternal {
iter =>
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val metrics = objectMapper
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
logDebug(s"Velox write files metrics: $metrics")

val fileWriteInfos = metrics.fileWriteInfos
assert(fileWriteInfos.length == 1)
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
val outputPath = writeFilesSpec.description.path

// part1=1/part2=1
val partitionFragment = metrics.name
// write a non-partitioned table
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
val customOutputPath = writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}

// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numWrittenRows)
}

val partitionsInternalRows = updatedPartitions.map {
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = loadedCb.numRows() - 1,
numBytes = numBytes,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
}
}
val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
new VeloxColumnarWriteFilesRDD(child.executeColumnar(), writeFilesSpec, jobTrackerID)
}

override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec =
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual std::string planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) = 0;

virtual void injectWriteFilesTempPath(const std::string& path) = 0;

// Just for benchmark
::substrait::Plan& getPlan() {
return substraitPlan_;
Expand Down Expand Up @@ -136,6 +138,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
protected:
std::unique_ptr<ObjectStore> objStore_ = ObjectStore::create();
::substrait::Plan substraitPlan_;
std::optional<std::string> writeFilesTempPath_;
SparkTaskInfo taskInfo_;
// Session conf map
const std::unordered_map<std::string, std::string> confMap_;
Expand Down
Loading

0 comments on commit c47112f

Please sign in to comment.