Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7205] [VL] Optimize row to column for scalar type #7206

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 260 additions & 0 deletions cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,239 @@

using namespace facebook::velox;
namespace gluten {
namespace {

inline int64_t calculateBitSetWidthInBytes(int32_t numFields) {
return ((numFields + 63) / 64) * 8;
}

inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) {
return nullBitsetWidthInBytes + 8L * index;
}

inline bool isNull(uint8_t* buffer_address, int32_t index) {
int64_t mask = 1L << (index & 0x3f); // mod 64 and shift
int64_t wordOffset = (index >> 6) * 8;
int64_t value = *((int64_t*)(buffer_address + wordOffset));
return (value & mask) != 0;
}

int32_t getTotalStringSize(
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress) {
size_t size = 0;
for (auto pos = 0; pos < numRows; pos++) {
if (isNull(memoryAddress + offsets[pos], columnIdx)) {
continue;
}

int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
int32_t length = static_cast<int32_t>(offsetAndSize);
if (!StringView::isInline(length)) {
size += length;
}
}
return size;
}

template <TypeKind Kind>
VectorPtr createFlatVector(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
using T = typename TypeTraits<Kind>::NativeType;
auto typeWidth = sizeof(T);
auto column = BaseVector::create<FlatVector<T>>(type, numRows, pool);
auto rawValues = column->template mutableRawValues<uint8_t>();
auto shift = __builtin_ctz((uint32_t)typeWidth);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset);
uint8_t* destptr = rawValues + (pos << shift);
memcpy(destptr, srcptr, typeWidth);
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::HUGEINT>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<int128_t>>(type, numRows, pool);
auto rawValues = column->mutableRawValues<uint8_t>();
auto typeWidth = sizeof(int128_t);
auto shift = __builtin_ctz((uint32_t)typeWidth);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
uint8_t* destptr = rawValues + (pos << shift);
int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
int32_t length = static_cast<int32_t>(offsetAndSize);
int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
uint8_t bytesValue[length];
memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length);
uint8_t bytesValue2[16]{};
for (int k = length - 1; k >= 0; k--) {
bytesValue2[length - 1 - k] = bytesValue[k];
}
if (int8_t(bytesValue[0]) < 0) {
memset(bytesValue2 + length, 255, 16 - length);
}
memcpy(destptr, bytesValue2, typeWidth);
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::BOOLEAN>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<bool>>(type, numRows, pool);
auto rawValues = column->mutableRawValues<uint64_t>();
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset);
bits::setBit(rawValues, pos, value);
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::TIMESTAMP>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<Timestamp>>(type, numRows, pool);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
column->set(pos, Timestamp::fromMicros(value));
} else {
column->setNull(pos, true);
}
}
return column;
}

VectorPtr createFlatVectorStringView(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<StringView>>(type, numRows, pool);
auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets, memoryAddress);
char* rawBuffer = column->getRawStringBufferWithSpace(size, true);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
int32_t length = static_cast<int32_t>(offsetAndSize);
int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset;
if (StringView::isInline(length)) {
column->set(pos, StringView(reinterpret_cast<char*>(valueSrcPtr), length));
} else {
memcpy(rawBuffer, valueSrcPtr, length);
column->setNoCopy(pos, StringView(rawBuffer, length));
rawBuffer += length;
}
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::VARCHAR>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool);
}

template <>
VectorPtr createFlatVector<TypeKind::VARBINARY>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool);
}

template <>
VectorPtr createFlatVector<TypeKind::UNKNOWN>(
const TypePtr& /*type*/,
int32_t /*columnIdx*/,
int32_t numRows,
int64_t /*fieldOffset*/,
std::vector<int64_t>& /*offsets*/,
uint8_t* /*memoryAddress*/,
memory::MemoryPool* pool) {
auto nulls = allocateNulls(numRows, pool, bits::kNull);
return std::make_shared<FlatVector<UnknownValue>>(
pool,
UNKNOWN(),
nulls,
numRows,
nullptr, // values
std::vector<BufferPtr>{}); // stringBuffers
}

bool supporteType(const RowTypePtr rowType) {
for (auto i = 0; i < rowType->size(); i++) {
auto kind = rowType->childAt(i)->kind();
switch (kind) {
case TypeKind::ARRAY:
case TypeKind::MAP:
case TypeKind::ROW:
return false;
default:
break;
}
}
return true;
}

} // namespace
VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(
struct ArrowSchema* cSchema,
std::shared_ptr<memory::MemoryPool> memoryPool)
Expand All @@ -32,6 +265,9 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(

std::shared_ptr<ColumnarBatch>
VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) {
if (supporteType(asRowType(rowType_))) {
return convertPrimitive(numRows, rowLength, memoryAddress);
}
std::vector<std::optional<std::string_view>> data;
int64_t offset = 0;
for (auto i = 0; i < numRows; i++) {
Expand All @@ -41,4 +277,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_
auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_, pool_.get());
return std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<RowVector>(vp));
}

std::shared_ptr<ColumnarBatch>
VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) {
auto numFields = rowType_->size();
int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields);
std::vector<int64_t> offsets;
offsets.resize(numRows);
for (auto i = 1; i < numRows; i++) {
offsets[i] = offsets[i - 1] + rowLength[i - 1];
}

std::vector<VectorPtr> columns;
columns.resize(numFields);

for (auto i = 0; i < numFields; i++) {
auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i);
auto& type = rowType_->childAt(i);
columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
createFlatVector, type->kind(), type, i, numRows, fieldOffset, offsets, memoryAddress, pool_.get());
}

auto rowVector = std::make_shared<RowVector>(pool_.get(), rowType_, BufferPtr(nullptr), numRows, std::move(columns));
return std::make_shared<VeloxColumnarBatch>(rowVector);
}
} // namespace gluten
3 changes: 2 additions & 1 deletion cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class VeloxRowToColumnarConverter final : public RowToColumnarConverter {

std::shared_ptr<ColumnarBatch> convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress);

protected:
private:
std::shared_ptr<ColumnarBatch> convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress);
facebook::velox::TypePtr rowType_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
};
Expand Down
Loading