Skip to content

Commit

Permalink
Revert Revert "[GLUTEN-8080][CH]Support function transform_keys/trans…
Browse files Browse the repository at this point in the history
…form_values" (#8277)

* Reapply "[GLUTEN-8080][CH]Support function transform_keys/transform_values (#8…"

* fix building

* reapply transform_keys/transfrom_values
  • Loading branch information
taiyang-li authored Dec 19, 2024
1 parent 9bb426e commit dcd356c
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ object CHExpressionUtil {
TO_UTC_TIMESTAMP -> UtcTimestampValidator(),
FROM_UTC_TIMESTAMP -> UtcTimestampValidator(),
STACK -> DefaultValidator(),
TRANSFORM_KEYS -> DefaultValidator(),
TRANSFORM_VALUES -> DefaultValidator(),
RAISE_ERROR -> DefaultValidator(),
WIDTH_BUCKET -> DefaultValidator()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,4 +860,16 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
val sql = "select cast(id % 2 = 1 as string) from range(10)"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("Test transform_keys/transform_values") {
val sql = """
|select
| transform_keys(map_from_arrays(array(id+1, id+2, id+3),
| array(1, id+2, 3)), (k, v) -> k + 1),
| transform_values(map_from_arrays(array(id+1, id+2, id+3),
| array(1, id+2, 3)), (k, v) -> v + 1)
|from range(10)
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/MetadataOperationsHolder.h>
#include <rocksdb/db.h>
#include <shared_mutex>

namespace local_engine
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory);
void registerGlutenDisks(bool global_skip_access_check)
{
auto & factory = DB::DiskFactory::instance();
auto & object_factory = DB::ObjectStorageFactory::instance();

#if USE_AWS_S3
auto creator = [global_skip_access_check](
Expand Down Expand Up @@ -90,7 +91,6 @@ void registerGlutenDisks(bool global_skip_access_check)
return disk;
};

auto & object_factory = DB::ObjectStorageFactory::instance();

registerGlutenS3ObjectStorage(object_factory);
factory.registerDiskType("s3_gluten", creator); /// For compatibility
Expand Down
2 changes: 0 additions & 2 deletions cpp-ch/local-engine/Parser/FunctionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ FunctionParserPtr FunctionParserFactory::get(const String & name, ParserContextP
{
auto res = tryGet(name, ctx);
if (!res)
{
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown function parser {}", name);
}

return res;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <Parser/FunctionParser.h>
#include <Parser/TypeParser.h>
#include <Parser/scalar_function_parser/lambdaFunction.h>
#include <Common/BlockTypeUtils.h>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <DataTypes/DataTypeMap.h>
#include <Functions/FunctionHelpers.h>

namespace DB::ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}

namespace local_engine
{

template <bool transform_keys = true>
class FunctionParserMapTransformImpl : public FunctionParser
{
public:
static constexpr auto name = transform_keys ? "transform_keys" : "transform_values";
String getName() const override { return name; }

explicit FunctionParserMapTransformImpl(ParserContextPtr parser_context_) : FunctionParser(parser_context_) {}
~FunctionParserMapTransformImpl() override = default;

const DB::ActionsDAG::Node *
parse(const substrait::Expression_ScalarFunction & substrait_func, DB::ActionsDAG & actions_dag) const override
{
/// Parse spark transform_keys(map, func) as CH mapFromArrays(arrayMap(func, cast(map as array)), mapValues(map))
/// Parse spark transform_values(map, func) as CH mapFromArrays(mapKeys(map), arrayMap(func, cast(map as array)))
auto parsed_args = parseFunctionArguments(substrait_func, actions_dag);
if (parsed_args.size() != 2)
throw DB::Exception(DB::ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "{} function must have three arguments", getName());

auto lambda_args = collectLambdaArguments(parser_context, substrait_func.arguments()[1].value().scalar_function());
if (lambda_args.size() != 2)
throw DB::Exception(
DB::ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "The lambda function in {} must have two arguments", getName());

const auto * map_node = parsed_args[0];
const auto * func_node = parsed_args[1];
const auto & map_type = map_node->result_type;
auto array_type = checkAndGetDataType<DataTypeMap>(removeNullable(map_type).get())->getNestedType();
if (map_type->isNullable())
array_type = std::make_shared<DataTypeNullable>(array_type);
const auto * array_node = ActionsDAGUtil::convertNodeTypeIfNeeded(actions_dag, map_node, array_type);
const auto * transformed_node = toFunctionNode(actions_dag, "arrayMap", {func_node, array_node});

const DB::ActionsDAG::Node * result_node = nullptr;
if constexpr (transform_keys)
{
const auto * nontransformed_node = toFunctionNode(actions_dag, "mapValues", {parsed_args[0]});
result_node = toFunctionNode(actions_dag, "mapFromArrays", {transformed_node, nontransformed_node});
}
else
{
const auto * nontransformed_node = toFunctionNode(actions_dag, "mapKeys", {parsed_args[0]});
result_node = toFunctionNode(actions_dag, "mapFromArrays", {nontransformed_node, transformed_node});
}
return convertNodeTypeIfNeeded(substrait_func, result_node, actions_dag);
}
};

using FunctionParserTransformKeys = FunctionParserMapTransformImpl<true>;
using FunctionParserTransformValues = FunctionParserMapTransformImpl<false>;

static FunctionParserRegister<FunctionParserTransformKeys> register_transform_keys;
static FunctionParserRegister<FunctionParserTransformValues> register_transform_values;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <IO/SeekableReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/SplittableBzip2ReadBuffer.h>
#include <IO/ParallelReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
#include "TextFormatFile.h"

#if USE_HIVE
#include <memory>

#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/Impl/HiveTextRowInputFormat.h>
#include <Poco/URI.h>


namespace local_engine
{

Expand Down Expand Up @@ -73,3 +75,4 @@ FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block & h
}

}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
#pragma once

#include <memory>
#include "config.h"


#if USE_HIVE
#include <memory>
#include <Storages/SubstraitSource/FormatFile.h>

namespace local_engine
Expand All @@ -43,3 +46,4 @@ class TextFormatFile : public FormatFile
};

}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down

0 comments on commit dcd356c

Please sign in to comment.