Skip to content

Commit

Permalink
refine resize partition buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 6, 2023
1 parent 16e2874 commit ac0ba92
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 75 deletions.
149 changes: 79 additions & 70 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto& dstAddrs = partitionValidityAddrs_[col];
for (auto& pid : partitionUsed_) {
if (dstAddrs[pid] == nullptr) {
// init bitmap if it's null, initialize the buffer as true
auto newSize = std::max(partition2RowCount_[pid], (uint32_t)options_.buffer_size);
// Init bitmap if it's null.
ARROW_ASSIGN_OR_RAISE(
auto validityBuffer,
arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(newSize), partitionBufferPool_.get()));
arrow::AllocateResizableBuffer(
arrow::bit_util::BytesForBits(partition2BufferSize_[pid]), partitionBufferPool_.get()));
dstAddrs[pid] = const_cast<uint8_t*>(validityBuffer->data());
memset(validityBuffer->mutable_data(), 0xff, validityBuffer->capacity());
partitionBuffers_[col][pid][kValidityBufferIndex] = std::move(validityBuffer);
Expand Down Expand Up @@ -937,7 +937,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
multiply = std::min(3, multiply + 1);

auto valueBuffer = std::static_pointer_cast<arrow::ResizableBuffer>(
partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kValueBufferIndex]);
partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kBinaryValueBufferIndex]);

RETURN_NOT_OK(valueBuffer->Reserve(capacity));

Expand Down Expand Up @@ -1157,7 +1157,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return arrow::Status::OK();
}

arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool reuseBuffers) {
arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]);

// try to allocate new
Expand All @@ -1177,21 +1177,14 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto columnIdx = fixedWidthColumnCount_ + binaryIdx;
ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(columnIdx, partitionId, newSize));

auto valueBufferSize = calculateValueBufferSizeForBinaryArray(binaryIdx, newSize);
auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize);
auto lengthBufferSize = newSize * kSizeOfBinaryArrayLengthBuffer;

auto& buffers = partitionBuffers_[columnIdx][partitionId];
if (reuseBuffers) {
valueBuffer = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffers[kValueBufferIndex]);
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize, /*shrink_to_fit=*/true));
lengthBuffer = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffers[kLengthBufferIndex]);
RETURN_NOT_OK(lengthBuffer->Resize(lengthBufferSize, /*shrink_to_fit=*/true));
} else {
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));
ARROW_ASSIGN_OR_RAISE(
lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, partitionBufferPool_.get()));
}
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));
ARROW_ASSIGN_OR_RAISE(
lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, partitionBufferPool_.get()));
partitionBinaryAddrs_[binaryIdx][partitionId] =
BinaryBuf(valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize);
buffers = {std::move(validityBuffer), std::move(lengthBuffer), std::move(valueBuffer)};
Expand Down Expand Up @@ -1221,13 +1214,8 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}

auto& buffers = partitionBuffers_[fixedWidthIdx][partitionId];
if (reuseBuffers) {
valueBuffer = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffers[1]);
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize, /*shrink_to_fit=*/true));
} else {
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));
}
ARROW_ASSIGN_OR_RAISE(
valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get()));
partitionFixedWidthValueAddrs_[fixedWidthIdx][partitionId] = valueBuffer->mutable_data();
buffers = {std::move(validityBuffer), std::move(valueBuffer)};

Expand Down Expand Up @@ -1310,12 +1298,13 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}
// offset buffer
ARROW_RETURN_IF(
!buffers[kLengthBufferIndex], arrow::Status::Invalid("Offset buffer of binary array is null."));
!buffers[kBinaryLengthBufferIndex], arrow::Status::Invalid("Offset buffer of binary array is null."));
allBuffers.push_back(
arrow::SliceBuffer(buffers[kLengthBufferIndex], 0, numRows * kSizeOfBinaryArrayLengthBuffer));
ARROW_RETURN_IF(!buffers[kValueBufferIndex], arrow::Status::Invalid("Value buffer of binary array is null."));
arrow::SliceBuffer(buffers[kBinaryLengthBufferIndex], 0, numRows * kSizeOfBinaryArrayLengthBuffer));
ARROW_RETURN_IF(
!buffers[kBinaryValueBufferIndex], arrow::Status::Invalid("Value buffer of binary array is null."));
// value buffer
allBuffers.push_back(arrow::SliceBuffer(buffers[kValueBufferIndex], 0, binaryBuf.valueOffset));
allBuffers.push_back(arrow::SliceBuffer(buffers[kBinaryValueBufferIndex], 0, binaryBuf.valueOffset));

