Skip to content

Commit

Permalink
[VL] Refactor split buffer allocation (#3177)
Browse files Browse the repository at this point in the history
The PR is a code cleanup.  no impact to perf
Extract the duplicated logic in both resizePartitionBuffer and allocatePartitionBuffer.
In prealloc partition buffer, use resizePartitionBuffer instead of allocatePartitionBuffer if need to preserve buffer data for the next split, and remove the resize branch in allocate PartitionBuffer.
  • Loading branch information
marin-ma authored Nov 9, 2023
1 parent c7780fe commit 5ee4e34
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 115 deletions.
205 changes: 94 additions & 111 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto& dstAddrs = partitionValidityAddrs_[col];
for (auto& pid : partitionUsed_) {
if (dstAddrs[pid] == nullptr) {
// init bitmap if it's null, initialize the buffer as true
auto newSize = std::max(partition2RowCount_[pid], (uint32_t)options_.buffer_size);
// Init bitmap if it's null.
ARROW_ASSIGN_OR_RAISE(
auto validityBuffer,
arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(newSize), partitionBufferPool_.get()));
arrow::AllocateResizableBuffer(
arrow::bit_util::BytesForBits(partition2BufferSize_[pid]), partitionBufferPool_.get()));
dstAddrs[pid] = const_cast<uint8_t*>(validityBuffer->data());
memset(validityBuffer->mutable_data(), 0xff, validityBuffer->capacity());
partitionBuffers_[col][pid][kValidityBufferIndex] = std::move(validityBuffer);
Expand Down Expand Up @@ -937,7 +937,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
multiply = std::min(3, multiply + 1);

auto valueBuffer = std::static_pointer_cast<arrow::ResizableBuffer>(
partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kValueBufferIndex]);
partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kBinaryValueBufferIndex]);

RETURN_NOT_OK(valueBuffer->Reserve(capacity));

Expand Down Expand Up @@ -1157,86 +1157,48 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return arrow::Status::OK();
}

arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool reuseBuffers) {
arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]);

// try to allocate new
auto numFields = schema_->num_fields();
assert(numFields == arrowColumnTypes_.size());
for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id();
auto& buffers = partitionBuffers_[i][partitionId];

auto fixedWidthIdx = 0;
auto binaryIdx = 0;
for (auto i = 0; i < numFields; ++i) {
switch (arrowColumnTypes_[i]->id()) {
std::shared_ptr<arrow::ResizableBuffer> validityBuffer{};
ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(i, partitionId, newSize));
switch (columnType) {
// binary types
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
std::shared_ptr<arrow::ResizableBuffer> validityBuffer{};
std::shared_ptr<arrow::ResizableBuffer> lengthBuffer{};
std::shared_ptr<arrow::ResizableBuffer> valueBuffer{};

auto columnIdx = fixedWidthColumnCount_ + binaryIdx;
ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(columnIdx, partitionId, newSize));
auto binaryIdx = i - fixedWidthColumnCount_;

auto valueBufferSize = calculateValueBufferSizeForBinaryArray(binaryIdx, newSize);
std::shared_ptr<arrow::ResizableBuffer> lengthBuffer{};
auto lengthBufferSize = newSize * kSizeOfBinaryArrayLengthBuffer;
ARROW_ASSIGN_OR_RAISE(
lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, partitionBufferPool_.get()));

std::shared_ptr<arrow::ResizableBuffer> valueBuffer{};
auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize);
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));

auto& buffers = partitionBuffers_[columnIdx][partitionId];
if (reuseBuffers) {
valueBuffer = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffers[kValueBufferIndex]);
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize, /*shrink_to_fit=*/true));
lengthBuffer = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffers[kLengthBufferIndex]);
RETURN_NOT_OK(lengthBuffer->Resize(lengthBufferSize, /*shrink_to_fit=*/true));
} else {
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));
ARROW_ASSIGN_OR_RAISE(
lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, partitionBufferPool_.get()));
}
partitionBinaryAddrs_[binaryIdx][partitionId] =
BinaryBuf(valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize);
buffers = {std::move(validityBuffer), std::move(lengthBuffer), std::move(valueBuffer)};

