From b204d2d3371d36816038c12a219e76b42c6875d2 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 18 Jul 2024 14:18:42 +0800 Subject: [PATCH 1/3] Rename VeloxColumnarWriteFilesExec to GlutenColumnarWriteFilesExec, and move it to gluten-core 1. Return GlutenColumnarWriteFilesExec at SparkPlanExecApi 2. Move SparkWriteFilesCommitProtocol to gluten-core 3. SparkWriteFilesCommitProtocol support getFilename from internal commiter 4. Remove supportTransformWriteFiles from BackendSettingsApi 5. injectWriteFilesTempPath with fileName --- .../CHNativeExpressionEvaluator.java | 5 + .../backendsapi/clickhouse/CHBackend.scala | 9 -- .../clickhouse/CHIteratorApi.scala | 4 + .../clickhouse/CHSparkPlanExecApi.scala | 21 +-- .../backendsapi/velox/VeloxBackend.scala | 2 - .../backendsapi/velox/VeloxIteratorApi.scala | 4 +- .../velox/VeloxSparkPlanExecApi.scala | 20 +-- .../sql/execution/VeloxBackendWrite.scala | 138 +++++++++++++++++ .../VeloxParquetWriteForHiveSuite.scala | 4 +- .../backendsapi/BackendSettingsApi.scala | 2 - .../gluten/backendsapi/IteratorApi.scala | 9 +- .../gluten/backendsapi/SparkPlanExecApi.scala | 17 ++- .../columnar/validator/Validators.scala | 3 +- .../GlutenColumnarWriteFilesExec.scala | 143 +++--------------- .../SparkWriteFilesCommitProtocol.scala | 23 ++- .../GlutenV1WriteCommandSuite.scala | 10 +- .../spark/sql/sources/GlutenInsertSuite.scala | 20 +-- .../GlutenColumnarWriteTestSupport.scala | 6 +- 18 files changed, 241 insertions(+), 199 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala rename backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala => gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala (67%) rename {backends-velox => gluten-core}/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala (82%) diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java index b8b4138dc8c0..45a9f01f7337 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java @@ -81,6 +81,11 @@ private Map getNativeBackendConf() { BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs()); } + public static void injectWriteFilesTempPath(String path, String fileName) { + throw new UnsupportedOperationException( + "injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator"); + } + // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public BatchIterator createKernelWithBatchIterator( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 341a3e0f0a52..07129e69a987 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -25,13 +25,11 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { .getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT) } - override def supportWriteFilesExec( - format: FileFormat, - fields: Array[StructField], - bucketSpec: Option[BucketSpec], - options: Map[String, String]): ValidationResult = - ValidationResult.failed("CH backend is unsupported.") - override def enableNativeWriteFiles(): Boolean = { GlutenConfig.getConf.enableNativeWriter.getOrElse(false) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 4b9ec739028f..6c86583f4c7c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -290,6 +290,10 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { None, createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators)) } + + override def injectWriteFilesTempPath(path: String, fileName: String): Unit = { + CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName) + } } class CollectMetricIterator( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 547458d8c254..920c61cd4e80 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -37,8 +37,6 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriter import org.apache.spark.shuffle.utils.CHShuffleUtil import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet} import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -49,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteJobDescription} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -145,10 +143,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { } child match { - case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) => + case scan: FileSourceScanExec if checkMergeTreeFileFormat(scan.relation) => // For the validation phase of the AddFallbackTagRule CHFilterExecTransformer(condition, child) - case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) => + case scan: FileSourceScanExecTransformerBase if checkMergeTreeFileFormat(scan.relation) => // For the transform phase, the FileSourceScanExec is already transformed CHFilterExecTransformer(condition, child) case _ => @@ -395,7 +393,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { left: ExpressionTransformer, right: ExpressionTransformer, original: GetMapValue): ExpressionTransformer = - GetMapValueTransformer(substraitExprName, left, right, false, original) + GetMapValueTransformer(substraitExprName, left, right, failOnError = false, original) /** * Generate ShuffleDependency for ColumnarShuffleExchangeExec. @@ -669,15 +667,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { CHRegExpReplaceTransformer(substraitExprName, children, expr) } - override def createColumnarWriteFilesExec( - child: SparkPlan, - fileFormat: FileFormat, - partitionColumns: Seq[Attribute], - bucketSpec: Option[BucketSpec], - options: Map[String, String], - staticPartitions: TablePartitionSpec): SparkPlan = { - throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support in ch backend.") - } + def createBackendWrite(description: WriteJobDescription): BackendWrite = + throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.") override def createColumnarArrowEvalPythonExec( udfs: Seq[PythonUDF], diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index b0692816af8c..9339473015f2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -495,8 +495,6 @@ object VeloxBackendSettings extends BackendSettingsApi { override def staticPartitionWriteOnly(): Boolean = true - override def supportTransformWriteFiles: Boolean = true - override def allowDecimalArithmetic: Boolean = true override def enableNativeWriteFiles(): Boolean = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 613e539456ec..1c7f913481ec 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns) } - override def injectWriteFilesTempPath(path: String): Unit = { + override def injectWriteFilesTempPath(path: String, fileName: String): Unit = { val transKernel = NativePlanEvaluator.create() transKernel.injectWriteFilesTempPath(path) } @@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { inputPartition: BaseGlutenPartition, context: TaskContext, pipelineTime: SQLMetric, - updateInputMetrics: (InputMetricsWrapper) => Unit, + updateInputMetrics: InputMetricsWrapper => Unit, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 37b46df3e23d..098d65a96e0e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -39,8 +39,6 @@ import org.apache.spark.shuffle.utils.ShuffleUtil import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -50,7 +48,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.WriteJobDescription import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.SQLMetric @@ -550,20 +548,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { ShuffleUtil.genColumnarShuffleWriter(parameters) } - override def createColumnarWriteFilesExec( - child: SparkPlan, - fileFormat: FileFormat, - partitionColumns: Seq[Attribute], - bucketSpec: Option[BucketSpec], - options: Map[String, String], - staticPartitions: TablePartitionSpec): SparkPlan = { - VeloxColumnarWriteFilesExec( - child, - fileFormat, - partitionColumns, - bucketSpec, - options, - staticPartitions) + override def createBackendWrite(description: WriteJobDescription): BackendWrite = { + VeloxBackendWrite(description) } override def createColumnarArrowEvalPythonExec( diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala new file mode 100644 index 000000000000..5d47aff04d59 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import scala.collection.mutable + +// Velox write files metrics start +// +// Follows the code in velox `HiveDataSink::close()` +// The json can be as following: +// { +// "inMemoryDataSizeInBytes":0, +// "containsNumberedFileNames":true, +// "onDiskDataSizeInBytes":307, +// "fileWriteInfos":[ +// { +// "fileSize":307, +// "writeFileName": +// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", +// "targetFileName": +// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" +// } +// ], +// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", +// "rowCount":1, +// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", +// "updateMode":"NEW", +// "name":"part1=1/part2=1" +// } +case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long) + +case class VeloxWriteFilesMetrics( + name: String, + updateMode: String, + writePath: String, + targetPath: String, + fileWriteInfos: Seq[VeloxWriteFilesInfo], + rowCount: Long, + inMemoryDataSizeInBytes: Long, + onDiskDataSizeInBytes: Long, + containsNumberedFileNames: Boolean) + +// Velox write files metrics end + +case class VeloxBackendWrite(description: WriteJobDescription) extends BackendWrite with Logging { + + override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = { + // Currently, the cb contains three columns: row, fragments, and context. + // The first row in the row column contains the number of written numRows. + // The fragments column contains detailed information about the file writes. + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + assert(loadedCb.numCols() == 3) + val numWrittenRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + val objectMapper = new ObjectMapper() + objectMapper.registerModule(DefaultScalaModule) + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val metrics = objectMapper + .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) + logDebug(s"Velox write files metrics: $metrics") + + val fileWriteInfos = metrics.fileWriteInfos + assert(fileWriteInfos.length == 1) + val fileWriteInfo = fileWriteInfos.head + numBytes += fileWriteInfo.fileSize + val targetFileName = fileWriteInfo.targetFileName + val outputPath = description.path + + // part1=1/part2=1 + val partitionFragment = metrics.name + // Write a partitioned table + if (partitionFragment != "") { + updatedPartitions += partitionFragment + val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName + val customOutputPath = description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionFragment)) + if (customOutputPath.isDefined) { + addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName + } + } + } + + val numFiles = loadedCb.numRows() - 1 + val partitionsInternalRows = updatedPartitions.map { + part => + val parts = new Array[Any](1) + parts(0) = part + new GenericInternalRow(parts) + }.toSeq + val stats = BasicWriteTaskStats( + partitions = partitionsInternalRows, + numFiles = numFiles, + numBytes = numBytes, + numRows = numWrittenRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + // Write an empty iterator + if (numFiles == 0) { + None + } else { + Some( + WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary)) + } + } +} diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 731f5ef4845c..576714c27a24 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.util.QueryExecutionListener class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { - private var _spark: SparkSession = null + private var _spark: SparkSession = _ override protected def beforeAll(): Unit = { super.beforeAll() @@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { if (!nativeUsed) { nativeUsed = if (isSparkVersionGE("3.4")) { - qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined + qe.executedPlan.find(_.isInstanceOf[GlutenColumnarWriteFilesExec]).isDefined } else { qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 8b4c18b01970..358043cc5f6b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -129,8 +129,6 @@ trait BackendSettingsApi { def staticPartitionWriteOnly(): Boolean = false - def supportTransformWriteFiles: Boolean = false - def requiredInputFilePaths(): Boolean = false // TODO: Move this to test settings as used in UT only. diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 53dc8f47861f..495b91c50757 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -47,8 +47,13 @@ trait IteratorApi { /** * Inject the task attempt temporary path for native write files, this method should be called * before `genFirstStageIterator` or `genFinalStageIterator` + * @param path + * is the temporary directory for native write pipeline + * @param fileName + * is the file name for native write pipeline, backend could generate it by itself. */ - def injectWriteFilesTempPath(path: String): Unit = throw new UnsupportedOperationException() + def injectWriteFilesTempPath(path: String, fileName: String): Unit = + throw new UnsupportedOperationException() /** * Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other @@ -58,7 +63,7 @@ trait IteratorApi { inputPartition: BaseGlutenPartition, context: TaskContext, pipelineTime: SQLMetric, - updateInputMetrics: (InputMetricsWrapper) => Unit, + updateInputMetrics: InputMetricsWrapper => Unit, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq() diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 58a08192dec9..0bb23b7ce655 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.{BackendWrite, FileSourceScanExec, GenerateExec, GlutenColumnarWriteFilesExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation @@ -388,7 +388,18 @@ trait SparkPlanExecApi { partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec): SparkPlan + staticPartitions: TablePartitionSpec): SparkPlan = { + GlutenColumnarWriteFilesExec( + child, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) + } + + /** Create BackendWrite */ + def createBackendWrite(description: WriteJobDescription): BackendWrite /** Create ColumnarArrowEvalPythonExec, for velox backend */ def createColumnarArrowEvalPythonExec( diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 903723ccb56b..a85cb163ceaa 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -141,8 +141,7 @@ object Validators { override def validate(plan: SparkPlan): Validator.OutCome = plan match { case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => fail(p) case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => fail(p) - case p: WriteFilesExec - if !(settings.enableNativeWriteFiles() && settings.supportTransformWriteFiles) => + case p: WriteFilesExec if !settings.enableNativeWriteFiles() => fail(p) case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg => fail(p) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala similarity index 67% rename from backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala rename to gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala index c87b8d4f688d..0291450d7f61 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala @@ -17,150 +17,47 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.FileAlreadyExistsException import java.util.Date -import scala.collection.mutable - -// Velox write files metrics start -// -// Follows the code in velox `HiveDataSink::close()` -// The json can be as following: -// { -// "inMemoryDataSizeInBytes":0, -// "containsNumberedFileNames":true, -// "onDiskDataSizeInBytes":307, -// "fileWriteInfos":[ -// { -// "fileSize":307, -// "writeFileName": -// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", -// "targetFileName": -// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" -// } -// ], -// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", -// "rowCount":1, -// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", -// "updateMode":"NEW", -// "name":"part1=1/part2=1" -// } -case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long) - -case class VeloxWriteFilesMetrics( - name: String, - updateMode: String, - writePath: String, - targetPath: String, - fileWriteInfos: Seq[VeloxWriteFilesInfo], - rowCount: Long, - inMemoryDataSizeInBytes: Long, - onDiskDataSizeInBytes: Long, - containsNumberedFileNames: Boolean) - -// Velox write files metrics end +/** + * This trait is used in [[GlutenColumnarWriteFilesRDD]] to inject the staging write path before + * initializing the native plan and collect native write files metrics for each backend. + */ +trait BackendWrite { + def collectNativeWriteFilesMetrics(batch: ColumnarBatch): Option[WriteTaskResult] +} /** * This RDD is used to make sure we have injected staging write path before initializing the native * plan, and support Spark file commit protocol. */ -class VeloxColumnarWriteFilesRDD( +class GlutenColumnarWriteFilesRDD( var prev: RDD[ColumnarBatch], description: WriteJobDescription, committer: FileCommitProtocol, jobTrackerID: String) extends RDD[WriterCommitMessage](prev) { - private def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = { - // Currently, the cb contains three columns: row, fragments, and context. - // The first row in the row column contains the number of written numRows. - // The fragments column contains detailed information about the file writes. - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - assert(loadedCb.numCols() == 3) - val numWrittenRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - val objectMapper = new ObjectMapper() - objectMapper.registerModule(DefaultScalaModule) - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) - val metrics = objectMapper - .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) - logDebug(s"Velox write files metrics: $metrics") - - val fileWriteInfos = metrics.fileWriteInfos - assert(fileWriteInfos.length == 1) - val fileWriteInfo = fileWriteInfos.head - numBytes += fileWriteInfo.fileSize - val targetFileName = fileWriteInfo.targetFileName - val outputPath = description.path - - // part1=1/part2=1 - val partitionFragment = metrics.name - // Write a partitioned table - if (partitionFragment != "") { - updatedPartitions += partitionFragment - val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName - val customOutputPath = description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionFragment)) - if (customOutputPath.isDefined) { - addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName - } - } - } - - val numFiles = loadedCb.numRows() - 1 - val partitionsInternalRows = updatedPartitions.map { - part => - val parts = new Array[Any](1) - parts(0) = part - new GenericInternalRow(parts) - }.toSeq - val stats = BasicWriteTaskStats( - partitions = partitionsInternalRows, - numFiles = numFiles, - numBytes = numBytes, - numRows = numWrittenRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - - // Write an empty iterator - if (numFiles == 0) { - None - } else { - Some( - WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary)) - } - } - private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = { val stats = writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats] val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows) @@ -194,21 +91,25 @@ class VeloxColumnarWriteFilesRDD( override def compute(split: Partition, context: TaskContext): Iterator[WriterCommitMessage] = { val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, description, committer) + val backendWrite = + BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description) commitProtocol.setupTask() val writePath = commitProtocol.newTaskAttemptTempPath() - logDebug(s"Velox staging write path: $writePath") + val writeFileName = commitProtocol.getFilename + logDebug(s"Native staging write path: $writePath and file name: $writeFileName") + var writeTaskResult: WriteTaskResult = null try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath) + BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, writeFileName) // Initialize the native plan val iter = firstParent[ColumnarBatch].iterator(split, context) assert(iter.hasNext) val resultColumnarBatch = iter.next() assert(resultColumnarBatch != null) - val nativeWriteTaskResult = collectNativeWriteFilesMetrics(resultColumnarBatch) + val nativeWriteTaskResult = backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch) if (nativeWriteTaskResult.isEmpty) { // If we are writing an empty iterator, then velox would do nothing. // Here we fallback to use vanilla Spark write files to generate an empty file for @@ -255,7 +156,7 @@ class VeloxColumnarWriteFilesRDD( // we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark // choose the new write code path (version >= 3.4). The actual plan to write is the left child // of this operator. -case class VeloxColumnarWriteFilesExec private ( +case class GlutenColumnarWriteFilesExec private ( override val left: SparkPlan, override val right: SparkPlan, fileFormat: FileFormat, @@ -265,7 +166,7 @@ case class VeloxColumnarWriteFilesExec private ( staticPartitions: TablePartitionSpec) extends BinaryExecNode with GlutenPlan - with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible { + with GlutenColumnarWriteFilesExec.ExecuteWriteCompatible { val child: SparkPlan = left @@ -316,7 +217,7 @@ case class VeloxColumnarWriteFilesExec private ( // partition rdd to make sure we at least set up one write task to write the metadata. writeFilesForEmptyRDD(description, committer, jobTrackerID) } else { - new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID) + new GlutenColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID) } } override protected def withNewChildrenInternal( @@ -325,7 +226,7 @@ case class VeloxColumnarWriteFilesExec private ( copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) } -object VeloxColumnarWriteFilesExec { +object GlutenColumnarWriteFilesExec { def apply( child: SparkPlan, @@ -333,7 +234,7 @@ object VeloxColumnarWriteFilesExec { partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = { + staticPartitions: TablePartitionSpec): GlutenColumnarWriteFilesExec = { // This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for // a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala // case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute` @@ -352,7 +253,7 @@ object VeloxColumnarWriteFilesExec { options, staticPartitions) - VeloxColumnarWriteFilesExec( + GlutenColumnarWriteFilesExec( child, right, fileFormat, diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala similarity index 82% rename from backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala rename to gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala index 845f2f98fb8c..5e3ab83e32e7 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, HadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.WriteJobDescription import org.apache.spark.util.Utils @@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol( extends Logging { assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol]) - val sparkStageId = TaskContext.get().stageId() - val sparkPartitionId = TaskContext.get().partitionId() - val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue + val sparkStageId: Int = TaskContext.get().stageId() + val sparkPartitionId: Int = TaskContext.get().partitionId() + private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue private val jobId = createJobID(jobTrackerID, sparkStageId) private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -68,6 +68,21 @@ class SparkWriteFilesCommitProtocol( field.get(committer).asInstanceOf[OutputCommitter] } + private lazy val internalGetFilename = { + val m = classOf[HadoopMapReduceCommitProtocol] + .getDeclaredMethod("getFilename", classOf[TaskAttemptContext], classOf[FileNameSpec]) + m.setAccessible(true) + m + } + + def getFilename: String = { + val fileCounter = 0 + val suffix = f".c$fileCounter%03d" + + description.outputWriterFactory.getFileExtension(taskAttemptContext) + val fileNameSpec = FileNameSpec("", suffix) + internalGetFilename.invoke(committer, taskAttemptContext, fileNameSpec).asInstanceOf[String] + } + def setupTask(): Unit = { committer.setupTask(taskAttemptContext) } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala index 3d277b94cc3e..14605c403f4c 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala @@ -21,7 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer import org.apache.spark.sql.GlutenSQLTestsBaseTrait import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.{QueryExecution, SortExec, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{GlutenColumnarWriteFilesExec, QueryExecution, SortExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} @@ -122,8 +122,8 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) + executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } @@ -204,8 +204,8 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) + executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index ca4b3740a7bc..6db1f66845ac 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{CommandResultExec, GlutenColumnarWriteFilesExec, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -60,13 +60,13 @@ class GlutenInsertSuite super.afterAll() } - private def checkAndGetWriteFiles(df: DataFrame): VeloxColumnarWriteFilesExec = { + private def checkAndGetWriteFiles(df: DataFrame): GlutenColumnarWriteFilesExec = { val writeFiles = stripAQEPlan( df.queryExecution.executedPlan .asInstanceOf[CommandResultExec] .commandPhysicalPlan).children.head - assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec]) - writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec] + assert(writeFiles.isInstanceOf[GlutenColumnarWriteFilesExec]) + writeFiles.asInstanceOf[GlutenColumnarWriteFilesExec] } testGluten("insert partition table") { @@ -405,7 +405,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -425,7 +425,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -452,7 +452,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -474,7 +474,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -501,7 +501,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -571,7 +571,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } diff --git a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala index c7ad606bcf8d..28f315da49e7 100644 --- a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala +++ b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala @@ -16,12 +16,12 @@ */ package org.apache.gluten -import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{SparkPlan, GlutenColumnarWriteFilesExec} trait GlutenColumnarWriteTestSupport { def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = { - assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + assert(sparkPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) + sparkPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child } } From 197ddcddb43e9d5d4cd4b761076d5a630b6df38b Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 18 Jul 2024 10:28:24 +0800 Subject: [PATCH 2/3] support pass format to backend --- .../CartesianProductExecTransformer.scala | 10 ++-- .../apache/gluten/execution/JoinUtils.scala | 55 ++++++++----------- .../execution/WriteFilesExecTransformer.scala | 41 +++++++------- .../apache/gluten/utils/SubstraitUtil.scala | 29 ++++++++++ 4 files changed, 76 insertions(+), 59 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala index 0dd110fa542f..28bf1eeabd23 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala @@ -86,12 +86,10 @@ case class CartesianProductExecTransformer( val (inputRightRelNode, inputRightOutput) = (rightPlanContext.root, rightPlanContext.outputAttributes) - val expressionNode = condition.map { - expr => - ExpressionConverter - .replaceWithExpressionTransformer(expr, inputLeftOutput ++ inputRightOutput) - .doTransform(context.registeredFunction) - } + val expressionNode = + condition.map { + SubstraitUtil.toSubstraitExpression(_, inputLeftOutput ++ inputRightOutput, context) + } val extensionNode = JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput, validation = false) diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala index 9dd73800e29b..12d08518509a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala @@ -16,13 +16,12 @@ */ package org.apache.gluten.execution -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.expression.{AttributeReferenceTransformer, ConverterUtils, ExpressionConverter} -import org.apache.gluten.substrait.`type`.TypeBuilder +import org.apache.gluten.expression.{AttributeReferenceTransformer, ExpressionConverter} import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder} import org.apache.gluten.substrait.rel.{RelBuilder, RelNode} +import org.apache.gluten.utils.SubstraitUtil import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans._ @@ -34,21 +33,11 @@ import io.substrait.proto.{CrossRel, JoinRel} import scala.collection.JavaConverters._ object JoinUtils { - private def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = { - val inputTypeNodes = output.map { - attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable) - } - // Normally the enhancement node is only used for plan validation. But here the enhancement - // is also used in execution phase. In this case an empty typeUrlPrefix need to be passed, - // so that it can be correctly parsed into json string on the cpp side. - BackendsApiManager.getTransformerApiInstance.packPBMessage( - TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) - } def createExtensionNode(output: Seq[Attribute], validation: Boolean): AdvancedExtensionNode = { // Use field [enhancement] in a extension node for input type validation. if (validation) { - ExtensionBuilder.makeAdvancedExtension(createEnhancement(output)) + ExtensionBuilder.makeAdvancedExtension(SubstraitUtil.createEnhancement(output)) } else { null } @@ -58,7 +47,7 @@ object JoinUtils { !keyExprs.forall(_.isInstanceOf[AttributeReference]) } - def createPreProjectionIfNeeded( + private def createPreProjectionIfNeeded( keyExprs: Seq[Expression], inputNode: RelNode, inputNodeOutput: Seq[Attribute], @@ -131,17 +120,17 @@ object JoinUtils { } } - def createJoinExtensionNode( + private def createJoinExtensionNode( joinParameters: Any, output: Seq[Attribute]): AdvancedExtensionNode = { // Use field [optimization] in a extension node // to send some join parameters through Substrait plan. - val enhancement = createEnhancement(output) + val enhancement = SubstraitUtil.createEnhancement(output) ExtensionBuilder.makeAdvancedExtension(joinParameters, enhancement) } // Return the direct join output. - protected def getDirectJoinOutput( + private def getDirectJoinOutput( joinType: JoinType, leftOutput: Seq[Attribute], rightOutput: Seq[Attribute]): (Seq[Attribute], Seq[Attribute]) = { @@ -164,7 +153,7 @@ object JoinUtils { } } - protected def getDirectJoinOutputSeq( + private def getDirectJoinOutputSeq( joinType: JoinType, leftOutput: Seq[Attribute], rightOutput: Seq[Attribute]): Seq[Attribute] = { @@ -209,8 +198,8 @@ object JoinUtils { validation) // Combine join keys to make a single expression. - val joinExpressionNode = (streamedKeys - .zip(buildKeys)) + val joinExpressionNode = streamedKeys + .zip(buildKeys) .map { case ((leftKey, leftType), (rightKey, rightType)) => HashJoinLikeExecTransformer.makeEqualToExpression( @@ -225,12 +214,10 @@ object JoinUtils { HashJoinLikeExecTransformer.makeAndExpression(l, r, substraitContext.registeredFunction)) // Create post-join filter, which will be computed in hash join. - val postJoinFilter = condition.map { - expr => - ExpressionConverter - .replaceWithExpressionTransformer(expr, streamedOutput ++ buildOutput) - .doTransform(substraitContext.registeredFunction) - } + val postJoinFilter = + condition.map { + SubstraitUtil.toSubstraitExpression(_, streamedOutput ++ buildOutput, substraitContext) + } // Create JoinRel. val joinRel = RelBuilder.makeJoinRel( @@ -340,12 +327,14 @@ object JoinUtils { joinParameters: Any, validation: Boolean = false ): RelNode = { - val expressionNode = condition.map { - expr => - ExpressionConverter - .replaceWithExpressionTransformer(expr, inputStreamedOutput ++ inputBuildOutput) - .doTransform(substraitContext.registeredFunction) - } + val expressionNode = + condition.map { + SubstraitUtil.toSubstraitExpression( + _, + inputStreamedOutput ++ inputBuildOutput, + substraitContext) + } + val extensionNode = createJoinExtensionNode(joinParameters, inputStreamedOutput ++ inputBuildOutput) diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala index d78f21beaabf..d2ec994ba64b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala @@ -21,10 +21,11 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.extension.ValidationResult import org.apache.gluten.metrics.MetricsUpdater -import org.apache.gluten.substrait.`type`.{ColumnTypeNode, TypeBuilder} +import org.apache.gluten.substrait.`type`.ColumnTypeNode import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder import org.apache.gluten.substrait.rel.{RelBuilder, RelNode} +import org.apache.gluten.utils.SubstraitUtil import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -32,7 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.MetadataBuilder import com.google.protobuf.{Any, StringValue} @@ -40,7 +43,6 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import java.util.Locale -import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` /** @@ -56,7 +58,7 @@ case class WriteFilesExecTransformer( staticPartitions: TablePartitionSpec) extends UnaryTransformSupport { // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. - @transient override lazy val metrics = + @transient override lazy val metrics: Map[String, SQLMetric] = BackendsApiManager.getMetricsApiInstance.genWriteFilesTransformerMetrics(sparkContext) override def metricsUpdater(): MetricsUpdater = @@ -66,11 +68,18 @@ case class WriteFilesExecTransformer( private val caseInsensitiveOptions = CaseInsensitiveMap(options) - def genWriteParameters(): Any = { + private def genWriteParameters(): Any = { + val fileFormatStr = fileFormat match { + case register: DataSourceRegister => + register.shortName + case _ => "UnknownFileFormat" + } val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(caseInsensitiveOptions).capitalize val writeParametersStr = new StringBuffer("WriteParameters:") - writeParametersStr.append("is").append(compressionCodec).append("=1").append("\n") + writeParametersStr.append("is").append(compressionCodec).append("=1") + writeParametersStr.append(";format=").append(fileFormatStr).append("\n") + val message = StringValue .newBuilder() .setValue(writeParametersStr.toString) @@ -78,15 +87,6 @@ case class WriteFilesExecTransformer( BackendsApiManager.getTransformerApiInstance.packPBMessage(message) } - def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = { - val inputTypeNodes = output.map { - attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable) - } - - BackendsApiManager.getTransformerApiInstance.packPBMessage( - TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) - } - def getRelNode( context: SubstraitContext, originalInputAttributes: Seq[Attribute], @@ -118,10 +118,11 @@ case class WriteFilesExecTransformer( val extensionNode = if (!validation) { ExtensionBuilder.makeAdvancedExtension( genWriteParameters(), - createEnhancement(originalInputAttributes)) + SubstraitUtil.createEnhancement(originalInputAttributes)) } else { // Use a extension node to send the input types through Substrait plan for validation. - ExtensionBuilder.makeAdvancedExtension(createEnhancement(originalInputAttributes)) + ExtensionBuilder.makeAdvancedExtension( + SubstraitUtil.createEnhancement(originalInputAttributes)) } RelBuilder.makeWriteRel( input, @@ -133,7 +134,7 @@ case class WriteFilesExecTransformer( operatorId) } - private def getFinalChildOutput(): Seq[Attribute] = { + private def getFinalChildOutput: Seq[Attribute] = { val metadataExclusionList = conf .getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST) .split(",") @@ -143,7 +144,7 @@ case class WriteFilesExecTransformer( } override protected def doValidateInternal(): ValidationResult = { - val finalChildOutput = getFinalChildOutput() + val finalChildOutput = getFinalChildOutput val validationResult = BackendsApiManager.getSettings.supportWriteFilesExec( fileFormat, @@ -165,7 +166,7 @@ case class WriteFilesExecTransformer( val childCtx = child.asInstanceOf[TransformSupport].transform(context) val operatorId = context.nextOperatorId(this.nodeName) val currRel = - getRelNode(context, getFinalChildOutput(), operatorId, childCtx.root, validation = false) + getRelNode(context, getFinalChildOutput, operatorId, childCtx.root, validation = false) assert(currRel != null, "Write Rel should be valid") TransformContext(childCtx.outputAttributes, output, currRel) } @@ -196,7 +197,7 @@ object WriteFilesExecTransformer { "__file_source_generated_metadata_col" ) - def removeMetadata(attr: Attribute, metadataExclusionList: Seq[String]): Attribute = { + private def removeMetadata(attr: Attribute, metadataExclusionList: Seq[String]): Attribute = { val metadataKeys = INTERNAL_METADATA_KEYS ++ metadataExclusionList attr.withMetadata { var builder = new MetadataBuilder().withMetadata(attr.metadata) diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala index e8e7ce06feaf..c641cb44891d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala @@ -16,10 +16,19 @@ */ package org.apache.gluten.utils +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} +import org.apache.gluten.substrait.`type`.TypeBuilder +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.expression.ExpressionNode + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import io.substrait.proto.{CrossRel, JoinRel} +import scala.collection.JavaConverters._ + object SubstraitUtil { def toSubstrait(sparkJoin: JoinType): JoinRel.JoinType = sparkJoin match { case _: InnerLike => @@ -55,4 +64,24 @@ object SubstraitUtil { case _ => CrossRel.JoinType.UNRECOGNIZED } + + def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = { + val inputTypeNodes = output.map { + attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable) + } + // Normally the enhancement node is only used for plan validation. But here the enhancement + // is also used in execution phase. In this case an empty typeUrlPrefix need to be passed, + // so that it can be correctly parsed into json string on the cpp side. + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) + } + + def toSubstraitExpression( + expr: Expression, + attributeSeq: Seq[Attribute], + context: SubstraitContext): ExpressionNode = { + ExpressionConverter + .replaceWithExpressionTransformer(expr, attributeSeq) + .doTransform(context.registeredFunction) + } } From cd7cdd06e831dafd89253af319abc8087f2f8ba6 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Fri, 19 Jul 2024 11:59:15 +0800 Subject: [PATCH 3/3] Rename GlutenColumnarWriteFilesExec to ColumnarWriteFilesExec and GlutenColumnarWriteFilesRDD to ColumnarWriteFilesRDD --- .../VeloxParquetWriteForHiveSuite.scala | 2 +- .../gluten/backendsapi/SparkPlanExecApi.scala | 4 ++-- ...esExec.scala => ColumnarWriteFilesExec.scala} | 16 ++++++++-------- .../datasources/GlutenV1WriteCommandSuite.scala | 10 +++++----- .../spark/sql/sources/GlutenInsertSuite.scala | 8 ++++---- .../gluten/GlutenColumnarWriteTestSupport.scala | 6 +++--- 6 files changed, 23 insertions(+), 23 deletions(-) rename gluten-core/src/main/scala/org/apache/spark/sql/execution/{GlutenColumnarWriteFilesExec.scala => ColumnarWriteFilesExec.scala} (95%) diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 576714c27a24..412548de9c44 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { if (!nativeUsed) { nativeUsed = if (isSparkVersionGE("3.4")) { - qe.executedPlan.find(_.isInstanceOf[GlutenColumnarWriteFilesExec]).isDefined + qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined } else { qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 0bb23b7ce655..273443f647ab 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{BackendWrite, FileSourceScanExec, GenerateExec, GlutenColumnarWriteFilesExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -389,7 +389,7 @@ trait SparkPlanExecApi { bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec): SparkPlan = { - GlutenColumnarWriteFilesExec( + ColumnarWriteFilesExec( child, fileFormat, partitionColumns, diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala similarity index 95% rename from gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala rename to gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala index 0291450d7f61..6f04b84804c5 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException import java.util.Date /** - * This trait is used in [[GlutenColumnarWriteFilesRDD]] to inject the staging write path before + * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write path before * initializing the native plan and collect native write files metrics for each backend. */ trait BackendWrite { @@ -51,7 +51,7 @@ trait BackendWrite { * This RDD is used to make sure we have injected staging write path before initializing the native * plan, and support Spark file commit protocol. */ -class GlutenColumnarWriteFilesRDD( +class ColumnarWriteFilesRDD( var prev: RDD[ColumnarBatch], description: WriteJobDescription, committer: FileCommitProtocol, @@ -156,7 +156,7 @@ class GlutenColumnarWriteFilesRDD( // we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark // choose the new write code path (version >= 3.4). The actual plan to write is the left child // of this operator. -case class GlutenColumnarWriteFilesExec private ( +case class ColumnarWriteFilesExec private ( override val left: SparkPlan, override val right: SparkPlan, fileFormat: FileFormat, @@ -166,7 +166,7 @@ case class GlutenColumnarWriteFilesExec private ( staticPartitions: TablePartitionSpec) extends BinaryExecNode with GlutenPlan - with GlutenColumnarWriteFilesExec.ExecuteWriteCompatible { + with ColumnarWriteFilesExec.ExecuteWriteCompatible { val child: SparkPlan = left @@ -217,7 +217,7 @@ case class GlutenColumnarWriteFilesExec private ( // partition rdd to make sure we at least set up one write task to write the metadata. writeFilesForEmptyRDD(description, committer, jobTrackerID) } else { - new GlutenColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID) + new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID) } } override protected def withNewChildrenInternal( @@ -226,7 +226,7 @@ case class GlutenColumnarWriteFilesExec private ( copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) } -object GlutenColumnarWriteFilesExec { +object ColumnarWriteFilesExec { def apply( child: SparkPlan, @@ -234,7 +234,7 @@ object GlutenColumnarWriteFilesExec { partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec): GlutenColumnarWriteFilesExec = { + staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = { // This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for // a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala // case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute` @@ -253,7 +253,7 @@ object GlutenColumnarWriteFilesExec { options, staticPartitions) - GlutenColumnarWriteFilesExec( + ColumnarWriteFilesExec( child, right, fileFormat, diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala index 14605c403f4c..726ace3a15f1 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala @@ -21,7 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer import org.apache.spark.sql.GlutenSQLTestsBaseTrait import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.{GlutenColumnarWriteFilesExec, QueryExecution, SortExec} +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, QueryExecution, SortExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} @@ -122,8 +122,8 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) - executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child + assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec]) + executedPlan.asInstanceOf[ColumnarWriteFilesExec].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } @@ -204,8 +204,8 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) - executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child + assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec]) + executedPlan.asInstanceOf[ColumnarWriteFilesExec].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 6db1f66845ac..5c60115c5e1d 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{CommandResultExec, GlutenColumnarWriteFilesExec, QueryExecution} +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -60,13 +60,13 @@ class GlutenInsertSuite super.afterAll() } - private def checkAndGetWriteFiles(df: DataFrame): GlutenColumnarWriteFilesExec = { + private def checkAndGetWriteFiles(df: DataFrame): ColumnarWriteFilesExec = { val writeFiles = stripAQEPlan( df.queryExecution.executedPlan .asInstanceOf[CommandResultExec] .commandPhysicalPlan).children.head - assert(writeFiles.isInstanceOf[GlutenColumnarWriteFilesExec]) - writeFiles.asInstanceOf[GlutenColumnarWriteFilesExec] + assert(writeFiles.isInstanceOf[ColumnarWriteFilesExec]) + writeFiles.asInstanceOf[ColumnarWriteFilesExec] } testGluten("insert partition table") { diff --git a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala index 28f315da49e7..68c0e1c932b5 100644 --- a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala +++ b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala @@ -16,12 +16,12 @@ */ package org.apache.gluten -import org.apache.spark.sql.execution.{SparkPlan, GlutenColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{SparkPlan, ColumnarWriteFilesExec} trait GlutenColumnarWriteTestSupport { def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = { - assert(sparkPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) - sparkPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child + assert(sparkPlan.isInstanceOf[ColumnarWriteFilesExec]) + sparkPlan.asInstanceOf[ColumnarWriteFilesExec].child } }