Skip to content

Commit

Permalink
[GLUTEN-3951][CH]Bug fix floor diff (#3956)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)

(Fixes: #3951)

How was this patch tested?
TEST BY UT

端到端性能测试
数据类型为Int64,表结构 test_tbl(d Int64), 测试SQL: select count(1) from test_tbl where floor(d) > 1 数据总量3000W, 测试三次
PR 改动前:1.13s, 0.92s, 0.985s
PR 改动后: 1.064s, 1.077s, 0.984s

数据类型为Float64, 表结构为test_tbl(d float64) , 测试SQL select count(1) from test_tbl where floor(d) > 1 数据总量3000W, 测试三次
PR 改动前: 1.417s, 1.386s 1.426s
PR 改动后:1.568s, 1.476s, 1.508s

可见对于Int64类型来说,改动前后性能基本持平;对于float64类型来说,大约有7.6%的性能回退,主要是来自于针对数据中可能出现NaN 以及INF 的情况进行了判断和赋值。

benchmark 性能测试
使用开发的 benchmark_spark_floor_function.cpp 来测试

Int64类型测试
对于CH 的Floor函数, 结果如下
image
对于新开发的Floor函数,结果如下
image

Float64类型测试
对于CH的Floor函数,结果如下
image
对于新开发的Floor函数,结果如下
image

可见对于Int64,大概有 3%左右的回退,对于Float64类型 大概有70%左右的回退

Spark UT 关于Floor 函数的测试,会通过 org.apache.spark.sql.GlutenMathFunctionsSuite 这个测试来完成,已开启
  • Loading branch information
KevinyhZou authored Jan 2, 2024
1 parent fcb31fc commit 9b6beac
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2308,5 +2308,16 @@ class GlutenClickHouseTPCHParquetSuite extends GlutenClickHouseTPCHAbstractSuite
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_4085")
}

test("GLUTEN-3951: Bug fix floor") {
val tbl_create_sql = "create table test_tbl_3951(d double) using parquet";
val data_insert_sql = "insert into test_tbl_3951 values(1.0), (2.0), (2.5)";
val select_sql =
"select floor(d), floor(log10(d-1)), floor(log10(d-2)) from test_tbl_3951"
spark.sql(tbl_create_sql)
spark.sql(data_insert_sql)
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_3951")
}
}
// scalastyle:on line.size.limit
28 changes: 28 additions & 0 deletions cpp-ch/local-engine/Functions/SparkFunctionFloor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <Functions/SparkFunctionFloor.h>

namespace local_engine
{

REGISTER_FUNCTION(SparkFunctionFloor)
{
factory.registerFunction<SparkFunctionFloor>();
}

}
143 changes: 143 additions & 0 deletions cpp-ch/local-engine/Functions/SparkFunctionFloor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Functions/FunctionsRound.h>
#include <Functions/FunctionFactory.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <bit>

using namespace DB;

