Skip to content

Commit

Permalink
Update VeloxColumnarToRowConverter.cc
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixYBW authored Aug 1, 2024
1 parent 05a7763 commit a140b8f
Showing 1 changed file with 22 additions and 32 deletions.
54 changes: 22 additions & 32 deletions cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,57 +28,47 @@ using namespace facebook;

namespace gluten {

void VeloxColumnarToRowConverter::refreshStates(
facebook::velox::RowVectorPtr rowVector,
int64_t rowId,
int64_t memoryThreshold) {
void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t startRow) {
auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();

fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);

size_t totalMemorySize = 0;
if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
if (memoryThreshold < fixedRowSize.value()) {
memoryThreshold = fixedRowSize.value();
LOG(WARNING) << "spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) +
") is too small, it can't hold even one row(" + velox::succinctBytes(fixedRowSize.value()) + ")";
}
memThreshold_ == std::max<int64_t>(memThreshold_, fixedRowSize.value());
auto rowSize = fixedRowSize.value();
numRows_ = std::min<int64_t>(memoryThreshold / rowSize, vectorLength - rowId);
totalMemorySize = rowSize * numRows_;
numRows_ = std::min<int64_t>(memThreshold_ / rowSize, vectorLength - startRow);
} else {
int64_t i = rowId;
for (; i < vectorLength; ++i) {
auto rowSize = fast_->rowSize(i);
if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) {
if (i == rowId) {
memoryThreshold = rowSize;
LOG(WARNING) << "spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) +
") is too small, it can't hold even one row(" + velox::succinctBytes(rowSize) + ")";
}
// Calculate the first row size
int64_t totalMemorySize = fast_->rowSize(startRow);

auto endRow = startRow + 1;
for (; endRow < vectorLength; ++endRow) {
auto rowSize = fast_->rowSize(endRow);
if (UNLIKELY(totalMemorySize + rowSize > memThreshold_)) {
break;
} else {
totalMemorySize += rowSize;
}
}
numRows_ = i - rowId;
// Make sure the threshold is larger than the first row size
memThreshold_ = std::max<int64_t>(totalMemorySize, memThreshold_);
numRows_ = endRow - startRow;
}

if (veloxBuffers_ == nullptr) {
veloxBuffers_ = velox::AlignedBuffer::allocate<uint8_t>(memoryThreshold, veloxPool_.get());
if (nullptr == veloxBuffers_) {
veloxBuffers_ = velox::AlignedBuffer::allocate<uint8_t>(memThreshold_, veloxPool_.get());
} else if (veloxBuffers_->capacity() < memThreshold_) {
velox::AlignedBuffer::reallocate<uint8_t>(&veloxBuffers_, memThreshold_);
}
if (veloxBuffers_->capacity() < totalMemorySize) {
velox::AlignedBuffer::reallocate<uint8_t>(&veloxBuffers_, totalMemorySize);
}


bufferAddress_ = veloxBuffers_->asMutable<uint8_t>();
memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize);
memset(bufferAddress_, 0, sizeof(int8_t) * memThreshold_);
}

void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, int64_t rowId, int64_t memoryThreshold) {
void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, int64_t startRow) {
auto veloxBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
refreshStates(veloxBatch->getRowVector(), rowId, memoryThreshold);
refreshStates(veloxBatch->getRowVector(), startRow);

// Initialize the offsets_ , lengths_
lengths_.clear();
Expand All @@ -88,7 +78,7 @@ void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, int

size_t offset = 0;
for (auto i = 0; i < numRows_; ++i) {
auto rowSize = fast_->serialize(rowId + i, (char*)(bufferAddress_ + offset));
auto rowSize = fast_->serialize(startRow + i, (char*)(bufferAddress_ + offset));
lengths_[i] = rowSize;
if (i > 0) {
offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
Expand Down

0 comments on commit a140b8f

Please sign in to comment.