Skip to content

Commit

Permalink
[VL] Followup for native write files (#4246)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Jan 3, 2024
1 parent 72cecaa commit 8797350
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 342 deletions.
4 changes: 0 additions & 4 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
4 changes: 0 additions & 4 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.jimfs</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,73 @@
package org.apache.spark.sql.execution

import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.execution.WriteFilesExecTransformer
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators

import org.apache.spark.TaskContext
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult}
import org.apache.spark.sql.vectorized.ColumnarBatch

import shaded.parquet.com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import scala.collection.mutable

// Velox write files metrics start
//
// Follows the code in velox `HiveDataSink::close()`
// The json can be as following:
// {
// "inMemoryDataSizeInBytes":0,
// "containsNumberedFileNames":true,
// "onDiskDataSizeInBytes":307,
// "fileWriteInfos":[
// {
// "fileSize":307,
// "writeFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
// "targetFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
// }
// ],
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "rowCount":1,
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long)

case class VeloxWriteFilesMetrics(
name: String,
updateMode: String,
writePath: String,
targetPath: String,
fileWriteInfos: Seq[VeloxWriteFilesInfo],
rowCount: Long,
inMemoryDataSizeInBytes: Long,
onDiskDataSizeInBytes: Long,
containsNumberedFileNames: Boolean)

// Velox write files metrics end

class VeloxColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec)
extends WriteFilesExec(
child,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions) {
extends WriteFilesExec(child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
with GlutenPlan {

override lazy val references: AttributeSet = AttributeSet.empty

override def supportsColumnar(): Boolean = true

Expand All @@ -58,81 +96,77 @@ class VeloxColumnarWriteFilesExec(
// during the actual execution, specifically in the doExecuteWrite method of
// ColumnarWriteFilesExec, where it is available within the WriteFilesSpec.
// Therefore, we use this hack method to pass the writePath.
child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path)
child.executeColumnar().mapPartitionsInternal {
iter =>
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
// The json can be as following:
// {
// "inMemoryDataSizeInBytes":0,
// "containsNumberedFileNames":true,
// "onDiskDataSizeInBytes":307,
// "fileWriteInfos":[
// {
// "fileSize":307,
// "writeFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
// "targetFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
// }
// ],
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "rowCount":1,
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
WriteFilesExecTransformer.withWriteFilePath(writeFilesSpec.description.path) {
child.executeColumnar().mapPartitionsInternal {
iter =>
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
val objectMapper = new ObjectMapper()
val jsonObject = objectMapper.readTree(fragments.toString)

val fileWriteInfos = jsonObject.get("fileWriteInfos").elements()
if (jsonObject.get("fileWriteInfos").elements().hasNext) {
val writeInfo = fileWriteInfos.next();
numBytes += writeInfo.get("fileSize").longValue()
// Get partition information.
if (jsonObject.get("name").textValue().nonEmpty) {
val targetFileName = writeInfo.get("targetFileName").textValue()
val partitionDir = jsonObject.get("name").textValue()
updatedPartitions += partitionDir
val tmpOutputPath =
writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName
val absOutputPathObject =
writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionDir))
if (absOutputPathObject.nonEmpty) {
val absOutputPath = absOutputPathObject.get + "/" + targetFileName
addedAbsPathFiles(tmpOutputPath) = absOutputPath
objectMapper.registerModule(DefaultScalaModule)
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val metrics = objectMapper
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
logDebug(s"Velox write files metrics: $metrics")

val fileWriteInfos = metrics.fileWriteInfos
assert(fileWriteInfos.length == 1)
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
val outputPath = writeFilesSpec.description.path

// part1=1/part2=1
val partitionFragment = metrics.name
// write a non-partitioned table
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
val customOutputPath = writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}
}

// TODO: need to get the partition Internal row?
val stats = BasicWriteTaskStats(Seq.empty, loadedCb.numRows() - 1, numBytes, numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))
// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numWrittenRows)
}

val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
}
}
val partitionsInternalRows = updatedPartitions.map {
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = loadedCb.numRows() - 1,
numBytes = numBytes,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().")
val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
}
}
}

override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec =
Expand Down
4 changes: 0 additions & 4 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ case class WriteFilesExecTransformer(
}

override def doTransform(context: SubstraitContext): TransformContext = {
val writePath = child.session.sparkContext.getLocalProperty("writePath")
assert(writePath.size > 0)
val writePath = WriteFilesExecTransformer.getWriteFilePath
val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)

val operatorId = context.nextOperatorId(this.nodeName)
Expand All @@ -175,3 +174,23 @@ case class WriteFilesExecTransformer(
override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer =
copy(child = newChild)
}

object WriteFilesExecTransformer {
private val writeFilePathThreadLocal = new ThreadLocal[String]

def withWriteFilePath[T](path: String)(f: => T): T = {
val origin = writeFilePathThreadLocal.get()
writeFilePathThreadLocal.set(path)
try {
f
} finally {
writeFilePathThreadLocal.set(origin)
}
}

def getWriteFilePath: String = {
val writeFilePath = writeFilePathThreadLocal.get()
assert(writeFilePath != null)
writeFilePath
}
}
3 changes: 0 additions & 3 deletions gluten-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
4 changes: 0 additions & 4 deletions gluten-ut/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
Expand Down
Loading

0 comments on commit 8797350

Please sign in to comment.