Skip to content

Commit

Permalink
prealloc sortedBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 26, 2024
1 parent 784cb36 commit 962fc80
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 69 deletions.
6 changes: 3 additions & 3 deletions cpp/velox/shuffle/RadixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class RadixSort {
auto offsets = transformCountsToOffsets(counts, outIndex);

for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) {
auto bucket = (array[offset].value >> (byteIdx * 8)) & 0xff;
auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff;
array[offsets[bucket]++] = array[offset];
}
}
Expand All @@ -112,7 +112,7 @@ class RadixSort {
int64_t bitwiseMax = 0;
int64_t bitwiseMin = -1L;
for (auto offset = 0; offset < numRecords; ++offset) {
auto value = array[offset].value;
auto value = array[offset];
bitwiseMax |= value;
bitwiseMin &= value;
}
Expand All @@ -123,7 +123,7 @@ class RadixSort {
if (((bitsChanged >> (i * 8)) & 0xff) != 0) {
counts[i].resize(256);
for (auto offset = 0; offset < numRecords; ++offset) {
counts[i][(array[offset].value >> (i * 8)) & 0xff]++;
counts[i][(array[offset] >> (i * 8)) & 0xff]++;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::deserializeTo
auto buffer = cur->second;
const auto* rawBuffer = buffer->as<char>();
while (rowOffset_ < cur->first && readRows < batchSize_) {
auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_);
byteOffset_ += sizeof(uint32_t);
auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_) - sizeof(RowSizeType);
byteOffset_ += sizeof(RowSizeType);
data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
byteOffset_ += rowSize;
++rowOffset_;
Expand Down
121 changes: 66 additions & 55 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1;
constexpr uint32_t kPartitionIdStartByteIndex = 5;
constexpr uint32_t kPartitionIdEndByteIndex = 7;

constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024;

uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) {
// |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) |
return (uint64_t)partitionId << 40 | (uint64_t)pageNumber << 27 | offsetInPage;
Expand Down Expand Up @@ -80,7 +82,8 @@ arrow::Status VeloxSortShuffleWriter::stop() {
if (offset_ > 0) {
RETURN_NOT_OK(evictAllPartitions());
}
array_ = nullptr;
array_.reset();
sortedBuffer_.reset();
pages_.clear();
pageAddresses_.clear();
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
Expand All @@ -103,13 +106,18 @@ arrow::Status VeloxSortShuffleWriter::init() {
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
initArray();
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get());
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
return arrow::Status::OK();
}

void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv) {
if (UNLIKELY(!rowType_)) {
rowType_ = facebook::velox::asRowType(rv->type());
fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
if (fixedRowSize_) {
*fixedRowSize_ += sizeof(RowSizeType);
}
}
}

Expand Down Expand Up @@ -150,19 +158,24 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
facebook::velox::row::CompactRow row(vector);

if (!fixedRowSize_) {
rowSizes_.resize(inputRows + 1);
rowSizes_[0] = 0;
rowSize_.resize(inputRows);
rowSizePrefixSum_.resize(inputRows + 1);
rowSizePrefixSum_[0] = 0;
for (auto i = 0; i < inputRows; ++i) {
rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i);
auto rowSize = row.rowSize(i) + sizeof(RowSizeType);
rowSize_[i] = rowSize;
rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize;
}
} else {
rowSize_.resize(inputRows, *fixedRowSize_);
}

uint32_t rowOffset = 0;
while (rowOffset < inputRows) {
auto remainingRows = inputRows - rowOffset;
auto rows = maxRowsToInsert(rowOffset, remainingRows);
if (rows == 0) {
auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : rowSizes_[rowOffset + 1] - rowSizes_[rowOffset];
auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : rowSize_[rowOffset];
acquireNewBuffer(memLimit, minSizeRequired);
rows = maxRowsToInsert(rowOffset, remainingRows);
ARROW_RETURN_IF(
Expand All @@ -181,10 +194,12 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) {
VELOX_CHECK(!pages_.empty());
for (auto i = offset; i < offset + rows; ++i) {
auto pid = row2Partition_[i];
arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);
// size(RowSize) | bytes
memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType));
pageCursor_ += sizeof(RowSizeType);
auto size = row.serialize(i, currentPage_ + pageCursor_);
arrayPtr_[offset_].value = toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_);
arrayPtr_[offset_].rowSize = size;
++offset_;
pageCursor_ += size;
VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
}
Expand All @@ -205,19 +220,20 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
{
ScopedTimer timer(&sortTime_);
if (options_.useRadixSort) {
begin = RadixSort<Element>::sort(
begin = RadixSort<uint64_t>::sort(
arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex);
} else {
auto ptr = arrayPtr_;
qsort(ptr, numRecords, sizeof(Element), compare);
qsort(ptr, numRecords, sizeof(uint64_t), compare);
(void)ptr;
}
}

auto end = begin + numRecords;
auto cur = begin;
auto pid = extractPartitionId(arrayPtr_[begin].value);
auto pid = extractPartitionId(arrayPtr_[begin]);
while (++cur < end) {
auto curPid = extractPartitionId(arrayPtr_[cur].value);
auto curPid = extractPartitionId(arrayPtr_[cur]);
if (curPid != pid) {
RETURN_NOT_OK(evictPartition(pid, begin, cur));
pid = curPid;
Expand All @@ -226,8 +242,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}
RETURN_NOT_OK(evictPartition(pid, begin, cur));

sortedBuffer_ = nullptr;

if (!stopped_) {
// Preserve the last page for use.
auto numPages = pages_.size();
Expand All @@ -253,43 +267,39 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}

arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
auto payload = prepareToEvict(begin, end);
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
}

std::unique_ptr<InMemoryPayload> VeloxSortShuffleWriter::prepareToEvict(size_t begin, size_t end) {
ScopedTimer timer(&sortTime_);
// Serialize [begin, end)
uint32_t numRows = end - begin;
uint64_t rawSize = numRows * sizeof(RowSizeType);
for (auto i = begin; i < end; ++i) {
rawSize += arrayPtr_[i].rowSize;
}

if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) {
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(rawSize, veloxPool_.get());
}
auto* rawBuffer = sortedBuffer_->asMutable<char>();

uint64_t offset = 0;
for (auto i = begin; i < end; ++i) {
// size(RowSize) | bytes
auto size = arrayPtr_[i].rowSize;
memcpy(rawBuffer + offset, &size, sizeof(RowSizeType));
offset += sizeof(RowSizeType);
auto index = extractPageNumberAndOffset(arrayPtr_[i].value);
memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size);
char* addr;
uint32_t size;

auto index = begin;
while (index < end) {
auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
addr = pageAddresses_[pageIndex.first] + pageIndex.second;
size = *(RowSizeType*)addr;
if (offset + size > kSortedBufferSize) {
VELOX_CHECK(offset > 0);
auto payload = std::make_unique<InMemoryPayload>(
index - begin,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, offset)});
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
begin = index;
offset = 0;
}
gluten::fastCopy(rawBuffer_ + offset, addr, size);
offset += size;
index++;
}
VELOX_CHECK_EQ(offset, rawSize);

