Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Dec 26, 2023
1 parent 7073972 commit 230950e
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

Expand Down Expand Up @@ -238,4 +239,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def requiredInputFilePaths(): Boolean = true

override def enableBloomFilterAggFallbackRule(): Boolean = false

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField]): Option[String] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ object BackendSettings extends BackendSettingsApi {
}
}

override def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String] = {
override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField]): Option[String] = {
def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli, lzo")
Expand All @@ -131,10 +133,10 @@ object BackendSettings extends BackendSettingsApi {
case struct: StructType if validateDateTypes(struct.fields).nonEmpty =>
Some("StructType(TimestampType)")
case array: ArrayType if array.elementType.isInstanceOf[TimestampType] =>
Some("MapType(TimestampType)")
Some("ArrayType(TimestampType)")
case map: MapType
if (map.keyType.isInstanceOf[TimestampType] ||
map.valueType.isInstanceOf[TimestampType]) =>
if map.keyType.isInstanceOf[TimestampType] ||
map.valueType.isInstanceOf[TimestampType] =>
Some("MapType(TimestampType)")
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,55 +84,51 @@ class VeloxColumnarWriteFilesExec(
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
if (iter.hasNext) {
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)

val numRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val objectMapper = new ObjectMapper()
val jsonObject = objectMapper.readTree(fragments.toString)

val fileWriteInfos = jsonObject.get("fileWriteInfos").elements()
if (jsonObject.get("fileWriteInfos").elements().hasNext) {
val writeInfo = fileWriteInfos.next();
numBytes += writeInfo.get("fileSize").size()
// Get partition information.
if (jsonObject.get("name").textValue().nonEmpty) {
val targetFileName = writeInfo.get("targetFileName").textValue()
val partitionDir = jsonObject.get("name").textValue()
updatedPartitions += partitionDir
val tmpOutputPath =
writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName
val absOutputPathObject =
writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionDir))
if (absOutputPathObject.nonEmpty) {
val absOutputPath = absOutputPathObject.get + "/" + targetFileName
addedAbsPathFiles(tmpOutputPath) = absOutputPath
}
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)

val numRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val objectMapper = new ObjectMapper()
val jsonObject = objectMapper.readTree(fragments.toString)

val fileWriteInfos = jsonObject.get("fileWriteInfos").elements()
if (jsonObject.get("fileWriteInfos").elements().hasNext) {
val writeInfo = fileWriteInfos.next();
numBytes += writeInfo.get("fileSize").longValue()
// Get partition information.
if (jsonObject.get("name").textValue().nonEmpty) {
val targetFileName = writeInfo.get("targetFileName").textValue()
val partitionDir = jsonObject.get("name").textValue()
updatedPartitions += partitionDir
val tmpOutputPath =
writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName
val absOutputPathObject =
writeFilesSpec.description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionDir))
if (absOutputPathObject.nonEmpty) {
val absOutputPath = absOutputPathObject.get + "/" + targetFileName
addedAbsPathFiles(tmpOutputPath) = absOutputPath
}
}
}

// TODO: need to get the partition Internal row?
val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
} else {
Iterator.empty
}

// TODO: need to get the partition Internal row?
val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

val result = WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary)
Iterator.single(result)
}
}

Expand Down
21 changes: 10 additions & 11 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include "utils/ConfigExtractor.h"

#include <iostream>
#include "config/GlutenConfig.h"

namespace gluten {
Expand Down Expand Up @@ -450,9 +449,10 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}

