Skip to content

Commit

Permalink
Pass basic write without partition write
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 21, 2023
1 parent 0eb3c27 commit 24f899e
Show file tree
Hide file tree
Showing 19 changed files with 626 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ import org.apache.spark.shuffle.utils.ShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{AggregateFunctionRewriteRule, FunctionIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, GetArrayItem, GetMapValue, GetStructField, Literal, NamedExpression, StringSplit, StringTrim}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, HLLAdapter}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, SparkPlan}
import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, ColumnarWriteFilesExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -243,6 +246,22 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
ShuffleUtil.genColumnarShuffleWriter(parameters)
}

override def createColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): WriteFilesExec = {
new ColumnarWriteFilesExec(
child,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)
}

/**
* Generate ColumnarBatchSerializer for ColumnarShuffleExchangeExec.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object BackendSettings extends BackendSettingsApi {
case _ => false
}
}
override def supportWriteExec(): Boolean = true

override def supportExpandExec(): Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,18 @@ class VeloxDataTypeValidationSuite extends VeloxWholeStageTransformerSuite {
}
}

ignore("Velox Parquet Write") {
test("Velox Parquet Write") {
withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
withTempDir {
dir =>
val write_path = dir.toURI.getPath
val data_path = getClass.getResource("/").getPath + "/data-type-validation-data/type1"
val df = spark.read.format("parquet").load(data_path)
val df = spark.read.format("parquet").load(data_path).drop("timestamp")
df.write.mode("append").format("parquet").save(write_path)
val parquetDf = spark.read
.format("parquet")
.load(write_path)
checkAnswer(parquetDf, df)
}
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ VeloxPlanConverter::VeloxPlanConverter(
substraitVeloxPlanConverter_(veloxPool, confMap, validationMode),
pool_(veloxPool) {}

void VeloxPlanConverter::setInputPlanNode(const ::substrait::WriteRel& writeRel) {
if (writeRel.has_input()) {
setInputPlanNode(writeRel.input());
} else {
throw std::runtime_error("Child expected");
}
}

void VeloxPlanConverter::setInputPlanNode(const ::substrait::FetchRel& fetchRel) {
if (fetchRel.has_input()) {
setInputPlanNode(fetchRel.input());
Expand Down Expand Up @@ -176,6 +184,8 @@ void VeloxPlanConverter::setInputPlanNode(const ::substrait::Rel& srel) {
setInputPlanNode(srel.window());
} else if (srel.has_generate()) {
setInputPlanNode(srel.generate());
} else if (srel.has_write()) {
setInputPlanNode(srel.write());
} else {
throw std::runtime_error("Rel is not supported: " + srel.DebugString());
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/VeloxPlanConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class VeloxPlanConverter {
}

private:
void setInputPlanNode(const ::substrait::WriteRel& writeRel);

void setInputPlanNode(const ::substrait::FetchRel& fetchRel);

void setInputPlanNode(const ::substrait::ExpandRel& sExpand);
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ WholeStageResultIteratorFirstStage::WholeStageResultIteratorFirstStage(
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();

task_ = velox::exec::Task::create(
fmt::format("Gluten {}", taskInfo_.toString()), std::move(planFragment), 0, std::move(queryCtx));
fmt::format("Gluten_StageId_{}_TaskId_{}", std::to_string(taskInfo_.stageId), std::to_string(taskInfo_.taskId)), std::move(planFragment), 0, std::move(queryCtx));

if (!task_->supportsSingleThreadedExecution()) {
throw std::runtime_error("Task doesn't support single thread execution: " + planNode->toString());
Expand Down Expand Up @@ -510,7 +510,7 @@ WholeStageResultIteratorMiddleStage::WholeStageResultIteratorMiddleStage(
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();

task_ = velox::exec::Task::create(
fmt::format("Gluten {}", taskInfo_.toString()), std::move(planFragment), 0, std::move(queryCtx));
fmt::format("Gluten_StageId_{}_TaskId_{}", std::to_string(taskInfo_.stageId), std::to_string(taskInfo_.taskId)), std::move(planFragment), 0, std::move(queryCtx));

if (!task_->supportsSingleThreadedExecution()) {
throw std::runtime_error("Task doesn't support single thread execution: " + planNode->toString());
Expand Down
141 changes: 141 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "SubstraitToVeloxPlan.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Type.h"

#include "utils/ConfigExtractor.h"
Expand Down Expand Up @@ -440,6 +442,143 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
}

std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
std::string targetDirectory,
std::optional<std::string> writeDirectory = std::nullopt,
connector::hive::LocationHandle::TableType tableType = connector::hive::LocationHandle::TableType::kNew) {
return std::make_shared<connector::hive::LocationHandle>(
targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
}

std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandle(
const std::vector<std::string>& tableColumnNames,
const std::vector<TypePtr>& tableColumnTypes,
const std::vector<std::string>& partitionedBy,
std::shared_ptr<connector::hive::HiveBucketProperty> bucketProperty,
std::shared_ptr<connector::hive::LocationHandle> locationHandle,
const dwio::common::FileFormat tableStorageFormat = dwio::common::FileFormat::PARQUET,
const std::optional<common::CompressionKind> compressionKind = {}) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>> columnHandles;
std::vector<std::string> bucketedBy;
std::vector<TypePtr> bucketedTypes;
std::vector<std::shared_ptr<const connector::hive::HiveSortingColumn>> sortedBy;
if (bucketProperty != nullptr) {
bucketedBy = bucketProperty->bucketedBy();
bucketedTypes = bucketProperty->bucketedTypes();
sortedBy = bucketProperty->sortedBy();
}
int32_t numPartitionColumns{0};
int32_t numSortingColumns{0};
int32_t numBucketColumns{0};
for (int i = 0; i < tableColumnNames.size(); ++i) {
for (int j = 0; j < bucketedBy.size(); ++j) {
if (bucketedBy[j] == tableColumnNames[i]) {
++numBucketColumns;
}
}
for (int j = 0; j < sortedBy.size(); ++j) {
if (sortedBy[j]->sortColumn() == tableColumnNames[i]) {
++numSortingColumns;
}
}
if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) {
++numPartitionColumns;
columnHandles.push_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
} else {
columnHandles.push_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
}
}
VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size());
VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size());
VELOX_CHECK_EQ(numSortingColumns, sortedBy.size());
return std::make_shared<connector::hive::HiveInsertTableHandle>(
columnHandles, locationHandle, tableStorageFormat, bucketProperty, compressionKind);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) {
core::PlanNodePtr childNode;
if (writeRel.has_input()) {
childNode = toVeloxPlan(writeRel.input());
} else {
VELOX_FAIL("Child Rel is expected in WriteRel.");
}
const auto& inputType = childNode->outputType();

std::vector<std::string> tableColumnNames;
std::vector<std::string> partitionedKey;
std::vector<bool> isPartitionColumns;
tableColumnNames.reserve(writeRel.table_schema().names_size());

if (writeRel.has_table_schema()) {
const auto& tableSchema = writeRel.table_schema();
isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema);

for (const auto& name : tableSchema.names()) {
tableColumnNames.emplace_back(name);
}

for (int i = 0; i < tableSchema.names_size(); i++) {
if (isPartitionColumns[i]) {
partitionedKey.emplace_back(tableColumnNames[i]);
}
}
}

std::vector<std::string> writePath;
writePath.reserve(1);
for (const auto& name : writeRel.named_table().names()) {
std::cout << "the file path when creating write node is " << name << std::flush << std::endl;
writePath.emplace_back(name);
}

std::string format = "dwrf";
if (writeRel.named_table().has_advanced_extension() &&
SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isPARQUET=")) {
format = "parquet";
}

// Do not hard-code connector ID and allow for connectors other than Hive.
static const std::string kHiveConnectorId = "test-hive";
// check whether the write path is file, if yes, create it as a directory
// if (writePath[0].substr(0, 4) == "file") {
// struct stat buffer;
// if (stat(writePath[0].substr(5).c_str(), &buffer) == 0 && S_ISREG(buffer.st_mode)) {
// auto command = "rm -rf " + writePath[0].substr(5) + " && mkdir -p " + writePath[0].substr(5);

// auto ret = system(command.c_str());
// (void)(ret);
// }
// }

auto outputType = ROW({"rowCount"}, {BIGINT()});

return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
inputType,
tableColumnNames,
nullptr, /*aggregationNode*/
std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
makeHiveInsertTableHandle(
inputType->names(),
inputType->children(),
partitionedKey,
nullptr /*bucketProperty*/,
makeLocationHandle(writePath[0]))),
(isPartitionColumns.size() > 0) ? true : false,
exec::TableWriteTraits::outputType(nullptr),
connector::CommitStrategy::kNoCommit,
childNode);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ExpandRel& expandRel) {
core::PlanNodePtr childNode;
if (expandRel.has_input()) {
Expand Down Expand Up @@ -993,6 +1132,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
return toVeloxPlan(rel.fetch());
} else if (rel.has_window()) {
return toVeloxPlan(rel.window());
} else if (rel.has_write()) {
return toVeloxPlan(rel.write());
} else {
VELOX_NYI("Substrait conversion not supported for Rel.");
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class SubstraitToVeloxPlanConverter {
bool validationMode = false)
: pool_(pool), confMap_(confMap), validationMode_(validationMode) {}

/// Used to convert Substrait WriteRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel);

/// Used to convert Substrait ExpandRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::ExpandRel& expandRel);

Expand Down
21 changes: 21 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,25 @@ bool SubstraitToVeloxPlanValidator::validateExpression(
}
}

bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeRel) {
if (writeRel.has_input() && !validate(writeRel.input())) {
std::cout << "Validation failed for input type validation in WriteRel." << std::endl;
return false;
}

// validate input datatype
if (writeRel.has_named_table()) {
const auto& extension = writeRel.named_table().advanced_extension();
std::vector<TypePtr> types;
if (!validateInputTypes(extension, types)) {
std::cout << "Validation failed for input types in WriteRel." << std::endl;
return false;
}
}

return true;
}