if (reuseBuffers) {
// Set the first value offset to 0.
Expand Down Expand Up @@ -1344,21 +1333,22 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
allBuffers.push_back(nullptr);
}
// value buffer
ARROW_RETURN_IF(!buffers[1], arrow::Status::Invalid("Value buffer of fixed-width array is null."));
std::shared_ptr<arrow::Buffer> valueBuffer;
auto& valueBuffer = buffers[kFixedWidthValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of fixed-width array is null."));
std::shared_ptr<arrow::Buffer> slicedValueBuffer;
if (arrowColumnTypes_[i]->id() == arrow::BooleanType::type_id) {
valueBuffer = arrow::SliceBuffer(buffers[1], 0, arrow::bit_util::BytesForBits(numRows));
slicedValueBuffer = arrow::SliceBuffer(valueBuffer, 0, arrow::bit_util::BytesForBits(numRows));
} else if (veloxColumnTypes_[i]->isShortDecimal()) {
valueBuffer =
arrow::SliceBuffer(buffers[1], 0, numRows * (arrow::bit_width(arrow::Int64Type::type_id) >> 3));
slicedValueBuffer =
arrow::SliceBuffer(valueBuffer, 0, numRows * (arrow::bit_width(arrow::Int64Type::type_id) >> 3));
} else if (veloxColumnTypes_[i]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBuffer = arrow::SliceBuffer(
buffers[1], 0, facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(numRows));
slicedValueBuffer = arrow::SliceBuffer(
valueBuffer, 0, facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(numRows));
} else {
valueBuffer =
arrow::SliceBuffer(buffers[1], 0, numRows * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3));
slicedValueBuffer =
arrow::SliceBuffer(valueBuffer, 0, numRows * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3));
}
allBuffers.push_back(std::move(valueBuffer));
allBuffers.push_back(std::move(slicedValueBuffer));
if (!reuseBuffers) {
partitionValidityAddrs_[fixedWidthIdx][partitionId] = nullptr;
partitionFixedWidthValueAddrs_[fixedWidthIdx][partitionId] = nullptr;
Expand Down Expand Up @@ -1475,14 +1465,17 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return arrow::Status::OK();
}

arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, int64_t newSize) {
arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, int64_t newSize, bool preserveData) {
for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id();
auto& buffers = partitionBuffers_[i][partitionId];

// resize validity
if (buffers[kValidityBufferIndex]) {
auto& validityBuffer = buffers[kValidityBufferIndex];
// Handle validity buffer first.
auto& validityBuffer = buffers[kValidityBufferIndex];
if (!preserveData) {
ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(i, partitionId, newSize));
} else if (buffers[kValidityBufferIndex]) {
// Resize validity.
auto filled = validityBuffer->capacity();
RETURN_NOT_OK(validityBuffer->Resize(arrow::bit_util::BytesForBits(newSize)));
partitionValidityAddrs_[i][partitionId] = validityBuffer->mutable_data();
Expand All @@ -1493,42 +1486,39 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}
}

// shrink value buffer if fixed-width, offset & value buffers if binary
// Resize value buffer if fixed-width, offset & value buffers if binary.
switch (columnType) {
// binary types
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
auto& lengthBuffer = buffers[kLengthBufferIndex];
// Resize length buffer.
auto& lengthBuffer = buffers[kBinaryLengthBufferIndex];
ARROW_RETURN_IF(!lengthBuffer, arrow::Status::Invalid("Offset buffer of binary array is null."));
RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfBinaryArrayLengthBuffer));

// Resize value buffer.
auto binaryIdx = i - fixedWidthColumnCount_;
auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId];
auto& valueBuffer = buffers[kValueBufferIndex];
auto& valueBuffer = buffers[kBinaryValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of binary array is null."));
auto binaryNewSize = calculateValueBufferSizeForBinaryArray(binaryIdx, newSize);
auto valueBufferSize = std::max(binaryBuf.valueOffset, binaryNewSize);
// Determine the new Size for value buffer.
auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize);
auto valueOffset = 0;
// If preserve data, the new valueBufferSize should not be smaller than the current offset.
if (preserveData) {
valueBufferSize = std::max(binaryBuf.valueOffset, valueBufferSize);
valueOffset = binaryBuf.valueOffset;
}
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize));

binaryBuf = BinaryBuf(
valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize, binaryBuf.valueOffset);
binaryBuf =
BinaryBuf(valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize, valueOffset);
break;
}
default: { // fixed-width types
uint64_t valueBufferSize = 0;
auto columnIndex = simpleColumnIndices_[i];
if (arrowColumnTypes_[columnIndex]->id() == arrow::BooleanType::type_id) {
valueBufferSize = arrow::bit_util::BytesForBits(newSize);
} else if (veloxColumnTypes_[columnIndex]->isShortDecimal()) {
valueBufferSize = newSize * (arrow::bit_width(arrow::Int64Type::type_id) >> 3);
} else if (veloxColumnTypes_[columnIndex]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(newSize);
} else {
valueBufferSize = newSize * (arrow::bit_width(arrowColumnTypes_[columnIndex]->id()) >> 3);
}
auto& valueBuffer = buffers[1];
auto& valueBuffer = buffers[kFixedWidthValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of fixed-width array is null."));
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize));
RETURN_NOT_OK(valueBuffer->Resize(valueBufferSizeForFixedWidthArray(i, newSize)));
partitionFixedWidthValueAddrs_[i][partitionId] = valueBuffer->mutable_data();
break;
}
Expand Down Expand Up @@ -1558,7 +1548,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
if (newSize == 0) {
return resetPartitionBuffer(partitionId);
}
return resizePartitionBuffer(partitionId, newSize);
return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true);
}

