From 24f899e8356d899454550c7cb41baf4a51e37912 Mon Sep 17 00:00:00 2001 From: Jia Date: Tue, 21 Nov 2023 07:13:25 +0000 Subject: [PATCH] Pass basic write without partition write --- .../velox/SparkPlanExecApiImpl.scala | 21 ++- .../backendsapi/velox/VeloxBackend.scala | 1 + .../VeloxDataTypeValidationSuite.scala | 8 +- cpp/velox/compute/VeloxPlanConverter.cc | 10 ++ cpp/velox/compute/VeloxPlanConverter.h | 2 + cpp/velox/compute/WholeStageResultIterator.cc | 4 +- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 141 ++++++++++++++++ cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 + .../SubstraitToVeloxPlanValidator.cc | 21 +++ .../substrait/SubstraitToVeloxPlanValidator.h | 3 + .../substrait/rel/RelBuilder.java | 25 +++ .../substrait/rel/WriteRelNode.java | 112 +++++++++++++ .../substrait/proto/substrait/algebra.proto | 1 + .../backendsapi/BackendSettingsApi.scala | 1 + .../backendsapi/SparkPlanExecApi.scala | 12 ++ .../execution/WriteFilesExecTransformer.scala | 153 ++++++++++++++++++ .../extension/ColumnarOverrides.scala | 19 +++ .../columnar/TransformHintRule.scala | 20 ++- .../execution/ColumnarWriteFilesExec.scala | 75 +++++++++ 19 files changed, 626 insertions(+), 6 deletions(-) create mode 100644 gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java create mode 100644 gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala create mode 100644 gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index 38cfe013868f3..beeacefb0e04c 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -34,6 +34,8 @@ import org.apache.spark.shuffle.utils.ShuffleUtil import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.{AggregateFunctionRewriteRule, FunctionIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, GetArrayItem, GetMapValue, GetStructField, Literal, NamedExpression, StringSplit, StringTrim} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, HLLAdapter} import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -41,7 +43,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, ColumnarWriteFilesExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric @@ -243,6 +246,22 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { ShuffleUtil.genColumnarShuffleWriter(parameters) } + override def createColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec): WriteFilesExec = { + new ColumnarWriteFilesExec( + child, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) + } + /** * Generate ColumnarBatchSerializer for ColumnarShuffleExchangeExec. * diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 40d42b90873ec..963d6f7c3e35b 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -109,6 +109,7 @@ object BackendSettings extends BackendSettingsApi { case _ => false } } + override def supportWriteExec(): Boolean = true override def supportExpandExec(): Boolean = true diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDataTypeValidationSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDataTypeValidationSuite.scala index 130a05f901948..7031dbb44922b 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDataTypeValidationSuite.scala @@ -445,14 +445,18 @@ class VeloxDataTypeValidationSuite extends VeloxWholeStageTransformerSuite { } } - ignore("Velox Parquet Write") { + test("Velox Parquet Write") { withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) { withTempDir { dir => val write_path = dir.toURI.getPath val data_path = getClass.getResource("/").getPath + "/data-type-validation-data/type1" - val df = spark.read.format("parquet").load(data_path) + val df = spark.read.format("parquet").load(data_path).drop("timestamp") df.write.mode("append").format("parquet").save(write_path) + val parquetDf = spark.read + .format("parquet") + .load(write_path) + checkAnswer(parquetDf, df) } } diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index a449b261234eb..45b2927a4ff10 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -39,6 +39,14 @@ VeloxPlanConverter::VeloxPlanConverter( substraitVeloxPlanConverter_(veloxPool, confMap, validationMode), pool_(veloxPool) {} +void VeloxPlanConverter::setInputPlanNode(const ::substrait::WriteRel& writeRel) { + if (writeRel.has_input()) { + setInputPlanNode(writeRel.input()); + } else { + throw std::runtime_error("Child expected"); + } +} + void VeloxPlanConverter::setInputPlanNode(const ::substrait::FetchRel& fetchRel) { if (fetchRel.has_input()) { setInputPlanNode(fetchRel.input()); @@ -176,6 +184,8 @@ void VeloxPlanConverter::setInputPlanNode(const ::substrait::Rel& srel) { setInputPlanNode(srel.window()); } else if (srel.has_generate()) { setInputPlanNode(srel.generate()); + } else if (srel.has_write()) { + setInputPlanNode(srel.write()); } else { throw std::runtime_error("Rel is not supported: " + srel.DebugString()); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 90c58774aa0dc..01fd9bcfa4e62 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -42,6 +42,8 @@ class VeloxPlanConverter { } private: + void setInputPlanNode(const ::substrait::WriteRel& writeRel); + void setInputPlanNode(const ::substrait::FetchRel& fetchRel); void setInputPlanNode(const ::substrait::ExpandRel& sExpand); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index f62564cd2e82b..ec5313cb37d4e 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -457,7 +457,7 @@ WholeStageResultIteratorFirstStage::WholeStageResultIteratorFirstStage( std::shared_ptr queryCtx = createNewVeloxQueryCtx(); task_ = velox::exec::Task::create( - fmt::format("Gluten {}", taskInfo_.toString()), std::move(planFragment), 0, std::move(queryCtx)); + fmt::format("Gluten_StageId_{}_TaskId_{}", std::to_string(taskInfo_.stageId), std::to_string(taskInfo_.taskId)), std::move(planFragment), 0, std::move(queryCtx)); if (!task_->supportsSingleThreadedExecution()) { throw std::runtime_error("Task doesn't support single thread execution: " + planNode->toString()); @@ -510,7 +510,7 @@ WholeStageResultIteratorMiddleStage::WholeStageResultIteratorMiddleStage( std::shared_ptr queryCtx = createNewVeloxQueryCtx(); task_ = velox::exec::Task::create( - fmt::format("Gluten {}", taskInfo_.toString()), std::move(planFragment), 0, std::move(queryCtx)); + fmt::format("Gluten_StageId_{}_TaskId_{}", std::to_string(taskInfo_.stageId), std::to_string(taskInfo_.taskId)), std::move(planFragment), 0, std::move(queryCtx)); if (!task_->supportsSingleThreadedExecution()) { throw std::runtime_error("Task doesn't support single thread execution: " + planNode->toString()); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index f38c2eefaccaa..bd53ddd7ad4e5 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -18,6 +18,8 @@ #include "SubstraitToVeloxPlan.h" #include "TypeUtils.h" #include "VariantToVectorConverter.h" +#include "velox/connectors/hive/HiveDataSink.h" +#include "velox/exec/TableWriter.h" #include "velox/type/Type.h" #include "utils/ConfigExtractor.h" @@ -440,6 +442,143 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } } +std::shared_ptr makeLocationHandle( + std::string targetDirectory, + std::optional writeDirectory = std::nullopt, + connector::hive::LocationHandle::TableType tableType = connector::hive::LocationHandle::TableType::kNew) { + return std::make_shared( + targetDirectory, writeDirectory.value_or(targetDirectory), tableType); +} + +std::shared_ptr makeHiveInsertTableHandle( + const std::vector& tableColumnNames, + const std::vector& tableColumnTypes, + const std::vector& partitionedBy, + std::shared_ptr bucketProperty, + std::shared_ptr locationHandle, + const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET, + const std::optional compressionKind = {}) { + std::vector> columnHandles; + std::vector bucketedBy; + std::vector bucketedTypes; + std::vector> sortedBy; + if (bucketProperty != nullptr) { + bucketedBy = bucketProperty->bucketedBy(); + bucketedTypes = bucketProperty->bucketedTypes(); + sortedBy = bucketProperty->sortedBy(); + } + int32_t numPartitionColumns{0}; + int32_t numSortingColumns{0}; + int32_t numBucketColumns{0}; + for (int i = 0; i < tableColumnNames.size(); ++i) { + for (int j = 0; j < bucketedBy.size(); ++j) { + if (bucketedBy[j] == tableColumnNames[i]) { + ++numBucketColumns; + } + } + for (int j = 0; j < sortedBy.size(); ++j) { + if (sortedBy[j]->sortColumn() == tableColumnNames[i]) { + ++numSortingColumns; + } + } + if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) { + ++numPartitionColumns; + columnHandles.push_back(std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, + tableColumnTypes.at(i), + tableColumnTypes.at(i))); + } else { + columnHandles.push_back(std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kRegular, + tableColumnTypes.at(i), + tableColumnTypes.at(i))); + } + } + VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size()); + VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size()); + VELOX_CHECK_EQ(numSortingColumns, sortedBy.size()); + return std::make_shared( + columnHandles, locationHandle, tableStorageFormat, bucketProperty, compressionKind); +} + +core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) { + core::PlanNodePtr childNode; + if (writeRel.has_input()) { + childNode = toVeloxPlan(writeRel.input()); + } else { + VELOX_FAIL("Child Rel is expected in WriteRel."); + } + const auto& inputType = childNode->outputType(); + + std::vector tableColumnNames; + std::vector partitionedKey; + std::vector isPartitionColumns; + tableColumnNames.reserve(writeRel.table_schema().names_size()); + + if (writeRel.has_table_schema()) { + const auto& tableSchema = writeRel.table_schema(); + isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); + + for (const auto& name : tableSchema.names()) { + tableColumnNames.emplace_back(name); + } + + for (int i = 0; i < tableSchema.names_size(); i++) { + if (isPartitionColumns[i]) { + partitionedKey.emplace_back(tableColumnNames[i]); + } + } + } + + std::vector writePath; + writePath.reserve(1); + for (const auto& name : writeRel.named_table().names()) { + std::cout << "the file path when creating write node is " << name << std::flush << std::endl; + writePath.emplace_back(name); + } + + std::string format = "dwrf"; + if (writeRel.named_table().has_advanced_extension() && + SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isPARQUET=")) { + format = "parquet"; + } + + // Do not hard-code connector ID and allow for connectors other than Hive. + static const std::string kHiveConnectorId = "test-hive"; + // check whether the write path is file, if yes, create it as a directory + // if (writePath[0].substr(0, 4) == "file") { + // struct stat buffer; + // if (stat(writePath[0].substr(5).c_str(), &buffer) == 0 && S_ISREG(buffer.st_mode)) { + // auto command = "rm -rf " + writePath[0].substr(5) + " && mkdir -p " + writePath[0].substr(5); + + // auto ret = system(command.c_str()); + // (void)(ret); + // } + // } + + auto outputType = ROW({"rowCount"}, {BIGINT()}); + + return std::make_shared( + nextPlanNodeId(), + inputType, + tableColumnNames, + nullptr, /*aggregationNode*/ + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + inputType->names(), + inputType->children(), + partitionedKey, + nullptr /*bucketProperty*/, + makeLocationHandle(writePath[0]))), + (isPartitionColumns.size() > 0) ? true : false, + exec::TableWriteTraits::outputType(nullptr), + connector::CommitStrategy::kNoCommit, + childNode); +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ExpandRel& expandRel) { core::PlanNodePtr childNode; if (expandRel.has_input()) { @@ -993,6 +1132,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: return toVeloxPlan(rel.fetch()); } else if (rel.has_window()) { return toVeloxPlan(rel.window()); + } else if (rel.has_write()) { + return toVeloxPlan(rel.write()); } else { VELOX_NYI("Substrait conversion not supported for Rel."); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index f8ad7d0727252..6f37d183d843c 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -58,6 +58,9 @@ class SubstraitToVeloxPlanConverter { bool validationMode = false) : pool_(pool), confMap_(confMap), validationMode_(validationMode) {} + /// Used to convert Substrait WriteRel into Velox PlanNode. + core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel); + /// Used to convert Substrait ExpandRel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::ExpandRel& expandRel); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index e5a45951a6bd9..afc5ae8ceca36 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -354,6 +354,25 @@ bool SubstraitToVeloxPlanValidator::validateExpression( } } +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeRel) { + if (writeRel.has_input() && !validate(writeRel.input())) { + std::cout << "Validation failed for input type validation in WriteRel." << std::endl; + return false; + } + + // validate input datatype + if (writeRel.has_named_table()) { + const auto& extension = writeRel.named_table().advanced_extension(); + std::vector types; + if (!validateInputTypes(extension, types)) { + std::cout << "Validation failed for input types in WriteRel." << std::endl; + return false; + } + } + + return true; +} + bool SubstraitToVeloxPlanValidator::validate(const ::substrait::FetchRel& fetchRel) { RowTypePtr rowType = nullptr; // Get and validate the input types from extension. @@ -1193,6 +1212,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Rel& rel) { return validate(rel.fetch()); } else if (rel.has_window()) { return validate(rel.window()); + } else if (rel.has_write()) { + return validate(rel.write()); } else { return false; } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index d5d76a4dc1c18..ad237f7a701b4 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -29,6 +29,9 @@ class SubstraitToVeloxPlanValidator { SubstraitToVeloxPlanValidator(memory::MemoryPool* pool, core::ExecCtx* execCtx) : pool_(pool), execCtx_(execCtx), planConverter_(pool_, confMap_, true) {} + /// Used to validate whether the computing of this Write is supported. + bool validate(const ::substrait::WriteRel& writeRel); + /// Used to validate whether the computing of this Limit is supported. bool validate(const ::substrait::FetchRel& fetchRel); diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java index 8dfb2f4a20afc..dbb60e5ae1fbe 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java @@ -201,6 +201,31 @@ public static RelNode makeExpandRel( return new ExpandRelNode(input, projections); } + public static RelNode makeWriteRel( + RelNode input, + List types, + List names, + List columnTypeNodes, + String writePath, + SubstraitContext context, + Long operatorId) { + context.registerRelToOperator(operatorId); + return new WriteRelNode(input, types, names, columnTypeNodes, writePath); + } + + public static RelNode makeWriteRel( + RelNode input, + List types, + List names, + List columnTypeNodes, + String writePath, + AdvancedExtensionNode extensionNode, + SubstraitContext context, + Long operatorId) { + context.registerRelToOperator(operatorId); + return new WriteRelNode(input, types, names, columnTypeNodes, writePath, extensionNode); + } + public static RelNode makeSortRel( RelNode input, List sorts, diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java new file mode 100644 index 0000000000000..d699ce026d5a7 --- /dev/null +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/WriteRelNode.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.substrait.rel; + +import io.glutenproject.substrait.extensions.AdvancedExtensionNode; +import io.glutenproject.substrait.type.ColumnTypeNode; +import io.glutenproject.substrait.type.TypeNode; + +import io.substrait.proto.NamedObjectWrite; +import io.substrait.proto.NamedStruct; +import io.substrait.proto.Rel; +import io.substrait.proto.Type; +import io.substrait.proto.WriteRel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class WriteRelNode implements RelNode, Serializable { + private final RelNode input; + private final List types = new ArrayList<>(); + private final List names = new ArrayList<>(); + + private final String writePath; + private final List columnTypeNodes = new ArrayList<>(); + + private final AdvancedExtensionNode extensionNode; + + WriteRelNode( + RelNode input, + List types, + List names, + List partitionColumnTypeNodes, + String writePath, + AdvancedExtensionNode extensionNode) { + this.input = input; + this.types.addAll(types); + this.names.addAll(names); + this.columnTypeNodes.addAll(partitionColumnTypeNodes); + this.writePath = writePath; + this.extensionNode = extensionNode; + } + + WriteRelNode( + RelNode input, + List types, + List names, + List partitionColumnTypeNodes, + String writePath) { + this.input = input; + this.types.addAll(types); + this.names.addAll(names); + this.columnTypeNodes.addAll(partitionColumnTypeNodes); + this.writePath = writePath; + this.extensionNode = null; + } + + @Override + public Rel toProtobuf() { + + WriteRel.Builder writeBuilder = WriteRel.newBuilder(); + + Type.Struct.Builder structBuilder = Type.Struct.newBuilder(); + for (TypeNode typeNode : types) { + structBuilder.addTypes(typeNode.toProtobuf()); + } + + NamedStruct.Builder nStructBuilder = NamedStruct.newBuilder(); + nStructBuilder.setStruct(structBuilder.build()); + for (String name : names) { + nStructBuilder.addNames(name); + } + if (!columnTypeNodes.isEmpty()) { + for (ColumnTypeNode columnTypeNode : columnTypeNodes) { + nStructBuilder.addColumnTypes(columnTypeNode.toProtobuf()); + } + } + + writeBuilder.setTableSchema(nStructBuilder); + if (writePath != "") { + NamedObjectWrite.Builder nameObjectWriter = NamedObjectWrite.newBuilder(); + nameObjectWriter.addNames(writePath); + if (extensionNode != null) { + nameObjectWriter.setAdvancedExtension(extensionNode.toProtobuf()); + } + + writeBuilder.setNamedTable(nameObjectWriter); + } + + if (input != null) { + writeBuilder.setInput(input.toProtobuf()); + } + + Rel.Builder builder = Rel.newBuilder(); + builder.setWrite(writeBuilder.build()); + return builder.build(); + } +} diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index 6bebe6496497a..ef1542c08c9b3 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -444,6 +444,7 @@ message Rel { ExpandRel expand = 15; WindowRel window = 16; GenerateRel generate = 17; + WriteRel write = 18; } } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index fcd1bbfe84533..ada1da5a4ba4e 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -34,6 +34,7 @@ trait BackendSettingsApi { fields: Array[StructField], partTable: Boolean, paths: Seq[String]): Boolean = false + def supportWriteExec(): Boolean = false def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index f910e16d9a0bf..8913754c89762 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -27,6 +27,8 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriter import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BuildSide @@ -35,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.hive.HiveTableScanExecTransformer @@ -181,6 +184,15 @@ trait SparkPlanExecApi { numOutputRows: SQLMetric, dataSize: SQLMetric): BuildSideRelation + /** Create broadcast relation for BroadcastExchangeExec */ + def createColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec): WriteFilesExec + /** * Generate extended DataSourceV2 Strategies. Currently only for ClickHouse backend. * diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala new file mode 100644 index 0000000000000..3b0c86d6250aa --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.backendsapi.BackendsApiManager +import io.glutenproject.expression.ConverterUtils +import io.glutenproject.extension.ValidationResult +import io.glutenproject.metrics.MetricsUpdater +import io.glutenproject.substrait.`type`.{ColumnTypeNode, TypeBuilder, TypeNode} +import io.glutenproject.substrait.SubstraitContext +import io.glutenproject.substrait.extensions.ExtensionBuilder +import io.glutenproject.substrait.rel.{RelBuilder, RelNode} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.google.protobuf.Any + +import java.util + +case class WriteFilesExecTransformer( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) + extends UnaryExecNode + with UnaryTransformSupport { + override def metricsUpdater(): MetricsUpdater = null + + override def output: Seq[Attribute] = Seq.empty + + def getRelNode( + context: SubstraitContext, + originalInputAttributes: Seq[Attribute], + writePath: String, + operatorId: Long, + input: RelNode, + validation: Boolean): RelNode = { + val typeNodes = ConverterUtils.collectAttributeTypeNodes(originalInputAttributes) + val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(originalInputAttributes) + + val columnTypeNodes = new java.util.ArrayList[ColumnTypeNode]() + for (attr <- output) { + if (partitionColumns.exists(_.name.equals(attr.name))) { + columnTypeNodes.add(new ColumnTypeNode(1)) + } else { + columnTypeNodes.add(new ColumnTypeNode(0)) + } + } + + if (!validation) { + RelBuilder.makeWriteRel( + input, + typeNodes, + nameList, + columnTypeNodes, + writePath, + context, + operatorId) + } else { + // Use a extension node to send the input types through Substrait plan for validation. + val inputTypeNodeList = new java.util.ArrayList[TypeNode]() + for (attr <- originalInputAttributes) { + inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) + } + val extensionNode = ExtensionBuilder.makeAdvancedExtension( + Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + RelBuilder.makeWriteRel( + input, + typeNodes, + nameList, + columnTypeNodes, + writePath, + extensionNode, + context, + operatorId) + } + } + + override protected def doValidateInternal(): ValidationResult = { + if (!BackendsApiManager.getSettings.supportWriteExec()) { + return ValidationResult.notOk("Current backend does not support expand") + } + + val substraitContext = new SubstraitContext + val operatorId = substraitContext.nextOperatorId(this.nodeName) + + val relNode = + getRelNode(substraitContext, child.output, "", operatorId, null, validation = true) + + doNativeValidation(substraitContext, relNode) + } + + override def doTransform(context: SubstraitContext): TransformContext = { +// val writePath = ColumnarWriteFilesExec.writePath.get() + val writePath = child.session.sparkContext.getLocalProperty("writePath") + val childCtx = child match { + case c: TransformSupport => + c.doTransform(context) + case _ => + null + } + + val operatorId = context.nextOperatorId(this.nodeName) + + val (currRel, inputAttributes) = if (childCtx != null) { + ( + getRelNode(context, child.output, writePath, operatorId, childCtx.root, validation = false), + childCtx.outputAttributes) + } else { + // This means the input is just an iterator, so an ReadRel will be created as child. + // Prepare the input schema. + val attrList = new util.ArrayList[Attribute]() + for (attr <- child.output) { + attrList.add(attr) + } + val readRel = RelBuilder.makeReadRel(attrList, context, operatorId) + ( + getRelNode(context, child.output, writePath, operatorId, readRel, validation = false), + child.output) + } + assert(currRel != null, "Expand Rel should be valid") + TransformContext(inputAttributes, output, currRel) + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().") + } + + override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer = + copy(child = newChild) +} diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 3c12d0c56e226..dd25d4bc9f962 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ @@ -375,6 +376,24 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) val child = replaceWithTransformerPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ExpandExecTransformer(plan.projections, plan.output, child) + case plan: WriteFilesExec => + val child = replaceWithTransformerPlan(plan.child) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + val writeTransformer = WriteFilesExecTransformer( + child, + plan.fileFormat, + plan.partitionColumns, + plan.bucketSpec, + plan.options, + plan.staticPartitions) + BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec( + writeTransformer, + plan.fileFormat, + plan.partitionColumns, + plan.bucketSpec, + plan.options, + plan.staticPartitions + ) case plan: SortExec => val child = replaceWithTransformerPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala index 7bfcd35382c3a..af77f794651fa 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, QueryStageExec} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ @@ -258,7 +259,7 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { TransformHints.tagNotTransformable(p, "at least one of its children has empty output") p.children.foreach { child => - if (child.output.isEmpty) { + if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { TransformHints.tagNotTransformable( child, "at least one of its children has empty output") @@ -306,6 +307,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { val enableTakeOrderedAndProject: Boolean = !scanOnly && columnarConf.enableTakeOrderedAndProject && enableColumnarSort && enableColumnarLimit && enableColumnarShuffle && enableColumnarProject + val enableColumnarWrite: Boolean = columnarConf.enableNativeWriter def apply(plan: SparkPlan): SparkPlan = { addTransformableTags(plan) @@ -464,6 +466,22 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { val transformer = ExpandExecTransformer(plan.projections, plan.output, plan.child) TransformHints.tag(plan, transformer.doValidate().toTransformHint) } + + case plan: WriteFilesExec => + if (!enableColumnarWrite) { + TransformHints.tagNotTransformable( + plan, + "columnar Write is not enabled in WriteFilesExec") + } else { + val transformer = WriteFilesExecTransformer( + plan.child, + plan.fileFormat, + plan.partitionColumns, + plan.bucketSpec, + plan.options, + plan.staticPartitions) + TransformHints.tag(plan, transformer.doValidate().toTransformHint) + } case plan: SortExec => if (!enableColumnarSort) { TransformHints.tagNotTransformable(plan, "columnar Sort is not enabled in SortExec") diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala new file mode 100644 index 0000000000000..cfdb41b19dc88 --- /dev/null +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators + +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.write.WriterCommitMessage +import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, WriteFilesExec, WriteFilesSpec, WriteTaskResult} +import org.apache.spark.sql.vectorized.ColumnarBatch + +class ColumnarWriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) + extends WriteFilesExec( + child, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) { + + override def supportsColumnar(): Boolean = true + + override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { + assert(child.supportsColumnar) + + child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path) + child.executeColumnar().map { + cb => + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + val numRows = loadedCb.column(0).getLong(0) + // TODO: need to get the partitions, numFiles, numBytes from cb. + val stats = BasicWriteTaskStats(Seq.empty, 0, 0, numRows) + val summary = ExecutedWriteSummary(updatedPartitions = Set.empty, stats = Seq(stats)) + WriteTaskResult(new TaskCommitMessage(Map.empty -> Set.empty), summary) + } + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().") + } + + override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = + new ColumnarWriteFilesExec( + newChild, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) +}