bool SubstraitToVeloxPlanValidator::validate(const ::substrait::FetchRel& fetchRel) {
RowTypePtr rowType = nullptr;
// Get and validate the input types from extension.
Expand Down Expand Up @@ -1193,6 +1212,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Rel& rel) {
return validate(rel.fetch());
} else if (rel.has_window()) {
return validate(rel.window());
} else if (rel.has_write()) {
return validate(rel.write());
} else {
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class SubstraitToVeloxPlanValidator {
SubstraitToVeloxPlanValidator(memory::MemoryPool* pool, core::ExecCtx* execCtx)
: pool_(pool), execCtx_(execCtx), planConverter_(pool_, confMap_, true) {}

/// Used to validate whether the computing of this Write is supported.
bool validate(const ::substrait::WriteRel& writeRel);

/// Used to validate whether the computing of this Limit is supported.
bool validate(const ::substrait::FetchRel& fetchRel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ public static RelNode makeExpandRel(
return new ExpandRelNode(input, projections);
}

public static RelNode makeWriteRel(
RelNode input,
List<TypeNode> types,
List<String> names,
List<ColumnTypeNode> columnTypeNodes,
String writePath,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
return new WriteRelNode(input, types, names, columnTypeNodes, writePath);
}

public static RelNode makeWriteRel(
RelNode input,
List<TypeNode> types,
List<String> names,
List<ColumnTypeNode> columnTypeNodes,
String writePath,
AdvancedExtensionNode extensionNode,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
return new WriteRelNode(input, types, names, columnTypeNodes, writePath, extensionNode);
}

public static RelNode makeSortRel(
RelNode input,
List<SortField> sorts,
Expand Down
Loading

0 comments on commit 24f899e

Please sign in to comment.