Skip to content

Commit

Permalink
radix sort
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 24, 2024
1 parent ece1d4f commit bf5ad1e
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 27 deletions.
157 changes: 157 additions & 0 deletions cpp/velox/shuffle/RadixSort.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>

namespace gluten {

template <typename SortArray>
class RadixSort {
public:
/**
* Sorts a given array of longs using least-significant-digit radix sort. This routine assumes
* you have extra space at the end of the array at least equal to the number of records. The
* sort is destructive and may relocate the data positioned within the array.
*
* @param array array of long elements followed by at least that many empty slots.
* @param numRecords number of data records in the array.
* @param startByteIndex the first byte (in range [0, 7]) to sort each long by, counting from the
* least significant byte.
* @param endByteIndex the last byte (in range [0, 7]) to sort each long by, counting from the
* least significant byte. Must be greater than startByteIndex.
*
* @return The starting index of the sorted data within the given array. We return this instead
* of always copying the data back to position zero for efficiency.
*/
static int32_t sort(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) {
assert(startByteIndex >= 0 && "startByteIndex should >= 0");
assert(endByteIndex <= 7 && "endByteIndex should <= 7");
assert(endByteIndex > startByteIndex);
assert(numRecords * 2 <= array.size());

int64_t inIndex = 0;
int64_t outIndex = numRecords;

if (numRecords > 0) {
auto counts = getCounts(array, numRecords, startByteIndex, endByteIndex);

for (auto i = startByteIndex; i <= endByteIndex; i++) {
if (!counts[i].empty()) {
sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex);
std::swap(inIndex, outIndex);
}
}
}

return static_cast<int>(inIndex);
}

private:
/**
* Performs a partial sort by copying data into destination offsets for each byte value at the
* specified byte offset.
*
* @param array array to partially sort.
* @param numRecords number of data records in the array.
* @param counts counts for each byte value. This routine destructively modifies this array.
* @param byteIdx the byte in a long to sort at, counting from the least significant byte.
* @param inIndex the starting index in the array where input data is located.
* @param outIndex the starting index where sorted output data should be written.
*/
static void sortAtByte(
SortArray& array,
int64_t numRecords,
std::vector<int64_t>& counts,
int32_t byteIdx,
int64_t inIndex,
int64_t outIndex) {
assert(counts.size() == 256);

auto offsets = transformCountsToOffsets(counts, outIndex);

for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) {
auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff;
array[offsets[bucket]++] = array[offset];
}
}

/**
* Computes a value histogram for each byte in the given array.
*
* @param array array to count records in.
* @param numRecords number of data records in the array.
* @param startByteIndex the first byte to compute counts for (the prior are skipped).
* @param endByteIndex the last byte to compute counts for.
*
* @return a vector of eight 256-element count arrays, one for each byte starting from the least
* significant byte. If the byte does not need sorting the vector entry will be empty.
*/
static std::vector<std::vector<int64_t>>
getCounts(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) {
std::vector<std::vector<int64_t>> counts;
counts.resize(8);

// Optimization: do a fast pre-pass to determine which byte indices we can skip for sorting.
// If all the byte values at a particular index are the same we don't need to count it.
int64_t bitwiseMax = 0;
int64_t bitwiseMin = -1L;
for (auto offset = 0; offset < numRecords; ++offset) {
auto value = array[offset];
bitwiseMax |= value;
bitwiseMin &= value;
}
auto bitsChanged = bitwiseMin ^ bitwiseMax;

// Compute counts for each byte index.
for (auto i = startByteIndex; i <= endByteIndex; i++) {
if (((bitsChanged >> (i * 8)) & 0xff) != 0) {
counts[i].resize(256);
for (auto offset = 0; offset < numRecords; ++offset) {
counts[i][(array[offset] >> (i * 8)) & 0xff]++;
}
}
}

return counts;
}

/**
* Transforms counts into the proper output offsets for the sort type.
*
* @param counts counts for each byte value. This routine destructively modifies this vector.
* @param numRecords number of data records in the original data array.
* @param outputOffset output offset in bytes from the base array object.
*
* @return the input counts vector.
*/
static std::vector<int64_t>& transformCountsToOffsets(std::vector<int64_t>& counts, int64_t outputOffset) {
assert(counts.size() == 256);

int64_t pos = outputOffset;
for (auto i = 0; i < 256; i++) {
auto tmp = counts[i & 0xff];
counts[i & 0xff] = pos;
pos += tmp;
}

return counts;
}
};

} // namespace gluten
5 changes: 2 additions & 3 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,8 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::deserializeTo
auto buffer = cur->second;
const auto* rawBuffer = buffer->as<char>();
while (rowOffset_ < cur->first && readRows < batchSize_) {
auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_);
byteOffset_ += sizeof(uint32_t);
data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_);
data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType)));
byteOffset_ += rowSize;
++rowOffset_;
++readRows;
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/shuffle/VeloxShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "shuffle/Payload.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/VeloxSortShuffleWriter.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"