arrow::Result<int64_t> VeloxShuffleWriter::shrinkPartitionBuffers() {
Expand All @@ -1574,10 +1564,25 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return shrunken;
}

uint64_t VeloxShuffleWriter::calculateValueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize) {
uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize) {
return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / totalInputNumRows_ * newSize + 1024;
}

uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIdx, int64_t newSize) {
uint64_t valueBufferSize = 0;
auto columnIdx = simpleColumnIndices_[fixedWidthIdx];
if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) {
valueBufferSize = arrow::bit_util::BytesForBits(newSize);
} else if (veloxColumnTypes_[columnIdx]->isShortDecimal()) {
valueBufferSize = newSize * (arrow::bit_width(arrow::Int64Type::type_id) >> 3);
} else if (veloxColumnTypes_[columnIdx]->kind() == facebook::velox::TypeKind::TIMESTAMP) {
valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(newSize);
} else {
valueBufferSize = newSize * (arrow::bit_width(arrowColumnTypes_[columnIdx]->id()) >> 3);
}
return valueBufferSize;
}

void VeloxShuffleWriter::stat() const {
#if VELOX_SHUFFLE_WRITER_LOG_FLAG
for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
Expand Down Expand Up @@ -1742,17 +1747,17 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// Make sure the size to be allocated is larger than the size to be filled.
if (partition2BufferSize_[pid] == 0) {
// Allocate buffer if it's not yet allocated.
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, false));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize));
} else if (beyondThreshold(pid, newSize)) {
if (newSize <= partitionBufferIdxBase_[pid]) {
// If the newSize is smaller, cache the buffered data and reuse and shrink the buffer.
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, true));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, true));
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false));
} else {
// If the newSize is larger, check if alreadyFilled + toBeFilled <= newSize
if (partitionBufferIdxBase_[pid] + partition2RowCount_[pid] <= newSize) {
// If so, keep the data in buffers and resize buffers.
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize)); // resize
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/true));
// Because inputHasNull_ is updated every time split is called, and resizePartitionBuffer won't allocate
// validity buffer.
RETURN_NOT_OK(updateValidityBuffers(pid, newSize));
Expand All @@ -1762,7 +1767,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// Else free and allocate new buffers.
bool reuseBuffers = newSize <= partition2BufferSize_[pid];
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, reuseBuffers));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, reuseBuffers));
if (reuseBuffers) {
RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false));
} else {
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize));
}
}
}
} else if (partitionBufferIdxBase_[pid] + partition2RowCount_[pid] > partition2BufferSize_[pid]) {
Expand All @@ -1771,11 +1780,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
if (newSize > partition2BufferSize_[pid]) {
// If the partition size after split is already larger than allocated buffer size, need reallocate.
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, false));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize, false));
RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize));
} else {
// Partition size after split is smaller than buffer size. Reuse the buffers.
RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, true));
// Reset validity buffer for reuse.
// Reset validity buffer for reallocate.
RETURN_NOT_OK(resetValidityBuffer(pid));
}
}
Expand Down
20 changes: 15 additions & 5 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ enum SplitState { kInit, kPreAlloc, kSplit, kStop };
enum EvictState { kEvictable, kUnevictable };

class VeloxShuffleWriter final : public ShuffleWriter {
enum { kValidityBufferIndex = 0, kLengthBufferIndex = 1, kValueBufferIndex = 2 };
enum {
kValidityBufferIndex = 0,
kFixedWidthValueBufferIndex = 1,
kBinaryValueBufferIndex = 2,
kBinaryLengthBufferIndex = kFixedWidthValueBufferIndex
};

public:
struct BinaryBuf {
Expand Down Expand Up @@ -227,7 +232,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize);

arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool reuseBuffers);
arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize);

arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv);

Expand Down Expand Up @@ -274,7 +279,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {

arrow::Result<std::shared_ptr<arrow::Buffer>> generateComplexTypeBuffers(facebook::velox::RowVectorPtr vector);

arrow::Status resetValidityBuffer(uint32_t partitionId);
arrow::Status resetValidityBuffer(uint32_t buffers);

arrow::Result<int64_t> shrinkPartitionBuffersMinSize(int64_t size);

Expand All @@ -286,9 +291,14 @@ class VeloxShuffleWriter final : public ShuffleWriter {

arrow::Status resetPartitionBuffer(uint32_t partitionId);

arrow::Status resizePartitionBuffer(uint32_t partitionId, int64_t newSize);
// Resize the partition buffer to newSize. If preserveData is true, it will keep the data in buffer.
// Note when preserveData is false, and newSize is larger, this function can introduce unnecessary memory copy.
// In this case, use allocatePartitionBuffer to free current buffers and allocate new buffers instead.
arrow::Status resizePartitionBuffer(uint32_t partitionId, int64_t newSize, bool preserveData);

uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize);

uint64_t calculateValueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize);
uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, int64_t newSize);

void calculateSimpleColumnBytes();

Expand Down

0 comments on commit ac0ba92

Please sign in to comment.