Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3951][CH]Bug fix floor diff #3956

Merged
merged 9 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2308,5 +2308,16 @@ class GlutenClickHouseTPCHParquetSuite extends GlutenClickHouseTPCHAbstractSuite
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_4085")
}

test("GLUTEN-3951: Bug fix floor") {
val tbl_create_sql = "create table test_tbl_3951(d double) using parquet";
val data_insert_sql = "insert into test_tbl_3951 values(1.0), (2.0), (2.5)";
val select_sql =
"select floor(d), floor(log10(d-1)), floor(log10(d-2)) from test_tbl_3951"
spark.sql(tbl_create_sql)
spark.sql(data_insert_sql)
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_3951")
}
}
// scalastyle:on line.size.limit
28 changes: 28 additions & 0 deletions cpp-ch/local-engine/Functions/SparkFunctionFloor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <Functions/SparkFunctionFloor.h>

namespace local_engine
{

REGISTER_FUNCTION(SparkFunctionFloor)
{
factory.registerFunction<SparkFunctionFloor>();
}

}
143 changes: 143 additions & 0 deletions cpp-ch/local-engine/Functions/SparkFunctionFloor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Functions/FunctionsRound.h>
#include <Functions/FunctionFactory.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <bit>

using namespace DB;

namespace local_engine
{

template <typename T, ScaleMode scale_mode>
struct SparkFloatFloorImpl
{
private:
static_assert(!is_decimal<T>);
using Op = FloatRoundingComputation<T, RoundingMode::Floor, scale_mode>;
using Data = std::array<T, Op::data_count>;
public:
static NO_INLINE void apply(const PaddedPODArray<T> & in, size_t scale, PaddedPODArray<T> & out, PaddedPODArray<UInt8> & null_map)
{
auto mm_scale = Op::prepare(scale);
const size_t data_count = std::tuple_size<Data>();
const T* end_in = in.data() + in.size();
const T* limit = in.data() + in.size() / data_count * data_count;
const T* __restrict p_in = in.data();
T* __restrict p_out = out.data();
while (p_in < limit)
{
Op::compute(p_in, mm_scale, p_out);
p_in += data_count;
p_out += data_count;
}

if (p_in < end_in)
{
Data tmp_src{{}};
Data tmp_dst;
size_t tail_size_bytes = (end_in - p_in) * sizeof(*p_in);
memcpy(&tmp_src, p_in, tail_size_bytes);
Op::compute(reinterpret_cast<T *>(&tmp_src), mm_scale, reinterpret_cast<T *>(&tmp_dst));
memcpy(p_out, &tmp_dst, tail_size_bytes);
}
for (size_t i = 0; i < out.size(); ++i)
checkAndSetNullable(out[i], null_map[i]);
}

static void checkAndSetNullable(T & t, UInt8 & null_flag)
{
UInt8 is_nan = (t != t);
UInt8 is_inf = 0;
if constexpr (std::is_same_v<T, float>)
is_inf = ((*reinterpret_cast<const uint32_t *>(&t) & 0b01111111111111111111111111111111) == 0b01111111100000000000000000000000);
else if constexpr (std::is_same_v<T, double>)
is_inf
= ((*reinterpret_cast<const uint64_t *>(&t) & 0b0111111111111111111111111111111111111111111111111111111111111111)
== 0b0111111111110000000000000000000000000000000000000000000000000000);

null_flag = is_nan | is_inf;
if (null_flag) t = 0;
}
};

class SparkFunctionFloor : public DB::FunctionFloor
{
public:
static constexpr auto name = "sparkFloor";
static DB::FunctionPtr create(DB::ContextPtr) { return std::make_shared<SparkFunctionFloor>(); }
SparkFunctionFloor() = default;
~SparkFunctionFloor() override = default;
DB::String getName() const override { return name; }

DB::DataTypePtr getReturnTypeImpl(const DB::DataTypes & arguments) const override
{
auto result_type = DB::FunctionFloor::getReturnTypeImpl(arguments);
return makeNullable(result_type);
}

DB::ColumnPtr executeImpl(const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr & result_type, size_t input_rows) const override
{
const ColumnWithTypeAndName & first_arg = arguments[0];
Scale scale_arg = getScaleArg(arguments);
switch(first_arg.type->getTypeId())
{
case TypeIndex::Float32:
return executeInternal<Float32>(first_arg.column, scale_arg);
case TypeIndex::Float64:
return executeInternal<Float64>(first_arg.column, scale_arg);
default:
DB::ColumnPtr res = DB::FunctionFloor::executeImpl(arguments, result_type, input_rows);
DB::MutableColumnPtr null_map_col = DB::ColumnUInt8::create(first_arg.column->size(), 0);
return DB::ColumnNullable::create(std::move(res), std::move(null_map_col));
}
}

template<typename T>
static ColumnPtr executeInternal(const ColumnPtr & col_arg, const Scale & scale_arg)
{
const auto * col = checkAndGetColumn<ColumnVector<T>>(col_arg.get());
auto col_res = ColumnVector<T>::create(col->size());
MutableColumnPtr null_map_col = DB::ColumnUInt8::create(col->size(), 0);
PaddedPODArray<T> & vec_res = col_res->getData();
PaddedPODArray<UInt8> & null_map_data = assert_cast<ColumnVector<UInt8> *>(null_map_col.get())->getData();
if (!vec_res.empty())
{
if (scale_arg == 0)
{
size_t scale = 1;
SparkFloatFloorImpl<T, ScaleMode::Zero>::apply(col->getData(), scale, vec_res, null_map_data);
}
else if (scale_arg > 0)
{
size_t scale = intExp10(scale_arg);
SparkFloatFloorImpl<T, ScaleMode::Positive>::apply(col->getData(), scale, vec_res, null_map_data);
}
else
{
size_t scale = intExp10(-scale_arg);
SparkFloatFloorImpl<T, ScaleMode::Negative>::apply(col->getData(), scale, vec_res, null_map_data);
}
}
return DB::ColumnNullable::create(std::move(col_res), std::move(null_map_col));
}
};
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"pmod", "pmod"},
{"abs", "abs"},
{"ceil", "ceil"},
{"floor", "floor"},
{"round", "roundHalfUp"},
{"bround", "roundBankers"},
{"exp", "exp"},
Expand Down Expand Up @@ -180,6 +179,7 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"add_months", "addMonths"},
{"date_trunc", "dateTrunc"},
{"floor_datetime", "dateTrunc"},
{"floor", "sparkFloor"},
{"months_between", "sparkMonthsBetween"},