Expand Down Expand Up @@ -64,6 +65,8 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator {

class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
using RowSizeType = VeloxSortShuffleWriter::RowSizeType;

VeloxSortShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
Expand Down
53 changes: 34 additions & 19 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ namespace gluten {
namespace {
constexpr uint32_t kMaskLower27Bits = (1 << 27) - 1;
constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1;
constexpr uint32_t kPartitionIdStartByteIndex = 5;
constexpr uint32_t kPartitionIdEndByteIndex = 7;

uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) {
// |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) |
Expand Down Expand Up @@ -101,13 +103,17 @@ arrow::Status VeloxSortShuffleWriter::init() {
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
array_.resize(initialSize_);
partitionRawSize_.resize(numPartitions_, 0);
return arrow::Status::OK();
}

void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv) {
if (UNLIKELY(!rowType_)) {
rowType_ = facebook::velox::asRowType(rv->type());
fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
if (fixedRowSize_) {
*fixedRowSize_ += sizeof(RowSizeType);
}
}
}

Expand Down Expand Up @@ -151,7 +157,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
rowSizes_.resize(inputRows + 1);
rowSizes_[0] = 0;
for (auto i = 0; i < inputRows; ++i) {
rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i);
rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType);
}
}

Expand All @@ -177,8 +183,15 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u
// Allocate newArray can trigger spill.
growArrayIfNecessary(rows);
for (auto i = offset; i < offset + rows; ++i) {
auto size = row.serialize(i, currentPage_ + pageCursor_);
array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size};
auto pid = row2Partition_[i];
array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);

// (RowSizeType)size | serialized row
auto dest = currentPage_ + pageCursor_;
RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType));
memcpy(dest, &size, sizeof(RowSizeType));

partitionRawSize_[pid] += size;
pageCursor_ += size;
}
}
Expand All @@ -192,17 +205,23 @@ arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) {
}

arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
auto numRecords = offset_;
int32_t begin = 0;
{
ScopedTimer timer(&sortTime_);
// TODO: Add radix sort to align with Spark.
std::sort(array_.begin(), array_.begin() + offset_);
if (useRadixSort_) {
begin = RadixSort<SortArray>::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex);
} else {
std::sort(array_.begin(), array_.begin() + numRecords);
}
}

size_t begin = 0;
size_t cur = 0;
auto pid = extractPartitionId(array_[begin].first);
while (++cur < offset_) {
auto curPid = extractPartitionId(array_[cur].first);
auto end = begin + numRecords;
auto cur = begin;
auto pid = extractPartitionId(array_[begin]);
while (++cur < end) {
auto curPid = extractPartitionId(array_[cur]);
if (curPid != pid) {
RETURN_NOT_OK(evictPartition(pid, begin, cur));
pid = curPid;
Expand Down Expand Up @@ -230,10 +249,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
// Serialize [begin, end)
uint32_t numRows = end - begin;
uint64_t rawSize = numRows * sizeof(RowSizeType);
for (auto i = begin; i < end; ++i) {
rawSize += array_[i].second;
}
uint64_t rawSize = partitionRawSize_[partitionId];
partitionRawSize_[partitionId] = 0;

if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) {
sortedBuffer_ = nullptr;
Expand All @@ -243,12 +260,10 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_

uint64_t offset = 0;
for (auto i = begin; i < end; ++i) {
// size(size_t) | bytes
auto size = array_[i].second;
memcpy(rawBuffer + offset, &size, sizeof(RowSizeType));
offset += sizeof(RowSizeType);
auto index = extractPageNumberAndOffset(array_[i].first);
memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size);
auto index = extractPageNumberAndOffset(array_[i]);
const auto* src = pageAddresses_[index.first] + index.second;
auto size = *(RowSizeType*)(src);
memcpy(rawBuffer + offset, src, size);
offset += size;
}
VELOX_CHECK_EQ(offset, rawSize);
Expand Down
13 changes: 8 additions & 5 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "shuffle/RadixSort.h"
#include "shuffle/VeloxShuffleWriter.h"

#include <arrow/status.h>
Expand All @@ -31,6 +32,8 @@ namespace gluten {

class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
public:
using RowSizeType = uint32_t;

static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
Expand Down Expand Up @@ -80,10 +83,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

void growArrayIfNecessary(uint32_t rows);

using RowSizeType = uint32_t;
using ElementType = std::pair<uint64_t, RowSizeType>;
using Allocator = facebook::velox::StlAllocator<ElementType>;
using SortArray = std::vector<ElementType, Allocator>;
using Allocator = facebook::velox::StlAllocator<uint64_t>;
using SortArray = std::vector<uint64_t, Allocator>;

std::unique_ptr<facebook::velox::HashStringAllocator> allocator_;
// Stores compact row id -> row
Expand All @@ -98,7 +99,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

// FIXME: Use configuration to replace hardcode.
uint32_t initialSize_ = 4096;
bool useRadixSort_ = false;
bool useRadixSort_ = true;

facebook::velox::BufferPtr sortedBuffer_;

Expand All @@ -108,6 +109,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
// Updated for each input RowVector.
std::vector<uint32_t> row2Partition_;

std::vector<uint64_t> partitionRawSize_;

std::shared_ptr<const facebook::velox::RowType> rowType_;
std::optional<int32_t> fixedRowSize_;
std::vector<uint64_t> rowSizes_;
Expand Down

0 comments on commit bf5ad1e

Please sign in to comment.