Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6067][VL] [Part 3-1] Refactor: Rename VeloxColumnarWriteFilesExec to ColumnarWriteFilesExec #6403

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ private Map<String, String> getNativeBackendConf() {
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
}

public static void injectWriteFilesTempPath(String path, String fileName) {
throw new UnsupportedOperationException(
"injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator");
}

// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public BatchIterator createKernelWithBatchIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

Expand Down Expand Up @@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult =
ValidationResult.failed("CH backend is unsupported.")

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
None,
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
}

override def injectWriteFilesTempPath(path: String, fileName: String): Unit = {
CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName)
}
}

class CollectMetricIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriter
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
Expand All @@ -49,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -145,10 +143,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
}

child match {
case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) =>
case scan: FileSourceScanExec if checkMergeTreeFileFormat(scan.relation) =>
// For the validation phase of the AddFallbackTagRule
CHFilterExecTransformer(condition, child)
case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) =>
case scan: FileSourceScanExecTransformerBase if checkMergeTreeFileFormat(scan.relation) =>
// For the transform phase, the FileSourceScanExec is already transformed
CHFilterExecTransformer(condition, child)
case _ =>
Expand Down Expand Up @@ -395,7 +393,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
left: ExpressionTransformer,
right: ExpressionTransformer,
original: GetMapValue): ExpressionTransformer =
GetMapValueTransformer(substraitExprName, left, right, false, original)
GetMapValueTransformer(substraitExprName, left, right, failOnError = false, original)

/**
* Generate ShuffleDependency for ColumnarShuffleExchangeExec.
Expand Down Expand Up @@ -669,15 +667,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHRegExpReplaceTransformer(substraitExprName, children, expr)
}

override def createColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan = {
throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support in ch backend.")
}
def createBackendWrite(description: WriteJobDescription): BackendWrite =
throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.")

override def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,6 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def staticPartitionWriteOnly(): Boolean = true

override def supportTransformWriteFiles: Boolean = true

override def allowDecimalArithmetic: Boolean = true

override def enableNativeWriteFiles(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
(paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns)
}

override def injectWriteFilesTempPath(path: String): Unit = {
override def injectWriteFilesTempPath(path: String, fileName: String): Unit = {
val transKernel = NativePlanEvaluator.create()
transKernel.injectWriteFilesTempPath(path)
}
Expand All @@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
inputPartition: BaseGlutenPartition,
context: TaskContext,
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import org.apache.spark.shuffle.utils.ShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
Expand All @@ -50,7 +48,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.WriteJobDescription
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -550,20 +548,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ShuffleUtil.genColumnarShuffleWriter(parameters)
}

override def createColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan = {
VeloxColumnarWriteFilesExec(
child,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)
override def createBackendWrite(description: WriteJobDescription): BackendWrite = {
VeloxBackendWrite(description)
}

override def createColumnarArrowEvalPythonExec(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import scala.collection.mutable

// Velox write files metrics start
//
// Follows the code in velox `HiveDataSink::close()`
// The json can be as following:
// {
// "inMemoryDataSizeInBytes":0,
// "containsNumberedFileNames":true,
// "onDiskDataSizeInBytes":307,
// "fileWriteInfos":[
// {
// "fileSize":307,
// "writeFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
// "targetFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
// }
// ],
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "rowCount":1,
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long)

case class VeloxWriteFilesMetrics(
name: String,
updateMode: String,
writePath: String,
targetPath: String,
fileWriteInfos: Seq[VeloxWriteFilesInfo],
rowCount: Long,
inMemoryDataSizeInBytes: Long,
onDiskDataSizeInBytes: Long,
containsNumberedFileNames: Boolean)

// Velox write files metrics end

case class VeloxBackendWrite(description: WriteJobDescription) extends BackendWrite with Logging {

override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = {
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val metrics = objectMapper
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
logDebug(s"Velox write files metrics: $metrics")

val fileWriteInfos = metrics.fileWriteInfos
assert(fileWriteInfos.length == 1)
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
val outputPath = description.path

// part1=1/part2=1
val partitionFragment = metrics.name
// Write a partitioned table
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
val customOutputPath = description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}

val numFiles = loadedCb.numRows() - 1
val partitionsInternalRows = updatedPartitions.map {
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = numFiles,
numBytes = numBytes,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

// Write an empty iterator
if (numFiles == 0) {
None
} else {
Some(
WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener

class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
private var _spark: SparkSession = null
private var _spark: SparkSession = _

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
if (!nativeUsed) {
nativeUsed = if (isSparkVersionGE("3.4")) {
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
} else {
qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ trait BackendSettingsApi {

def staticPartitionWriteOnly(): Boolean = false

def supportTransformWriteFiles: Boolean = false

def requiredInputFilePaths(): Boolean = false

// TODO: Move this to test settings as used in UT only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ trait IteratorApi {
/**
* Inject the task attempt temporary path for native write files, this method should be called
* before `genFirstStageIterator` or `genFinalStageIterator`
* @param path
* is the temporary directory for native write pipeline
* @param fileName
* is the file name for native write pipeline, backend could generate it by itself.
*/
def injectWriteFilesTempPath(path: String): Unit = throw new UnsupportedOperationException()
def injectWriteFilesTempPath(path: String, fileName: String): Unit =
throw new UnsupportedOperationException()

/**
* Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other
Expand All @@ -58,7 +63,7 @@ trait IteratorApi {
inputPartition: BaseGlutenPartition,
context: TaskContext,
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
Expand Down Expand Up @@ -388,7 +388,18 @@ trait SparkPlanExecApi {
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan
staticPartitions: TablePartitionSpec): SparkPlan = {
ColumnarWriteFilesExec(
child,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)
}

/** Create BackendWrite */
def createBackendWrite(description: WriteJobDescription): BackendWrite

/** Create ColumnarArrowEvalPythonExec, for velox backend */
def createColumnarArrowEvalPythonExec(
Expand Down
Loading
Loading