Skip to content

Commit

Permalink
[GLUTEN-3582][CH] Fix bug for decimal and float type (apache#6925)
Browse files Browse the repository at this point in the history
* [GLUTEN-3582][CH] Fix bug for decimal and float type

* fix style

* fix velox compile issue by introducing def enableSuite(suiteName: String) to avoid explicitly refer test suite

* Spark 35 CI Pipeline is not ready, let's ignore it first
  • Loading branch information
baibaichen authored and shamirchen committed Oct 14, 2024
1 parent 6ad373a commit 1c1a4b7
Show file tree
Hide file tree
Showing 14 changed files with 403 additions and 30 deletions.
7 changes: 3 additions & 4 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

#pragma once

#include <base/unit.h>
#include <base/types.h>
#include <Interpreters/Context.h>
#include <base/types.h>
#include <base/unit.h>

namespace local_engine
{
Expand Down Expand Up @@ -104,7 +104,7 @@ struct JoinConfig
bool prefer_multi_join_on_clauses = true;
size_t multi_join_on_clauses_build_side_rows_limit = 10000000;

static JoinConfig loadFromContext(DB::ContextPtr context)
static JoinConfig loadFromContext(const DB::ContextPtr & context)
{
JoinConfig config;
config.prefer_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_MULTI_JOIN_ON_CLAUSES, true);
Expand Down Expand Up @@ -198,4 +198,3 @@ struct GlutenJobSchedulerConfig
}
};
}

2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q
google::protobuf::StringValue optimization_info;
optimization_info.ParseFromString(join.advanced_extension().optimization().value());
auto join_opt_info = JoinOptimizationInfo::parse(optimization_info.value());
LOG_ERROR(getLogger("JoinRelParser"), "optimizaiton info:{}", optimization_info.value());
LOG_DEBUG(getLogger("JoinRelParser"), "optimization info:{}", optimization_info.value());
auto storage_join = join_opt_info.is_broadcast ? BroadCastJoinBuilder::getJoin(join_opt_info.storage_join_key) : nullptr;
if (storage_join)
{
Expand Down
96 changes: 74 additions & 22 deletions cpp-ch/local-engine/Storages/Parquet/ParquetConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,23 @@ template <typename PhysicalType>
struct ToParquet
{
using T = typename PhysicalType::c_type;
T as(const DB::Field & value, const parquet::ColumnDescriptor &)
T as(const DB::Field & value, const parquet::ColumnDescriptor & s)
{
if constexpr (std::is_same_v<PhysicalType, parquet::Int32Type>)
return static_cast<T>(value.safeGet<Int64>());
if (s.logical_type()->is_decimal())
{
if constexpr (std::is_same_v<PhysicalType, parquet::Int32Type>)
{
const auto v = value.safeGet<DB::DecimalField<DB::Decimal32>>();
return v.getValue().value;
}
if constexpr (std::is_same_v<PhysicalType, parquet::Int64Type>)
{
const auto v = value.safeGet<DB::DecimalField<DB::Decimal64>>();
return v.getValue().value;
}
}
// parquet::BooleanType, parquet::Int64Type, parquet::FloatType, parquet::DoubleType
return value.safeGet<T>(); // FLOAT, DOUBLE, INT64
return value.safeGet<T>(); // FLOAT, DOUBLE, INT64, Int32
}
};

Expand All @@ -57,36 +68,52 @@ struct ToParquet<parquet::ByteArrayType>
}
};

template <typename T>
parquet::FixedLenByteArray convertField(const DB::Field & value, uint8_t * buf, size_t type_length)
{
assert(sizeof(T) >= type_length);

T val = value.safeGet<DB::DecimalField<DB::Decimal<T>>>().getValue().value;
std::reverse(reinterpret_cast<char *>(&val), reinterpret_cast<char *>(&val) + sizeof(T));
const int offset = sizeof(T) - type_length;

memcpy(buf, reinterpret_cast<char *>(&val) + offset, type_length);
return parquet::FixedLenByteArray(buf);
}