binaryIdx++;
break;
}
case arrow::StructType::type_id:
case arrow::MapType::type_id:
case arrow::ListType::type_id:
break;
default: {
std::shared_ptr<arrow::ResizableBuffer> validityBuffer{};
default: { // fixed-width types
std::shared_ptr<arrow::ResizableBuffer> valueBuffer{};

ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(fixedWidthIdx, partitionId, newSize));

int64_t valueBufferSize = 0;
if (arrowColumnTypes_[i]->id() == arrow::BooleanType::type_id) {
valueBufferSize = arrow::bit_util::BytesForBits(newSize);
} else if (veloxColumnTypes_[i]->isShortDecimal()) {
valueBufferSize = newSize * (arrow::bit_width(arrow::Int64Type::type_id) >> 3);
} else if (veloxColumnTypes_[i]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(newSize);
} else {
valueBufferSize = newSize * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3);
}

auto& buffers = partitionBuffers_[fixedWidthIdx][partitionId];
if (reuseBuffers) {
valueBuffer = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffers[1]);
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize, /*shrink_to_fit=*/true));
} else {
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));
}
partitionFixedWidthValueAddrs_[fixedWidthIdx][partitionId] = valueBuffer->mutable_data();
ARROW_ASSIGN_OR_RAISE(
valueBuffer,
arrow::AllocateResizableBuffer(
valueBufferSizeForFixedWidthArray(i, newSize), partitionBufferPool_.get()));
partitionFixedWidthValueAddrs_[i][partitionId] = valueBuffer->mutable_data();
buffers = {std::move(validityBuffer), std::move(valueBuffer)};

fixedWidthIdx++;
break;
}
}
}

partition2BufferSize_[partitionId] = newSize;
return arrow::Status::OK();
}
Expand Down Expand Up @@ -1310,12 +1272,13 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}
// offset buffer
ARROW_RETURN_IF(
!buffers[kLengthBufferIndex], arrow::Status::Invalid("Offset buffer of binary array is null."));
!buffers[kBinaryLengthBufferIndex], arrow::Status::Invalid("Offset buffer of binary array is null."));
allBuffers.push_back(
arrow::SliceBuffer(buffers[kLengthBufferIndex], 0, numRows * kSizeOfBinaryArrayLengthBuffer));
ARROW_RETURN_IF(!buffers[kValueBufferIndex], arrow::Status::Invalid("Value buffer of binary array is null."));
arrow::SliceBuffer(buffers[kBinaryLengthBufferIndex], 0, numRows * kSizeOfBinaryArrayLengthBuffer));
ARROW_RETURN_IF(
!buffers[kBinaryValueBufferIndex], arrow::Status::Invalid("Value buffer of binary array is null."));
// value buffer
allBuffers.push_back(arrow::SliceBuffer(buffers[kValueBufferIndex], 0, binaryBuf.valueOffset));
allBuffers.push_back(arrow::SliceBuffer(buffers[kBinaryValueBufferIndex], 0, binaryBuf.valueOffset));

