diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala index b3f5f45e5874..a6b1453b4e9b 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala @@ -2308,5 +2308,16 @@ class GlutenClickHouseTPCHParquetSuite extends GlutenClickHouseTPCHAbstractSuite compareResultsAgainstVanillaSpark(select_sql, true, { _ => }) spark.sql("drop table test_tbl_4085") } + + test("GLUTEN-3951: Bug fix floor") { + val tbl_create_sql = "create table test_tbl_3951(d double) using parquet"; + val data_insert_sql = "insert into test_tbl_3951 values(1.0), (2.0), (2.5)"; + val select_sql = + "select floor(d), floor(log10(d-1)), floor(log10(d-2)) from test_tbl_3951" + spark.sql(tbl_create_sql) + spark.sql(data_insert_sql) + compareResultsAgainstVanillaSpark(select_sql, true, { _ => }) + spark.sql("drop table test_tbl_3951") + } } // scalastyle:on line.size.limit diff --git a/cpp-ch/local-engine/Functions/SparkFunctionFloor.cpp b/cpp-ch/local-engine/Functions/SparkFunctionFloor.cpp new file mode 100644 index 000000000000..93cb9c1ba9be --- /dev/null +++ b/cpp-ch/local-engine/Functions/SparkFunctionFloor.cpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace local_engine +{ + +REGISTER_FUNCTION(SparkFunctionFloor) +{ + factory.registerFunction(); +} + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Functions/SparkFunctionFloor.h b/cpp-ch/local-engine/Functions/SparkFunctionFloor.h new file mode 100644 index 000000000000..6f94f61a4093 --- /dev/null +++ b/cpp-ch/local-engine/Functions/SparkFunctionFloor.h @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +namespace local_engine +{ + +template +struct SparkFloatFloorImpl +{ +private: + static_assert(!is_decimal); + using Op = FloatRoundingComputation; + using Data = std::array; +public: + static NO_INLINE void apply(const PaddedPODArray & in, size_t scale, PaddedPODArray & out, PaddedPODArray & null_map) + { + auto mm_scale = Op::prepare(scale); + const size_t data_count = std::tuple_size(); + const T* end_in = in.data() + in.size(); + const T* limit = in.data() + in.size() / data_count * data_count; + const T* __restrict p_in = in.data(); + T* __restrict p_out = out.data(); + while (p_in < limit) + { + Op::compute(p_in, mm_scale, p_out); + p_in += data_count; + p_out += data_count; + } + + if (p_in < end_in) + { + Data tmp_src{{}}; + Data tmp_dst; + size_t tail_size_bytes = (end_in - p_in) * sizeof(*p_in); + memcpy(&tmp_src, p_in, tail_size_bytes); + Op::compute(reinterpret_cast(&tmp_src), mm_scale, reinterpret_cast(&tmp_dst)); + memcpy(p_out, &tmp_dst, tail_size_bytes); + } + for (size_t i = 0; i < out.size(); ++i) + checkAndSetNullable(out[i], null_map[i]); + } + + static void checkAndSetNullable(T & t, UInt8 & null_flag) + { + UInt8 is_nan = (t != t); + UInt8 is_inf = 0; + if constexpr (std::is_same_v) + is_inf = ((*reinterpret_cast(&t) & 0b01111111111111111111111111111111) == 0b01111111100000000000000000000000); + else if constexpr (std::is_same_v) + is_inf + = ((*reinterpret_cast(&t) & 0b0111111111111111111111111111111111111111111111111111111111111111) + == 0b0111111111110000000000000000000000000000000000000000000000000000); + + null_flag = is_nan | is_inf; + if (null_flag) t = 0; + } +}; + +class SparkFunctionFloor : public DB::FunctionFloor +{ +public: + static constexpr auto name = "sparkFloor"; + static DB::FunctionPtr create(DB::ContextPtr) { return std::make_shared(); } + SparkFunctionFloor() = default; + ~SparkFunctionFloor() override = default; + DB::String getName() const override { return name; } + + DB::DataTypePtr getReturnTypeImpl(const DB::DataTypes & arguments) const override + { + auto result_type = DB::FunctionFloor::getReturnTypeImpl(arguments); + return makeNullable(result_type); + } + + DB::ColumnPtr executeImpl(const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr & result_type, size_t input_rows) const override + { + const ColumnWithTypeAndName & first_arg = arguments[0]; + Scale scale_arg = getScaleArg(arguments); + switch(first_arg.type->getTypeId()) + { + case TypeIndex::Float32: + return executeInternal(first_arg.column, scale_arg); + case TypeIndex::Float64: + return executeInternal(first_arg.column, scale_arg); + default: + DB::ColumnPtr res = DB::FunctionFloor::executeImpl(arguments, result_type, input_rows); + DB::MutableColumnPtr null_map_col = DB::ColumnUInt8::create(first_arg.column->size(), 0); + return DB::ColumnNullable::create(std::move(res), std::move(null_map_col)); + } + } + + template + static ColumnPtr executeInternal(const ColumnPtr & col_arg, const Scale & scale_arg) + { + const auto * col = checkAndGetColumn>(col_arg.get()); + auto col_res = ColumnVector::create(col->size()); + MutableColumnPtr null_map_col = DB::ColumnUInt8::create(col->size(), 0); + PaddedPODArray & vec_res = col_res->getData(); + PaddedPODArray & null_map_data = assert_cast *>(null_map_col.get())->getData(); + if (!vec_res.empty()) + { + if (scale_arg == 0) + { + size_t scale = 1; + SparkFloatFloorImpl::apply(col->getData(), scale, vec_res, null_map_data); + } + else if (scale_arg > 0) + { + size_t scale = intExp10(scale_arg); + SparkFloatFloorImpl::apply(col->getData(), scale, vec_res, null_map_data); + } + else + { + size_t scale = intExp10(-scale_arg); + SparkFloatFloorImpl::apply(col->getData(), scale, vec_res, null_map_data); + } + } + return DB::ColumnNullable::create(std::move(col_res), std::move(null_map_col)); + } +}; +} diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index 006d418e6884..f097aabd3b11 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -76,7 +76,6 @@ static const std::map SCALAR_FUNCTIONS {"pmod", "pmod"}, {"abs", "abs"}, {"ceil", "ceil"}, - {"floor", "floor"}, {"round", "roundHalfUp"}, {"bround", "roundBankers"}, {"exp", "exp"}, @@ -180,6 +179,7 @@ static const std::map SCALAR_FUNCTIONS {"add_months", "addMonths"}, {"date_trunc", "dateTrunc"}, {"floor_datetime", "dateTrunc"}, + {"floor", "sparkFloor"}, {"months_between", "sparkMonthsBetween"}, // array functions diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index 8f796914d1ce..fe36f7cef1af 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -117,10 +117,8 @@ class ShuffleSplitter; using ShuffleSplitterPtr = std::unique_ptr; class ShuffleSplitter : public ShuffleWriterBase { -private: - inline const static std::vector compress_methods = {"", "ZSTD", "LZ4"}; - public: + inline const static std::vector compress_methods = {"", "ZSTD", "LZ4"}; static ShuffleSplitterPtr create(const std::string & short_name, const SplitOptions & options_); explicit ShuffleSplitter(const SplitOptions & options); diff --git a/cpp-ch/local-engine/tests/CMakeLists.txt b/cpp-ch/local-engine/tests/CMakeLists.txt index ee6f3c240563..2b4c45fcb747 100644 --- a/cpp-ch/local-engine/tests/CMakeLists.txt +++ b/cpp-ch/local-engine/tests/CMakeLists.txt @@ -71,6 +71,6 @@ else() benchmark_parquet_read.cpp benchmark_spark_row.cpp benchmark_unix_timestamp_function.cpp - ) - target_link_libraries(benchmark_local_engine PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers) -endif() \ No newline at end of file + benchmark_spark_floor_function.cpp) + target_link_libraries(benchmark_local_engine PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers ch_parquet) +endif() diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index 6bfc443e97e6..df09b8d3d150 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -51,6 +51,7 @@ #include #include #include +#include #include "testConfig.h" #if defined(__SSE2__) @@ -131,7 +132,7 @@ DB::ContextMutablePtr global_context; param, std::move(settings)); auto snapshot = std::make_shared(custom_merge_tree, metadata); - custom_merge_tree.loadDataParts(false); + custom_merge_tree.loadDataParts(false, {}); for (auto _ : state) { state.PauseTiming(); @@ -249,7 +250,7 @@ DB::ContextMutablePtr global_context; "", param, std::move(settings)); - custom_merge_tree.loadDataParts(false); + custom_merge_tree.loadDataParts(false, {}); auto snapshot = std::make_shared(custom_merge_tree, metadata); for (auto _ : state) { @@ -337,7 +338,7 @@ DB::ContextMutablePtr global_context; "", param, std::move(settings)); - custom_merge_tree.loadDataParts(false); + custom_merge_tree.loadDataParts(false, {}); auto snapshot = std::make_shared(custom_merge_tree, metadata); for (auto _ : state) @@ -1317,7 +1318,7 @@ class FasterCompressedReadBuffer : public FasterCompressedReadBufferBase, public param, std::move(settings)); auto snapshot = std::make_shared(custom_merge_tree, metadata); - custom_merge_tree.loadDataParts(false); + custom_merge_tree.loadDataParts(false, {}); for (auto _ : state) { state.PauseTiming(); @@ -1372,7 +1373,7 @@ MergeTreeWithSnapshot buildMergeTree(NamesAndTypesList names_and_types, std::str std::shared_ptr custom_merge_tree = std::make_shared( DB::StorageID("default", table), relative_path, *metadata, false, global_context, "", param, std::move(settings)); auto snapshot = std::make_shared(*custom_merge_tree, metadata); - custom_merge_tree->loadDataParts(false); + custom_merge_tree->loadDataParts(false, {}); return MergeTreeWithSnapshot{.merge_tree = custom_merge_tree, .snapshot = snapshot, .columns = names_and_types}; } @@ -1398,7 +1399,7 @@ QueryPlanPtr joinPlan(QueryPlanPtr left, QueryPlanPtr right, String left_key, St join->addDisjunct(); ASTPtr lkey = std::make_shared(left_key); ASTPtr rkey = std::make_shared(right_key); - join->addOnKeys(lkey, rkey); + join->addOnKeys(lkey, rkey, true); for (const auto & column : join->columnsFromJoinedTable()) { join->addJoinedColumn(column); diff --git a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp new file mode 100644 index 000000000000..6f88196eaa47 --- /dev/null +++ b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +static Block createDataBlock(String type_str, size_t rows) +{ + auto type = DataTypeFactory::instance().get(type_str); + auto column = type->createColumn(); + for (size_t i = 0; i < rows; ++i) + { + if (isInt(type)) + { + column->insert(i); + } + else if (isFloat(type)) + { + double d = i * 1.0; + column->insert(d); + } + } + Block block; + block.insert(ColumnWithTypeAndName(std::move(column), type, "d")); + return std::move(block); +} + +static void BM_CHFloorFunction_For_Int64(benchmark::State & state) +{ + using namespace DB; + auto & factory = FunctionFactory::instance(); + auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context); + Block int64_block = createDataBlock("Int64", 30000000); + auto executable = function->build(int64_block.getColumnsWithTypeAndName()); + for (auto _ : state)[[maybe_unused]] + auto result = executable->execute(int64_block.getColumnsWithTypeAndName(), executable->getResultType(), int64_block.rows()); +} + +static void BM_CHFloorFunction_For_Float64(benchmark::State & state) +{ + using namespace DB; + auto & factory = FunctionFactory::instance(); + auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context); + Block float64_block = createDataBlock("Float64", 30000000); + auto executable = function->build(float64_block.getColumnsWithTypeAndName()); + for (auto _ : state)[[maybe_unused]] + auto result = executable->execute(float64_block.getColumnsWithTypeAndName(), executable->getResultType(), float64_block.rows()); +} + +static void BM_SparkFloorFunction_For_Int64(benchmark::State & state) +{ + using namespace DB; + auto & factory = FunctionFactory::instance(); + auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context); + Block int64_block = createDataBlock("Int64", 30000000); + auto executable = function->build(int64_block.getColumnsWithTypeAndName()); + for (auto _ : state) [[maybe_unused]] + auto result = executable->execute(int64_block.getColumnsWithTypeAndName(), executable->getResultType(), int64_block.rows()); +} + +static void BM_SparkFloorFunction_For_Float64(benchmark::State & state) +{ + using namespace DB; + auto & factory = FunctionFactory::instance(); + auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context); + Block float64_block = createDataBlock("Float64", 30000000); + auto executable = function->build(float64_block.getColumnsWithTypeAndName()); + for (auto _ : state) [[maybe_unused]] + auto result = executable->execute(float64_block.getColumnsWithTypeAndName(), executable->getResultType(), float64_block.rows()); +} + +BENCHMARK(BM_CHFloorFunction_For_Int64)->Unit(benchmark::kMillisecond)->Iterations(10); +BENCHMARK(BM_CHFloorFunction_For_Float64)->Unit(benchmark::kMillisecond)->Iterations(10); +BENCHMARK(BM_SparkFloorFunction_For_Int64)->Unit(benchmark::kMillisecond)->Iterations(10); +BENCHMARK(BM_SparkFloorFunction_For_Float64)->Unit(benchmark::kMillisecond)->Iterations(10); \ No newline at end of file