auto rawData = sortedBuffer_->as<uint8_t>();
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
buffers.push_back(std::make_shared<arrow::Buffer>(rawData, rawSize));

return std::make_unique<InMemoryPayload>(numRows, nullptr, std::move(buffers));
auto payload = std::make_unique<InMemoryPayload>(
end - begin,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, offset)});
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
}

uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) {
Expand All @@ -301,8 +311,8 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows)
if (fixedRowSize_) {
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), rows);
}
auto beginIter = rowSizes_.begin() + 1 + offset;
auto iter = std::upper_bound(beginIter, rowSizes_.end(), remainingBytes);
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes);
return iter - beginIter;
}

Expand All @@ -328,17 +338,18 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
auto newSize = newArraySize(rows);
if (newSize > arraySize_) {
// May trigger spill.
auto newSizeBytes = newSize * sizeof(Element);
auto newSizeBytes = newSize * sizeof(uint64_t);
auto newArray = facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
// Check if already satisfies.
if (newArraySize(rows) > arraySize_) {
auto newPtr = newArray->asMutable<Element>();
auto newPtr = newArray->asMutable<uint64_t>();
if (offset_ > 0) {
gluten::fastCopy(newPtr, arrayPtr_, arraySize_ * sizeof(Element));
gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
}
arraySize_ = newSize;
arrayPtr_ = newPtr;
array_ = std::move(newArray);
array_.reset();
array_.swap(newArray);
}
}
}
Expand All @@ -355,8 +366,8 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) {

void VeloxSortShuffleWriter::initArray() {
arraySize_ = options_.sortBufferInitialSize;
array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ * sizeof(Element), veloxPool_.get());
arrayPtr_ = array_->asMutable<Element>();
array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ * sizeof(uint64_t), veloxPool_.get());
arrayPtr_ = array_->asMutable<uint64_t>();
}

int64_t VeloxSortShuffleWriter::peakBytesAllocated() const {
Expand All @@ -373,6 +384,6 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {

int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
// No same values.
return ((Element*)a)->value > ((Element*)b)->value ? 1 : -1;
return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
}
} // namespace gluten
13 changes: 4 additions & 9 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);

std::unique_ptr<InMemoryPayload> prepareToEvict(size_t begin, size_t end);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows);

void acquireNewBuffer(int64_t memLimit, uint64_t minSizeRequired);
Expand All @@ -89,16 +87,11 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

void initArray();

struct Element {
uint64_t value;
uint32_t rowSize;
};

static int compare(const void* a, const void* b);

// Stores compact row id -> row
facebook::velox::BufferPtr array_;
Element* arrayPtr_;
uint64_t* arrayPtr_;
uint32_t arraySize_;
uint32_t offset_{0};

Expand All @@ -111,6 +104,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
uint32_t currenPageSize_;

facebook::velox::BufferPtr sortedBuffer_;
uint8_t* rawBuffer_;

// Row ID -> Partition ID
// subscript: The index of row in the current input RowVector
Expand All @@ -120,7 +114,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

std::shared_ptr<const facebook::velox::RowType> rowType_;
std::optional<int32_t> fixedRowSize_;
std::vector<uint64_t> rowSizes_;
std::vector<RowSizeType> rowSize_;
std::vector<uint64_t> rowSizePrefixSum_;

int64_t c2rTime_{0};
int64_t sortTime_{0};
Expand Down

0 comments on commit 962fc80

Please sign in to comment.