template <>
struct ToParquet<parquet::FLBAType>
{
uint8_t buf[256];
uint8_t buf[16];
using T = parquet::FixedLenByteArray;
T as(const DB::Field & value, const parquet::ColumnDescriptor & descriptor)
{
if (value.getType() != DB::Field::Types::Decimal128)
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "Field type '{}' for FIXED_LEN_BYTE_ARRAY is not supported", value.getTypeName());
static_assert(sizeof(Int128) <= sizeof(buf));
if (descriptor.type_length() > sizeof(Int128))
if (value.getType() == DB::Field::Types::Decimal256)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Field type '{}' is not supported", value.getTypeName());

static_assert(sizeof(Int128) == sizeof(buf));

if (descriptor.type_length() > sizeof(buf))
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"descriptor.type_length() = {} , which is > {}, e.g. sizeof(Int128)",
"descriptor.type_length() = {} , which is > {}, e.g. sizeof(buf)",
descriptor.type_length(),
sizeof(Int128));
Int128 val = value.safeGet<DB::DecimalField<DB::Decimal128>>().getValue();
std::reverse(reinterpret_cast<char *>(&val), reinterpret_cast<char *>(&val) + sizeof(val));
const int offset = sizeof(Int128) - descriptor.type_length();
memcpy(buf, reinterpret_cast<char *>(&val) + offset, descriptor.type_length());
return parquet::FixedLenByteArray(buf);
sizeof(buf));

if (value.getType() == DB::Field::Types::Decimal32)
return convertField<Int32>(value, buf, descriptor.type_length());
if (value.getType() == DB::Field::Types::Decimal64)
return convertField<Int64>(value, buf, descriptor.type_length());

return convertField<Int128>(value, buf, descriptor.type_length());
}
};