std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
std::string targetDirectory,
std::optional<std::string> writeDirectory = std::nullopt,
connector::hive::LocationHandle::TableType tableType = connector::hive::LocationHandle::TableType::kExisting) {
const std::string& targetDirectory,
const std::optional<std::string>& writeDirectory = std::nullopt,
const connector::hive::LocationHandle::TableType& tableType =
connector::hive::LocationHandle::TableType::kExisting) {
return std::make_shared<connector::hive::LocationHandle>(
targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
}
Expand All @@ -461,10 +461,10 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
const std::vector<std::string>& tableColumnNames,
const std::vector<TypePtr>& tableColumnTypes,
const std::vector<std::string>& partitionedBy,
std::shared_ptr<connector::hive::HiveBucketProperty> bucketProperty,
std::shared_ptr<connector::hive::LocationHandle> locationHandle,
const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET,
const std::optional<common::CompressionKind> compressionKind = {}) {
const std::shared_ptr<connector::hive::HiveBucketProperty>& bucketProperty,
const std::shared_ptr<connector::hive::LocationHandle>& locationHandle,
const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET,
const std::optional<common::CompressionKind>& compressionKind = {}) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>> columnHandles;
columnHandles.reserve(tableColumnNames.size());
std::vector<std::string> bucketedBy;
Expand All @@ -491,13 +491,13 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
}
if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) {
++numPartitionColumns;
columnHandles.push_back(std::make_shared<connector::hive::HiveColumnHandle>(
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
} else {
columnHandles.push_back(std::make_shared<connector::hive::HiveColumnHandle>(
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
Expand Down Expand Up @@ -549,7 +549,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// spark default compression code is snappy.
common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
if (writeRel.named_table().has_advanced_extension()) {
std::cout << "the table has extension" << std::flush << std::endl;
if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) {
compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
} else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeR
return false;
}

// validate input datatype
// Validate input data type.
std::vector<TypePtr> types;
if (writeRel.has_named_table()) {
const auto& extension = writeRel.named_table().advanced_extension();
Expand Down
1 change: 1 addition & 0 deletions docs/developers/SubstraitModifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ changed `Unbounded` in `WindowFunction` into `Unbounded_Preceding` and `Unbounde
* Added `ExpandRel`([#1361](https://github.com/oap-project/gluten/pull/1361)).
* Added `GenerateRel`([#574](https://github.com/oap-project/gluten/pull/574)).
* Added `PartitionColumn` in `LocalFiles`([#2405](https://github.com/oap-project/gluten/pull/2405)).
* Added `WriteRel` ([#3690](https://github.com/oap-project/gluten/pull/3690)).

## Modifications to type.proto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait BackendSettingsApi {
fields: Array[StructField],
partTable: Boolean,
paths: Seq[String]): Boolean = false
def supportWriteExec(format: FileFormat, fields: Array[StructField]): Option[String]
def supportWriteFilesExec(format: FileFormat, fields: Array[StructField]): Option[String]
def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
def supportSortMergeJoinExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -46,8 +46,7 @@ case class WriteFilesExecTransformer(
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec)
extends UnaryExecNode
with UnaryTransformSupport {
extends UnaryTransformSupport {
override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater

override def output: Seq[Attribute] = Seq.empty
Expand All @@ -63,15 +62,15 @@ case class WriteFilesExecTransformer(
.newBuilder()
.setValue(writeParametersStr.toString)
.build()
BackendsApiManager.getTransformerApiInstance.getPackMessage(message)
BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
}

def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = {
val inputTypeNodes = output.map {
attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
}

BackendsApiManager.getTransformerApiInstance.getPackMessage(
BackendsApiManager.getTransformerApiInstance.packPBMessage(
TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
}

Expand Down Expand Up @@ -136,7 +135,9 @@ case class WriteFilesExecTransformer(

override protected def doValidateInternal(): ValidationResult = {
val supportedWrite =
BackendsApiManager.getSettings.supportWriteExec(fileFormat, child.output.toStructType.fields)
BackendsApiManager.getSettings.supportWriteFilesExec(
fileFormat,
child.output.toStructType.fields)
if (supportedWrite.nonEmpty) {
return ValidationResult.notOk("Unsupported native write: " + supportedWrite.get)
}
Expand Down

0 comments on commit 230950e

Please sign in to comment.