diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc index 75fa3c3d3262..5dec7db23af4 100644 --- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc +++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc @@ -22,6 +22,239 @@ using namespace facebook::velox; namespace gluten { +namespace { + +inline int64_t calculateBitSetWidthInBytes(int32_t numFields) { + return ((numFields + 63) / 64) * 8; +} + +inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { + return nullBitsetWidthInBytes + 8L * index; +} + +inline bool isNull(uint8_t* buffer_address, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t value = *((int64_t*)(buffer_address + wordOffset)); + return (value & mask) != 0; +} + +int32_t getTotalStringSize( + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress) { + size_t size = 0; + for (auto pos = 0; pos < numRows; pos++) { + if (isNull(memoryAddress + offsets[pos], columnIdx)) { + continue; + } + + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + if (!StringView::isInline(length)) { + size += length; + } + } + return size; +} + +template +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + using T = typename TypeTraits::NativeType; + auto typeWidth = sizeof(T); + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->template mutableRawValues(); + auto shift = __builtin_ctz((uint32_t)typeWidth); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset); + uint8_t* destptr = rawValues + (pos << shift); + memcpy(destptr, srcptr, typeWidth); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->mutableRawValues(); + auto typeWidth = sizeof(int128_t); + auto shift = __builtin_ctz((uint32_t)typeWidth); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + uint8_t* destptr = rawValues + (pos << shift); + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + int32_t wordoffset = static_cast(offsetAndSize >> 32); + uint8_t bytesValue[length]; + memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length); + uint8_t bytesValue2[16]{}; + for (int k = length - 1; k >= 0; k--) { + bytesValue2[length - 1 - k] = bytesValue[k]; + } + if (int8_t(bytesValue[0]) < 0) { + memset(bytesValue2 + length, 255, 16 - length); + } + memcpy(destptr, bytesValue2, typeWidth); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->mutableRawValues(); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset); + bits::setBit(rawValues, pos, value); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + column->set(pos, Timestamp::fromMicros(value)); + } else { + column->setNull(pos, true); + } + } + return column; +} + +VectorPtr createFlatVectorStringView( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets, memoryAddress); + char* rawBuffer = column->getRawStringBufferWithSpace(size, true); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + int32_t wordoffset = static_cast(offsetAndSize >> 32); + auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset; + if (StringView::isInline(length)) { + column->set(pos, StringView(reinterpret_cast(valueSrcPtr), length)); + } else { + memcpy(rawBuffer, valueSrcPtr, length); + column->setNoCopy(pos, StringView(rawBuffer, length)); + rawBuffer += length; + } + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool); +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool); +} + +template <> +VectorPtr createFlatVector( + const TypePtr& /*type*/, + int32_t /*columnIdx*/, + int32_t numRows, + int64_t /*fieldOffset*/, + std::vector& /*offsets*/, + uint8_t* /*memoryAddress*/, + memory::MemoryPool* pool) { + auto nulls = allocateNulls(numRows, pool, bits::kNull); + return std::make_shared>( + pool, + UNKNOWN(), + nulls, + numRows, + nullptr, // values + std::vector{}); // stringBuffers +} + +bool supporteType(const RowTypePtr rowType) { + for (auto i = 0; i < rowType->size(); i++) { + auto kind = rowType->childAt(i)->kind(); + switch (kind) { + case TypeKind::ARRAY: + case TypeKind::MAP: + case TypeKind::ROW: + return false; + default: + break; + } + } + return true; +} + +} // namespace VeloxRowToColumnarConverter::VeloxRowToColumnarConverter( struct ArrowSchema* cSchema, std::shared_ptr memoryPool) @@ -32,6 +265,9 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter( std::shared_ptr VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) { + if (supporteType(asRowType(rowType_))) { + return convertPrimitive(numRows, rowLength, memoryAddress); + } std::vector> data; int64_t offset = 0; for (auto i = 0; i < numRows; i++) { @@ -41,4 +277,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_ auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_, pool_.get()); return std::make_shared(std::dynamic_pointer_cast(vp)); } + +std::shared_ptr +VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) { + auto numFields = rowType_->size(); + int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields); + std::vector offsets; + offsets.resize(numRows); + for (auto i = 1; i < numRows; i++) { + offsets[i] = offsets[i - 1] + rowLength[i - 1]; + } + + std::vector columns; + columns.resize(numFields); + + for (auto i = 0; i < numFields; i++) { + auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i); + auto& type = rowType_->childAt(i); + columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + createFlatVector, type->kind(), type, i, numRows, fieldOffset, offsets, memoryAddress, pool_.get()); + } + + auto rowVector = std::make_shared(pool_.get(), rowType_, BufferPtr(nullptr), numRows, std::move(columns)); + return std::make_shared(rowVector); +} } // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h index 30006c4f0757..c064b9d3c091 100644 --- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h +++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h @@ -33,7 +33,8 @@ class VeloxRowToColumnarConverter final : public RowToColumnarConverter { std::shared_ptr convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress); - protected: + private: + std::shared_ptr convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress); facebook::velox::TypePtr rowType_; std::shared_ptr pool_; };