if (reuseBuffers) {
// Set the first value offset to 0.
Expand Down Expand Up @@ -1344,21 +1307,22 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
allBuffers.push_back(nullptr);
}
// value buffer
ARROW_RETURN_IF(!buffers[1], arrow::Status::Invalid("Value buffer of fixed-width array is null."));
std::shared_ptr<arrow::Buffer> valueBuffer;
auto& valueBuffer = buffers[kFixedWidthValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of fixed-width array is null."));
std::shared_ptr<arrow::Buffer> slicedValueBuffer;
if (arrowColumnTypes_[i]->id() == arrow::BooleanType::type_id) {
valueBuffer = arrow::SliceBuffer(buffers[1], 0, arrow::bit_util::BytesForBits(numRows));
slicedValueBuffer = arrow::SliceBuffer(valueBuffer, 0, arrow::bit_util::BytesForBits(numRows));
} else if (veloxColumnTypes_[i]->isShortDecimal()) {
valueBuffer =
arrow::SliceBuffer(buffers[1], 0, numRows * (arrow::bit_width(arrow::Int64Type::type_id) >> 3));
slicedValueBuffer =
arrow::SliceBuffer(valueBuffer, 0, numRows * (arrow::bit_width(arrow::Int64Type::type_id) >> 3));
} else if (veloxColumnTypes_[i]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBuffer = arrow::SliceBuffer(
buffers[1], 0, facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(numRows));
slicedValueBuffer = arrow::SliceBuffer(
valueBuffer, 0, facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(numRows));
} else {
valueBuffer =
arrow::SliceBuffer(buffers[1], 0, numRows * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3));
slicedValueBuffer =
arrow::SliceBuffer(valueBuffer, 0, numRows * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3));
}
allBuffers.push_back(std::move(valueBuffer));
allBuffers.push_back(std::move(slicedValueBuffer));
if (!reuseBuffers) {
partitionValidityAddrs_[fixedWidthIdx][partitionId] = nullptr;
partitionFixedWidthValueAddrs_[fixedWidthIdx][partitionId] = nullptr;
Expand Down Expand Up @@ -1475,14 +1439,17 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return arrow::Status::OK();
}

arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, int64_t newSize) {
arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, int64_t newSize, bool preserveData) {
for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id();
auto& buffers = partitionBuffers_[i][partitionId];

// resize validity
if (buffers[kValidityBufferIndex]) {
auto& validityBuffer = buffers[kValidityBufferIndex];
// Handle validity buffer first.
auto& validityBuffer = buffers[kValidityBufferIndex];
if (!preserveData) {
ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(i, partitionId, newSize));
} else if (buffers[kValidityBufferIndex]) {
// Resize validity.
auto filled = validityBuffer->capacity();
RETURN_NOT_OK(validityBuffer->Resize(arrow::bit_util::BytesForBits(newSize)));
partitionValidityAddrs_[i][partitionId] = validityBuffer->mutable_data();
Expand All @@ -1493,42 +1460,39 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}
}

// shrink value buffer if fixed-width, offset & value buffers if binary
// Resize value buffer if fixed-width, offset & value buffers if binary.
switch (columnType) {
// binary types
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
auto& lengthBuffer = buffers[kLengthBufferIndex];
// Resize length buffer.
auto& lengthBuffer = buffers[kBinaryLengthBufferIndex];
ARROW_RETURN_IF(!lengthBuffer, arrow::Status::Invalid("Offset buffer of binary array is null."));
RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfBinaryArrayLengthBuffer));

// Resize value buffer.
auto binaryIdx = i - fixedWidthColumnCount_;
auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId];
auto& valueBuffer = buffers[kValueBufferIndex];
auto& valueBuffer = buffers[kBinaryValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of binary array is null."));
auto binaryNewSize = calculateValueBufferSizeForBinaryArray(binaryIdx, newSize);
auto valueBufferSize = std::max(binaryBuf.valueOffset, binaryNewSize);
// Determine the new Size for value buffer.
auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize);
auto valueOffset = 0;
// If preserve data, the new valueBufferSize should not be smaller than the current offset.
if (preserveData) {
valueBufferSize = std::max(binaryBuf.valueOffset, valueBufferSize);
valueOffset = binaryBuf.valueOffset;
}
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize));

