diff --git a/utils/local-engine/Builder/SerializedPlanBuilder.cpp b/utils/local-engine/Builder/SerializedPlanBuilder.cpp index b51a4a810555..f9b257528fce 100644 --- a/utils/local-engine/Builder/SerializedPlanBuilder.cpp +++ b/utils/local-engine/Builder/SerializedPlanBuilder.cpp @@ -267,6 +267,7 @@ std::shared_ptr SerializedPlanBuilder::buildType(const DB::Data const auto precision = getDecimalPrecision(*ch_type_without_nullable); if (scale == 0 && precision == 0) throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support converting from {}", ch_type->getName()); + res->mutable_decimal()->set_nullability(type_nullability); res->mutable_decimal()->set_scale(scale); res->mutable_decimal()->set_precision(precision); diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.cpp b/utils/local-engine/Parser/CHColumnToSparkRow.cpp index b1f9b7c58959..10e9a06b2f8d 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/utils/local-engine/Parser/CHColumnToSparkRow.cpp @@ -107,14 +107,31 @@ static void writeVariableLengthNonNullableValue( { const auto type_without_nullable{std::move(removeNullable(col.type))}; const bool use_raw_data = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable); + const bool big_endian = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable); VariableLengthDataWriter writer(col.type, buffer_address, offsets, buffer_cursor); if (use_raw_data) { - for (size_t i = 0; i < static_cast(num_rows); i++) + if (!big_endian) { - StringRef str = col.column->getDataAt(i); - int64_t offset_and_size = writer.writeUnalignedBytes(i, str.data, str.size, 0); - memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8); + for (size_t i = 0; i < static_cast(num_rows); i++) + { + StringRef str = col.column->getDataAt(i); + int64_t offset_and_size = writer.writeUnalignedBytes(i, str.data, str.size, 0); + memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8); + } + } + else + { + Field field; + for (size_t i = 0; i < static_cast(num_rows); i++) + { + StringRef str_view = col.column->getDataAt(i); + String buf(str_view.data, str_view.size); + auto * decimal128 = reinterpret_cast(buf.data()); + BackingDataLengthCalculator::swapBytes(*decimal128); + int64_t offset_and_size = writer.writeUnalignedBytes(i, buf.data(), buf.size(), 0); + memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8); + } } } else @@ -143,6 +160,7 @@ static void writeVariableLengthNullableValue( const auto & nested_column = nullable_column->getNestedColumn(); const auto type_without_nullable{std::move(removeNullable(col.type))}; const bool use_raw_data = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable); + const bool big_endian = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable); VariableLengthDataWriter writer(col.type, buffer_address, offsets, buffer_cursor); if (use_raw_data) { @@ -150,12 +168,23 @@ static void writeVariableLengthNullableValue( { if (null_map[i]) bitSet(buffer_address + offsets[i], col_index); - else + else if (!big_endian) { StringRef str = nested_column.getDataAt(i); int64_t offset_and_size = writer.writeUnalignedBytes(i, str.data, str.size, 0); memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8); } + else + { + Field field; + nested_column.get(i, field); + StringRef str_view = nested_column.getDataAt(i); + String buf(str_view.data, str_view.size); + auto * decimal128 = reinterpret_cast(buf.data()); + BackingDataLengthCalculator::swapBytes(*decimal128); + int64_t offset_and_size = writer.writeUnalignedBytes(i, buf.data(), buf.size(), 0); + memcpy(buffer_address + offsets[i] + field_offset, &offset_and_size, 8); + } } } else @@ -344,9 +373,7 @@ int64_t SparkRowInfo::getTotalBytes() const std::unique_ptr CHColumnToSparkRow::convertCHColumnToSparkRow(const Block & block) { if (!block.columns()) - { throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "A block with empty columns"); - } std::unique_ptr spark_row_info = std::make_unique(block); spark_row_info->setBufferAddress(reinterpret_cast(alloc(spark_row_info->getTotalBytes(), 64))); @@ -509,6 +536,30 @@ bool BackingDataLengthCalculator::isDataTypeSupportRawData(const DB::DataTypePtr return isFixedLengthDataType(type_without_nullable) || which.isStringOrFixedString() || which.isDecimal128(); } +bool BackingDataLengthCalculator::isBigEndianInSparkRow(const DB::DataTypePtr & type_without_nullable) +{ + const WhichDataType which(type_without_nullable); + return which.isDecimal128(); +} + +void BackingDataLengthCalculator::swapBytes(DB::Decimal128 & decimal128) +{ + auto & x = decimal128.value; + for (size_t i = 0; i != std::size(x.items); ++i) + x.items[i] = __builtin_bswap64(x.items[i]); +} + +Decimal128 BackingDataLengthCalculator::getDecimal128FromBytes(const String & bytes) +{ + assert(bytes.size() == 16); + + using base_type = Decimal128::NativeType::base_type; + String bytes_copy(bytes); + base_type * high = reinterpret_cast(bytes_copy.data() + 8); + base_type * low = reinterpret_cast(bytes_copy.data()); + std::swap(*high, *low); + return std::move(*reinterpret_cast(bytes_copy.data())); +} VariableLengthDataWriter::VariableLengthDataWriter( const DataTypePtr & type_, char * buffer_address_, const std::vector & offsets_, std::vector & buffer_cursor_) @@ -694,9 +745,10 @@ int64_t VariableLengthDataWriter::write(size_t row_idx, const DB::Field & field, if (which.isDecimal128()) { - // const auto & decimal = field.get>(); - // const auto value = decimal.getValue(); - return writeUnalignedBytes(row_idx, &field.reinterpret(), sizeof(Decimal128), parent_offset); + const auto & decimal_field = field.reinterpret>(); + auto decimal128 = decimal_field.getValue(); + BackingDataLengthCalculator::swapBytes(decimal128); + return writeUnalignedBytes(row_idx, reinterpret_cast(&decimal128), sizeof(Decimal128), parent_offset); } if (which.isArray()) diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.h b/utils/local-engine/Parser/CHColumnToSparkRow.h index 5b10aa3abe53..4c22167d9014 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.h +++ b/utils/local-engine/Parser/CHColumnToSparkRow.h @@ -91,6 +91,16 @@ class BackingDataLengthCalculator /// If Data Type can use raw data between CH Column and Spark Row if value is not null static bool isDataTypeSupportRawData(const DB::DataTypePtr & type_without_nullable); + /// If bytes in Spark Row is big-endian. If true, we have to transform them to little-endian afterwords + static bool isBigEndianInSparkRow(const DB::DataTypePtr & type_without_nullable); + + /// Convert Field with type Decimal128 to/from buffer in Spark Row(big-endian) + static void swapBytes(DB::Decimal128 & decimal128); + + /// Get Decimal128 from substrait decimal literal bytes + /// Note: bytes is little-endian, but Int128 has big-endian array containing two little-endian uint64_t + static DB::Decimal128 getDecimal128FromBytes(const String & bytes); + static int64_t getOffsetAndSize(int64_t cursor, int64_t size); static int64_t extractOffset(int64_t offset_and_size); static int64_t extractSize(int64_t offset_and_size); @@ -120,7 +130,7 @@ class VariableLengthDataWriter /// parent_offset: the starting offset of current structure in which we are updating it's backing data region virtual int64_t write(size_t row_idx, const DB::Field & field, int64_t parent_offset); - /// Only support String/FixedString/Decimal32/Decimal64 + /// Only support String/FixedString/Decimal128 int64_t writeUnalignedBytes(size_t row_idx, const char * src, size_t size, int64_t parent_offset); private: int64_t writeArray(size_t row_idx, const DB::Array & array, int64_t parent_offset); diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index d3600e6a856d..be14b74addbd 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -509,6 +509,7 @@ DataTypePtr SerializedPlanParser::parseType(const substrait::Type & substrait_ty { UInt32 precision = substrait_type.decimal().precision(); UInt32 scale = substrait_type.decimal().scale(); + /* if (precision <= DataTypeDecimal32::maxPrecision()) ch_type = std::make_shared(precision, scale); else if (precision <= DataTypeDecimal64::maxPrecision()) @@ -516,8 +517,10 @@ DataTypePtr SerializedPlanParser::parseType(const substrait::Type & substrait_ty else if (precision <= DataTypeDecimal128::maxPrecision()) ch_type = std::make_shared(precision, scale); else + */ + if (precision > DataTypeDecimal128::maxPrecision()) throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support decimal type with precision {}", precision); - + ch_type = createDecimal(precision, scale); ch_type = wrapNullableType(substrait_type.decimal().nullability(), ch_type); } else if (substrait_type.has_struct_()) @@ -1218,7 +1221,7 @@ std::pair SerializedPlanParser::parseLiteral(const substrait else if (precision <= DataTypeDecimal128::maxPrecision()) { type = std::make_shared(precision, scale); - auto value = *reinterpret_cast(bytes.data()); + auto value = BackingDataLengthCalculator::getDecimal128FromBytes(bytes); field = DecimalField(value, scale); } else diff --git a/utils/local-engine/Parser/SparkRowToCHColumn.cpp b/utils/local-engine/Parser/SparkRowToCHColumn.cpp index 014103394925..845259026fa1 100644 --- a/utils/local-engine/Parser/SparkRowToCHColumn.cpp +++ b/utils/local-engine/Parser/SparkRowToCHColumn.cpp @@ -39,8 +39,18 @@ ALWAYS_INLINE static void writeRowToColumns(std::vector & colu { if (spark_row_reader.supportRawData(i)) { - const StringRef str{std::move(spark_row_reader.getStringRef(i))}; - columns[i]->insertData(str != EMPTY_STRING_REF ? str.data : nullptr, str.size); + const StringRef str_ref{std::move(spark_row_reader.getStringRef(i))}; + if (str_ref == EMPTY_STRING_REF) + columns[i]->insertData(nullptr, str_ref.size); + else if (!spark_row_reader.isBigEndianInSparkRow(i)) + columns[i]->insertData(str_ref.data, str_ref.size); + else + { + String str(str_ref.data, str_ref.size); + auto * decimal128 = reinterpret_cast(str.data()); + BackingDataLengthCalculator::swapBytes(*decimal128); + columns[i]->insertData(str.data(), str.size()); + } } else columns[i]->insert(spark_row_reader.getField(i)); @@ -135,9 +145,10 @@ Field VariableLengthDataReader::readDecimal(const char * buffer, size_t length) Decimal128 value; memcpy(&value, buffer, length); + BackingDataLengthCalculator::swapBytes(value); const auto * decimal128_type = typeid_cast(type_without_nullable.get()); - return std::move(DecimalField(value, decimal128_type->getScale())); + return std::move(DecimalField(std::move(value), decimal128_type->getScale())); } Field VariableLengthDataReader::readString(const char * buffer, size_t length) const diff --git a/utils/local-engine/Parser/SparkRowToCHColumn.h b/utils/local-engine/Parser/SparkRowToCHColumn.h index 200194d68518..946b0b4669fd 100644 --- a/utils/local-engine/Parser/SparkRowToCHColumn.h +++ b/utils/local-engine/Parser/SparkRowToCHColumn.h @@ -170,6 +170,7 @@ class SparkRowReader , bit_set_width_in_bytes(calculateBitSetWidthInBytes(num_fields)) , field_offsets(num_fields) , support_raw_datas(num_fields) + , is_big_endians_in_spark_row(num_fields) , fixed_length_data_readers(num_fields) , variable_length_data_readers(num_fields) { @@ -178,6 +179,7 @@ class SparkRowReader const auto type_without_nullable = removeNullable(field_types[ordinal]); field_offsets[ordinal] = bit_set_width_in_bytes + ordinal * 8L; support_raw_datas[ordinal] = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable); + is_big_endians_in_spark_row[ordinal] = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable); if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable)) fixed_length_data_readers[ordinal] = std::make_shared(field_types[ordinal]); else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable)) @@ -198,6 +200,12 @@ class SparkRowReader return support_raw_datas[ordinal]; } + bool isBigEndianInSparkRow(int ordinal) const + { + assertIndexIsValid(ordinal); + return is_big_endians_in_spark_row[ordinal]; + } + std::shared_ptr getFixedLengthDataReader(int ordinal) const { assertIndexIsValid(ordinal); @@ -361,6 +369,7 @@ class SparkRowReader const int32_t bit_set_width_in_bytes; std::vector field_offsets; std::vector support_raw_datas; + std::vector is_big_endians_in_spark_row; std::vector> fixed_length_data_readers; std::vector> variable_length_data_readers; diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index 443a94c67b36..02919bcd3828 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -734,7 +734,8 @@ jlong Java_io_glutenproject_vectorized_BlockNativeConverter_convertSparkRowsToCH env->DeleteLocalRef(type); } local_engine::SparkRowToCHColumn converter; - return reinterpret_cast(converter.convertSparkRowItrToCHColumn(java_iter, c_names, c_types)); + auto * block = converter.convertSparkRowItrToCHColumn(java_iter, c_names, c_types); + return reinterpret_cast(block); LOCAL_ENGINE_JNI_METHOD_END(env, -1) }