Skip to content

Commit

Permalink
[CH-225]Fix decimal bug cased by big-endian encoding in spark row. (#228
Browse files Browse the repository at this point in the history
)

* fix bug in issue 225

* fix bug of issue 225

* finish debug
  • Loading branch information
taiyang-li authored Dec 7, 2022
1 parent ad6dc45 commit 79b4286
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 17 deletions.
1 change: 1 addition & 0 deletions utils/local-engine/Builder/SerializedPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ std::shared_ptr<substrait::Type> SerializedPlanBuilder::buildType(const DB::Data
const auto precision = getDecimalPrecision(*ch_type_without_nullable);
if (scale == 0 && precision == 0)
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support converting from {}", ch_type->getName());

res->mutable_decimal()->set_nullability(type_nullability);
res->mutable_decimal()->set_scale(scale);
res->mutable_decimal()->set_precision(precision);
Expand Down
72 changes: 62 additions & 10 deletions utils/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,31 @@ static void writeVariableLengthNonNullableValue(
{
const auto type_without_nullable{std::move(removeNullable(col.type))};
const bool use_raw_data = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable);
const bool big_endian = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable);
VariableLengthDataWriter writer(col.type, buffer_address, offsets, buffer_cursor);
if (use_raw_data)
{
for (size_t i = 0; i < static_cast<size_t>(num_rows); i++)
if (!big_endian)
{
StringRef str = col.column->getDataAt(i);
int64_t offset_and_size = writer.writeUnalignedBytes(i, str.data, str.size, 0);
memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8);
for (size_t i = 0; i < static_cast<size_t>(num_rows); i++)
{
StringRef str = col.column->getDataAt(i);
int64_t offset_and_size = writer.writeUnalignedBytes(i, str.data, str.size, 0);
memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8);
}
}
else
{
Field field;
for (size_t i = 0; i < static_cast<size_t>(num_rows); i++)
{
StringRef str_view = col.column->getDataAt(i);
String buf(str_view.data, str_view.size);
auto * decimal128 = reinterpret_cast<Decimal128 *>(buf.data());
BackingDataLengthCalculator::swapBytes(*decimal128);
int64_t offset_and_size = writer.writeUnalignedBytes(i, buf.data(), buf.size(), 0);
memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8);
}
}
}
else
Expand Down Expand Up @@ -143,19 +160,31 @@ static void writeVariableLengthNullableValue(
const auto & nested_column = nullable_column->getNestedColumn();
const auto type_without_nullable{std::move(removeNullable(col.type))};
const bool use_raw_data = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable);
const bool big_endian = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable);
VariableLengthDataWriter writer(col.type, buffer_address, offsets, buffer_cursor);
if (use_raw_data)
{
for (size_t i = 0; i < static_cast<size_t>(num_rows); i++)
{
if (null_map[i])
bitSet(buffer_address + offsets[i], col_index);
else
else if (!big_endian)
{
StringRef str = nested_column.getDataAt(i);
int64_t offset_and_size = writer.writeUnalignedBytes(i, str.data, str.size, 0);
memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8);
}
else
{
Field field;
nested_column.get(i, field);
StringRef str_view = nested_column.getDataAt(i);
String buf(str_view.data, str_view.size);
auto * decimal128 = reinterpret_cast<Decimal128 *>(buf.data());
BackingDataLengthCalculator::swapBytes(*decimal128);
int64_t offset_and_size = writer.writeUnalignedBytes(i, buf.data(), buf.size(), 0);
memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8);
}
}
}
else
Expand Down Expand Up @@ -344,9 +373,7 @@ int64_t SparkRowInfo::getTotalBytes() const
std::unique_ptr<SparkRowInfo> CHColumnToSparkRow::convertCHColumnToSparkRow(const Block & block)
{
if (!block.columns())
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "A block with empty columns");
}

std::unique_ptr<SparkRowInfo> spark_row_info = std::make_unique<SparkRowInfo>(block);
spark_row_info->setBufferAddress(reinterpret_cast<char *>(alloc(spark_row_info->getTotalBytes(), 64)));
Expand Down Expand Up @@ -509,6 +536,30 @@ bool BackingDataLengthCalculator::isDataTypeSupportRawData(const DB::DataTypePtr
return isFixedLengthDataType(type_without_nullable) || which.isStringOrFixedString() || which.isDecimal128();
}

bool BackingDataLengthCalculator::isBigEndianInSparkRow(const DB::DataTypePtr & type_without_nullable)
{
const WhichDataType which(type_without_nullable);
return which.isDecimal128();
}