namespace local_engine
{

template <typename T, ScaleMode scale_mode>
struct SparkFloatFloorImpl
{
private:
static_assert(!is_decimal<T>);
using Op = FloatRoundingComputation<T, RoundingMode::Floor, scale_mode>;
using Data = std::array<T, Op::data_count>;
public:
static NO_INLINE void apply(const PaddedPODArray<T> & in, size_t scale, PaddedPODArray<T> & out, PaddedPODArray<UInt8> & null_map)
{
auto mm_scale = Op::prepare(scale);
const size_t data_count = std::tuple_size<Data>();
const T* end_in = in.data() + in.size();
const T* limit = in.data() + in.size() / data_count * data_count;
const T* __restrict p_in = in.data();
T* __restrict p_out = out.data();
while (p_in < limit)
{
Op::compute(p_in, mm_scale, p_out);
p_in += data_count;
p_out += data_count;
}

if (p_in < end_in)
{
Data tmp_src{{}};
Data tmp_dst;
size_t tail_size_bytes = (end_in - p_in) * sizeof(*p_in);
memcpy(&tmp_src, p_in, tail_size_bytes);
Op::compute(reinterpret_cast<T *>(&tmp_src), mm_scale, reinterpret_cast<T *>(&tmp_dst));
memcpy(p_out, &tmp_dst, tail_size_bytes);
}
for (size_t i = 0; i < out.size(); ++i)
checkAndSetNullable(out[i], null_map[i]);
}

static void checkAndSetNullable(T & t, UInt8 & null_flag)
{
UInt8 is_nan = (t != t);
UInt8 is_inf = 0;
if constexpr (std::is_same_v<T, float>)
is_inf = ((*reinterpret_cast<const uint32_t *>(&t) & 0b01111111111111111111111111111111) == 0b01111111100000000000000000000000);
else if constexpr (std::is_same_v<T, double>)
is_inf
= ((*reinterpret_cast<const uint64_t *>(&t) & 0b0111111111111111111111111111111111111111111111111111111111111111)
== 0b0111111111110000000000000000000000000000000000000000000000000000);

null_flag = is_nan | is_inf;
if (null_flag) t = 0;
}
};

class SparkFunctionFloor : public DB::FunctionFloor
{
public:
static constexpr auto name = "sparkFloor";
static DB::FunctionPtr create(DB::ContextPtr) { return std::make_shared<SparkFunctionFloor>(); }
SparkFunctionFloor() = default;
~SparkFunctionFloor() override = default;
DB::String getName() const override { return name; }

DB::DataTypePtr getReturnTypeImpl(const DB::DataTypes & arguments) const override
{
auto result_type = DB::FunctionFloor::getReturnTypeImpl(arguments);
return makeNullable(result_type);
}

DB::ColumnPtr executeImpl(const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr & result_type, size_t input_rows) const override
{
const ColumnWithTypeAndName & first_arg = arguments[0];
Scale scale_arg = getScaleArg(arguments);
switch(first_arg.type->getTypeId())
{
case TypeIndex::Float32:
return executeInternal<Float32>(first_arg.column, scale_arg);
case TypeIndex::Float64:
return executeInternal<Float64>(first_arg.column, scale_arg);
default:
DB::ColumnPtr res = DB::FunctionFloor::executeImpl(arguments, result_type, input_rows);
DB::MutableColumnPtr null_map_col = DB::ColumnUInt8::create(first_arg.column->size(), 0);
return DB::ColumnNullable::create(std::move(res), std::move(null_map_col));
}
}

template<typename T>
static ColumnPtr executeInternal(const ColumnPtr & col_arg, const Scale & scale_arg)
{
const auto * col = checkAndGetColumn<ColumnVector<T>>(col_arg.get());
auto col_res = ColumnVector<T>::create(col->size());
MutableColumnPtr null_map_col = DB::ColumnUInt8::create(col->size(), 0);
PaddedPODArray<T> & vec_res = col_res->getData();
PaddedPODArray<UInt8> & null_map_data = assert_cast<ColumnVector<UInt8> *>(null_map_col.get())->getData();
if (!vec_res.empty())
{
if (scale_arg == 0)
{
size_t scale = 1;
SparkFloatFloorImpl<T, ScaleMode::Zero>::apply(col->getData(), scale, vec_res, null_map_data);
}
else if (scale_arg > 0)
{
size_t scale = intExp10(scale_arg);
SparkFloatFloorImpl<T, ScaleMode::Positive>::apply(col->getData(), scale, vec_res, null_map_data);
}
else
{
size_t scale = intExp10(-scale_arg);
SparkFloatFloorImpl<T, ScaleMode::Negative>::apply(col->getData(), scale, vec_res, null_map_data);
}
}
return DB::ColumnNullable::create(std::move(col_res), std::move(null_map_col));
}
};
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"pmod", "pmod"},
{"abs", "abs"},
{"ceil", "ceil"},
{"floor", "floor"},
{"round", "roundHalfUp"},
{"bround", "roundBankers"},
{"exp", "exp"},
Expand Down Expand Up @@ -180,6 +179,7 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"add_months", "addMonths"},
{"date_trunc", "dateTrunc"},
{"floor_datetime", "dateTrunc"},
{"floor", "sparkFloor"},
{"months_between", "sparkMonthsBetween"},

// array functions
Expand Down
4 changes: 1 addition & 3 deletions cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ class ShuffleSplitter;
using ShuffleSplitterPtr = std::unique_ptr<ShuffleSplitter>;
class ShuffleSplitter : public ShuffleWriterBase
{
private:
inline const static std::vector<std::string> compress_methods = {"", "ZSTD", "LZ4"};

public:
inline const static std::vector<std::string> compress_methods = {"", "ZSTD", "LZ4"};
static ShuffleSplitterPtr create(const std::string & short_name, const SplitOptions & options_);

explicit ShuffleSplitter(const SplitOptions & options);
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ else()
benchmark_parquet_read.cpp
benchmark_spark_row.cpp
benchmark_unix_timestamp_function.cpp
)
target_link_libraries(benchmark_local_engine PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers)
endif()
benchmark_spark_floor_function.cpp)
target_link_libraries(benchmark_local_engine PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers ch_parquet)
endif()
13 changes: 7 additions & 6 deletions cpp-ch/local-engine/tests/benchmark_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <Common/PODArray_fwd.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Compression/CompressedReadBuffer.h>
#include "testConfig.h"