// array functions
Expand Down
4 changes: 1 addition & 3 deletions cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ class ShuffleSplitter;
using ShuffleSplitterPtr = std::unique_ptr<ShuffleSplitter>;
class ShuffleSplitter : public ShuffleWriterBase
{
private:
inline const static std::vector<std::string> compress_methods = {"", "ZSTD", "LZ4"};

public:
inline const static std::vector<std::string> compress_methods = {"", "ZSTD", "LZ4"};
static ShuffleSplitterPtr create(const std::string & short_name, const SplitOptions & options_);

explicit ShuffleSplitter(const SplitOptions & options);
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ else()
benchmark_parquet_read.cpp
benchmark_spark_row.cpp
benchmark_unix_timestamp_function.cpp
)
target_link_libraries(benchmark_local_engine PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers)
endif()
benchmark_spark_floor_function.cpp)
target_link_libraries(benchmark_local_engine PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers ch_parquet)
endif()
13 changes: 7 additions & 6 deletions cpp-ch/local-engine/tests/benchmark_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <Common/PODArray_fwd.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Compression/CompressedReadBuffer.h>
#include "testConfig.h"

#if defined(__SSE2__)
Expand Down Expand Up @@ -131,7 +132,7 @@ DB::ContextMutablePtr global_context;
param,
std::move(settings));
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
for (auto _ : state)
{
state.PauseTiming();
Expand Down Expand Up @@ -249,7 +250,7 @@ DB::ContextMutablePtr global_context;
"",
param,
std::move(settings));
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);
for (auto _ : state)
{
Expand Down Expand Up @@ -337,7 +338,7 @@ DB::ContextMutablePtr global_context;
"",
param,
std::move(settings));
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);