void BackingDataLengthCalculator::swapBytes(DB::Decimal128 & decimal128)
{
auto & x = decimal128.value;
for (size_t i = 0; i != std::size(x.items); ++i)
x.items[i] = __builtin_bswap64(x.items[i]);
}

Decimal128 BackingDataLengthCalculator::getDecimal128FromBytes(const String & bytes)
{
assert(bytes.size() == 16);

using base_type = Decimal128::NativeType::base_type;
String bytes_copy(bytes);
base_type * high = reinterpret_cast<base_type *>(bytes_copy.data() + 8);
base_type * low = reinterpret_cast<base_type *>(bytes_copy.data());
std::swap(*high, *low);
return std::move(*reinterpret_cast<Decimal128 *>(bytes_copy.data()));
}

VariableLengthDataWriter::VariableLengthDataWriter(
const DataTypePtr & type_, char * buffer_address_, const std::vector<int64_t> & offsets_, std::vector<int64_t> & buffer_cursor_)
Expand Down Expand Up @@ -694,9 +745,10 @@ int64_t VariableLengthDataWriter::write(size_t row_idx, const DB::Field & field,

if (which.isDecimal128())
{
// const auto & decimal = field.get<DecimalField<Decimal128>>();
// const auto value = decimal.getValue();
return writeUnalignedBytes(row_idx, &field.reinterpret<char>(), sizeof(Decimal128), parent_offset);
const auto & decimal_field = field.reinterpret<DecimalField<Decimal128>>();
auto decimal128 = decimal_field.getValue();
BackingDataLengthCalculator::swapBytes(decimal128);
return writeUnalignedBytes(row_idx, reinterpret_cast<const char *>(&decimal128), sizeof(Decimal128), parent_offset);
}

if (which.isArray())
Expand Down
12 changes: 11 additions & 1 deletion utils/local-engine/Parser/CHColumnToSparkRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ class BackingDataLengthCalculator
/// If Data Type can use raw data between CH Column and Spark Row if value is not null
static bool isDataTypeSupportRawData(const DB::DataTypePtr & type_without_nullable);

/// If bytes in Spark Row is big-endian. If true, we have to transform them to little-endian afterwords
static bool isBigEndianInSparkRow(const DB::DataTypePtr & type_without_nullable);

/// Convert Field with type Decimal128 to/from buffer in Spark Row(big-endian)
static void swapBytes(DB::Decimal128 & decimal128);

/// Get Decimal128 from substrait decimal literal bytes
/// Note: bytes is little-endian, but Int128 has big-endian array containing two little-endian uint64_t
static DB::Decimal128 getDecimal128FromBytes(const String & bytes);

static int64_t getOffsetAndSize(int64_t cursor, int64_t size);
static int64_t extractOffset(int64_t offset_and_size);
static int64_t extractSize(int64_t offset_and_size);
Expand Down Expand Up @@ -120,7 +130,7 @@ class VariableLengthDataWriter
/// parent_offset: the starting offset of current structure in which we are updating it's backing data region
virtual int64_t write(size_t row_idx, const DB::Field & field, int64_t parent_offset);

/// Only support String/FixedString/Decimal32/Decimal64
/// Only support String/FixedString/Decimal128
int64_t writeUnalignedBytes(size_t row_idx, const char * src, size_t size, int64_t parent_offset);
private:
int64_t writeArray(size_t row_idx, const DB::Array & array, int64_t parent_offset);
Expand Down
7 changes: 5 additions & 2 deletions utils/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,18 @@ DataTypePtr SerializedPlanParser::parseType(const substrait::Type & substrait_ty
{
UInt32 precision = substrait_type.decimal().precision();
UInt32 scale = substrait_type.decimal().scale();
/*
if (precision <= DataTypeDecimal32::maxPrecision())
ch_type = std::make_shared<DataTypeDecimal32>(precision, scale);
else if (precision <= DataTypeDecimal64::maxPrecision())
ch_type = std::make_shared<DataTypeDecimal64>(precision, scale);
else if (precision <= DataTypeDecimal128::maxPrecision())
ch_type = std::make_shared<DataTypeDecimal128>(precision, scale);
else
*/
if (precision > DataTypeDecimal128::maxPrecision())
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support decimal type with precision {}", precision);

ch_type = createDecimal<DataTypeDecimal>(precision, scale);
ch_type = wrapNullableType(substrait_type.decimal().nullability(), ch_type);
}
else if (substrait_type.has_struct_())
Expand Down Expand Up @@ -1218,7 +1221,7 @@ std::pair<DataTypePtr, Field> SerializedPlanParser::parseLiteral(const substrait
else if (precision <= DataTypeDecimal128::maxPrecision())
{
type = std::make_shared<DataTypeDecimal128>(precision, scale);
auto value = *reinterpret_cast<const Int128 *>(bytes.data());
auto value = BackingDataLengthCalculator::getDecimal128FromBytes(bytes);
field = DecimalField<Decimal128>(value, scale);
}
else
Expand Down
17 changes: 14 additions & 3 deletions utils/local-engine/Parser/SparkRowToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,18 @@ ALWAYS_INLINE static void writeRowToColumns(std::vector<MutableColumnPtr> & colu
{
if (spark_row_reader.supportRawData(i))
{
const StringRef str{std::move(spark_row_reader.getStringRef(i))};
columns[i]->insertData(str != EMPTY_STRING_REF ? str.data : nullptr, str.size);
const StringRef str_ref{std::move(spark_row_reader.getStringRef(i))};
if (str_ref == EMPTY_STRING_REF)
columns[i]->insertData(nullptr, str_ref.size);
else if (!spark_row_reader.isBigEndianInSparkRow(i))
columns[i]->insertData(str_ref.data, str_ref.size);
else
{
String str(str_ref.data, str_ref.size);
auto * decimal128 = reinterpret_cast<Decimal128 *>(str.data());
BackingDataLengthCalculator::swapBytes(*decimal128);
columns[i]->insertData(str.data(), str.size());
}
}
else
columns[i]->insert(spark_row_reader.getField(i));
Expand Down Expand Up @@ -135,9 +145,10 @@ Field VariableLengthDataReader::readDecimal(const char * buffer, size_t length)

Decimal128 value;
memcpy(&value, buffer, length);
BackingDataLengthCalculator::swapBytes(value);

const auto * decimal128_type = typeid_cast<const DataTypeDecimal128 *>(type_without_nullable.get());
return std::move(DecimalField<Decimal128>(value, decimal128_type->getScale()));
return std::move(DecimalField<Decimal128>(std::move(value), decimal128_type->getScale()));
}

Field VariableLengthDataReader::readString(const char * buffer, size_t length) const
Expand Down
9 changes: 9 additions & 0 deletions utils/local-engine/Parser/SparkRowToCHColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class SparkRowReader
, bit_set_width_in_bytes(calculateBitSetWidthInBytes(num_fields))
, field_offsets(num_fields)
, support_raw_datas(num_fields)
, is_big_endians_in_spark_row(num_fields)
, fixed_length_data_readers(num_fields)
, variable_length_data_readers(num_fields)
{
Expand All @@ -178,6 +179,7 @@ class SparkRowReader
const auto type_without_nullable = removeNullable(field_types[ordinal]);
field_offsets[ordinal] = bit_set_width_in_bytes + ordinal * 8L;
support_raw_datas[ordinal] = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable);
is_big_endians_in_spark_row[ordinal] = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable);
if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable))
fixed_length_data_readers[ordinal] = std::make_shared<FixedLengthDataReader>(field_types[ordinal]);
else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable))
Expand All @@ -198,6 +200,12 @@ class SparkRowReader
return support_raw_datas[ordinal];
}

