From 23ac4df8616a5d1d6ea3443c3738ba38594520b9 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Wed, 20 Dec 2023 18:26:16 +0800 Subject: [PATCH 1/5] Add native write support in spark 3.4 --- .../clickhouse/CHSparkPlanExecApi.scala | 13 ++ .../velox/SparkPlanExecApiImpl.scala | 28 ++- .../backendsapi/velox/VeloxBackend.scala | 57 +++++- .../VeloxColumnarWriteFilesExec.scala | 119 ++++++++++++ .../VeloxParquetDataTypeValidationSuite.scala | 10 +- .../VeloxParquetWriteForHiveSuite.scala | 67 +++++-- .../execution/VeloxParquetWriteSuite.scala | 15 +- cpp/velox/compute/VeloxPlanConverter.cc | 10 + cpp/velox/compute/VeloxPlanConverter.h | 2 + cpp/velox/compute/WholeStageResultIterator.cc | 1 + cpp/velox/substrait/SubstraitToVeloxPlan.cc | 147 +++++++++++++++ cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 + .../SubstraitToVeloxPlanValidator.cc | 43 +++++ .../substrait/SubstraitToVeloxPlanValidator.h | 3 + .../substrait/rel/RelBuilder.java | 25 +++ .../substrait/rel/WriteRelNode.java | 114 ++++++++++++ .../substrait/proto/substrait/algebra.proto | 1 + .../backendsapi/BackendSettingsApi.scala | 4 +- .../backendsapi/SparkPlanExecApi.scala | 12 ++ .../execution/WriteFilesExecTransformer.scala | 173 ++++++++++++++++++ .../expression/ConverterUtils.scala | 4 + .../extension/ColumnarOverrides.scala | 19 ++ .../columnar/TransformHintRule.scala | 20 +- gluten-ut/common/pom.xml | 9 + .../spark/sql/GlutenSQLTestsBaseTrait.scala | 9 + .../utils/velox/VeloxTestSettings.scala | 39 +++- .../spark/sql/GlutenSQLQuerySuite.scala | 3 +- .../parquet/GlutenParquetFilterSuite.scala | 3 +- 28 files changed, 920 insertions(+), 33 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala create mode 100644 gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java create mode 100644 gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala index a55d7586955b..8a8fe375c252 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -31,6 +31,8 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper} import org.apache.spark.shuffle.utils.CHShuffleUtil import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -40,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -452,6 +455,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { CHTruncTimestampTransformer(substraitExprName, format, timestamp, timeZoneId, original) } + override def createColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec): WriteFilesExec = { + throw new UnsupportedOperationException("ColumnarWriteFilesExec is not support in ch backend.") + } + /** * Define whether the join operator is fallback because of the join operator is not supported by * backend diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index 8966af4f34fd..7530af36d648 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -23,6 +23,7 @@ import io.glutenproject.execution._ import io.glutenproject.expression._ import io.glutenproject.expression.ConverterUtils.FunctionConfig import io.glutenproject.memory.nmm.NativeMemoryManagers +import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode, IfThenNode} import io.glutenproject.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializerJniWrapper} @@ -34,6 +35,8 @@ import org.apache.spark.shuffle.utils.ShuffleUtil import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.{AggregateFunctionRewriteRule, FunctionIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, GetArrayItem, GetMapValue, GetStructField, Literal, NamedExpression, StringSplit, StringTrim} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, HLLAdapter} import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -41,7 +44,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, SparkPlan, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation @@ -245,6 +249,22 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { ShuffleUtil.genColumnarShuffleWriter(parameters) } + override def createColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec): WriteFilesExec = { + new VeloxColumnarWriteFilesExec( + child, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) + } + /** * Generate ColumnarBatchSerializer for ColumnarShuffleExchangeExec. * @@ -477,7 +497,11 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { * @return */ override def genExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] = { - List(spark => NativeWritePostRule(spark)) + if (SparkShimLoader.getSparkShims.getShimDescriptor.toString.equals("3.4.1")) { + List() + } else { + List(spark => NativeWritePostRule(spark)) + } } /** diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index ebd15861a611..534bff9391ad 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand +import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.expression.UDFResolver import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -114,6 +115,60 @@ object BackendSettings extends BackendSettingsApi { } } + override def supportFileFormatWrite(format: FileFormat, fields: Array[StructField]): Boolean = { + // Validate if all types are supported. + def validateTypes(fields: Array[StructField]): Boolean = { + + val unsupportedDataTypes = + fields.flatMap { + field => + field.dataType match { + case _: TimestampType => Some("TimestampType") + case struct: StructType if validateTypes(struct.fields) => + Some("StructType(TimestampType)") + case array: ArrayType if array.elementType.isInstanceOf[TimestampType] => + Some("MapType(TimestampType)") + case map: MapType + if (map.keyType.isInstanceOf[TimestampType] || + map.valueType.isInstanceOf[TimestampType]) => + Some("MapType(TimestampType)") + case _ => None + } + } + + for (unsupportedDataType <- unsupportedDataTypes) { + // scalastyle:off println + println( + s"Validation failed for ${this.getClass.toString} due to:" + + s" data type $unsupportedDataType in file schema.") + // scalastyle:on println + } + + unsupportedDataTypes.isEmpty + } + + def containFieldMetadata(fields: Array[StructField]): Boolean = { + if (fields.exists(!_.metadata.equals(Metadata.empty))) false else true + } + + format match { + case _: ParquetFileFormat => + validateTypes(fields) && + containFieldMetadata(fields) + case _ => false + } + } + override def supportWriteExec(): Boolean = { + // Velox doesn't support brotli and lzo. + if ( + SQLConf.get.parquetCompressionCodec.toLowerCase().equals("brotli") || + SQLConf.get.parquetCompressionCodec.toLowerCase().equals("lzo") + ) { + return false + } + true + } + override def supportExpandExec(): Boolean = true override def supportSortExec(): Boolean = true diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala new file mode 100644 index 000000000000..f11d0abd071e --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators + +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.write.WriterCommitMessage +import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import shaded.parquet.com.fasterxml.jackson.databind.ObjectMapper + +import scala.collection.mutable + +class VeloxColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) + extends WriteFilesExec( + child, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) { + + override def supportsColumnar(): Boolean = true + + override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { + assert(child.supportsColumnar) + child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path) + child.executeColumnar().map { + cb => + // Currently, the cb contains three columns: row, fragments, and context. + // The first row in the row column contains the number of written numRows. + // The fragments column contains detailed information about the file writes. + // The detailed fragement is https://github.com/facebookincubator/velox/blob/ + // 6b17ea5100a2713a6ee0252a37ce47cb17f46929/velox/connectors/hive/HiveDataSink.cpp#L508. + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + + val numRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val objectMapper = new ObjectMapper() + val jsonObject = objectMapper.readTree(fragments.toString) + + val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() + if (jsonObject.get("fileWriteInfos").elements().hasNext) { + val writeInfo = fileWriteInfos.next(); + numBytes += writeInfo.get("fileSize").size() + // Get partition informations. + if (jsonObject.get("name").textValue().nonEmpty) { + val targetFileName = writeInfo.get("targetFileName").textValue() + val partitionDir = jsonObject.get("name").textValue() + updatedPartitions += partitionDir + val tmpOutputPath = + writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName + val absOutputPathObject = + writeFilesSpec.description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionDir)) + if (absOutputPathObject.nonEmpty) { + val absOutputPath = absOutputPathObject.get + "/" + targetFileName + addedAbsPathFiles(tmpOutputPath) = absOutputPath + } + } + } + } + + // TODO: need to get the partition Internal row? + val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary) + } + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().") + } + + override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = + new VeloxColumnarWriteFilesExec( + newChild, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) +} diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala index 49c9093f199e..ee0fd8a647e1 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala @@ -445,16 +445,20 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit } } - ignore("Velox Parquet Write") { + test("Velox Parquet Write") { withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) { withTempDir { dir => val write_path = dir.toURI.getPath val data_path = getClass.getResource("/").getPath + "/data-type-validation-data/type1" - val df = spark.read.format("parquet").load(data_path) + // Spark 3.4 native write doesn't support Timestamp type. + val df = spark.read.format("parquet").load(data_path).drop("timestamp") df.write.mode("append").format("parquet").save(write_path) + val parquetDf = spark.read + .format("parquet") + .load(write_path) + checkAnswer(parquetDf, df) } } - } } diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index c11633038582..20c20d8d8161 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import io.glutenproject.sql.shims.SparkShimLoader + import org.apache.spark.SparkConf import org.apache.spark.internal.config import org.apache.spark.internal.config.UI.UI_ENABLED @@ -97,16 +99,24 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { _.getMessage.toString.contains("Use Gluten partition write for hive")) == native) } - ignore("test hive static partition write table") { + test("test hive static partition write table") { withTable("t") { spark.sql( "CREATE TABLE t (c int, d long, e long)" + " STORED AS PARQUET partitioned by (c, d)") withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") { - checkNativeStaticPartitionWrite( - "INSERT OVERWRITE TABLE t partition(c=1, d=2)" + - " SELECT 3 as e", - native = true) + if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { + checkNativeStaticPartitionWrite( + "INSERT OVERWRITE TABLE t partition(c=1, d=2)" + + " SELECT 3 as e", + native = false) + } else { + checkNativeStaticPartitionWrite( + "INSERT OVERWRITE TABLE t partition(c=1, d=2)" + + " SELECT 3 as e", + native = true) + } + } checkAnswer(spark.table("t"), Row(3, 1, 2)) } @@ -118,20 +128,33 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { "CREATE TABLE t (c int, d long, e long)" + " STORED AS PARQUET partitioned by (c, d)") withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") { - checkNativeStaticPartitionWrite( - "INSERT OVERWRITE TABLE t partition(c=1, d)" + - " SELECT 3 as e, 2 as e", - native = false) + if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { + checkNativeStaticPartitionWrite( + "INSERT OVERWRITE TABLE t partition(c=1, d)" + + " SELECT 3 as e, 2 as e", + native = false) + } else { + checkNativeStaticPartitionWrite( + "INSERT OVERWRITE TABLE t partition(c=1, d)" + + " SELECT 3 as e, 2 as e", + native = false) + } + } checkAnswer(spark.table("t"), Row(3, 1, 2)) } } - ignore("test hive write table") { + test("test hive write table") { withTable("t") { spark.sql("CREATE TABLE t (c int) STORED AS PARQUET") withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") { - checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native = true) + if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { + checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native = false) + } else { + checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native = true) + } + } checkAnswer(spark.table("t"), Row(1)) } @@ -142,12 +165,22 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { f => // compatible with Spark3.3 and later withSQLConf("spark.sql.hive.convertMetastoreInsertDir" -> "false") { - checkNativeWrite( - s""" - |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS PARQUET SELECT 1 as c - |""".stripMargin, - native = true - ) + if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { + checkNativeWrite( + s""" + |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS PARQUET SELECT 1 as c + |""".stripMargin, + native = false + ) + } else { + checkNativeWrite( + s""" + |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS PARQUET SELECT 1 as c + |""".stripMargin, + native = true + ) + } + checkAnswer(spark.read.parquet(f.getCanonicalPath), Row(1)) } } diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 535cf6354c1b..3e37d6c2c714 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import io.glutenproject.execution.VeloxWholeStageTransformerSuite +import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.utils.FallbackUtil import org.apache.spark.SparkConf @@ -38,7 +39,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { super.sparkConf.set("spark.gluten.sql.native.writer.enabled", "true") } - ignore("test write parquet with compression codec") { + test("test write parquet with compression codec") { // compression codec details see `VeloxParquetDatasource.cc` Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed") .foreach { @@ -59,7 +60,13 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { .save(f.getCanonicalPath) val files = f.list() assert(files.nonEmpty, extension) - assert(files.exists(_.contains(extension)), extension) + + if (!SparkShimLoader.getSparkVersion.startsWith("3.4")) { + assert( + files.exists(_.contains(extension)), + extension + ) // filename changed in spark 3.4. + } val parquetDf = spark.read .format("parquet") @@ -71,7 +78,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { } } - ignore("test ctas") { + test("test ctas") { withTable("velox_ctas") { spark .range(100) @@ -82,7 +89,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { } } - ignore("test parquet dynamic partition write") { + test("test parquet dynamic partition write") { withTempPath { f => val path = f.getCanonicalPath diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index a449b261234e..45b2927a4ff1 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -39,6 +39,14 @@ VeloxPlanConverter::VeloxPlanConverter( substraitVeloxPlanConverter_(veloxPool, confMap, validationMode), pool_(veloxPool) {} +void VeloxPlanConverter::setInputPlanNode(const ::substrait::WriteRel& writeRel) { + if (writeRel.has_input()) { + setInputPlanNode(writeRel.input()); + } else { + throw std::runtime_error("Child expected"); + } +} + void VeloxPlanConverter::setInputPlanNode(const ::substrait::FetchRel& fetchRel) { if (fetchRel.has_input()) { setInputPlanNode(fetchRel.input()); @@ -176,6 +184,8 @@ void VeloxPlanConverter::setInputPlanNode(const ::substrait::Rel& srel) { setInputPlanNode(srel.window()); } else if (srel.has_generate()) { setInputPlanNode(srel.generate()); + } else if (srel.has_write()) { + setInputPlanNode(srel.write()); } else { throw std::runtime_error("Rel is not supported: " + srel.DebugString()); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 90c58774aa0d..01fd9bcfa4e6 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -42,6 +42,8 @@ class VeloxPlanConverter { } private: + void setInputPlanNode(const ::substrait::WriteRel& writeRel); + void setInputPlanNode(const ::substrait::FetchRel& fetchRel); void setInputPlanNode(const ::substrait::ExpandRel& sExpand); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ccde0d4b8f56..2c40897a50d4 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -429,6 +429,7 @@ std::shared_ptr WholeStageResultIterator::createConnectorConfig() configs[velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession] = veloxCfg_->get(kCaseSensitive, false) == false ? "true" : "false"; configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = "6"; + configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] = "400"; return std::make_shared(configs); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 0d75f9a0bde2..441f06c15299 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -18,10 +18,13 @@ #include "SubstraitToVeloxPlan.h" #include "TypeUtils.h" #include "VariantToVectorConverter.h" +#include "velox/connectors/hive/HiveDataSink.h" +#include "velox/exec/TableWriter.h" #include "velox/type/Type.h" #include "utils/ConfigExtractor.h" +#include #include "config/GlutenConfig.h" namespace gluten { @@ -446,6 +449,148 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } } +std::shared_ptr makeLocationHandle( + std::string targetDirectory, + std::optional writeDirectory = std::nullopt, + connector::hive::LocationHandle::TableType tableType = connector::hive::LocationHandle::TableType::kExisting) { + return std::make_shared( + targetDirectory, writeDirectory.value_or(targetDirectory), tableType); +} + +std::shared_ptr makeHiveInsertTableHandle( + const std::vector& tableColumnNames, + const std::vector& tableColumnTypes, + const std::vector& partitionedBy, + std::shared_ptr bucketProperty, + std::shared_ptr locationHandle, + const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET, + const std::optional compressionKind = {}) { + std::vector> columnHandles; + std::vector bucketedBy; + std::vector bucketedTypes; + std::vector> sortedBy; + if (bucketProperty != nullptr) { + bucketedBy = bucketProperty->bucketedBy(); + bucketedTypes = bucketProperty->bucketedTypes(); + sortedBy = bucketProperty->sortedBy(); + } + int32_t numPartitionColumns{0}; + int32_t numSortingColumns{0}; + int32_t numBucketColumns{0}; + for (int i = 0; i < tableColumnNames.size(); ++i) { + for (int j = 0; j < bucketedBy.size(); ++j) { + if (bucketedBy[j] == tableColumnNames[i]) { + ++numBucketColumns; + } + } + for (int j = 0; j < sortedBy.size(); ++j) { + if (sortedBy[j]->sortColumn() == tableColumnNames[i]) { + ++numSortingColumns; + } + } + if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) { + ++numPartitionColumns; + columnHandles.push_back(std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, + tableColumnTypes.at(i), + tableColumnTypes.at(i))); + } else { + columnHandles.push_back(std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kRegular, + tableColumnTypes.at(i), + tableColumnTypes.at(i))); + } + } + VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size()); + VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size()); + VELOX_CHECK_EQ(numSortingColumns, sortedBy.size()); + return std::make_shared( + columnHandles, locationHandle, tableStorageFormat, bucketProperty, compressionKind); +} + +core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) { + core::PlanNodePtr childNode; + if (writeRel.has_input()) { + childNode = toVeloxPlan(writeRel.input()); + } else { + VELOX_FAIL("Child Rel is expected in WriteRel."); + } + const auto& inputType = childNode->outputType(); + + std::vector tableColumnNames; + std::vector partitionedKey; + std::vector isPartitionColumns; + tableColumnNames.reserve(writeRel.table_schema().names_size()); + + if (writeRel.has_table_schema()) { + const auto& tableSchema = writeRel.table_schema(); + isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); + + for (const auto& name : tableSchema.names()) { + tableColumnNames.emplace_back(name); + } + + for (int i = 0; i < tableSchema.names_size(); i++) { + if (isPartitionColumns[i]) { + partitionedKey.emplace_back(tableColumnNames[i]); + } + } + } + + std::vector writePath; + writePath.reserve(1); + for (const auto& name : writeRel.named_table().names()) { + writePath.emplace_back(name); + } + + // spark default compression code is snappy. + common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; + if (writeRel.named_table().has_advanced_extension()) { + std::cout << "the table has extension" << std::flush << std::endl; + if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) { + compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; + } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) { + compressionCodec = common::CompressionKind::CompressionKind_GZIP; + } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLzo=")) { + compressionCodec = common::CompressionKind::CompressionKind_LZO; + } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLz4=")) { + compressionCodec = common::CompressionKind::CompressionKind_LZ4; + } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isZstd=")) { + compressionCodec = common::CompressionKind::CompressionKind_ZSTD; + } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isNone=")) { + compressionCodec = common::CompressionKind::CompressionKind_NONE; + } else if (SubstraitParser::configSetInOptimization( + writeRel.named_table().advanced_extension(), "isUncompressed=")) { + compressionCodec = common::CompressionKind::CompressionKind_NONE; + } + } + + // Do not hard-code connector ID and allow for connectors other than Hive. + static const std::string kHiveConnectorId = "test-hive"; + + return std::make_shared( + nextPlanNodeId(), + inputType, + tableColumnNames, + nullptr, /*aggregationNode*/ + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + tableColumnNames, /*inputType->names() clolumn name is different*/ + inputType->children(), + partitionedKey, + nullptr /*bucketProperty*/, + makeLocationHandle(writePath[0]), + dwio::common::FileFormat::PARQUET, // Currently only support parquet format. + compressionCodec)), + (isPartitionColumns.size() > 0) ? true : false, + exec::TableWriteTraits::outputType(nullptr), + connector::CommitStrategy::kNoCommit, + childNode); +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ExpandRel& expandRel) { core::PlanNodePtr childNode; if (expandRel.has_input()) { @@ -1042,6 +1187,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: return toVeloxPlan(rel.fetch()); } else if (rel.has_window()) { return toVeloxPlan(rel.window()); + } else if (rel.has_write()) { + return toVeloxPlan(rel.write()); } else { VELOX_NYI("Substrait conversion not supported for Rel."); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index bf4f80f34d70..ffb973ee01e6 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -61,6 +61,9 @@ class SubstraitToVeloxPlanConverter { bool validationMode = false) : pool_(pool), confMap_(confMap), validationMode_(validationMode) {} + /// Used to convert Substrait WriteRel into Velox PlanNode. + core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel); + /// Used to convert Substrait ExpandRel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::ExpandRel& expandRel); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index d863c0d87f67..17f4ef56859c 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -329,6 +329,47 @@ bool SubstraitToVeloxPlanValidator::validateExpression( } } +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeRel) { + if (writeRel.has_input() && !validate(writeRel.input())) { + std::cout << "Validation failed for input type validation in WriteRel." << std::endl; + return false; + } + + // validate input datatype + std::vector types; + if (writeRel.has_named_table()) { + const auto& extension = writeRel.named_table().advanced_extension(); + if (!validateInputTypes(extension, types)) { + std::cout << "Validation failed for input types in WriteRel." << std::endl; + return false; + } + } + + // Validate partition key type. + if (writeRel.has_table_schema()) { + const auto& tableSchema = writeRel.table_schema(); + auto isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); + for (auto i = 0; i < types.size(); i++) { + if (isPartitionColumns[i]) { + switch (types[i]->kind()) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + break; + default: + return false; + } + } + } + } + + return true; +} + bool SubstraitToVeloxPlanValidator::validate(const ::substrait::FetchRel& fetchRel) { RowTypePtr rowType = nullptr; // Get and validate the input types from extension. @@ -1185,6 +1226,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Rel& rel) { return validate(rel.fetch()); } else if (rel.has_window()) { return validate(rel.window()); + } else if (rel.has_write()) { + return validate(rel.write()); } else { return false; } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index d5d76a4dc1c1..ad237f7a701b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -29,6 +29,9 @@ class SubstraitToVeloxPlanValidator { SubstraitToVeloxPlanValidator(memory::MemoryPool* pool, core::ExecCtx* execCtx) : pool_(pool), execCtx_(execCtx), planConverter_(pool_, confMap_, true) {} + /// Used to validate whether the computing of this Write is supported. + bool validate(const ::substrait::WriteRel& writeRel); + /// Used to validate whether the computing of this Limit is supported. bool validate(const ::substrait::FetchRel& fetchRel); diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java index 1bbd64adc20c..a3432f905852 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java @@ -179,6 +179,31 @@ public static RelNode makeExpandRel( return new ExpandRelNode(input, projections); } + public static RelNode makeWriteRel( + RelNode input, + List types, + List names, + List columnTypeNodes, + String writePath, + SubstraitContext context, + Long operatorId) { + context.registerRelToOperator(operatorId); + return new WriteRelNode(input, types, names, columnTypeNodes, writePath); + } + + public static RelNode makeWriteRel( + RelNode input, + List types, + List names, + List columnTypeNodes, + String writePath, + AdvancedExtensionNode extensionNode, + SubstraitContext context, + Long operatorId) { + context.registerRelToOperator(operatorId); + return new WriteRelNode(input, types, names, columnTypeNodes, writePath, extensionNode); + } + public static RelNode makeSortRel( RelNode input, List sorts, diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java new file mode 100644 index 000000000000..d50c283bd902 --- /dev/null +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.substrait.rel; + +import io.glutenproject.substrait.extensions.AdvancedExtensionNode; +import io.glutenproject.substrait.type.ColumnTypeNode; +import io.glutenproject.substrait.type.TypeNode; + +import io.substrait.proto.NamedObjectWrite; +import io.substrait.proto.NamedStruct; +import io.substrait.proto.Rel; +import io.substrait.proto.Type; +import io.substrait.proto.WriteRel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class WriteRelNode implements RelNode, Serializable { + private final RelNode input; + private final List types = new ArrayList<>(); + private final List names = new ArrayList<>(); + + private final String writePath; + private final List columnTypeNodes = new ArrayList<>(); + + private final AdvancedExtensionNode extensionNode; + + WriteRelNode( + RelNode input, + List types, + List names, + List partitionColumnTypeNodes, + String writePath, + AdvancedExtensionNode extensionNode) { + this.input = input; + this.types.addAll(types); + this.names.addAll(names); + this.columnTypeNodes.addAll(partitionColumnTypeNodes); + this.writePath = writePath; + this.extensionNode = extensionNode; + } + + WriteRelNode( + RelNode input, + List types, + List names, + List partitionColumnTypeNodes, + String writePath) { + this.input = input; + this.types.addAll(types); + this.names.addAll(names); + this.columnTypeNodes.addAll(partitionColumnTypeNodes); + this.writePath = writePath; + this.extensionNode = null; + } + + @Override + public Rel toProtobuf() { + + WriteRel.Builder writeBuilder = WriteRel.newBuilder(); + + Type.Struct.Builder structBuilder = Type.Struct.newBuilder(); + for (TypeNode typeNode : types) { + structBuilder.addTypes(typeNode.toProtobuf()); + } + + NamedStruct.Builder nStructBuilder = NamedStruct.newBuilder(); + nStructBuilder.setStruct(structBuilder.build()); + for (String name : names) { + nStructBuilder.addNames(name); + } + if (!columnTypeNodes.isEmpty()) { + for (ColumnTypeNode columnTypeNode : columnTypeNodes) { + nStructBuilder.addColumnTypes(columnTypeNode.toProtobuf()); + } + } + + writeBuilder.setTableSchema(nStructBuilder); + + NamedObjectWrite.Builder nameObjectWriter = NamedObjectWrite.newBuilder(); + if (writePath != "") { + nameObjectWriter.addNames(writePath); + } + + if (extensionNode != null) { + nameObjectWriter.setAdvancedExtension(extensionNode.toProtobuf()); + } + + writeBuilder.setNamedTable(nameObjectWriter); + + if (input != null) { + writeBuilder.setInput(input.toProtobuf()); + } + + Rel.Builder builder = Rel.newBuilder(); + builder.setWrite(writeBuilder.build()); + return builder.build(); + } +} diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index 6bebe6496497..ef1542c08c9b 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -444,6 +444,7 @@ message Rel { ExpandRel expand = 15; WindowRel window = 16; GenerateRel generate = 17; + WriteRel write = 18; } } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 10071c8599b2..1fb50fae75b1 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand +import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.types.StructField trait BackendSettingsApi { @@ -35,6 +35,8 @@ trait BackendSettingsApi { fields: Array[StructField], partTable: Boolean, paths: Seq[String]): ValidationResult = ValidationResult.ok + def supportFileFormatWrite(format: FileFormat, fields: Array[StructField]): Boolean = false + def supportWriteExec(): Boolean = false def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index a5c4e820da2b..419851fdad96 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -27,6 +27,8 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriter import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -35,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.hive.HiveTableScanExecTransformer @@ -195,6 +198,15 @@ trait SparkPlanExecApi { numOutputRows: SQLMetric, dataSize: SQLMetric): BuildSideRelation + /** Create ColumnarWriteFilesExec */ + def createColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec): WriteFilesExec + /** * Generate extended DataSourceV2 Strategies. Currently only for ClickHouse backend. * diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala new file mode 100644 index 000000000000..0ad8a5d1b339 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.backendsapi.BackendsApiManager +import io.glutenproject.expression.ConverterUtils +import io.glutenproject.extension.ValidationResult +import io.glutenproject.metrics.MetricsUpdater +import io.glutenproject.substrait.`type`.{ColumnTypeNode, TypeBuilder} +import io.glutenproject.substrait.SubstraitContext +import io.glutenproject.substrait.extensions.ExtensionBuilder +import io.glutenproject.substrait.rel.{RelBuilder, RelNode} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.google.protobuf.{Any, StringValue} + +import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + +case class WriteFilesExecTransformer( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) + extends UnaryExecNode + with UnaryTransformSupport { + override def metricsUpdater(): MetricsUpdater = null + + override def output: Seq[Attribute] = Seq.empty + + def genWriteParameters(): Any = { + val compressionCodec = if (options.get("parquet.compression").nonEmpty) { + options.get("parquet.compression").get.toLowerCase().capitalize + } else SQLConf.get.parquetCompressionCodec.toLowerCase().capitalize + + val writeParametersStr = new StringBuffer("WriteParameters:") + writeParametersStr.append("is").append(compressionCodec).append("=1").append("\n") + val message = StringValue + .newBuilder() + .setValue(writeParametersStr.toString) + .build() + BackendsApiManager.getTransformerApiInstance.getPackMessage(message) + } + + def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = { + val inputTypeNodes = output.map { + attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable) + } + + BackendsApiManager.getTransformerApiInstance.getPackMessage( + TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) + } + + def getRelNode( + context: SubstraitContext, + originalInputAttributes: Seq[Attribute], + writePath: String, + operatorId: Long, + input: RelNode, + validation: Boolean): RelNode = { + val typeNodes = ConverterUtils.collectAttributeTypeNodes(originalInputAttributes) + + val columnTypeNodes = new java.util.ArrayList[ColumnTypeNode]() + val inputAttributes = new java.util.ArrayList[Attribute]() + val childSize = this.child.output.size + val childOutput = this.child.output + for (i <- 0 until childSize) { + val partitionCol = partitionColumns.find(_.exprId == childOutput(i).exprId) + if (partitionCol.nonEmpty) { + columnTypeNodes.add(new ColumnTypeNode(1)) + // "aggregate with partition group by can be pushed down" + // test partitionKey("p") is different with + // data columns("P"). + inputAttributes.add(partitionCol.get) + } else { + columnTypeNodes.add(new ColumnTypeNode(0)) + inputAttributes.add(originalInputAttributes(i)) + } + } + + val nameList = + ConverterUtils.collectAttributeNames(inputAttributes.toSeq) + + if (!validation) { + val extensionNode = ExtensionBuilder.makeAdvancedExtension( + genWriteParameters(), + createEnhancement(originalInputAttributes)) + RelBuilder.makeWriteRel( + input, + typeNodes, + nameList, + columnTypeNodes, + writePath, + extensionNode, + context, + operatorId) + } else { + // Use a extension node to send the input types through Substrait plan for validation. + val extensionNode = + ExtensionBuilder.makeAdvancedExtension(createEnhancement(originalInputAttributes)) + RelBuilder.makeWriteRel( + input, + typeNodes, + nameList, + columnTypeNodes, + writePath, + extensionNode, + context, + operatorId) + } + } + + override protected def doValidateInternal(): ValidationResult = { + if ( + !BackendsApiManager.getSettings.supportWriteExec() || !BackendsApiManager.getSettings + .supportFileFormatWrite(fileFormat, child.output.toStructType.fields) || bucketSpec.nonEmpty + ) { + return ValidationResult.notOk("Current backend does not support Write") + } + + val substraitContext = new SubstraitContext + val operatorId = substraitContext.nextOperatorId(this.nodeName) + + val relNode = + getRelNode(substraitContext, child.output, "", operatorId, null, validation = true) + + doNativeValidation(substraitContext, relNode) + } + + override def doTransform(context: SubstraitContext): TransformContext = { +// val writePath = ColumnarWriteFilesExec.writePath.get() + val writePath = child.session.sparkContext.getLocalProperty("writePath") + val childCtx = child.asInstanceOf[TransformSupport].doTransform(context) + + val operatorId = context.nextOperatorId(this.nodeName) + + val currRel = + getRelNode(context, child.output, writePath, operatorId, childCtx.root, validation = false) + assert(currRel != null, "Write Rel should be valid") + TransformContext(childCtx.outputAttributes, output, currRel) + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().") + } + + override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer = + copy(child = newChild) +} diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ConverterUtils.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ConverterUtils.scala index 018484c7c1fa..c0eaf72d4ebe 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ConverterUtils.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ConverterUtils.scala @@ -126,6 +126,10 @@ object ConverterUtils extends Logging { collectAttributeNamesDFS(attributes)(attr => normalizeColName(attr.name)) } + def collectAttributeNames(attributes: Seq[Attribute]): JList[String] = { + collectAttributeNamesDFS(attributes)(_.name) + } + private def collectAttributeNamesDFS(attributes: Seq[Attribute])( f: Attribute => String): JList[String] = { val nameList = new JArrayList[String]() diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index c2d1053c7844..dcfb85176cf0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ @@ -370,6 +371,24 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) val child = replaceWithTransformerPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ExpandExecTransformer(plan.projections, plan.output, child) + case plan: WriteFilesExec => + val child = replaceWithTransformerPlan(plan.child) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + val writeTransformer = WriteFilesExecTransformer( + child, + plan.fileFormat, + plan.partitionColumns, + plan.bucketSpec, + plan.options, + plan.staticPartitions) + BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec( + writeTransformer, + plan.fileFormat, + plan.partitionColumns, + plan.bucketSpec, + plan.options, + plan.staticPartitions + ) case plan: SortExec => val child = replaceWithTransformerPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala index d463c309a82c..9343e54739cb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, QueryStageExec} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ @@ -267,7 +268,7 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { TransformHints.tagNotTransformable(p, "at least one of its children has empty output") p.children.foreach { child => - if (child.output.isEmpty) { + if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { TransformHints.tagNotTransformable( child, "at least one of its children has empty output") @@ -361,6 +362,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { val enableTakeOrderedAndProject: Boolean = !scanOnly && columnarConf.enableTakeOrderedAndProject && enableColumnarSort && enableColumnarLimit && enableColumnarShuffle && enableColumnarProject + val enableColumnarWrite: Boolean = columnarConf.enableNativeWriter def apply(plan: SparkPlan): SparkPlan = { addTransformableTags(plan) @@ -513,6 +515,22 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { val transformer = ExpandExecTransformer(plan.projections, plan.output, plan.child) TransformHints.tag(plan, transformer.doValidate().toTransformHint) } + + case plan: WriteFilesExec => + if (!enableColumnarWrite) { + TransformHints.tagNotTransformable( + plan, + "columnar Write is not enabled in WriteFilesExec") + } else { + val transformer = WriteFilesExecTransformer( + plan.child, + plan.fileFormat, + plan.partitionColumns, + plan.bucketSpec, + plan.options, + plan.staticPartitions) + TransformHints.tag(plan, transformer.doValidate().toTransformHint) + } case plan: SortExec => if (!enableColumnarSort) { TransformHints.tagNotTransformable(plan, "columnar Sort is not enabled in SortExec") diff --git a/gluten-ut/common/pom.xml b/gluten-ut/common/pom.xml index a812408e833c..801b04c82119 100644 --- a/gluten-ut/common/pom.xml +++ b/gluten-ut/common/pom.xml @@ -14,6 +14,15 @@ jar Gluten Unit Test Common + + + io.glutenproject + spark-sql-columnar-shims-common + ${project.version} + provided + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala index 8cfeaf2b90c1..2a00b44a2e4d 100644 --- a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala +++ b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql import io.glutenproject.GlutenConfig +import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.utils.{BackendTestUtils, SystemParameters} import org.apache.spark.SparkConf @@ -58,6 +59,14 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession with GlutenTestsBaseTra // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + "," + // NullPropagation.ruleName) + if ( + BackendTestUtils.isVeloxBackendLoaded() && + SparkShimLoader.getSparkVersion.startsWith("3.4") + ) { + // Enable velox native write in spark 3.4 + conf.set("spark.gluten.sql.native.writer.enabled", "true") + } + if (BackendTestUtils.isCHBackendLoaded()) { conf .set("spark.io.compression.codec", "LZ4") diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 177eccc3f3f6..088244578aef 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -691,6 +691,10 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetInteroperabilitySuite] .exclude("parquet timestamp conversion") enableSuite[GlutenParquetIOSuite] + // Velox doesn't write file metadata into parquet file. + .exclude("Write Spark version into Parquet metadata") + // Spark except exception but not occur in velox. + .exclude("SPARK-7837 Do not close output writer twice when commitTask() fails") // Disable Spark's vectorized reading tests. .exclude("Standard mode - fixed-length decimals") .exclude("Legacy mode - fixed-length decimals") @@ -732,6 +736,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude(("Various inferred partition value types")) enableSuite[GlutenParquetProtobufCompatibilitySuite] enableSuite[GlutenParquetV1QuerySuite] + // Velox convert the null as minimum value of int, which cause the partition dir is not align with spark. + .exclude("SPARK-11997 parquet with null partition values") // Only for testing a type mismatch issue caused by hive (before hive 2.2). // Only reproducible when spark.sql.parquet.enableVectorizedReader=true. .exclude("SPARK-16632: read Parquet int32 as ByteType and ShortType") @@ -751,6 +757,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") enableSuite[GlutenParquetV2QuerySuite] + // Velox convert the null as minimum value of int, which cause the partition dir is not align with spark. + .exclude("SPARK-11997 parquet with null partition values") // Only for testing a type mismatch issue caused by hive (before hive 2.2). // Only reproducible when spark.sql.parquet.enableVectorizedReader=true. .exclude("SPARK-16632: read Parquet int32 as ByteType and ShortType") @@ -769,10 +777,14 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetV1SchemaPruningSuite] enableSuite[GlutenParquetV2SchemaPruningSuite] enableSuite[GlutenParquetRebaseDatetimeV1Suite] + // Velox doesn't write file metadata into parquet file. + .excludeByPrefix("SPARK-33163, SPARK-37705: write the metadata keys") // jar path and ignore PARQUET_REBASE_MODE_IN_READ, rewrite some .excludeByPrefix("SPARK-31159") .excludeByPrefix("SPARK-35427") enableSuite[GlutenParquetRebaseDatetimeV2Suite] + // Velox doesn't write file metadata into parquet file. + .excludeByPrefix("SPARK-33163, SPARK-37705: write the metadata keys") // jar path and ignore PARQUET_REBASE_MODE_IN_READ .excludeByPrefix("SPARK-31159") .excludeByPrefix("SPARK-35427") @@ -795,6 +807,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceStrategySuite] enableSuite[GlutenDataSourceSuite] enableSuite[GlutenFileFormatWriterSuite] + // Velox doesn't write file if the data is null. + .exclude("empty file should be skipped while write to file") enableSuite[GlutenFileIndexSuite] enableSuite[GlutenFileMetadataStructSuite] .exclude("SPARK-41896: Filter on row_index and a stored column at the same time") @@ -807,8 +821,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("nested column: Max(top level column) not push down") .exclude("nested column: Count(nested sub-field) not push down") enableSuite[GlutenParquetCodecSuite] - // Unsupported compression codec. - .exclude("write and read - file source parquet - codec: lz4") enableSuite[GlutenOrcCodecSuite] enableSuite[GlutenFileSourceStrategySuite] // Plan comparison. @@ -954,12 +966,17 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFilteredScanSuite] enableSuite[GlutenFiltersSuite] enableSuite[GlutenInsertSuite] + // Spark except exception but not occur in velox. + .exclude("SPARK-35106: Throw exception when rename custom partition paths returns false") + .exclude("Stop task set if FileAlreadyExistsException was thrown") .exclude("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") .exclude("SPARK-39557 INSERT INTO statements with tables with array defaults") .exclude("SPARK-39557 INSERT INTO statements with tables with struct defaults") .exclude("SPARK-39557 INSERT INTO statements with tables with map defaults") .exclude("SPARK-39844 Restrict adding DEFAULT columns for existing tables to certain sources") enableSuite[GlutenPartitionedWriteSuite] + // Velox doesn't support maxRecordsPerFile parameter. + .exclude("maxRecordsPerFile setting in non-partitioned write path") enableSuite[GlutenPathOptionSuite] enableSuite[GlutenPrunedScanSuite] enableSuite[GlutenResolvedDataSourceSuite] @@ -1094,6 +1111,9 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("Check schemas for expression examples") enableSuite[GlutenExtraStrategiesSuite] enableSuite[GlutenFileBasedDataSourceSuite] + // The following suites failed because velox will not write empty data frame. + .excludeByPrefix("SPARK-15474 Write and read back non-empty schema with empty dataframe ") + .excludeByPrefix("SPARK-23271 empty RDD when saved should write a metadata only file ") // test data path is jar path, rewrite .exclude("Option recursiveFileLookup: disable partition inferring") // gluten executor exception cannot get in driver, rewrite @@ -1139,12 +1159,16 @@ class VeloxTestSettings extends BackendTestSettings { // following UT is removed in spark3.3.1 // enableSuite[GlutenSimpleShowCreateTableSuite] enableSuite[GlutenFileSourceSQLInsertTestSuite] + // velox convert string null as -1583242847, which is not same with spark. + .exclude("SPARK-30844: static partition should also follow StoreAssignmentPolicy") .exclude( "SPARK-41982: treat the partition field as string literal when keepPartitionSpecAsStringLiteral is enabled") enableSuite[GlutenDSV2SQLInsertTestSuite] .exclude( "SPARK-41982: treat the partition field as string literal when keepPartitionSpecAsStringLiteral is enabled") enableSuite[GlutenSQLQuerySuite] + // Velox doesn't support spark.sql.optimizer.metadataOnly config. + .exclude("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") // Decimal precision exceeds. .exclude("should be able to resolve a persistent view") // Unstable. Needs to be fixed. @@ -1170,8 +1194,19 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-38173: Quoted column cannot be recognized correctly when quotedRegexColumnNames is true") enableSuite[GlutenSQLQueryTestSuite] enableSuite[GlutenStatisticsCollectionSuite] + // The following five unit tests failed after enabling native table write, because the velox convert the null to Timestamp + // with minimum value of int, which will cause overflow when calling toMicro() method. + .exclude("column stats collection for null columns") + .exclude("store and retrieve column stats in different time zones") + .exclude( + "SPARK-38140: describe column stats (min, max) for timestamp column: desc results should be consistent with the written value if writing and desc happen in the same time zone") + .exclude( + "SPARK-38140: describe column stats (min, max) for timestamp column: desc should show different results if writing in UTC and desc in other time zones") + .exclude("Gluten - store and retrieve column stats in different time zones") .exclude("SPARK-33687: analyze all tables in a specific database") enableSuite[GlutenSubquerySuite] + // Velox doesn't write file if the data is null. + .exclude("SPARK-42745: Improved AliasAwareOutputExpression works with DSv2") .excludeByPrefix( "SPARK-26893" // Rewrite this test because it checks Spark's physical operators. ) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala index ebb44545ca14..f482c6c56f7e 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala @@ -54,7 +54,8 @@ class GlutenSQLQuerySuite extends SQLQuerySuite with GlutenSQLTestsTrait { } - test( + // Velox throw exception : An unsupported nested encoding was found. + ignore( GlutenTestConstants.GLUTEN_TEST + "SPARK-33338: GROUP BY using literal map should not fail") { withTable("t") { diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala index 2532436aa206..6f423124e622 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala @@ -266,7 +266,8 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ } } - test(GlutenTestConstants.GLUTEN_TEST + "Support Parquet column index") { + // Velox doesn't support ParquetOutputFormat.PAGE_SIZE and ParquetOutputFormat.BLOCK_SIZE. + ignore(GlutenTestConstants.GLUTEN_TEST + "Support Parquet column index") { // block 1: // null count min max // page-0 0 0 99 From 54deba674a95a436fcf8cea686db10a8e680100a Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 21 Dec 2023 15:42:09 +0800 Subject: [PATCH 2/5] Resolve comments --- .../velox/SparkPlanExecApiImpl.scala | 3 +- .../backendsapi/velox/VeloxBackend.scala | 86 +++++++------ .../VeloxColumnarWriteFilesExec.scala | 116 +++++++++++------- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 23 ++-- .../SubstraitToVeloxPlanValidator.cc | 4 +- .../substrait/rel/RelBuilder.java | 12 -- .../substrait/rel/WriteRelNode.java | 14 --- .../backendsapi/BackendSettingsApi.scala | 3 +- .../execution/WriteFilesExecTransformer.scala | 19 +-- 9 files changed, 143 insertions(+), 137 deletions(-) diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index 7530af36d648..5655056335a4 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -497,7 +497,8 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { * @return */ override def genExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] = { - if (SparkShimLoader.getSparkShims.getShimDescriptor.toString.equals("3.4.1")) { + val Array(major, minor, _) = SparkShimLoader.getSparkShims.getShimDescriptor.toString.split('.') + if (major.toInt > 3 || (major.toInt == 3 && (minor.toInt >= 4))) { List() } else { List(spark => NativeWritePostRule(spark)) diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 534bff9391ad..a1396ff089df 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -115,58 +115,54 @@ object BackendSettings extends BackendSettingsApi { } } - override def supportFileFormatWrite(format: FileFormat, fields: Array[StructField]): Boolean = { - // Validate if all types are supported. - def validateTypes(fields: Array[StructField]): Boolean = { - - val unsupportedDataTypes = - fields.flatMap { - field => - field.dataType match { - case _: TimestampType => Some("TimestampType") - case struct: StructType if validateTypes(struct.fields) => - Some("StructType(TimestampType)") - case array: ArrayType if array.elementType.isInstanceOf[TimestampType] => - Some("MapType(TimestampType)") - case map: MapType - if (map.keyType.isInstanceOf[TimestampType] || - map.valueType.isInstanceOf[TimestampType]) => - Some("MapType(TimestampType)") - case _ => None - } - } - - for (unsupportedDataType <- unsupportedDataTypes) { - // scalastyle:off println - println( - s"Validation failed for ${this.getClass.toString} due to:" + - s" data type $unsupportedDataType in file schema.") - // scalastyle:on println + override def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] = { + def validateCompressionCodec(): Option[String] = { + // Velox doesn't support brotli and lzo. + val unSupportedCompressions = Set("brotli, lzo") + if (unSupportedCompressions.contains(SQLConf.get.parquetCompressionCodec.toLowerCase())) { + Some("brotli or lzo compression codec is not support in velox backend.") + } else { + None } - - unsupportedDataTypes.isEmpty } - def containFieldMetadata(fields: Array[StructField]): Boolean = { - if (fields.exists(!_.metadata.equals(Metadata.empty))) false else true + // Validate if all types are supported. + def validateDateTypes(fields: Array[StructField]): Option[String] = { + fields.flatMap { + field => + field.dataType match { + case _: TimestampType => Some("TimestampType") + case struct: StructType if validateDateTypes(struct.fields).nonEmpty => + Some("StructType(TimestampType)") + case array: ArrayType if array.elementType.isInstanceOf[TimestampType] => + Some("MapType(TimestampType)") + case map: MapType + if (map.keyType.isInstanceOf[TimestampType] || + map.valueType.isInstanceOf[TimestampType]) => + Some("MapType(TimestampType)") + case _ => None + } + }.headOption } - format match { - case _: ParquetFileFormat => - validateTypes(fields) && - containFieldMetadata(fields) - case _ => false + def validateFieldMetadata(fields: Array[StructField]): Option[String] = { + if (fields.exists(!_.metadata.equals(Metadata.empty))) { + Some("StructField contain the metadata information.") + } else None } - } - override def supportWriteExec(): Boolean = { - // Velox doesn't support brotli and lzo. - if ( - SQLConf.get.parquetCompressionCodec.toLowerCase().equals("brotli") || - SQLConf.get.parquetCompressionCodec.toLowerCase().equals("lzo") - ) { - return false + + def validateFileFormat(format: FileFormat): Option[String] = { + format match { + case _: ParquetFileFormat => None + case _: FileFormat => Some("Only parquet fileformat is supported in native write.") + } } - true + + val fileFormat = validateFileFormat(format) + val metadata = validateFieldMetadata(fields) + val dataTypes = validateDateTypes(fields) + val compression = validateCompressionCodec() + compression.orElse(dataTypes).orElse(metadata).orElse(fileFormat) } override def supportExpandExec(): Boolean = true diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index f11d0abd071e..0f9dda5e44d0 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -51,56 +51,88 @@ class VeloxColumnarWriteFilesExec( override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { assert(child.supportsColumnar) + + // We need to pass the WritePath to the Velox TableWriter in the doTransform + // method of the WriteTransformer. However, the WritePath is not accessible + // during the planning phase in the WriteTransformer, and can only be obtained + // during the actual execution, specifically in the doExecuteWrite method of + // ColumnarWriteFilesExec, where it is available within the WriteFilesSpec. + // Therefore, we use this hack method to pass the writePath. child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path) - child.executeColumnar().map { - cb => + child.executeColumnar().mapPartitionsInternal { + iter => // Currently, the cb contains three columns: row, fragments, and context. - // The first row in the row column contains the number of written numRows. + // The first row in the row column contains the number of written numRows. // The fragments column contains detailed information about the file writes. - // The detailed fragement is https://github.com/facebookincubator/velox/blob/ - // 6b17ea5100a2713a6ee0252a37ce47cb17f46929/velox/connectors/hive/HiveDataSink.cpp#L508. - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - - val numRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) - val objectMapper = new ObjectMapper() - val jsonObject = objectMapper.readTree(fragments.toString) - - val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() - if (jsonObject.get("fileWriteInfos").elements().hasNext) { - val writeInfo = fileWriteInfos.next(); - numBytes += writeInfo.get("fileSize").size() - // Get partition informations. - if (jsonObject.get("name").textValue().nonEmpty) { - val targetFileName = writeInfo.get("targetFileName").textValue() - val partitionDir = jsonObject.get("name").textValue() - updatedPartitions += partitionDir - val tmpOutputPath = - writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName - val absOutputPathObject = - writeFilesSpec.description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionDir)) - if (absOutputPathObject.nonEmpty) { - val absOutputPath = absOutputPathObject.get + "/" + targetFileName - addedAbsPathFiles(tmpOutputPath) = absOutputPath + // The json can be as following: + // { + // "inMemoryDataSizeInBytes":0, + // "containsNumberedFileNames":true, + // "onDiskDataSizeInBytes":307, + // "fileWriteInfos":[ + // { + // "fileSize":307, + // "writeFileName": + // "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", + // "targetFileName": + // "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" + // } + // ], + // "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", + // "rowCount":1, + // "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", + // "updateMode":"NEW", + // "name":"part1=1/part2=1" + // } + if (iter.hasNext) { + val cb = iter.next() + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + + val numRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val objectMapper = new ObjectMapper() + val jsonObject = objectMapper.readTree(fragments.toString) + + val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() + if (jsonObject.get("fileWriteInfos").elements().hasNext) { + val writeInfo = fileWriteInfos.next(); + numBytes += writeInfo.get("fileSize").size() + // Get partition information. + if (jsonObject.get("name").textValue().nonEmpty) { + val targetFileName = writeInfo.get("targetFileName").textValue() + val partitionDir = jsonObject.get("name").textValue() + updatedPartitions += partitionDir + val tmpOutputPath = + writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName + val absOutputPathObject = + writeFilesSpec.description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionDir)) + if (absOutputPathObject.nonEmpty) { + val absOutputPath = absOutputPathObject.get + "/" + targetFileName + addedAbsPathFiles(tmpOutputPath) = absOutputPath + } } } } - } - // TODO: need to get the partition Internal row? - val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + // TODO: need to get the partition Internal row? + val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + val result = WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary) + Iterator.single(result) + } else { + Iterator.empty + } - WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary) } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 441f06c15299..2372112b4830 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -466,6 +466,7 @@ std::shared_ptr makeHiveInsertTableHandl const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET, const std::optional compressionKind = {}) { std::vector> columnHandles; + columnHandles.reserve(tableColumnNames.size()); std::vector bucketedBy; std::vector bucketedTypes; std::vector> sortedBy; @@ -524,23 +525,23 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::vector isPartitionColumns; tableColumnNames.reserve(writeRel.table_schema().names_size()); - if (writeRel.has_table_schema()) { - const auto& tableSchema = writeRel.table_schema(); - isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); + VELOX_CHECK(writeRel.has_table_schema(), "WriteRel should have the table schema to store the column information"); + const auto& tableSchema = writeRel.table_schema(); + isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); - for (const auto& name : tableSchema.names()) { - tableColumnNames.emplace_back(name); - } + for (const auto& name : tableSchema.names()) { + tableColumnNames.emplace_back(name); + } - for (int i = 0; i < tableSchema.names_size(); i++) { - if (isPartitionColumns[i]) { - partitionedKey.emplace_back(tableColumnNames[i]); - } + for (int i = 0; i < tableSchema.names_size(); i++) { + if (isPartitionColumns[i]) { + partitionedKey.emplace_back(tableColumnNames[i]); } } std::vector writePath; writePath.reserve(1); + VELOX_CHECK(writeRel.named_table().names().size() == 1) for (const auto& name : writeRel.named_table().names()) { writePath.emplace_back(name); } @@ -585,7 +586,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: makeLocationHandle(writePath[0]), dwio::common::FileFormat::PARQUET, // Currently only support parquet format. compressionCodec)), - (isPartitionColumns.size() > 0) ? true : false, + (partitionedKey.size() > 0) ? true : false, exec::TableWriteTraits::outputType(nullptr), connector::CommitStrategy::kNoCommit, childNode); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 17f4ef56859c..d5451d8d74db 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -331,7 +331,7 @@ bool SubstraitToVeloxPlanValidator::validateExpression( bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeRel) { if (writeRel.has_input() && !validate(writeRel.input())) { - std::cout << "Validation failed for input type validation in WriteRel." << std::endl; + logValidateMsg("Validation failed for input type validation in WriteRel."); return false; } @@ -340,7 +340,7 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeR if (writeRel.has_named_table()) { const auto& extension = writeRel.named_table().advanced_extension(); if (!validateInputTypes(extension, types)) { - std::cout << "Validation failed for input types in WriteRel." << std::endl; + logValidateMsg("Validation failed for input type validation in WriteRel."); return false; } } diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java index a3432f905852..1c590cd785e9 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java @@ -179,18 +179,6 @@ public static RelNode makeExpandRel( return new ExpandRelNode(input, projections); } - public static RelNode makeWriteRel( - RelNode input, - List types, - List names, - List columnTypeNodes, - String writePath, - SubstraitContext context, - Long operatorId) { - context.registerRelToOperator(operatorId); - return new WriteRelNode(input, types, names, columnTypeNodes, writePath); - } - public static RelNode makeWriteRel( RelNode input, List types, diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java index d50c283bd902..fae8deeb5d24 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java @@ -55,20 +55,6 @@ public class WriteRelNode implements RelNode, Serializable { this.extensionNode = extensionNode; } - WriteRelNode( - RelNode input, - List types, - List names, - List partitionColumnTypeNodes, - String writePath) { - this.input = input; - this.types.addAll(types); - this.names.addAll(names); - this.columnTypeNodes.addAll(partitionColumnTypeNodes); - this.writePath = writePath; - this.extensionNode = null; - } - @Override public Rel toProtobuf() { diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 1fb50fae75b1..b86f7b31de11 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -35,8 +35,7 @@ trait BackendSettingsApi { fields: Array[StructField], partTable: Boolean, paths: Seq[String]): ValidationResult = ValidationResult.ok - def supportFileFormatWrite(format: FileFormat, fields: Array[StructField]): Boolean = false - def supportWriteExec(): Boolean = false + def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala index 0ad8a5d1b339..67c8fdd4115b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -19,7 +19,7 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.ConverterUtils import io.glutenproject.extension.ValidationResult -import io.glutenproject.metrics.MetricsUpdater +import io.glutenproject.metrics.{MetricsUpdater, NoopMetricsUpdater} import io.glutenproject.substrait.`type`.{ColumnTypeNode, TypeBuilder} import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.extensions.ExtensionBuilder @@ -48,7 +48,7 @@ case class WriteFilesExecTransformer( staticPartitions: TablePartitionSpec) extends UnaryExecNode with UnaryTransformSupport { - override def metricsUpdater(): MetricsUpdater = null + override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater override def output: Seq[Attribute] = Seq.empty @@ -135,11 +135,14 @@ case class WriteFilesExecTransformer( } override protected def doValidateInternal(): ValidationResult = { - if ( - !BackendsApiManager.getSettings.supportWriteExec() || !BackendsApiManager.getSettings - .supportFileFormatWrite(fileFormat, child.output.toStructType.fields) || bucketSpec.nonEmpty - ) { - return ValidationResult.notOk("Current backend does not support Write") + val supportedWrite = + BackendsApiManager.getSettings.supportWriteExec(fileFormat, child.output.toStructType.fields) + if (supportedWrite.nonEmpty) { + return ValidationResult.notOk("Unsupported native write: " + supportedWrite.get) + } + + if (bucketSpec.nonEmpty) { + return ValidationResult.notOk("Unsupported native write: bucket write is not supported.") } val substraitContext = new SubstraitContext @@ -152,8 +155,8 @@ case class WriteFilesExecTransformer( } override def doTransform(context: SubstraitContext): TransformContext = { -// val writePath = ColumnarWriteFilesExec.writePath.get() val writePath = child.session.sparkContext.getLocalProperty("writePath") + assert(writePath.size > 0) val childCtx = child.asInstanceOf[TransformSupport].doTransform(context) val operatorId = context.nextOperatorId(this.nodeName) From 9f72037b19f465e2cf66a9279ec4fbd57b7e374d Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 26 Dec 2023 20:13:53 +0800 Subject: [PATCH 3/5] Resolve comments --- .../backendsapi/clickhouse/CHBackend.scala | 5 ++ .../backendsapi/velox/VeloxBackend.scala | 10 ++- .../VeloxColumnarWriteFilesExec.scala | 84 +++++++++---------- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 21 +++-- .../SubstraitToVeloxPlanValidator.cc | 2 +- docs/developers/SubstraitModifications.md | 1 + .../backendsapi/BackendSettingsApi.scala | 2 +- .../execution/WriteFilesExecTransformer.scala | 13 +-- 8 files changed, 71 insertions(+), 67 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 5c0cf21f43c2..cfc4d5c91e1a 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -269,4 +270,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging { SparkEnv.get.conf .getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT) } + + override def supportWriteFilesExec( + format: FileFormat, + fields: Array[StructField]): Option[String] = None } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index a1396ff089df..3fa5f92aed61 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -115,7 +115,9 @@ object BackendSettings extends BackendSettingsApi { } } - override def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] = { + override def supportWriteFilesExec( + format: FileFormat, + fields: Array[StructField]): Option[String] = { def validateCompressionCodec(): Option[String] = { // Velox doesn't support brotli and lzo. val unSupportedCompressions = Set("brotli, lzo") @@ -135,10 +137,10 @@ object BackendSettings extends BackendSettingsApi { case struct: StructType if validateDateTypes(struct.fields).nonEmpty => Some("StructType(TimestampType)") case array: ArrayType if array.elementType.isInstanceOf[TimestampType] => - Some("MapType(TimestampType)") + Some("ArrayType(TimestampType)") case map: MapType - if (map.keyType.isInstanceOf[TimestampType] || - map.valueType.isInstanceOf[TimestampType]) => + if map.keyType.isInstanceOf[TimestampType] || + map.valueType.isInstanceOf[TimestampType] => Some("MapType(TimestampType)") case _ => None } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 0f9dda5e44d0..b516d547170d 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -84,55 +84,51 @@ class VeloxColumnarWriteFilesExec( // "updateMode":"NEW", // "name":"part1=1/part2=1" // } - if (iter.hasNext) { - val cb = iter.next() - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - - val numRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) - val objectMapper = new ObjectMapper() - val jsonObject = objectMapper.readTree(fragments.toString) - - val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() - if (jsonObject.get("fileWriteInfos").elements().hasNext) { - val writeInfo = fileWriteInfos.next(); - numBytes += writeInfo.get("fileSize").size() - // Get partition information. - if (jsonObject.get("name").textValue().nonEmpty) { - val targetFileName = writeInfo.get("targetFileName").textValue() - val partitionDir = jsonObject.get("name").textValue() - updatedPartitions += partitionDir - val tmpOutputPath = - writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName - val absOutputPathObject = - writeFilesSpec.description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionDir)) - if (absOutputPathObject.nonEmpty) { - val absOutputPath = absOutputPathObject.get + "/" + targetFileName - addedAbsPathFiles(tmpOutputPath) = absOutputPath - } + assert(iter.hasNext) + val cb = iter.next() + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + + val numRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val objectMapper = new ObjectMapper() + val jsonObject = objectMapper.readTree(fragments.toString) + + val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() + if (jsonObject.get("fileWriteInfos").elements().hasNext) { + val writeInfo = fileWriteInfos.next(); + numBytes += writeInfo.get("fileSize").longValue() + // Get partition information. + if (jsonObject.get("name").textValue().nonEmpty) { + val targetFileName = writeInfo.get("targetFileName").textValue() + val partitionDir = jsonObject.get("name").textValue() + updatedPartitions += partitionDir + val tmpOutputPath = + writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName + val absOutputPathObject = + writeFilesSpec.description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionDir)) + if (absOutputPathObject.nonEmpty) { + val absOutputPath = absOutputPathObject.get + "/" + targetFileName + addedAbsPathFiles(tmpOutputPath) = absOutputPath } } } - - // TODO: need to get the partition Internal row? - val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - - val result = WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary) - Iterator.single(result) - } else { - Iterator.empty } + // TODO: need to get the partition Internal row? + val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + val result = WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary) + Iterator.single(result) } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 2372112b4830..55d5c6ba9eb0 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -24,7 +24,6 @@ #include "utils/ConfigExtractor.h" -#include #include "config/GlutenConfig.h" namespace gluten { @@ -450,9 +449,10 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } std::shared_ptr makeLocationHandle( - std::string targetDirectory, - std::optional writeDirectory = std::nullopt, - connector::hive::LocationHandle::TableType tableType = connector::hive::LocationHandle::TableType::kExisting) { + const std::string& targetDirectory, + const std::optional& writeDirectory = std::nullopt, + const connector::hive::LocationHandle::TableType& tableType = + connector::hive::LocationHandle::TableType::kExisting) { return std::make_shared( targetDirectory, writeDirectory.value_or(targetDirectory), tableType); } @@ -461,10 +461,10 @@ std::shared_ptr makeHiveInsertTableHandl const std::vector& tableColumnNames, const std::vector& tableColumnTypes, const std::vector& partitionedBy, - std::shared_ptr bucketProperty, - std::shared_ptr locationHandle, - const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET, - const std::optional compressionKind = {}) { + const std::shared_ptr& bucketProperty, + const std::shared_ptr& locationHandle, + const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET, + const std::optional& compressionKind = {}) { std::vector> columnHandles; columnHandles.reserve(tableColumnNames.size()); std::vector bucketedBy; @@ -491,13 +491,13 @@ std::shared_ptr makeHiveInsertTableHandl } if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) { ++numPartitionColumns; - columnHandles.push_back(std::make_shared( + columnHandles.emplace_back(std::make_shared( tableColumnNames.at(i), connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, tableColumnTypes.at(i), tableColumnTypes.at(i))); } else { - columnHandles.push_back(std::make_shared( + columnHandles.emplace_back(std::make_shared( tableColumnNames.at(i), connector::hive::HiveColumnHandle::ColumnType::kRegular, tableColumnTypes.at(i), @@ -549,7 +549,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // spark default compression code is snappy. common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; if (writeRel.named_table().has_advanced_extension()) { - std::cout << "the table has extension" << std::flush << std::endl; if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) { compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index d5451d8d74db..5bc424c0d840 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -335,7 +335,7 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeR return false; } - // validate input datatype + // Validate input data type. std::vector types; if (writeRel.has_named_table()) { const auto& extension = writeRel.named_table().advanced_extension(); diff --git a/docs/developers/SubstraitModifications.md b/docs/developers/SubstraitModifications.md index 15f52a4f6a0f..1d97d58c7b63 100644 --- a/docs/developers/SubstraitModifications.md +++ b/docs/developers/SubstraitModifications.md @@ -25,6 +25,7 @@ changed `Unbounded` in `WindowFunction` into `Unbounded_Preceding` and `Unbounde * Added `ExpandRel`([#1361](https://github.com/oap-project/gluten/pull/1361)). * Added `GenerateRel`([#574](https://github.com/oap-project/gluten/pull/574)). * Added `PartitionColumn` in `LocalFiles`([#2405](https://github.com/oap-project/gluten/pull/2405)). +* Added `WriteRel` ([#3690](https://github.com/oap-project/gluten/pull/3690)). ## Modifications to type.proto diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index b86f7b31de11..983755b747a8 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -35,7 +35,7 @@ trait BackendSettingsApi { fields: Array[StructField], partTable: Boolean, paths: Seq[String]): ValidationResult = ValidationResult.ok - def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] + def supportWriteFilesExec(format: FileFormat, fields: Array[StructField]): Option[String] def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala index 67c8fdd4115b..042abe30b564 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -46,8 +46,7 @@ case class WriteFilesExecTransformer( bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec) - extends UnaryExecNode - with UnaryTransformSupport { + extends UnaryTransformSupport { override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater override def output: Seq[Attribute] = Seq.empty @@ -63,7 +62,7 @@ case class WriteFilesExecTransformer( .newBuilder() .setValue(writeParametersStr.toString) .build() - BackendsApiManager.getTransformerApiInstance.getPackMessage(message) + BackendsApiManager.getTransformerApiInstance.packPBMessage(message) } def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = { @@ -71,7 +70,7 @@ case class WriteFilesExecTransformer( attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable) } - BackendsApiManager.getTransformerApiInstance.getPackMessage( + BackendsApiManager.getTransformerApiInstance.packPBMessage( TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) } @@ -136,7 +135,9 @@ case class WriteFilesExecTransformer( override protected def doValidateInternal(): ValidationResult = { val supportedWrite = - BackendsApiManager.getSettings.supportWriteExec(fileFormat, child.output.toStructType.fields) + BackendsApiManager.getSettings.supportWriteFilesExec( + fileFormat, + child.output.toStructType.fields) if (supportedWrite.nonEmpty) { return ValidationResult.notOk("Unsupported native write: " + supportedWrite.get) } From f19cf3adfcef4420ea504422da08d957b041d951 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 26 Dec 2023 22:46:44 +0800 Subject: [PATCH 4/5] code format --- .../io/glutenproject/backendsapi/clickhouse/CHBackend.scala | 2 +- cpp/velox/compute/WholeStageResultIterator.cc | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index cfc4d5c91e1a..712ef6af7b0c 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -270,7 +270,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { SparkEnv.get.conf .getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT) } - + override def supportWriteFilesExec( format: FileFormat, fields: Array[StructField]): Option[String] = None diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 2c40897a50d4..7cbf383e4d4c 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -430,6 +430,7 @@ std::shared_ptr WholeStageResultIterator::createConnectorConfig() veloxCfg_->get(kCaseSensitive, false) == false ? "true" : "false"; configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = "6"; configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] = "400"; + configs[velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession] = "false"; return std::make_shared(configs); } From 2c1086392a2b6215eff4fd5c921f321d3489373d Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Wed, 27 Dec 2023 18:06:24 +0800 Subject: [PATCH 5/5] Resolve comments --- .../execution/VeloxColumnarWriteFilesExec.scala | 5 ++--- .../VeloxParquetDataTypeValidationSuite.scala | 2 +- .../VeloxParquetWriteForHiveSuite.scala | 16 ++++------------ 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index b516d547170d..50111ccb4ec5 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -87,8 +87,7 @@ class VeloxColumnarWriteFilesExec( assert(iter.hasNext) val cb = iter.next() val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - - val numRows = loadedCb.column(0).getLong(0) + val numWrittenRows = loadedCb.column(0).getLong(0) var updatedPartitions = Set.empty[String] val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() @@ -121,7 +120,7 @@ class VeloxColumnarWriteFilesExec( } // TODO: need to get the partition Internal row? - val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) + val stats = BasicWriteTaskStats(Seq.empty, loadedCb.numRows() - 1, numBytes, numWrittenRows) val summary = ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala index ee0fd8a647e1..e30fd6b7b643 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala @@ -451,7 +451,7 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit dir => val write_path = dir.toURI.getPath val data_path = getClass.getResource("/").getPath + "/data-type-validation-data/type1" - // Spark 3.4 native write doesn't support Timestamp type. + // Velox native write doesn't support Timestamp type. val df = spark.read.format("parquet").load(data_path).drop("timestamp") df.write.mode("append").format("parquet").save(write_path) val parquetDf = spark.read diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 20c20d8d8161..c76818a1657c 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -128,18 +128,10 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { "CREATE TABLE t (c int, d long, e long)" + " STORED AS PARQUET partitioned by (c, d)") withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") { - if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { - checkNativeStaticPartitionWrite( - "INSERT OVERWRITE TABLE t partition(c=1, d)" + - " SELECT 3 as e, 2 as e", - native = false) - } else { - checkNativeStaticPartitionWrite( - "INSERT OVERWRITE TABLE t partition(c=1, d)" + - " SELECT 3 as e, 2 as e", - native = false) - } - + checkNativeStaticPartitionWrite( + "INSERT OVERWRITE TABLE t partition(c=1, d)" + + " SELECT 3 as e, 2 as e", + native = false) } checkAnswer(spark.table("t"), Row(3, 1, 2)) }