diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 1e658acbae0e1..d7a946a849a0f 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -238,4 +239,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def requiredInputFilePaths(): Boolean = true override def enableBloomFilterAggFallbackRule(): Boolean = false + + override def supportWriteFilesExec( + format: FileFormat, + fields: Array[StructField]): Option[String] = None } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 4a84d868f3ce9..d4eb5e17233cf 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -111,7 +111,9 @@ object BackendSettings extends BackendSettingsApi { } } - override def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] = { + override def supportWriteFilesExec( + format: FileFormat, + fields: Array[StructField]): Option[String] = { def validateCompressionCodec(): Option[String] = { // Velox doesn't support brotli and lzo. val unSupportedCompressions = Set("brotli, lzo") @@ -131,10 +133,10 @@ object BackendSettings extends BackendSettingsApi { case struct: StructType if validateDateTypes(struct.fields).nonEmpty => Some("StructType(TimestampType)") case array: ArrayType if array.elementType.isInstanceOf[TimestampType] => - Some("MapType(TimestampType)") + Some("ArrayType(TimestampType)") case map: MapType - if (map.keyType.isInstanceOf[TimestampType] || - map.valueType.isInstanceOf[TimestampType]) => + if map.keyType.isInstanceOf[TimestampType] || + map.valueType.isInstanceOf[TimestampType] => Some("MapType(TimestampType)") case _ => None } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 0f9dda5e44d06..b516d547170d1 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -84,55 +84,51 @@ class VeloxColumnarWriteFilesExec( // "updateMode":"NEW", // "name":"part1=1/part2=1" // } - if (iter.hasNext) { - val cb = iter.next() - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - - val numRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) - val objectMapper = new ObjectMapper() - val jsonObject = objectMapper.readTree(fragments.toString) - - val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() - if (jsonObject.get("fileWriteInfos").elements().hasNext) { - val writeInfo = fileWriteInfos.next(); - numBytes += writeInfo.get("fileSize").size() - // Get partition information. - if (jsonObject.get("name").textValue().nonEmpty) { - val targetFileName = writeInfo.get("targetFileName").textValue() - val partitionDir = jsonObject.get("name").textValue() - updatedPartitions += partitionDir - val tmpOutputPath = - writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName - val absOutputPathObject = - writeFilesSpec.description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionDir)) - if (absOutputPathObject.nonEmpty) { - val absOutputPath = absOutputPathObject.get + "/" + targetFileName - addedAbsPathFiles(tmpOutputPath) = absOutputPath - } + assert(iter.hasNext) + val cb = iter.next() + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + + val numRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val objectMapper = new ObjectMapper() + val jsonObject = objectMapper.readTree(fragments.toString) + + val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() + if (jsonObject.get("fileWriteInfos").elements().hasNext) { + val writeInfo = fileWriteInfos.next(); + numBytes += writeInfo.get("fileSize").longValue() + // Get partition information. + if (jsonObject.get("name").textValue().nonEmpty) { + val targetFileName = writeInfo.get("targetFileName").textValue() + val partitionDir = jsonObject.get("name").textValue() + updatedPartitions += partitionDir + val tmpOutputPath = + writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName + val absOutputPathObject = + writeFilesSpec.description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionDir)) + if (absOutputPathObject.nonEmpty) { + val absOutputPath = absOutputPathObject.get + "/" + targetFileName + addedAbsPathFiles(tmpOutputPath) = absOutputPath } } } - - // TODO: need to get the partition Internal row? - val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - - val result = WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary) - Iterator.single(result) - } else { - Iterator.empty } + // TODO: need to get the partition Internal row? + val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + val result = WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary) + Iterator.single(result) } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 87abef8ba953b..c86da5be25520 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -24,7 +24,6 @@ #include "utils/ConfigExtractor.h" -#include #include "config/GlutenConfig.h" namespace gluten { @@ -450,9 +449,10 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } std::shared_ptr makeLocationHandle( - std::string targetDirectory, - std::optional writeDirectory = std::nullopt, - connector::hive::LocationHandle::TableType tableType = connector::hive::LocationHandle::TableType::kExisting) { + const std::string& targetDirectory, + const std::optional& writeDirectory = std::nullopt, + const connector::hive::LocationHandle::TableType& tableType = + connector::hive::LocationHandle::TableType::kExisting) { return std::make_shared( targetDirectory, writeDirectory.value_or(targetDirectory), tableType); } @@ -461,10 +461,10 @@ std::shared_ptr makeHiveInsertTableHandl const std::vector& tableColumnNames, const std::vector& tableColumnTypes, const std::vector& partitionedBy, - std::shared_ptr bucketProperty, - std::shared_ptr locationHandle, - const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET, - const std::optional compressionKind = {}) { + const std::shared_ptr& bucketProperty, + const std::shared_ptr& locationHandle, + const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET, + const std::optional& compressionKind = {}) { std::vector> columnHandles; columnHandles.reserve(tableColumnNames.size()); std::vector bucketedBy; @@ -491,13 +491,13 @@ std::shared_ptr makeHiveInsertTableHandl } if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) { ++numPartitionColumns; - columnHandles.push_back(std::make_shared( + columnHandles.emplace_back(std::make_shared( tableColumnNames.at(i), connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, tableColumnTypes.at(i), tableColumnTypes.at(i))); } else { - columnHandles.push_back(std::make_shared( + columnHandles.emplace_back(std::make_shared( tableColumnNames.at(i), connector::hive::HiveColumnHandle::ColumnType::kRegular, tableColumnTypes.at(i), @@ -549,7 +549,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // spark default compression code is snappy. common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; if (writeRel.named_table().has_advanced_extension()) { - std::cout << "the table has extension" << std::flush << std::endl; if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) { compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index d5451d8d74dbc..5bc424c0d840b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -335,7 +335,7 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeR return false; } - // validate input datatype + // Validate input data type. std::vector types; if (writeRel.has_named_table()) { const auto& extension = writeRel.named_table().advanced_extension(); diff --git a/docs/developers/SubstraitModifications.md b/docs/developers/SubstraitModifications.md index 15f52a4f6a0f6..1d97d58c7b63b 100644 --- a/docs/developers/SubstraitModifications.md +++ b/docs/developers/SubstraitModifications.md @@ -25,6 +25,7 @@ changed `Unbounded` in `WindowFunction` into `Unbounded_Preceding` and `Unbounde * Added `ExpandRel`([#1361](https://github.com/oap-project/gluten/pull/1361)). * Added `GenerateRel`([#574](https://github.com/oap-project/gluten/pull/574)). * Added `PartitionColumn` in `LocalFiles`([#2405](https://github.com/oap-project/gluten/pull/2405)). +* Added `WriteRel` ([#3690](https://github.com/oap-project/gluten/pull/3690)). ## Modifications to type.proto diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 0b5ded78284e8..b36057ce9c1cf 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -34,7 +34,7 @@ trait BackendSettingsApi { fields: Array[StructField], partTable: Boolean, paths: Seq[String]): Boolean = false - def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] + def supportWriteFilesExec(format: FileFormat, fields: Array[StructField]): Option[String] def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala index 67c8fdd4115b7..042abe30b5641 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -46,8 +46,7 @@ case class WriteFilesExecTransformer( bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec) - extends UnaryExecNode - with UnaryTransformSupport { + extends UnaryTransformSupport { override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater override def output: Seq[Attribute] = Seq.empty @@ -63,7 +62,7 @@ case class WriteFilesExecTransformer( .newBuilder() .setValue(writeParametersStr.toString) .build() - BackendsApiManager.getTransformerApiInstance.getPackMessage(message) + BackendsApiManager.getTransformerApiInstance.packPBMessage(message) } def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = { @@ -71,7 +70,7 @@ case class WriteFilesExecTransformer( attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable) } - BackendsApiManager.getTransformerApiInstance.getPackMessage( + BackendsApiManager.getTransformerApiInstance.packPBMessage( TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) } @@ -136,7 +135,9 @@ case class WriteFilesExecTransformer( override protected def doValidateInternal(): ValidationResult = { val supportedWrite = - BackendsApiManager.getSettings.supportWriteExec(fileFormat, child.output.toStructType.fields) + BackendsApiManager.getSettings.supportWriteFilesExec( + fileFormat, + child.output.toStructType.fields) if (supportedWrite.nonEmpty) { return ValidationResult.notOk("Unsupported native write: " + supportedWrite.get) }