bool isBigEndianInSparkRow(int ordinal) const
{
assertIndexIsValid(ordinal);
return is_big_endians_in_spark_row[ordinal];
}

std::shared_ptr<FixedLengthDataReader> getFixedLengthDataReader(int ordinal) const
{
assertIndexIsValid(ordinal);
Expand Down Expand Up @@ -361,6 +369,7 @@ class SparkRowReader
const int32_t bit_set_width_in_bytes;
std::vector<int64_t> field_offsets;
std::vector<bool> support_raw_datas;
std::vector<bool> is_big_endians_in_spark_row;
std::vector<std::shared_ptr<FixedLengthDataReader>> fixed_length_data_readers;
std::vector<std::shared_ptr<VariableLengthDataReader>> variable_length_data_readers;

Expand Down
3 changes: 2 additions & 1 deletion utils/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ jlong Java_io_glutenproject_vectorized_BlockNativeConverter_convertSparkRowsToCH
env->DeleteLocalRef(type);
}
local_engine::SparkRowToCHColumn converter;
return reinterpret_cast<jlong>(converter.convertSparkRowItrToCHColumn(java_iter, c_names, c_types));
auto * block = converter.convertSparkRowItrToCHColumn(java_iter, c_names, c_types);
return reinterpret_cast<jlong>(block);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}

Expand Down

0 comments on commit 79b4286

Please sign in to comment.