for (auto _ : state)
Expand Down Expand Up @@ -1317,7 +1318,7 @@ class FasterCompressedReadBuffer : public FasterCompressedReadBufferBase, public
param,
std::move(settings));
auto snapshot = std::make_shared<StorageSnapshot>(custom_merge_tree, metadata);
custom_merge_tree.loadDataParts(false);
custom_merge_tree.loadDataParts(false, {});
for (auto _ : state)
{
state.PauseTiming();
Expand Down Expand Up @@ -1372,7 +1373,7 @@ MergeTreeWithSnapshot buildMergeTree(NamesAndTypesList names_and_types, std::str
std::shared_ptr<local_engine::CustomStorageMergeTree> custom_merge_tree = std::make_shared<local_engine::CustomStorageMergeTree>(
DB::StorageID("default", table), relative_path, *metadata, false, global_context, "", param, std::move(settings));
auto snapshot = std::make_shared<StorageSnapshot>(*custom_merge_tree, metadata);
custom_merge_tree->loadDataParts(false);
custom_merge_tree->loadDataParts(false, {});
return MergeTreeWithSnapshot{.merge_tree = custom_merge_tree, .snapshot = snapshot, .columns = names_and_types};
}

Expand All @@ -1398,7 +1399,7 @@ QueryPlanPtr joinPlan(QueryPlanPtr left, QueryPlanPtr right, String left_key, St
join->addDisjunct();
ASTPtr lkey = std::make_shared<ASTIdentifier>(left_key);
ASTPtr rkey = std::make_shared<ASTIdentifier>(right_key);
join->addOnKeys(lkey, rkey);
join->addOnKeys(lkey, rkey, true);
for (const auto & column : join->columnsFromJoinedTable())
{
join->addJoinedColumn(column);
Expand Down
98 changes: 98 additions & 0 deletions cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeFactory.h>
#include <Functions/FunctionFactory.h>
#include <Functions/SparkFunctionFloor.h>
#include <Functions/FunctionsRound.h>
#include <Parser/SerializedPlanParser.h>
#include <benchmark/benchmark.h>

using namespace DB;

static Block createDataBlock(String type_str, size_t rows)
{
auto type = DataTypeFactory::instance().get(type_str);
auto column = type->createColumn();
for (size_t i = 0; i < rows; ++i)
{
if (isInt(type))
{
column->insert(i);
}
else if (isFloat(type))
{
double d = i * 1.0;
column->insert(d);
}
}
Block block;
block.insert(ColumnWithTypeAndName(std::move(column), type, "d"));
return std::move(block);
}

static void BM_CHFloorFunction_For_Int64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context);
Block int64_block = createDataBlock("Int64", 30000000);
auto executable = function->build(int64_block.getColumnsWithTypeAndName());
for (auto _ : state)[[maybe_unused]]
auto result = executable->execute(int64_block.getColumnsWithTypeAndName(), executable->getResultType(), int64_block.rows());
}

static void BM_CHFloorFunction_For_Float64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context);
Block float64_block = createDataBlock("Float64", 30000000);
auto executable = function->build(float64_block.getColumnsWithTypeAndName());
for (auto _ : state)[[maybe_unused]]
auto result = executable->execute(float64_block.getColumnsWithTypeAndName(), executable->getResultType(), float64_block.rows());
}

static void BM_SparkFloorFunction_For_Int64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context);
Block int64_block = createDataBlock("Int64", 30000000);
auto executable = function->build(int64_block.getColumnsWithTypeAndName());
for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(int64_block.getColumnsWithTypeAndName(), executable->getResultType(), int64_block.rows());
}

static void BM_SparkFloorFunction_For_Float64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context);
Block float64_block = createDataBlock("Float64", 30000000);
auto executable = function->build(float64_block.getColumnsWithTypeAndName());
for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(float64_block.getColumnsWithTypeAndName(), executable->getResultType(), float64_block.rows());
}

BENCHMARK(BM_CHFloorFunction_For_Int64)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_CHFloorFunction_For_Float64)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_SparkFloorFunction_For_Int64)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_SparkFloorFunction_For_Float64)->Unit(benchmark::kMillisecond)->Iterations(10);
Loading