// Int32 Int64 Float Double
template <typename DType, typename Col>
struct ConverterNumeric
{
using From = typename Col::Container::value_type;
using From = typename Col::ValueType;
using To = typename DType::c_type;

const Col & column;
Expand Down Expand Up @@ -119,6 +146,7 @@ using ConverterInt64 = ConverterNumeric<parquet::Int64Type, DB::ColumnVector<Int
using ConverterInt64_u = ConverterNumeric<parquet::Int64Type, DB::ColumnVector<UInt64>>;

using ConverterDouble = ConverterNumeric<parquet::DoubleType, DB::ColumnVector<Float64>>;
using ConverterFloat = ConverterNumeric<parquet::FloatType, DB::ColumnVector<Float32>>;

struct ConverterString
{
Expand All @@ -141,7 +169,7 @@ struct ConverterString

/// Like ConverterNumberAsFixedString, but converts to big-endian. Because that's the byte order
/// Parquet uses for decimal types and literally nothing else, for some reason.
template <typename T>
template <DB::is_decimal T>
struct ConverterDecimal
{
const parquet::ColumnDescriptor & descriptor;
Expand All @@ -165,7 +193,7 @@ struct ConverterDecimal
data_buf.resize(count * sizeof(T));
ptr_buf.resize(count);
memcpy(data_buf.data(), reinterpret_cast<const char *>(column.getData().data() + offset), count * sizeof(T));
const size_t offset_in_buf = sizeof(Int128) - descriptor.type_length();
const size_t offset_in_buf = sizeof(T) - descriptor.type_length();
;
for (size_t i = 0; i < count; ++i)
{
Expand All @@ -176,6 +204,13 @@ struct ConverterDecimal
}
};

using Decimal128ToFLB = ConverterDecimal<DB::Decimal128>;
using Decimal64ToFLB = ConverterDecimal<DB::Decimal64>;
using Decimal32ToFLB = ConverterDecimal<DB::Decimal32>;

using ConverterDecimal32 = ConverterNumeric<parquet::Int32Type, DB::ColumnDecimal<DB::Decimal32>>;
using ConverterDecimal64 = ConverterNumeric<parquet::Int64Type, DB::ColumnDecimal<DB::Decimal64>>;

class BaseConverter
{
public:
Expand Down Expand Up @@ -239,6 +274,8 @@ std::shared_ptr<ParquetConverter<DType>> ParquetConverter<DType>::Make(const DB:
case TypeIndex::UInt32:
result = std::make_shared<ParquetConverterImpl<parquet::Int32Type, ConverterInt32_u>>(ConverterInt32_u(c));
break;
case TypeIndex::Decimal32:
result = std::make_shared<ParquetConverterImpl<parquet::Int32Type, ConverterDecimal32>>(ConverterDecimal32(c));
default:
break;
}
Expand All @@ -251,13 +288,23 @@ std::shared_ptr<ParquetConverter<DType>> ParquetConverter<DType>::Make(const DB:
case TypeIndex::UInt64:
result = std::make_shared<ParquetConverterImpl<parquet::Int64Type, ConverterInt64_u>>(ConverterInt64_u(c));
break;
case TypeIndex::Decimal64:
result = std::make_shared<ParquetConverterImpl<parquet::Int64Type, ConverterDecimal64>>(ConverterDecimal64(c));
default:
break;
}
break;
case parquet::Type::INT96:
break;
case parquet::Type::FLOAT:
switch (c->getDataType())
{
case TypeIndex::Float32:
result = std::make_shared<ParquetConverterImpl<parquet::FloatType, ConverterFloat>>(ConverterFloat(c));
break;
default:
break;
}
break;
case parquet::Type::DOUBLE:
switch (c->getDataType())
Expand All @@ -283,8 +330,13 @@ std::shared_ptr<ParquetConverter<DType>> ParquetConverter<DType>::Make(const DB:
switch (c->getDataType())
{
case TypeIndex::Decimal128:
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, ConverterDecimal<Decimal128>>>(
ConverterDecimal<Decimal128>(c, desc));
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, Decimal128ToFLB>>(Decimal128ToFLB(c, desc));
break;
case TypeIndex::Decimal64:
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, Decimal64ToFLB>>(Decimal64ToFLB(c, desc));
break;
case TypeIndex::Decimal32:
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, Decimal32ToFLB>>(Decimal32ToFLB(c, desc));
break;
default:
break;
Expand Down
160 changes: 160 additions & 0 deletions cpp-ch/local-engine/tests/decmial_filter_push_down/18_2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
{
"relations": [
{
"root": {
"input": {
"filter": {
"common": {
"direct": {}
},
"input": {
"read": {
"common": {
"direct": {}
},
"baseSchema": {
"names": [
"a"
],
"struct": {
"types": [
{
"decimal": {
"scale": 2,
"precision": 18,
"nullability": "NULLABILITY_NULLABLE"
}
}
]
},
"columnTypes": [
"NORMAL_COL"
]
},
"filter": {
"singularOrList": {
"value": {
"selection": {
"directReference": {
"structField": {}
}
}
},
"options": [
{
"literal": {
"decimal": {
"value": "yAAAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "LAEAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "kAEAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "9AEAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
}
]
}
},
"advancedExtension": {
"optimization": {
"@type": "type.googleapis.com/google.protobuf.StringValue",
"value": "isMergeTree=0\n"
}
}
}
},
"condition": {
"singularOrList": {
"value": {
"selection": {
"directReference": {
"structField": {}
}
}
},
"options": [
{
"literal": {
"decimal": {
"value": "yAAAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "LAEAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "kAEAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "9AEAAAAAAAAAAAAAAAAAAA==",
"precision": 18,
"scale": 2
}
}
}
]
}
}
}
},
"names": [
"a#4772"
],
"outputSchema": {
"types": [
{
"decimal": {
"scale": 2,
"precision": 18,
"nullability": "NULLABILITY_NULLABLE"
}
}
],
"nullability": "NULLABILITY_REQUIRED"
}
}
}
]
}
Binary file not shown.
Loading

0 comments on commit 1c1a4b7

Please sign in to comment.