#if defined(__SSE2__)
Expand Down Expand Up @@ -131,7 +132,7 @@ DB::ContextMutablePtr global_context;
param,
std::move(settings));
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
for (auto _ : state)
{
state.PauseTiming();
Expand Down Expand Up @@ -249,7 +250,7 @@ DB::ContextMutablePtr global_context;
"",
param,
std::move(settings));
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);
for (auto _ : state)
{
Expand Down Expand Up @@ -337,7 +338,7 @@ DB::ContextMutablePtr global_context;
"",
param,
std::move(settings));
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);

for (auto _ : state)
Expand Down Expand Up @@ -1317,7 +1318,7 @@ class FasterCompressedReadBuffer : public FasterCompressedReadBufferBase, public
param,
std::move(settings));
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
for (auto _ : state)
{
state.PauseTiming();
Expand Down Expand Up @@ -1372,7 +1373,7 @@ MergeTreeWithSnapshot buildMergeTree(NamesAndTypesList names_and_types, std::str
std::shared_ptr<local_engine::CustomStorageMergeTree> custom_merge_tree = std::make_shared<local_engine::CustomStorageMergeTree>(
DB::StorageID("default", table), relative_path, *metadata, false, global_context, "", param, std::move(settings));
auto snapshot = std::make_shared<StorageSnapshot>(*custom_merge_tree, metadata);
custom_merge_tree->loadDataParts(false);
custom_merge_tree->loadDataParts(false, {});
return MergeTreeWithSnapshot{.merge_tree = custom_merge_tree, .snapshot = snapshot, .columns = names_and_types};
}

Expand All @@ -1398,7 +1399,7 @@ QueryPlanPtr joinPlan(QueryPlanPtr left, QueryPlanPtr right, String left_key, St
join->addDisjunct();
ASTPtr lkey = std::make_shared<ASTIdentifier>(left_key);
ASTPtr rkey = std::make_shared<ASTIdentifier>(right_key);
join->addOnKeys(lkey, rkey);
join->addOnKeys(lkey, rkey, true);
for (const auto & column : join->columnsFromJoinedTable())
{
join->addJoinedColumn(column);
Expand Down
98 changes: 98 additions & 0 deletions cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeFactory.h>
#include <Functions/FunctionFactory.h>
#include <Functions/SparkFunctionFloor.h>
#include <Functions/FunctionsRound.h>
#include <Parser/SerializedPlanParser.h>
#include <benchmark/benchmark.h>

using namespace DB;

static Block createDataBlock(String type_str, size_t rows)
{
auto type = DataTypeFactory::instance().get(type_str);
auto column = type->createColumn();
for (size_t i = 0; i < rows; ++i)
{
if (isInt(type))
{
column->insert(i);
}
else if (isFloat(type))
{
double d = i * 1.0;
column->insert(d);
}
}
Block block;
block.insert(ColumnWithTypeAndName(std::move(column), type, "d"));
return std::move(block);
}

static void BM_CHFloorFunction_For_Int64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context);
Block int64_block = createDataBlock("Int64", 30000000);
auto executable = function->build(int64_block.getColumnsWithTypeAndName());
for (auto _ : state)[[maybe_unused]]
auto result = executable->execute(int64_block.getColumnsWithTypeAndName(), executable->getResultType(), int64_block.rows());
}

static void BM_CHFloorFunction_For_Float64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context);
Block float64_block = createDataBlock("Float64", 30000000);
auto executable = function->build(float64_block.getColumnsWithTypeAndName());
for (auto _ : state)[[maybe_unused]]
auto result = executable->execute(float64_block.getColumnsWithTypeAndName(), executable->getResultType(), float64_block.rows());
}

static void BM_SparkFloorFunction_For_Int64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context);
Block int64_block = createDataBlock("Int64", 30000000);
auto executable = function->build(int64_block.getColumnsWithTypeAndName());
for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(int64_block.getColumnsWithTypeAndName(), executable->getResultType(), int64_block.rows());
}

static void BM_SparkFloorFunction_For_Float64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context);
Block float64_block = createDataBlock("Float64", 30000000);
auto executable = function->build(float64_block.getColumnsWithTypeAndName());
for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(float64_block.getColumnsWithTypeAndName(), executable->getResultType(), float64_block.rows());
}

BENCHMARK(BM_CHFloorFunction_For_Int64)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_CHFloorFunction_For_Float64)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_SparkFloorFunction_For_Int64)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_SparkFloorFunction_For_Float64)->Unit(benchmark::kMillisecond)->Iterations(10);

0 comments on commit 9b6beac

Please sign in to comment.