Skip to content

Commit

Permalink
Merge branch 'apache:main' into gayangya/offload_project
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyangxiaozhu authored Jun 28, 2024
2 parents 1ca60e2 + b6776b6 commit 3a63372
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 36 deletions.
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"replace", "replaceAll"},
{"regexp_replace", "replaceRegexpAll"},
{"regexp_extract_all", "regexpExtractAllSpark"},
{"chr", "char"},
{"rlike", "match"},
{"ascii", "ascii"},
{"split", "splitByRegexp"},
Expand Down
71 changes: 71 additions & 0 deletions cpp-ch/local-engine/Parser/scalar_function_parser/chr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Parser/FunctionParser.h>
#include <DataTypes/IDataType.h>
#include <Common/CHUtil.h>
#include <Core/Field.h>

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
}

namespace local_engine
{
class FunctionParserChr : public FunctionParser
{
public:
explicit FunctionParserChr(SerializedPlanParser * plan_parser_) : FunctionParser(plan_parser_) { }
~FunctionParserChr() override = default;
static constexpr auto name = "chr";
String getName() const override { return name; }

const ActionsDAG::Node * parse(
const substrait::Expression_ScalarFunction & substrait_func,
ActionsDAGPtr & actions_dag) const override
{
auto parsed_args = parseFunctionArguments(substrait_func, "", actions_dag);
if (parsed_args.size() != 1)
throw Exception(DB::ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires two or three arguments", getName());

/*
parse chr(number) as if(number < 0, '', convertCharset(char(0, number), 'unicode', 'utf-8'))
*/
const auto & num_arg = parsed_args[0];
const auto * const_zero_node = addColumnToActionsDAG(actions_dag, std::make_shared<DataTypeInt32>(), 0);
const auto * const_empty_node = addColumnToActionsDAG(actions_dag, std::make_shared<DataTypeString>(), "");
const auto * const_four_node = addColumnToActionsDAG(actions_dag, std::make_shared<DataTypeInt32>(), 4);
const auto * const_unicode_node = addColumnToActionsDAG(actions_dag, std::make_shared<DataTypeString>(), "unicode");
const auto * const_utf8_node = addColumnToActionsDAG(actions_dag, std::make_shared<DataTypeString>(), "utf-8");

const auto * less_node = toFunctionNode(actions_dag, "less", {num_arg, const_zero_node});

const auto * char_node = toFunctionNode(actions_dag, "char", {const_zero_node, num_arg});
const auto * convert_charset_node = toFunctionNode(actions_dag, "convertCharset", {char_node, const_unicode_node, const_utf8_node});

const auto * if_node = toFunctionNode(actions_dag, "if", {less_node, const_empty_node, convert_charset_node});
const auto * result_node = convertNodeTypeIfNeeded(substrait_func, if_node, actions_dag);
return result_node;
}
};

static FunctionParserRegister<FunctionParserChr> register_chr;
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,18 +348,15 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil {
// Push down the left conditions in Filter into FileSourceScan.
val newChild: SparkPlan = filter.child match {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
if (TransformHints.isTransformable(scan)) {
if (TransformHints.maybeTransformable(scan)) {
val newScan =
FilterHandler.pushFilterToScan(filter.condition, scan)
newScan match {
case ts: TransformSupport if ts.doValidate().isValid => ts
// TODO remove the call
case _ => replace.doReplace(scan)
case _ => scan
}
} else {
replace.doReplace(scan)
}
case _ => replace.doReplace(filter.child)
} else scan
case _ => filter.child
}
logDebug(s"Columnar Processing for ${filter.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,12 @@ object TransformHints {
}

/**
* NOTE: To be deprecated. Do not create new usages of this method.
*
* Since it's usually not safe to consider a plan "transformable" during validation phase. Another
* validation rule could turn "transformable" to "non-transformable" before implementing the plan
* within Gluten transformers.
* If true, it implies the plan maybe transformable during validation phase but not guaranteed,
* since another validation rule could turn it to "non-transformable" before implementing the plan
* within Gluten transformers. If false, the plan node will be guaranteed fallback to Vanilla plan
* node while being implemented.
*/
def isTransformable(plan: SparkPlan): Boolean = {
getHintOption(plan) match {
case None => true
case _ => false
}
}
def maybeTransformable(plan: SparkPlan): Boolean = !isNotTransformable(plan)

def tag(plan: SparkPlan, hint: TransformHint): Unit = {
val mergedHint = getHintOption(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode]
extends Rule[SparkPlan] {

private def mayNeedRewrite(plan: SparkPlan): Boolean = {
TransformHints.isTransformable(plan) && {
TransformHints.maybeTransformable(plan) && {
plan match {
case _: SortExec => true
case _: TakeOrderedAndProjectExec => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio
logicalPlan.setTagValue(FALLBACK_REASON_TAG, newReason)
}
case TRANSFORM_UNSUPPORTED(_, _) =>
logFallbackReason(validationLogLevel, p.nodeName, "unknown reason")
case _ =>
throw new IllegalStateException("Unreachable code")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("Substring")
.exclude("string substring_index function")
.exclude("ascii for string")
.exclude("string for ascii")
.exclude("base64/unbase64 for string")
.exclude("encode/decode for string")
.exclude("overlay for string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("string substring_index function")
.exclude("SPARK-40213: ascii for Latin-1 Supplement characters")
.exclude("ascii for string")
.exclude("string for ascii")
.exclude("base64/unbase64 for string")
.exclude("encode/decode for string")
.exclude("overlay for string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformer}
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
Expand Down Expand Up @@ -108,7 +108,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
METADATA_FILE_MODIFICATION_TIME,
"age")
dfWithMetadata.collect
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
} else {
checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
Expand All @@ -133,7 +133,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
.where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
val ret = filterDF.collect
assert(ret.size == 1)
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
} else {
checkOperatorMatch[FileSourceScanExec](filterDF)
Expand All @@ -149,7 +149,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
Row(f1(METADATA_FILE_PATH))
)
)
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
} else {
checkOperatorMatch[FileSourceScanExec](filterDF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,6 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("string substring_index function")
.exclude("SPARK-40213: ascii for Latin-1 Supplement characters")
.exclude("ascii for string")
.exclude("string for ascii")
.exclude("base64/unbase64 for string")
.exclude("encode/decode for string")
.exclude("Levenshtein distance")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformer}
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
Expand Down Expand Up @@ -109,7 +109,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
METADATA_FILE_MODIFICATION_TIME,
"age")
dfWithMetadata.collect
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
} else {
checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
Expand All @@ -134,7 +134,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
.where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
val ret = filterDF.collect
assert(ret.size == 1)
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
} else {
checkOperatorMatch[FileSourceScanExec](filterDF)
Expand All @@ -150,7 +150,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
Row(f1(METADATA_FILE_PATH))
)
)
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
} else {
checkOperatorMatch[FileSourceScanExec](filterDF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,6 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("string substring_index function")
.exclude("SPARK-40213: ascii for Latin-1 Supplement characters")
.exclude("ascii for string")
.exclude("string for ascii")
.exclude("base64/unbase64 for string")
.exclude("encode/decode for string")
.exclude("Levenshtein distance")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformer}
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
Expand Down Expand Up @@ -108,7 +108,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
METADATA_FILE_MODIFICATION_TIME,
"age")
dfWithMetadata.collect
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
} else {
checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
Expand All @@ -133,7 +133,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
.where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
val ret = filterDF.collect
assert(ret.size == 1)
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
} else {
checkOperatorMatch[FileSourceScanExec](filterDF)
Expand All @@ -149,7 +149,7 @@ class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenS
Row(f1(METADATA_FILE_PATH))
)
)
if (BackendTestUtils.isVeloxBackendLoaded()) {
if (BackendsApiManager.getSettings.supportNativeMetadataColumns()) {
checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
} else {
checkOperatorMatch[FileSourceScanExec](filterDF)
Expand Down

0 comments on commit 3a63372

Please sign in to comment.