Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Followup for native write files #4246

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
4 changes: 0 additions & 4 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.jimfs</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,73 @@
package org.apache.spark.sql.execution

import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.execution.WriteFilesExecTransformer
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators

import org.apache.spark.TaskContext
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult}
import org.apache.spark.sql.vectorized.ColumnarBatch

import shaded.parquet.com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import scala.collection.mutable

// Velox write files metrics start
//
// Follows the code in velox `HiveDataSink::close()`
// The json can be as following:
// {
// "inMemoryDataSizeInBytes":0,
// "containsNumberedFileNames":true,
// "onDiskDataSizeInBytes":307,
// "fileWriteInfos":[
// {
// "fileSize":307,
// "writeFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
// "targetFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
// }
// ],
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "rowCount":1,
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long)

case class VeloxWriteFilesMetrics(
name: String,
updateMode: String,
writePath: String,
targetPath: String,
fileWriteInfos: Seq[VeloxWriteFilesInfo],
rowCount: Long,
inMemoryDataSizeInBytes: Long,
onDiskDataSizeInBytes: Long,
containsNumberedFileNames: Boolean)

// Velox write files metrics end

class VeloxColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec)
extends WriteFilesExec(
child,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions) {
extends WriteFilesExec(child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
with GlutenPlan {

override lazy val references: AttributeSet = AttributeSet.empty

override def supportsColumnar(): Boolean = true

Expand All @@ -58,81 +96,77 @@ class VeloxColumnarWriteFilesExec(
// during the actual execution, specifically in the doExecuteWrite method of
// ColumnarWriteFilesExec, where it is available within the WriteFilesSpec.
// Therefore, we use this hack method to pass the writePath.
child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path)
child.executeColumnar().mapPartitionsInternal {
iter =>
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
// The json can be as following:
// {
// "inMemoryDataSizeInBytes":0,
// "containsNumberedFileNames":true,
// "onDiskDataSizeInBytes":307,
// "fileWriteInfos":[
// {
// "fileSize":307,
// "writeFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
// "targetFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
// }
// ],
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "rowCount":1,
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
WriteFilesExecTransformer.withWriteFilePath(writeFilesSpec.description.path) {
child.executeColumnar().mapPartitionsInternal {
iter =>
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
val objectMapper = new ObjectMapper()
val jsonObject = objectMapper.readTree(fragments.toString)

val fileWriteInfos = jsonObject.get("fileWriteInfos").elements()
if (jsonObject.get("fileWriteInfos").elements().hasNext) {
val writeInfo = fileWriteInfos.next();
numBytes += writeInfo.get("fileSize").longValue()
// Get partition information.
if (jsonObject.get("name").textValue().nonEmpty) {
val targetFileName = writeInfo.get("targetFileName").textValue()
val partitionDir = jsonObject.get("name").textValue()
updatedPartitions += partitionDir
val tmpOutputPath =
writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName
val absOutputPathObject =
writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionDir))
if (absOutputPathObject.nonEmpty) {
val absOutputPath = absOutputPathObject.get + "/" + targetFileName
addedAbsPathFiles(tmpOutputPath) = absOutputPath
objectMapper.registerModule(DefaultScalaModule)
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val metrics = objectMapper
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
logDebug(s"Velox write files metrics: $metrics")

val fileWriteInfos = metrics.fileWriteInfos
assert(fileWriteInfos.length == 1)
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
val outputPath = writeFilesSpec.description.path

// part1=1/part2=1
val partitionFragment = metrics.name
// write a non-partitioned table
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
val customOutputPath = writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}
}

// TODO: need to get the partition Internal row?
val stats = BasicWriteTaskStats(Seq.empty, loadedCb.numRows() - 1, numBytes, numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))
// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
JkSelf marked this conversation as resolved.
Show resolved Hide resolved
outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numWrittenRows)
}

val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
}
}
val partitionsInternalRows = updatedPartitions.map {
JkSelf marked this conversation as resolved.
Show resolved Hide resolved
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = loadedCb.numRows() - 1,
numBytes = numBytes,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().")
val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
}
}
}

override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec =
Expand Down
4 changes: 0 additions & 4 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ case class WriteFilesExecTransformer(
}

override def doTransform(context: SubstraitContext): TransformContext = {
val writePath = child.session.sparkContext.getLocalProperty("writePath")
assert(writePath.size > 0)
val writePath = WriteFilesExecTransformer.getWriteFilePath
val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)

val operatorId = context.nextOperatorId(this.nodeName)
Expand All @@ -175,3 +174,23 @@ case class WriteFilesExecTransformer(
override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer =
copy(child = newChild)
}

object WriteFilesExecTransformer {
private val writeFilePathThreadLocal = new ThreadLocal[String]

def withWriteFilePath[T](path: String)(f: => T): T = {
val origin = writeFilePathThreadLocal.get()
writeFilePathThreadLocal.set(path)
try {
f
} finally {
writeFilePathThreadLocal.set(origin)
}
}

def getWriteFilePath: String = {
val writeFilePath = writeFilePathThreadLocal.get()
assert(writeFilePath != null)
writeFilePath
}
}
3 changes: 0 additions & 3 deletions gluten-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
4 changes: 0 additions & 4 deletions gluten-ut/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,18 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
Expand Down
Loading
Loading