binaryBuf = BinaryBuf(
valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize, binaryBuf.valueOffset);
binaryBuf =
BinaryBuf(valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize, valueOffset);
break;
}
default: { // fixed-width types
uint64_t valueBufferSize = 0;
auto columnIndex = simpleColumnIndices_[i];
if (arrowColumnTypes_[columnIndex]->id() == arrow::BooleanType::type_id) {
valueBufferSize = arrow::bit_util::BytesForBits(newSize);
} else if (veloxColumnTypes_[columnIndex]->isShortDecimal()) {
valueBufferSize = newSize * (arrow::bit_width(arrow::Int64Type::type_id) >> 3);
} else if (veloxColumnTypes_[columnIndex]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(newSize);
} else {
valueBufferSize = newSize * (arrow::bit_width(arrowColumnTypes_[columnIndex]->id()) >> 3);
}
auto& valueBuffer = buffers[1];
auto& valueBuffer = buffers[kFixedWidthValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of fixed-width array is null."));
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize));
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSizeForFixedWidthArray(i, newSize)));
partitionFixedWidthValueAddrs_[i][partitionId] = valueBuffer->mutable_data();
break;
}
Expand Down Expand Up @@ -1558,7 +1522,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
if (newSize == 0) {
return resetPartitionBuffer(partitionId);
}
return resizePartitionBuffer(partitionId, newSize);
return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true);
}

arrow::Result<int64_t> VeloxShuffleWriter::shrinkPartitionBuffers() {
Expand All @@ -1574,10 +1538,25 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return shrunken;
}

uint64_t VeloxShuffleWriter::calculateValueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize) {
uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize) {
return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / totalInputNumRows_ * newSize + 1024;
}

uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIdx, int64_t newSize) {
uint64_t valueBufferSize = 0;
auto columnIdx = simpleColumnIndices_[fixedWidthIdx];
if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) {
valueBufferSize = arrow::bit_util::BytesForBits(newSize);
} else if (veloxColumnTypes_[columnIdx]->isShortDecimal()) {
valueBufferSize = newSize * (arrow::bit_width(arrow::Int64Type::type_id) >> 3);
} else if (veloxColumnTypes_[columnIdx]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(newSize);
} else {
valueBufferSize = newSize * (arrow::bit_width(arrowColumnTypes_[columnIdx]->id()) >> 3);
}
return valueBufferSize;
}

void VeloxShuffleWriter::stat() const {
#if VELOX_SHUFFLE_WRITER_LOG_FLAG
for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
Expand Down Expand Up @@ -1742,17 +1721,17 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// Make sure the size to be allocated is larger than the size to be filled.
if (partition2BufferSize_[pid] == 0) {
// Allocate buffer if it's not yet allocated.
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, false));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize));
} else if (beyondThreshold(pid, newSize)) {
if (newSize <= partitionBufferIdxBase_[pid]) {
// If the newSize is smaller, cache the buffered data and reuse and shrink the buffer.
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, true));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, true));
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false));
} else {
// If the newSize is larger, check if alreadyFilled + toBeFilled <= newSize
if (partitionBufferIdxBase_[pid] + partition2RowCount_[pid] <= newSize) {
// If so, keep the data in buffers and resize buffers.
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize)); // resize
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/true));
// Because inputHasNull_ is updated every time split is called, and resizePartitionBuffer won't allocate
// validity buffer.
RETURN_NOT_OK(updateValidityBuffers(pid, newSize));
Expand All @@ -1762,7 +1741,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// Else free and allocate new buffers.
bool reuseBuffers = newSize <= partition2BufferSize_[pid];
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, reuseBuffers));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, reuseBuffers));
if (reuseBuffers) {
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false));
} else {
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize));
}
}
}
} else if (partitionBufferIdxBase_[pid] + partition2RowCount_[pid] > partition2BufferSize_[pid]) {
Expand All @@ -1771,11 +1754,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
if (newSize > partition2BufferSize_[pid]) {
// If the partition size after split is already larger than allocated buffer size, need reallocate.
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, false));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, false));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize));
} else {
// Partition size after split is smaller than buffer size. Reuse the buffers.
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, true));
// Reset validity buffer for reuse.
// Reset validity buffer for reallocate.
RETURN_NOT_OK(resetValidityBuffer(pid));
}
}
Expand Down
Loading

0 comments on commit 5ee4e34

Please sign in to comment.